2026-01-11 22:56:02 +01:00

431 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import numpy as np
import statsmodels.api as sm
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from sklearn.preprocessing import PolynomialFeatures
from scipy import stats
from typing import List, Dict, Any, Tuple
import sympy as sp
def calculate_correlation_matrix(
df: pd.DataFrame,
columns: List[str],
method: str = 'pearson',
min_threshold: float = None,
include_pvalues: bool = True
) -> Dict[str, Any]:
"""
Calculate correlation matrix with optional p-values and filtering.
Args:
df: Input DataFrame
columns: List of column names to analyze
method: Correlation method ('pearson', 'spearman', 'kendall')
min_threshold: Minimum absolute correlation value to include (optional)
include_pvalues: Whether to calculate statistical significance
Returns:
Dictionary with matrix data, p-values, and metadata
"""
if not columns:
return {"matrix": [], "pvalues": [], "metadata": {}}
# Convert to numeric and handle missing values
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
# Remove columns with too many missing values (>50%)
missing_ratios = numeric_df.isnull().sum() / len(numeric_df)
valid_cols = missing_ratios[missing_ratios <= 0.5].index.tolist()
if len(valid_cols) < 2:
return {"matrix": [], "pvalues": [], "metadata": {"error": "Need at least 2 valid numeric columns"}}
# Use pairwise deletion for correlation (more robust than listwise)
clean_df = numeric_df[valid_cols]
# Calculate correlation matrix
corr_matrix = clean_df.corr(method=method)
# Calculate p-values if requested
pvalue_matrix = None
if include_pvalues:
pvalue_matrix = pd.DataFrame(np.zeros_like(corr_matrix),
index=corr_matrix.index,
columns=corr_matrix.columns)
for i, col1 in enumerate(corr_matrix.columns):
for j, col2 in enumerate(corr_matrix.index):
if i != j:
# Pairwise complete observations
valid_data = clean_df[[col1, col2]].dropna()
if len(valid_data) >= 3:
if method == 'pearson':
_, pval = stats.pearsonr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
elif method == 'spearman':
_, pval = stats.spearmanr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
elif method == 'kendall':
_, pval = stats.kendalltau(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
else:
pval = np.nan
pvalue_matrix.iloc[i, j] = pval
# Build results
results = []
pvalue_results = []
for x in corr_matrix.columns:
for y in corr_matrix.index:
value = float(corr_matrix.at[y, x])
# Apply threshold filter if specified
if min_threshold is not None and abs(value) < min_threshold:
continue
results.append({
"x": x,
"y": y,
"value": value,
"abs_value": abs(value)
})
if include_pvalues and pvalue_matrix is not None:
pvalue_results.append({
"x": x,
"y": y,
"pvalue": float(pvalue_matrix.at[y, x]) if not pd.isna(pvalue_matrix.at[y, x]) else None,
"significant": bool((pvalue_matrix.at[y, x] or 1) < 0.05 if not pd.isna(pvalue_matrix.at[y, x]) else False)
})
# Calculate summary statistics
n_observations = len(clean_df)
return {
"matrix": results,
"pvalues": pvalue_results if include_pvalues else [],
"metadata": {
"method": method,
"n_observations": n_observations,
"n_variables": len(valid_cols),
"columns_analyzed": valid_cols,
"threshold_applied": min_threshold
}
}
def get_correlation_summary(correlation_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate summary statistics from correlation data.
Identifies strongest correlations (positive and negative).
"""
matrix = correlation_data.get("matrix", [])
# Filter out diagonal (self-correlation)
off_diagonal = [m for m in matrix if m["x"] != m["y"]]
if not off_diagonal:
return {"strongest": [], "weakest": []}
# Sort by absolute correlation value
sorted_by_abs = sorted(off_diagonal, key=lambda x: x["abs_value"], reverse=True)
# Get strongest correlations (top 5)
strongest = sorted_by_abs[:5]
# Get weakest correlations (bottom 5, but non-zero)
weakest = [m for m in sorted_by_abs if m["abs_value"] > 0][-5:]
weakest = sorted(weakest, key=lambda x: x["abs_value"])
return {
"strongest": strongest,
"weakest": weakest,
"total_pairs": len(off_diagonal)
}
def calculate_feature_importance(df: pd.DataFrame, features: List[str], target: str) -> List[Dict[str, Any]]:
if not features or not target: return []
df_clean = df.dropna(subset=[target])
X = df_clean[features].apply(pd.to_numeric, errors='coerce').fillna(0)
y = df_clean[target]
if y.dtype == 'object' or y.dtype == 'string': y = pd.factorize(y)[0]
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
result = permutation_importance(model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
importances = result.importances_mean
results = [{"feature": name, "score": max(0, float(score))} for name, score in zip(features, importances)]
total = sum(r["score"] for r in results)
if total > 0:
for r in results: r["score"] /= total
return sorted(results, key=lambda x: x["score"], reverse=True)
def generate_equations(coefficients: Dict[str, float], model_type: str) -> Dict[str, str]:
"""
Generate equation strings in LaTeX, Python, and Excel formats.
Args:
coefficients: Dictionary of feature names to coefficient values
model_type: Type of regression model ('linear', 'polynomial', 'exponential', 'logistic')
Returns:
Dictionary with 'latex', 'python', and 'excel' equation strings
"""
from sympy import symbols, sympify, latex, Float, preorder_traversal, Mul, Pow
# Extract intercept
intercept = 0.0
feature_coefs = {}
for key, value in coefficients.items():
if key in ['const', 'intercept', '(Intercept)']:
intercept = float(value)
else:
feature_coefs[key] = float(value)
# Helper function to format number cleanly for Python/Excel
def format_number(num: float) -> str:
"""Format number with 3 decimal places max"""
if num == 0:
return "0"
abs_num = abs(num)
# Use scientific notation for very small or very large numbers
if abs_num >= 10000 or (abs_num < 0.001 and abs_num > 0):
return f"{num:.2e}"
# Regular decimal with 3 decimal places max
formatted = f"{num:.3f}"
# Remove trailing zeros
return formatted.rstrip('0').rstrip('.')
# Build LaTeX with sympy using scientific notation
# Create symbols for each variable
for name in feature_coefs.keys():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
symbols(safe_name)
# Build expression string
expr_parts = []
intercept_str = f"{intercept:.10f}"
expr_parts.append(intercept_str)
for name, coef in feature_coefs.items():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
coef_str = f"{coef:.10f}"
expr_parts.append(f"{coef_str}*{safe_name}")
expr_str = " + ".join(expr_parts)
expr = sympify(expr_str)
# Scientific notation rounding function
def scientific_round_expr(e, ndigits=2):
"""
Convert floats to scientific notation with specified decimal places.
Example: 12345.678 -> 1.23 × 10^4
"""
repl = {}
for node in preorder_traversal(e):
if isinstance(node, Float):
val = float(node.evalf(6)) # Get enough precision
abs_val = abs(val)
# Use scientific notation for large or small numbers
if abs_val >= 10000 or (abs_val < 0.01 and abs_val > 0):
sci_str = f"{val:.{ndigits}e}"
mantissa, exponent = sci_str.split('e')
# Reconstruct as: mantissa × 10^exponent
repl[node] = Mul(Float(mantissa), Pow(10, int(exponent)), evaluate=False)
else:
# Regular rounding for normal numbers
repl[node] = Float(round(val, ndigits))
return e.xreplace(repl)
# Apply scientific rounding
expr_sci = scientific_round_expr(expr, 2)
# Convert to LaTeX
latex_eq_raw = latex(expr_sci, fold_frac_powers=True, fold_short_frac=True, mul_symbol='times')
# Replace safe names with readable display names
for name in feature_coefs.keys():
safe_name = name.replace(' ', '_').replace('^', '_pow_')
display_name = name.replace('_', ' ')
latex_eq_raw = latex_eq_raw.replace(safe_name, f"\\mathrm{{{display_name}}}")
# Add "y = " prefix
latex_eq = f"y = {latex_eq_raw}"
# Build Python format
python_parts = []
for name, coef in feature_coefs.items():
coef_str = format_number(coef)
if coef >= 0:
python_parts.append(f"+ {coef_str}*{name}")
else:
python_parts.append(f"- {format_number(abs(coef))}*{name}")
intercept_str_clean = format_number(intercept)
python_eq = f"y = {intercept_str_clean} " + ' '.join(python_parts) if python_parts else f"y = {intercept_str_clean}"
# Generate Excel format
col_letters = {name: chr(65 + i) for i, name in enumerate(feature_coefs.keys())}
excel_parts = []
for name, coef in feature_coefs.items():
coef_str = format_number(coef)
col_letter = col_letters[name]
if coef >= 0:
excel_parts.append(f"+ {coef_str}*{col_letter}1")
else:
excel_parts.append(f"- {format_number(abs(coef))}*{col_letter}1")
excel_eq = f"={intercept_str_clean} " + ' '.join(excel_parts) if excel_parts else f"={intercept_str_clean}"
return {
"latex": latex_eq,
"python": python_eq,
"excel": excel_eq
}
def run_regression_analysis(df: pd.DataFrame, x_cols: List[str], y_col: str, model_type: str = "linear", poly_degree: int = 1, include_interactions: bool = False) -> Dict[str, Any]:
# 1. Prep Data
# Capture original X for plotting before transformation
X_original = df[x_cols].apply(pd.to_numeric, errors='coerce')
y_data = df[y_col]
# Align indices after dropna
data = pd.concat([X_original, y_data], axis=1).dropna()
if data.empty or len(data) < len(x_cols) + 1:
raise ValueError("Insufficient data.")
X_raw = data[x_cols] # Keep for plotting
y = pd.to_numeric(data[y_col], errors='coerce')
X = X_raw.copy() # Start with raw for modelling
# 2. Advanced Feature Engineering
if model_type == "polynomial" or include_interactions:
degree = poly_degree if model_type == "polynomial" else 2
interaction_only = include_interactions and model_type != "polynomial"
poly = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=False)
X_poly = poly.fit_transform(X)
poly_cols = poly.get_feature_names_out(X.columns)
X = pd.DataFrame(X_poly, columns=poly_cols, index=X.index)
# 3. Model Fitting
try:
model = None
y_pred = None
if model_type == "logistic":
X_const = sm.add_constant(X)
y_bin = (y > y.median()).astype(int)
model = sm.Logit(y_bin, X_const).fit(disp=0)
y_pred = model.predict(X_const)
y = y_bin
elif model_type == "exponential":
if (y <= 0).any(): raise ValueError("Exponential regression requires Y > 0.")
y_log = np.log(y)
X_const = sm.add_constant(X)
lin_model = sm.OLS(y_log, X_const).fit()
y_pred = np.exp(lin_model.predict(X_const))
model = lin_model
else: # Linear or Polynomial
X_const = sm.add_constant(X)
model = sm.OLS(y, X_const).fit()
y_pred = model.predict(X_const)
# 4. Construct Visualization Data
# Create fit plots for each original feature
fit_plots_by_feature = {}
residuals_vs_fitted = []
y_list = y.tolist()
pred_list = y_pred.tolist()
residuals = []
# Create a fit plot for each original feature
for feature_name in X_raw.columns:
x_feature_list = X_raw[feature_name].tolist()
feature_plot = []
for i in range(len(y_list)):
feature_plot.append({
"x": float(x_feature_list[i]),
"real": float(y_list[i]),
"pred": float(pred_list[i])
})
# Sort by X for proper curve rendering
feature_plot.sort(key=lambda item: item["x"])
fit_plots_by_feature[feature_name] = feature_plot
# Also create a single fit_plot using the first feature for backward compatibility
fit_plot = fit_plots_by_feature[X_raw.columns[0]] if len(X_raw.columns) > 0 else []
# Residuals plot
for i in range(len(y_list)):
res_val = y_list[i] - pred_list[i]
residuals.append(res_val)
residuals_vs_fitted.append({
"fitted": float(pred_list[i]),
"residual": res_val
})
# 5. Calculate Partial Regression Plots (Added Variable Plots)
# These show the isolated effect of each variable controlling for others
partial_regression_plots = {}
# Only calculate for multiple regression (more than 1 feature)
if len(X_raw.columns) > 1:
for feature_name in X_raw.columns:
# Get other features (all except current)
other_features = [col for col in X_raw.columns if col != feature_name]
if len(other_features) == 0:
continue
# Step 1: Regress Y on all features except current one
X_other = X_raw[other_features]
X_other_const = sm.add_constant(X_other)
model_y = sm.OLS(y, X_other_const).fit()
y_residuals = y - model_y.predict(X_other_const)
# Step 2: Regress current feature on other features
model_x = sm.OLS(X_raw[feature_name], X_other_const).fit()
x_residuals = X_raw[feature_name] - model_x.predict(X_other_const)
# Step 3: Create partial plot data
partial_plot = []
for i in range(len(y)):
partial_plot.append({
"x": float(x_residuals.iloc[i]),
"y": float(y_residuals.iloc[i])
})
# Sort by x for proper line rendering
partial_plot.sort(key=lambda item: item["x"])
partial_regression_plots[feature_name] = partial_plot
# Generate equation strings
equations = generate_equations(model.params.to_dict(), model_type)
summary = {
"r_squared": float(model.rsquared) if hasattr(model, 'rsquared') else float(model.prsquared),
"adj_r_squared": float(model.rsquared_adj) if hasattr(model, 'rsquared_adj') else None,
"aic": float(model.aic),
"bic": float(model.bic),
"coefficients": model.params.to_dict(),
"p_values": model.pvalues.to_dict(),
"std_errors": model.bse.to_dict(),
"sample_size": int(model.nobs),
"residuals": residuals,
"fit_plot": fit_plot, # Backward compatibility (first feature)
"fit_plots_by_feature": fit_plots_by_feature, # All features
"partial_regression_plots": partial_regression_plots, # Partial plots for multivariate
"diagnostic_plot": residuals_vs_fitted,
"equations": equations # LaTeX, Python, Excel formats
}
return summary
except Exception as e:
raise ValueError(f"Model calculation failed: {str(e)}")