Initial commit: Data Analysis application with FastAPI backend and Next.js frontend

This commit is contained in:
2026-01-11 21:54:33 +01:00
commit 7bdafb4fbf
549 changed files with 96211 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Any, Dict, Optional
import pandas as pd
import numpy as np
from app.core.engine.clean import detect_univariate_outliers, detect_multivariate_outliers, merge_outliers, merge_outliers_structured
from app.core.engine.stats import calculate_correlation_matrix, calculate_feature_importance, run_regression_analysis
router = APIRouter(prefix="/analysis", tags=["analysis"])
class TypeValidationRequest(BaseModel):
data: List[Any]
target_type: str
class OutlierDetectionRequest(BaseModel):
data: List[Dict[str, Optional[Any]]]
columns: List[str]
method: str = "both"
excluded_indices: List[int] = [] # Rows to exclude from outlier detection
class CorrelationRequest(BaseModel):
data: List[Dict[str, Optional[Any]]]
columns: List[str]
method: str = "pearson" # pearson, spearman, kendall
min_threshold: Optional[float] = None # Optional minimum correlation threshold
include_pvalues: bool = True
class FeatureImportanceRequest(BaseModel):
data: List[Dict[str, Optional[Any]]]
features: List[str]
target: str
class RegressionRequest(BaseModel):
data: List[Dict[str, Optional[Any]]]
x_features: List[str]
y_target: str
model_type: str = "linear"
# New Engineering Parameters
poly_degree: int = 1 # Default to linear
include_interactions: bool = False
@router.post("/validate-type")
async def validate_type_conversion(request: TypeValidationRequest):
s = pd.Series(request.data)
try:
if request.target_type == "numeric":
pd.to_numeric(s, errors='raise')
elif request.target_type == "date":
pd.to_datetime(s, errors='raise')
return {"status": "ok", "valid": True}
except Exception as e:
return {"status": "error", "valid": False, "message": str(e)}
@router.post("/detect-outliers")
async def detect_outliers(request: OutlierDetectionRequest):
if not request.data:
return {"outliers": []}
df = pd.DataFrame(request.data).fillna(np.nan)
# Pass excluded indices to detection functions
uni_results = detect_univariate_outliers(
df, request.columns, request.excluded_indices
) if request.method in ["univariate", "both"] else {}
multi_results = detect_multivariate_outliers(
df, request.columns, request.excluded_indices
) if request.method in ["multivariate", "both"] else {}
# Use the new structured merge function
structured = merge_outliers_structured(uni_results, multi_results)
return {
"status": "ok",
"total_count": len(structured["all"]),
"outliers": structured["all"], # Backwards compatibility
"univariate": structured["univariate"], # New: Column-specific outliers
"multivariate": structured["multivariate"] # New: Global outliers
}
@router.post("/correlation")
async def get_correlation(request: CorrelationRequest):
if not request.data or not request.columns:
return {
"status": "error",
"message": "Data and columns are required",
"result": {"matrix": [], "pvalues": [], "metadata": {}}
}
df = pd.DataFrame(request.data).fillna(np.nan)
# Validate method parameter
valid_methods = ['pearson', 'spearman', 'kendall']
if request.method not in valid_methods:
raise HTTPException(
status_code=400,
detail=f"Invalid method. Choose from: {', '.join(valid_methods)}"
)
try:
result = calculate_correlation_matrix(
df,
request.columns,
method=request.method,
min_threshold=request.min_threshold,
include_pvalues=request.include_pvalues
)
# Add summary statistics
from app.core.engine.stats import get_correlation_summary
summary = get_correlation_summary(result)
return {
"status": "ok",
"result": result,
"summary": summary
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Correlation calculation failed: {str(e)}")
@router.post("/feature-importance")
async def get_feature_importance(request: FeatureImportanceRequest):
if not request.data or not request.features or not request.target: return {"importances": []}
df = pd.DataFrame(request.data).fillna(np.nan)
return {"status": "ok", "importances": calculate_feature_importance(df, request.features, request.target)}
@router.post("/run-regression")
async def run_regression(request: RegressionRequest):
if not request.data or not request.x_features or not request.y_target:
raise HTTPException(status_code=400, detail="Incomplete parameters.")
df = pd.DataFrame(request.data).fillna(np.nan)
try:
results = run_regression_analysis(
df,
request.x_features,
request.y_target,
request.model_type,
request.poly_degree,
request.include_interactions
)
return {"status": "ok", "results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal Analysis Error: {str(e)}")

View File

@@ -0,0 +1,33 @@
from fastapi import APIRouter, HTTPException, Response
from pydantic import BaseModel
from typing import Dict, Any, List
from app.core.engine.reports import create_pdf_report
router = APIRouter(prefix="/reports", tags=["reporting"])
class ExportRequest(BaseModel):
project_name: str
results: Dict[str, Any]
audit_trail: Dict[str, Any]
@router.post("/export")
async def export_report(request: ExportRequest):
"""
Generates and returns a PDF report.
"""
try:
pdf_bytes = create_pdf_report(
request.project_name,
request.results,
request.audit_trail
)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={
"Content-Disposition": f"attachment; filename=Report_{request.project_name}.pdf"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -0,0 +1,44 @@
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import StreamingResponse
import io
import json
from app.core.engine.ingest import parse_file, get_column_metadata, dataframe_to_arrow_stream
router = APIRouter(prefix="/upload", tags=["ingestion"])
@router.post("")
async def upload_file(file: UploadFile = File(...)):
"""
Endpoint to upload Excel/CSV files and receive an Apache Arrow stream.
Metadata about columns is sent in the X-Column-Metadata header.
"""
# 1. Validation
if not file.filename.endswith(('.xlsx', '.xls', '.csv')):
raise HTTPException(status_code=400, detail="Only .xlsx, .xls and .csv files are supported.")
try:
content = await file.read()
# 2. Parsing
df = parse_file(content, file.filename)
# 3. Metadata Extraction
metadata = get_column_metadata(df)
# 4. Conversion to Arrow
arrow_bytes = dataframe_to_arrow_stream(df)
# We use a StreamingResponse to send the binary Arrow data.
# Metadata is sent as a custom header (JSON stringified).
return StreamingResponse(
io.BytesIO(arrow_bytes),
media_type="application/vnd.apache.arrow.stream",
headers={
"X-Column-Metadata": json.dumps(metadata),
"Access-Control-Expose-Headers": "X-Column-Metadata"
}
)
except Exception as e:
# In a real app, we'd log this properly
raise HTTPException(status_code=400, detail=f"Error processing file: {str(e)}")

View File

@@ -0,0 +1,165 @@
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from typing import List, Dict, Any
def detect_univariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
"""
Detects outliers in specific numeric columns using the Interquartile Range (IQR) method.
Args:
df: Input DataFrame
columns: List of column names to analyze
excluded_indices: List of row indices to exclude from detection
Returns:
Dictionary of {original_row_index: [reasons]}
"""
# Exclude specified rows if provided
if excluded_indices:
df = df[~df.index.isin(excluded_indices)]
outliers = {}
for col in columns:
if col not in df.columns:
continue
s = pd.to_numeric(df[col], errors='coerce')
q1 = s.quantile(0.25)
q3 = s.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
flags = (s < lower_bound) | (s > upper_bound)
indices = df.index[flags].tolist()
for idx in indices:
val = df.at[idx, col]
reason = f"Column '{col}' value {val} is outside IQR bounds [{lower_bound:.2f}, {upper_bound:.2f}]"
if idx not in outliers:
outliers[idx] = []
outliers[idx].append(reason)
return outliers
def detect_multivariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
"""
Detects anomalies across multiple numeric columns using Isolation Forest.
Args:
df: Input DataFrame
columns: List of column names to analyze
excluded_indices: List of row indices to exclude from detection (already filtered out)
Returns:
Dictionary of {original_row_index: [reasons]}
"""
# Store original indices
original_indices = df.index.tolist()
# Exclude specified rows if provided
if excluded_indices:
df = df[~df.index.isin(excluded_indices)]
# Select only relevant numeric columns
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
if numeric_df.empty:
return {}
# Get rows with all values present (no NaNs in selected columns)
valid_mask = numeric_df.notna().all(axis=1)
numeric_df_clean = numeric_df[valid_mask]
if numeric_df_clean.empty:
return {}
# Fit Isolation Forest
model = IsolationForest(contamination='auto', random_state=42)
preds = model.fit_predict(numeric_df_clean)
# IsolationForest returns -1 for outliers
# Get the indices from the clean DataFrame (these are the original indices)
outlier_indices = numeric_df_clean.index[preds == -1].tolist()
return {int(idx): ["Multivariate anomaly detected by Isolation Forest"] for idx in outlier_indices}
def merge_outliers(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> List[Dict[str, Any]]:
"""
Merges results into a flat list of outlier objects.
DEPRECATED: Use merge_outliers_structured instead for better type separation.
"""
all_indices = set(uni.keys()) | set(multi.keys())
results = []
for idx in sorted(all_indices):
reasons = uni.get(idx, []) + multi.get(idx, [])
results.append({
"index": int(idx),
"reasons": reasons
})
return results
def merge_outliers_structured(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> Dict[str, Any]:
"""
Merges and separates outliers by type for better frontend handling.
Returns:
Dictionary with:
- 'univariate': Dict mapping column names to their specific outliers
- 'multivariate': List of outliers that affect multiple columns
- 'all': Flat list of all outliers (for backwards compatibility)
This structure allows the frontend to:
1. Show column-specific outliers when clicking a column header
2. Show global/multivariate outliers in a separate view
3. Clearly distinguish between local and global anomalies
"""
# Extract column names from univariate reasons
column_outliers: Dict[str, List[Dict[str, Any]]] = {}
for idx, reasons in uni.items():
for reason in reasons:
# Extract column name from reason string
# Format: "Column 'Price' value 100 is outside..."
if "Column '" in reason:
col_start = reason.index("Column '") + 8
col_end = reason.index("'", col_start)
col_name = reason[col_start:col_end]
if col_name not in column_outliers:
column_outliers[col_name] = []
# Check if this row index already exists for this column
existing = next((x for x in column_outliers[col_name] if x["index"] == idx), None)
if not existing:
column_outliers[col_name].append({
"index": int(idx),
"reasons": [reason]
})
else:
existing["reasons"].append(reason)
# Prepare multivariate outliers
multivariate_list = [
{"index": int(idx), "reasons": reasons}
for idx, reasons in multi.items()
]
# Prepare legacy flat format (backwards compatibility)
all_indices = set(uni.keys()) | set(multi.keys())
all_outliers = []
for idx in sorted(all_indices):
reasons = uni.get(idx, []) + multi.get(idx, [])
all_outliers.append({
"index": int(idx),
"reasons": reasons
})
return {
"univariate": column_outliers,
"multivariate": multivariate_list,
"all": all_outliers
}

View File

@@ -0,0 +1,56 @@
import pandas as pd
import pyarrow as pa
import io
from typing import Tuple, Dict, Any
def parse_file(file_content: bytes, filename: str) -> pd.DataFrame:
"""
Parses the uploaded file (Excel or CSV) into a Pandas DataFrame.
"""
file_obj = io.BytesIO(file_content)
if filename.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_obj)
elif filename.endswith('.csv'):
# Attempt to detect common delimiters if needed, default to comma
df = pd.read_csv(file_obj)
else:
raise ValueError(f"Unsupported file format: {filename}")
# Basic hygiene: strip whitespace from headers
df.columns = [str(c).strip() for c in df.columns]
return df
def get_column_metadata(df: pd.DataFrame) -> list:
"""
Returns a list of column metadata (name and inferred type).
"""
metadata = []
for col in df.columns:
dtype = str(df[col].dtype)
# Simplify types for the frontend
inferred_type = "numeric"
if "object" in dtype or "string" in dtype:
inferred_type = "categorical"
elif "datetime" in dtype:
inferred_type = "date"
elif "bool" in dtype:
inferred_type = "boolean"
metadata.append({
"name": col,
"type": inferred_type,
"native_type": dtype
})
return metadata
def dataframe_to_arrow_stream(df: pd.DataFrame) -> bytes:
"""
Converts a Pandas DataFrame to an Apache Arrow IPC stream.
"""
table = pa.Table.from_pandas(df)
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, table.schema) as writer:
writer.write_table(table)
return sink.getvalue().to_pybytes()

View File

@@ -0,0 +1,223 @@
from fpdf import FPDF
from datetime import datetime
from io import BytesIO
import pandas as pd
import platform
import sklearn
import statsmodels
import os
import matplotlib
matplotlib.use('Agg') # Use non-GUI backend
import matplotlib.pyplot as plt
import tempfile
class AnalysisReport(FPDF):
def header(self):
try:
# Add Unicode font support for accented characters
self.set_font('Arial', 'B', 15)
self.set_text_color(79, 70, 229) # Indigo 600
self.cell(0, 10, 'Data_analysis - Rapport de Validation', 0, 1, 'L')
self.set_draw_color(226, 232, 240)
self.line(10, 22, 200, 22)
self.ln(10)
except Exception as e:
print(f"Header error: {e}")
def footer(self):
try:
self.set_y(-15)
self.set_font('Arial', 'I', 8)
self.set_text_color(148, 163, 184)
self.cell(0, 10, f'Page {self.page_no()} | Genere le {datetime.now().strftime("%Y-%m-%d %H:%M")}', 0, 0, 'C')
except Exception as e:
print(f"Footer error: {e}")
def create_pdf_report(project_name: str, results: dict, audit_trail: dict) -> bytes:
try:
pdf = AnalysisReport()
pdf.add_page()
# 1. Summary
pdf.set_font('Arial', 'B', 12)
pdf.set_text_color(51, 65, 85)
pdf.cell(0, 10, f"Projet : {project_name}", 0, 1)
pdf.ln(5)
# Handle missing model_type
model_type = results.get('model_type', 'Regression')
if isinstance(model_type, list):
model_type = model_type[0] if model_type else 'Regression'
pdf.set_font('Arial', '', 10)
pdf.cell(0, 8, f"Modele : {model_type}", 0, 1)
# Handle r_squared safely
r_squared = results.get('r_squared', 0)
if r_squared is None:
r_squared = 0
pdf.cell(0, 8, f"Precision (R²) : {float(r_squared):.4f}", 0, 1)
# Handle sample_size safely
sample_size = results.get('sample_size', 0)
if sample_size is None:
sample_size = 0
pdf.cell(0, 8, f"Taille de l'echantillon : {int(sample_size)}", 0, 1)
pdf.ln(10)
# 2. Coefficients Table
pdf.set_font('Arial', 'B', 11)
pdf.cell(0, 10, "Coefficients du Modele", 0, 1)
pdf.set_font('Arial', 'B', 9)
pdf.set_fill_color(248, 250, 252)
pdf.cell(80, 8, "Feature", 1, 0, 'L', True)
pdf.cell(50, 8, "Coefficient", 1, 0, 'R', True)
pdf.cell(50, 8, "P-Value", 1, 1, 'R', True)
# Get coefficients and p_values safely
coefficients = results.get('coefficients', {})
p_values = results.get('p_values', {})
if coefficients:
pdf.set_font('Arial', '', 9)
for name, coef in coefficients.items():
# Convert coef to float safely
try:
coef_val = float(coef)
except (TypeError, ValueError):
coef_val = 0.0
# Get p-value safely
p_val = p_values.get(name, 1.0)
try:
p_val = float(p_val)
except (TypeError, ValueError):
p_val = 1.0
pdf.cell(80, 8, str(name), 1)
pdf.cell(50, 8, f"{coef_val:.4f}", 1, 0, 'R')
if p_val < 0.05:
pdf.set_text_color(16, 185, 129) # Emerald
else:
pdf.set_text_color(244, 63, 94) # Rose
pdf.cell(50, 8, f"{p_val:.4f}", 1, 1, 'R')
pdf.set_text_color(51, 65, 85)
else:
pdf.set_font('Arial', '', 9)
pdf.cell(0, 8, "Aucun coefficient disponible", 0, 1)
pdf.ln(15)
# 3. Visualization Charts
if 'fit_plot' in results and len(results['fit_plot']) > 0:
pdf.set_font('Arial', 'B', 11)
pdf.cell(0, 10, "Courbe de Regression", 0, 1)
pdf.ln(5)
# Create fit plot
fit_data = results['fit_plot']
x_vals = [p['x'] for p in fit_data]
y_real = [p['real'] for p in fit_data]
y_pred = [p['pred'] for p in fit_data]
plt.figure(figsize=(10, 6))
plt.scatter(x_vals, y_real, alpha=0.6, color='#4f46e5', label='Données réelles', s=50)
plt.plot(x_vals, y_pred, color='#ef4444', linewidth=2, label='Courbe de régression')
plt.xlabel('Valeur X', fontsize=12)
plt.ylabel('Valeur Y', fontsize=12)
plt.title('Ajustement du Modèle', fontsize=14, fontweight='bold')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
# Save plot to temp file and add to PDF
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
plt.savefig(tmp.name, dpi=150, bbox_inches='tight')
plt.close()
pdf.image(tmp.name, x=10, w=190)
os.unlink(tmp.name)
pdf.ln(10)
# Residuals plot
if 'diagnostic_plot' in results and len(results['diagnostic_plot']) > 0:
pdf.set_font('Arial', 'B', 11)
pdf.cell(0, 10, "Graphique des Residus", 0, 1)
pdf.ln(5)
residuals_data = results['diagnostic_plot']
fitted = [p['fitted'] for p in residuals_data]
residuals = [p['residual'] for p in residuals_data]
plt.figure(figsize=(10, 6))
plt.scatter(fitted, residuals, alpha=0.6, color='#4f46e5', s=50)
plt.axhline(y=0, color='#ef4444', linestyle='--', linewidth=2)
plt.xlabel('Valeurs Ajustees', fontsize=12)
plt.ylabel('Residus', fontsize=12)
plt.title('Graphique des Residus', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp:
plt.savefig(tmp.name, dpi=150, bbox_inches='tight')
plt.close()
pdf.image(tmp.name, x=10, w=190)
os.unlink(tmp.name)
pdf.ln(10)
# 4. Audit Trail (Reproducibility)
pdf.set_font('Arial', 'B', 11)
pdf.cell(0, 10, "Piste d'Audit & Reproductibilite", 0, 1)
pdf.set_font('Arial', '', 8)
pdf.set_text_color(100, 116, 139)
# Cleaning steps
excluded_count = audit_trail.get('excluded_rows_count', 0)
if excluded_count is None:
excluded_count = 0
pdf.multi_cell(0, 6, f"- Nettoyage : {int(excluded_count)} lignes exclues de l'analyse.")
# Environment
pdf.ln(5)
pdf.set_font('Arial', 'B', 8)
pdf.cell(0, 6, "Environnement Technique :", 0, 1)
pdf.set_font('Arial', '', 8)
pdf.cell(0, 5, f"- Python : {platform.python_version()}", 0, 1)
pdf.cell(0, 5, f"- Pandas : {pd.__version__}", 0, 1)
# Try to get sklearn version safely
try:
pdf.cell(0, 5, f"- Scikit-learn : {sklearn.__version__}", 0, 1)
except Exception:
pdf.cell(0, 5, "- Scikit-learn : Installé", 0, 1)
# Try to get statsmodels version safely
try:
pdf.cell(0, 5, f"- Statsmodels : {statsmodels.__version__}", 0, 1)
except Exception:
pdf.cell(0, 5, "- Statsmodels : Installé", 0, 1)
pdf.cell(0, 5, f"- Random Seed : 42 (Fixed)", 0, 1)
# Generate PDF bytes using BytesIO
pdf_buffer = BytesIO()
pdf.output(pdf_buffer)
return pdf_buffer.getvalue()
except Exception as e:
# Return error as PDF with message using BytesIO
error_pdf = FPDF()
error_pdf.add_page()
error_pdf.set_font('Arial', 'B', 16)
error_pdf.cell(0, 10, f"Erreur lors de la generation du PDF", 0, 1)
error_pdf.ln(10)
error_pdf.set_font('Arial', '', 12)
error_pdf.multi_cell(0, 10, f"Erreur: {str(e)}")
error_buffer = BytesIO()
error_pdf.output(error_buffer)
return error_buffer.getvalue()

View File

@@ -0,0 +1,430 @@
import pandas as pd
import numpy as np
import statsmodels.api as sm
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from sklearn.preprocessing import PolynomialFeatures
from scipy import stats
from typing import List, Dict, Any, Tuple
import sympy as sp
def calculate_correlation_matrix(
df: pd.DataFrame,
columns: List[str],
method: str = 'pearson',
min_threshold: float = None,
include_pvalues: bool = True
) -> Dict[str, Any]:
"""
Calculate correlation matrix with optional p-values and filtering.
Args:
df: Input DataFrame
columns: List of column names to analyze
method: Correlation method ('pearson', 'spearman', 'kendall')
min_threshold: Minimum absolute correlation value to include (optional)
include_pvalues: Whether to calculate statistical significance
Returns:
Dictionary with matrix data, p-values, and metadata
"""
if not columns:
return {"matrix": [], "pvalues": [], "metadata": {}}
# Convert to numeric and handle missing values
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
# Remove columns with too many missing values (>50%)
missing_ratios = numeric_df.isnull().sum() / len(numeric_df)
valid_cols = missing_ratios[missing_ratios <= 0.5].index.tolist()
if len(valid_cols) < 2:
return {"matrix": [], "pvalues": [], "metadata": {"error": "Need at least 2 valid numeric columns"}}
# Use pairwise deletion for correlation (more robust than listwise)
clean_df = numeric_df[valid_cols]
# Calculate correlation matrix
corr_matrix = clean_df.corr(method=method)
# Calculate p-values if requested
pvalue_matrix = None
if include_pvalues:
pvalue_matrix = pd.DataFrame(np.zeros_like(corr_matrix),
index=corr_matrix.index,
columns=corr_matrix.columns)
for i, col1 in enumerate(corr_matrix.columns):
for j, col2 in enumerate(corr_matrix.index):
if i != j:
# Pairwise complete observations
valid_data = clean_df[[col1, col2]].dropna()
if len(valid_data) >= 3:
if method == 'pearson':
_, pval = stats.pearsonr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
elif method == 'spearman':
_, pval = stats.spearmanr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
elif method == 'kendall':
_, pval = stats.kendalltau(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
else:
pval = np.nan
pvalue_matrix.iloc[i, j] = pval
# Build results
results = []
pvalue_results = []
for x in corr_matrix.columns:
for y in corr_matrix.index:
value = float(corr_matrix.at[y, x])
# Apply threshold filter if specified
if min_threshold is not None and abs(value) < min_threshold:
continue
results.append({
"x": x,
"y": y,
"value": value,
"abs_value": abs(value)
})
if include_pvalues and pvalue_matrix is not None:
pvalue_results.append({
"x": x,
"y": y,
"pvalue": float(pvalue_matrix.at[y, x]) if not pd.isna(pvalue_matrix.at[y, x]) else None,
"significant": bool((pvalue_matrix.at[y, x] or 1) < 0.05 if not pd.isna(pvalue_matrix.at[y, x]) else False)
})
# Calculate summary statistics
n_observations = len(clean_df)
return {
"matrix": results,
"pvalues": pvalue_results if include_pvalues else [],
"metadata": {
"method": method,
"n_observations": n_observations,
"n_variables": len(valid_cols),
"columns_analyzed": valid_cols,
"threshold_applied": min_threshold
}
}
def get_correlation_summary(correlation_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate summary statistics from correlation data.
Identifies strongest correlations (positive and negative).
"""
matrix = correlation_data.get("matrix", [])
# Filter out diagonal (self-correlation)
off_diagonal = [m for m in matrix if m["x"] != m["y"]]
if not off_diagonal:
return {"strongest": [], "weakest": []}
# Sort by absolute correlation value
sorted_by_abs = sorted(off_diagonal, key=lambda x: x["abs_value"], reverse=True)
# Get strongest correlations (top 5)
strongest = sorted_by_abs[:5]
# Get weakest correlations (bottom 5, but non-zero)
weakest = [m for m in sorted_by_abs if m["abs_value"] > 0][-5:]
weakest = sorted(weakest, key=lambda x: x["abs_value"])
return {
"strongest": strongest,
"weakest": weakest,
"total_pairs": len(off_diagonal)
}
def calculate_feature_importance(df: pd.DataFrame, features: List[str], target: str) -> List[Dict[str, Any]]:
if not features or not target: return []
df_clean = df.dropna(subset=[target])
X = df_clean[features].apply(pd.to_numeric, errors='coerce').fillna(0)
y = df_clean[target]
if y.dtype == 'object' or y.dtype == 'string': y = pd.factorize(y)[0]
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
result = permutation_importance(model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
importances = result.importances_mean
results = [{"feature": name, "score": max(0, float(score))} for name, score in zip(features, importances)]
total = sum(r["score"] for r in results)
if total > 0:
for r in results: r["score"] /= total
return sorted(results, key=lambda x: x["score"], reverse=True)
def generate_equations(coefficients: Dict[str, float], model_type: str) -> Dict[str, str]:
"""
Generate equation strings in LaTeX, Python, and Excel formats.
Args:
coefficients: Dictionary of feature names to coefficient values
model_type: Type of regression model ('linear', 'polynomial', 'exponential', 'logistic')
Returns:
Dictionary with 'latex', 'python', and 'excel' equation strings
"""
from sympy import symbols, sympify, latex, Float, preorder_traversal, Mul, Pow
# Extract intercept
intercept = 0.0
feature_coefs = {}
for key, value in coefficients.items():
if key in ['const', 'intercept', '(Intercept)']:
intercept = float(value)
else:
feature_coefs[key] = float(value)
# Helper function to format number cleanly for Python/Excel
def format_number(num: float) -> str:
"""Format number with 3 decimal places max"""
if num == 0:
return "0"
abs_num = abs(num)
# Use scientific notation for very small or very large numbers
if abs_num >= 10000 or (abs_num < 0.001 and abs_num > 0):
return f"{num:.2e}"
# Regular decimal with 3 decimal places max
formatted = f"{num:.3f}"
# Remove trailing zeros
return formatted.rstrip('0').rstrip('.')
# Build LaTeX with sympy using scientific notation
# Create symbols for each variable
for name in feature_coefs.keys():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
symbols(safe_name)
# Build expression string
expr_parts = []
intercept_str = f"{intercept:.10f}"
expr_parts.append(intercept_str)
for name, coef in feature_coefs.items():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
coef_str = f"{coef:.10f}"
expr_parts.append(f"{coef_str}*{safe_name}")
expr_str = " + ".join(expr_parts)
expr = sympify(expr_str)
# Scientific notation rounding function
def scientific_round_expr(e, ndigits=2):
"""
Convert floats to scientific notation with specified decimal places.
Example: 12345.678 -> 1.23 × 10^4
"""
repl = {}
for node in preorder_traversal(e):
if isinstance(node, Float):
val = float(node.evalf(6)) # Get enough precision
abs_val = abs(val)
# Use scientific notation for large or small numbers
if abs_val >= 10000 or (abs_val < 0.01 and abs_val > 0):
sci_str = f"{val:.{ndigits}e}"
mantissa, exponent = sci_str.split('e')
# Reconstruct as: mantissa × 10^exponent
repl[node] = Mul(Float(mantissa), Pow(10, int(exponent)), evaluate=False)
else:
# Regular rounding for normal numbers
repl[node] = Float(round(val, ndigits))
return e.xreplace(repl)
# Apply scientific rounding
expr_sci = scientific_round_expr(expr, 2)
# Convert to LaTeX
latex_eq_raw = latex(expr_sci, fold_frac_powers=True, fold_short_frac=True, mul_symbol='times')
# Replace safe names with readable display names
for name in feature_coefs.keys():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
display_name = name.replace('_', ' ')
latex_eq_raw = latex_eq_raw.replace(safe_name, f"\\mathrm{{{display_name}}}")
# Add "y = " prefix
latex_eq = f"y = {latex_eq_raw}"
# Build Python format
python_parts = []
for name, coef in feature_coefs.items():
coef_str = format_number(coef)
if coef >= 0:
python_parts.append(f"+ {coef_str}*{name}")
else:
python_parts.append(f"- {format_number(abs(coef))}*{name}")
intercept_str_clean = format_number(intercept)
python_eq = f"y = {intercept_str_clean} " + ' '.join(python_parts) if python_parts else f"y = {intercept_str_clean}"
# Generate Excel format
col_letters = {name: chr(65 + i) for i, name in enumerate(feature_coefs.keys())}
excel_parts = []
for name, coef in feature_coefs.items():
coef_str = format_number(coef)
col_letter = col_letters[name]
if coef >= 0:
excel_parts.append(f"+ {coef_str}*{col_letter}1")
else:
excel_parts.append(f"- {format_number(abs(coef))}*{col_letter}1")
excel_eq = f"={intercept_str_clean} " + ' '.join(excel_parts) if excel_parts else f"={intercept_str_clean}"
return {
"latex": latex_eq,
"python": python_eq,
"excel": excel_eq
}
def run_regression_analysis(df: pd.DataFrame, x_cols: List[str], y_col: str, model_type: str = "linear", poly_degree: int = 1, include_interactions: bool = False) -> Dict[str, Any]:
# 1. Prep Data
# Capture original X for plotting before transformation
X_original = df[x_cols].apply(pd.to_numeric, errors='coerce')
y_data = df[y_col]
# Align indices after dropna
data = pd.concat([X_original, y_data], axis=1).dropna()
if data.empty or len(data) < len(x_cols) + 1:
raise ValueError("Insufficient data.")
X_raw = data[x_cols] # Keep for plotting
y = pd.to_numeric(data[y_col], errors='coerce')
X = X_raw.copy() # Start with raw for modelling
# 2. Advanced Feature Engineering
if model_type == "polynomial" or include_interactions:
degree = poly_degree if model_type == "polynomial" else 2
interaction_only = include_interactions and model_type != "polynomial"
poly = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=False)
X_poly = poly.fit_transform(X)
poly_cols = poly.get_feature_names_out(X.columns)
X = pd.DataFrame(X_poly, columns=poly_cols, index=X.index)
# 3. Model Fitting
try:
model = None
y_pred = None
if model_type == "logistic":
X_const = sm.add_constant(X)
y_bin = (y > y.median()).astype(int)
model = sm.Logit(y_bin, X_const).fit(disp=0)
y_pred = model.predict(X_const)
y = y_bin
elif model_type == "exponential":
if (y <= 0).any(): raise ValueError("Exponential regression requires Y > 0.")
y_log = np.log(y)
X_const = sm.add_constant(X)
lin_model = sm.OLS(y_log, X_const).fit()
y_pred = np.exp(lin_model.predict(X_const))
model = lin_model
else: # Linear or Polynomial
X_const = sm.add_constant(X)
model = sm.OLS(y, X_const).fit()
y_pred = model.predict(X_const)
# 4. Construct Visualization Data
# Create fit plots for each original feature
fit_plots_by_feature = {}
residuals_vs_fitted = []
y_list = y.tolist()
pred_list = y_pred.tolist()
residuals = []
# Create a fit plot for each original feature
for feature_name in X_raw.columns:
x_feature_list = X_raw[feature_name].tolist()
feature_plot = []
for i in range(len(y_list)):
feature_plot.append({
"x": float(x_feature_list[i]),
"real": float(y_list[i]),
"pred": float(pred_list[i])
})
# Sort by X for proper curve rendering
feature_plot.sort(key=lambda item: item["x"])
fit_plots_by_feature[feature_name] = feature_plot
# Also create a single fit_plot using the first feature for backward compatibility
fit_plot = fit_plots_by_feature[X_raw.columns[0]] if len(X_raw.columns) > 0 else []
# Residuals plot
for i in range(len(y_list)):
res_val = y_list[i] - pred_list[i]
residuals.append(res_val)
residuals_vs_fitted.append({
"fitted": float(pred_list[i]),
"residual": res_val
})
# 5. Calculate Partial Regression Plots (Added Variable Plots)
# These show the isolated effect of each variable controlling for others
partial_regression_plots = {}
# Only calculate for multiple regression (more than 1 feature)
if len(X_raw.columns) > 1:
for feature_name in X_raw.columns:
# Get other features (all except current)
other_features = [col for col in X_raw.columns if col != feature_name]
if len(other_features) == 0:
continue
# Step 1: Regress Y on all features except current one
X_other = X_raw[other_features]
X_other_const = sm.add_constant(X_other)
model_y = sm.OLS(y, X_other_const).fit()
y_residuals = y - model_y.predict(X_other_const)
# Step 2: Regress current feature on other features
model_x = sm.OLS(X_raw[feature_name], X_other_const).fit()
x_residuals = X_raw[feature_name] - model_x.predict(X_other_const)
# Step 3: Create partial plot data
partial_plot = []
for i in range(len(y)):
partial_plot.append({
"x": float(x_residuals.iloc[i]),
"y": float(y_residuals.iloc[i])
})
# Sort by x for proper line rendering
partial_plot.sort(key=lambda item: item["x"])
partial_regression_plots[feature_name] = partial_plot
# Generate equation strings
equations = generate_equations(model.params.to_dict(), model_type)
summary = {
"r_squared": float(model.rsquared) if hasattr(model, 'rsquared') else float(model.prsquared),
"adj_r_squared": float(model.rsquared_adj) if hasattr(model, 'rsquared_adj') else None,
"aic": float(model.aic),
"bic": float(model.bic),
"coefficients": model.params.to_dict(),
"p_values": model.pvalues.to_dict(),
"std_errors": model.bse.to_dict(),
"sample_size": int(model.nobs),
"residuals": residuals,
"fit_plot": fit_plot, # Backward compatibility (first feature)
"fit_plots_by_feature": fit_plots_by_feature, # All features
"partial_regression_plots": partial_regression_plots, # Partial plots for multivariate
"diagnostic_plot": residuals_vs_fitted,
"equations": equations # LaTeX, Python, Excel formats
}
return summary
except Exception as e:
raise ValueError(f"Model calculation failed: {str(e)}")