431 lines
16 KiB
Python
431 lines
16 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
import statsmodels.api as sm
|
||
from sklearn.ensemble import RandomForestRegressor
|
||
from sklearn.inspection import permutation_importance
|
||
from sklearn.preprocessing import PolynomialFeatures
|
||
from scipy import stats
|
||
from typing import List, Dict, Any, Tuple
|
||
import sympy as sp
|
||
|
||
def calculate_correlation_matrix(
|
||
df: pd.DataFrame,
|
||
columns: List[str],
|
||
method: str = 'pearson',
|
||
min_threshold: float = None,
|
||
include_pvalues: bool = True
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Calculate correlation matrix with optional p-values and filtering.
|
||
|
||
Args:
|
||
df: Input DataFrame
|
||
columns: List of column names to analyze
|
||
method: Correlation method ('pearson', 'spearman', 'kendall')
|
||
min_threshold: Minimum absolute correlation value to include (optional)
|
||
include_pvalues: Whether to calculate statistical significance
|
||
|
||
Returns:
|
||
Dictionary with matrix data, p-values, and metadata
|
||
"""
|
||
if not columns:
|
||
return {"matrix": [], "pvalues": [], "metadata": {}}
|
||
|
||
# Convert to numeric and handle missing values
|
||
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
|
||
|
||
# Remove columns with too many missing values (>50%)
|
||
missing_ratios = numeric_df.isnull().sum() / len(numeric_df)
|
||
valid_cols = missing_ratios[missing_ratios <= 0.5].index.tolist()
|
||
|
||
if len(valid_cols) < 2:
|
||
return {"matrix": [], "pvalues": [], "metadata": {"error": "Need at least 2 valid numeric columns"}}
|
||
|
||
# Use pairwise deletion for correlation (more robust than listwise)
|
||
clean_df = numeric_df[valid_cols]
|
||
|
||
# Calculate correlation matrix
|
||
corr_matrix = clean_df.corr(method=method)
|
||
|
||
# Calculate p-values if requested
|
||
pvalue_matrix = None
|
||
if include_pvalues:
|
||
pvalue_matrix = pd.DataFrame(np.zeros_like(corr_matrix),
|
||
index=corr_matrix.index,
|
||
columns=corr_matrix.columns)
|
||
|
||
for i, col1 in enumerate(corr_matrix.columns):
|
||
for j, col2 in enumerate(corr_matrix.index):
|
||
if i != j:
|
||
# Pairwise complete observations
|
||
valid_data = clean_df[[col1, col2]].dropna()
|
||
if len(valid_data) >= 3:
|
||
if method == 'pearson':
|
||
_, pval = stats.pearsonr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
|
||
elif method == 'spearman':
|
||
_, pval = stats.spearmanr(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
|
||
elif method == 'kendall':
|
||
_, pval = stats.kendalltau(valid_data.iloc[:, 0], valid_data.iloc[:, 1])
|
||
else:
|
||
pval = np.nan
|
||
pvalue_matrix.iloc[i, j] = pval
|
||
|
||
# Build results
|
||
results = []
|
||
pvalue_results = []
|
||
|
||
for x in corr_matrix.columns:
|
||
for y in corr_matrix.index:
|
||
value = float(corr_matrix.at[y, x])
|
||
|
||
# Apply threshold filter if specified
|
||
if min_threshold is not None and abs(value) < min_threshold:
|
||
continue
|
||
|
||
results.append({
|
||
"x": x,
|
||
"y": y,
|
||
"value": value,
|
||
"abs_value": abs(value)
|
||
})
|
||
|
||
if include_pvalues and pvalue_matrix is not None:
|
||
pvalue_results.append({
|
||
"x": x,
|
||
"y": y,
|
||
"pvalue": float(pvalue_matrix.at[y, x]) if not pd.isna(pvalue_matrix.at[y, x]) else None,
|
||
"significant": bool((pvalue_matrix.at[y, x] or 1) < 0.05 if not pd.isna(pvalue_matrix.at[y, x]) else False)
|
||
})
|
||
|
||
# Calculate summary statistics
|
||
n_observations = len(clean_df)
|
||
|
||
return {
|
||
"matrix": results,
|
||
"pvalues": pvalue_results if include_pvalues else [],
|
||
"metadata": {
|
||
"method": method,
|
||
"n_observations": n_observations,
|
||
"n_variables": len(valid_cols),
|
||
"columns_analyzed": valid_cols,
|
||
"threshold_applied": min_threshold
|
||
}
|
||
}
|
||
|
||
def get_correlation_summary(correlation_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
Generate summary statistics from correlation data.
|
||
Identifies strongest correlations (positive and negative).
|
||
"""
|
||
matrix = correlation_data.get("matrix", [])
|
||
|
||
# Filter out diagonal (self-correlation)
|
||
off_diagonal = [m for m in matrix if m["x"] != m["y"]]
|
||
|
||
if not off_diagonal:
|
||
return {"strongest": [], "weakest": []}
|
||
|
||
# Sort by absolute correlation value
|
||
sorted_by_abs = sorted(off_diagonal, key=lambda x: x["abs_value"], reverse=True)
|
||
|
||
# Get strongest correlations (top 5)
|
||
strongest = sorted_by_abs[:5]
|
||
|
||
# Get weakest correlations (bottom 5, but non-zero)
|
||
weakest = [m for m in sorted_by_abs if m["abs_value"] > 0][-5:]
|
||
weakest = sorted(weakest, key=lambda x: x["abs_value"])
|
||
|
||
return {
|
||
"strongest": strongest,
|
||
"weakest": weakest,
|
||
"total_pairs": len(off_diagonal)
|
||
}
|
||
|
||
def calculate_feature_importance(df: pd.DataFrame, features: List[str], target: str) -> List[Dict[str, Any]]:
|
||
if not features or not target: return []
|
||
df_clean = df.dropna(subset=[target])
|
||
X = df_clean[features].apply(pd.to_numeric, errors='coerce').fillna(0)
|
||
y = df_clean[target]
|
||
if y.dtype == 'object' or y.dtype == 'string': y = pd.factorize(y)[0]
|
||
model = RandomForestRegressor(n_estimators=100, random_state=42)
|
||
model.fit(X, y)
|
||
result = permutation_importance(model, X, y, n_repeats=10, random_state=42, n_jobs=-1)
|
||
importances = result.importances_mean
|
||
results = [{"feature": name, "score": max(0, float(score))} for name, score in zip(features, importances)]
|
||
total = sum(r["score"] for r in results)
|
||
if total > 0:
|
||
for r in results: r["score"] /= total
|
||
return sorted(results, key=lambda x: x["score"], reverse=True)
|
||
|
||
def generate_equations(coefficients: Dict[str, float], model_type: str) -> Dict[str, str]:
|
||
"""
|
||
Generate equation strings in LaTeX, Python, and Excel formats.
|
||
|
||
Args:
|
||
coefficients: Dictionary of feature names to coefficient values
|
||
model_type: Type of regression model ('linear', 'polynomial', 'exponential', 'logistic')
|
||
|
||
Returns:
|
||
Dictionary with 'latex', 'python', and 'excel' equation strings
|
||
"""
|
||
from sympy import symbols, sympify, latex, Float, preorder_traversal, Mul, Pow
|
||
|
||
# Extract intercept
|
||
intercept = 0.0
|
||
feature_coefs = {}
|
||
|
||
for key, value in coefficients.items():
|
||
if key in ['const', 'intercept', '(Intercept)']:
|
||
intercept = float(value)
|
||
else:
|
||
feature_coefs[key] = float(value)
|
||
|
||
# Helper function to format number cleanly for Python/Excel
|
||
def format_number(num: float) -> str:
|
||
"""Format number with 3 decimal places max"""
|
||
if num == 0:
|
||
return "0"
|
||
abs_num = abs(num)
|
||
# Use scientific notation for very small or very large numbers
|
||
if abs_num >= 10000 or (abs_num < 0.001 and abs_num > 0):
|
||
return f"{num:.2e}"
|
||
# Regular decimal with 3 decimal places max
|
||
formatted = f"{num:.3f}"
|
||
# Remove trailing zeros
|
||
return formatted.rstrip('0').rstrip('.')
|
||
|
||
# Build LaTeX with sympy using scientific notation
|
||
# Create symbols for each variable
|
||
for name in feature_coefs.keys():
|
||
safe_name = name.replace(' ', '_').replace('^', '_pow_')
|
||
symbols(safe_name)
|
||
|
||
# Build expression string
|
||
expr_parts = []
|
||
intercept_str = f"{intercept:.10f}"
|
||
expr_parts.append(intercept_str)
|
||
|
||
for name, coef in feature_coefs.items():
|
||
safe_name = name.replace(' ', '_').replace('^', '_pow_')
|
||
coef_str = f"{coef:.10f}"
|
||
expr_parts.append(f"{coef_str}*{safe_name}")
|
||
|
||
expr_str = " + ".join(expr_parts)
|
||
expr = sympify(expr_str)
|
||
|
||
# Scientific notation rounding function
|
||
def scientific_round_expr(e, ndigits=2):
|
||
"""
|
||
Convert floats to scientific notation with specified decimal places.
|
||
Example: 12345.678 -> 1.23 × 10^4
|
||
"""
|
||
repl = {}
|
||
for node in preorder_traversal(e):
|
||
if isinstance(node, Float):
|
||
val = float(node.evalf(6)) # Get enough precision
|
||
abs_val = abs(val)
|
||
|
||
# Use scientific notation for large or small numbers
|
||
if abs_val >= 10000 or (abs_val < 0.01 and abs_val > 0):
|
||
sci_str = f"{val:.{ndigits}e}"
|
||
mantissa, exponent = sci_str.split('e')
|
||
# Reconstruct as: mantissa × 10^exponent
|
||
repl[node] = Mul(Float(mantissa), Pow(10, int(exponent)), evaluate=False)
|
||
else:
|
||
# Regular rounding for normal numbers
|
||
repl[node] = Float(round(val, ndigits))
|
||
|
||
return e.xreplace(repl)
|
||
|
||
# Apply scientific rounding
|
||
expr_sci = scientific_round_expr(expr, 2)
|
||
|
||
# Convert to LaTeX
|
||
latex_eq_raw = latex(expr_sci, fold_frac_powers=True, fold_short_frac=True, mul_symbol='times')
|
||
|
||
# Replace safe names with readable display names
|
||
for name in feature_coefs.keys():
|
||
safe_name = name.replace(' ', '_').replace('^', '_pow_')
|
||
display_name = name.replace('_', ' ')
|
||
latex_eq_raw = latex_eq_raw.replace(safe_name, f"\\mathrm{{{display_name}}}")
|
||
|
||
# Add "y = " prefix
|
||
latex_eq = f"y = {latex_eq_raw}"
|
||
|
||
# Build Python format
|
||
python_parts = []
|
||
for name, coef in feature_coefs.items():
|
||
coef_str = format_number(coef)
|
||
if coef >= 0:
|
||
python_parts.append(f"+ {coef_str}*{name}")
|
||
else:
|
||
python_parts.append(f"- {format_number(abs(coef))}*{name}")
|
||
|
||
intercept_str_clean = format_number(intercept)
|
||
python_eq = f"y = {intercept_str_clean} " + ' '.join(python_parts) if python_parts else f"y = {intercept_str_clean}"
|
||
|
||
# Generate Excel format
|
||
col_letters = {name: chr(65 + i) for i, name in enumerate(feature_coefs.keys())}
|
||
|
||
excel_parts = []
|
||
for name, coef in feature_coefs.items():
|
||
coef_str = format_number(coef)
|
||
col_letter = col_letters[name]
|
||
if coef >= 0:
|
||
excel_parts.append(f"+ {coef_str}*{col_letter}1")
|
||
else:
|
||
excel_parts.append(f"- {format_number(abs(coef))}*{col_letter}1")
|
||
|
||
excel_eq = f"={intercept_str_clean} " + ' '.join(excel_parts) if excel_parts else f"={intercept_str_clean}"
|
||
|
||
return {
|
||
"latex": latex_eq,
|
||
"python": python_eq,
|
||
"excel": excel_eq
|
||
}
|
||
|
||
def run_regression_analysis(df: pd.DataFrame, x_cols: List[str], y_col: str, model_type: str = "linear", poly_degree: int = 1, include_interactions: bool = False) -> Dict[str, Any]:
|
||
# 1. Prep Data
|
||
# Capture original X for plotting before transformation
|
||
X_original = df[x_cols].apply(pd.to_numeric, errors='coerce')
|
||
y_data = df[y_col]
|
||
|
||
# Align indices after dropna
|
||
data = pd.concat([X_original, y_data], axis=1).dropna()
|
||
if data.empty or len(data) < len(x_cols) + 1:
|
||
raise ValueError("Insufficient data.")
|
||
|
||
X_raw = data[x_cols] # Keep for plotting
|
||
y = pd.to_numeric(data[y_col], errors='coerce')
|
||
|
||
X = X_raw.copy() # Start with raw for modelling
|
||
|
||
# 2. Advanced Feature Engineering
|
||
if model_type == "polynomial" or include_interactions:
|
||
degree = poly_degree if model_type == "polynomial" else 2
|
||
interaction_only = include_interactions and model_type != "polynomial"
|
||
poly = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=False)
|
||
X_poly = poly.fit_transform(X)
|
||
poly_cols = poly.get_feature_names_out(X.columns)
|
||
X = pd.DataFrame(X_poly, columns=poly_cols, index=X.index)
|
||
|
||
# 3. Model Fitting
|
||
try:
|
||
model = None
|
||
y_pred = None
|
||
|
||
if model_type == "logistic":
|
||
X_const = sm.add_constant(X)
|
||
y_bin = (y > y.median()).astype(int)
|
||
model = sm.Logit(y_bin, X_const).fit(disp=0)
|
||
y_pred = model.predict(X_const)
|
||
y = y_bin
|
||
elif model_type == "exponential":
|
||
if (y <= 0).any(): raise ValueError("Exponential regression requires Y > 0.")
|
||
y_log = np.log(y)
|
||
X_const = sm.add_constant(X)
|
||
lin_model = sm.OLS(y_log, X_const).fit()
|
||
y_pred = np.exp(lin_model.predict(X_const))
|
||
model = lin_model
|
||
else: # Linear or Polynomial
|
||
X_const = sm.add_constant(X)
|
||
model = sm.OLS(y, X_const).fit()
|
||
y_pred = model.predict(X_const)
|
||
|
||
# 4. Construct Visualization Data
|
||
# Create fit plots for each original feature
|
||
fit_plots_by_feature = {}
|
||
residuals_vs_fitted = []
|
||
|
||
y_list = y.tolist()
|
||
pred_list = y_pred.tolist()
|
||
|
||
residuals = []
|
||
|
||
# Create a fit plot for each original feature
|
||
for feature_name in X_raw.columns:
|
||
x_feature_list = X_raw[feature_name].tolist()
|
||
feature_plot = []
|
||
|
||
for i in range(len(y_list)):
|
||
feature_plot.append({
|
||
"x": float(x_feature_list[i]),
|
||
"real": float(y_list[i]),
|
||
"pred": float(pred_list[i])
|
||
})
|
||
|
||
# Sort by X for proper curve rendering
|
||
feature_plot.sort(key=lambda item: item["x"])
|
||
fit_plots_by_feature[feature_name] = feature_plot
|
||
|
||
# Also create a single fit_plot using the first feature for backward compatibility
|
||
fit_plot = fit_plots_by_feature[X_raw.columns[0]] if len(X_raw.columns) > 0 else []
|
||
|
||
# Residuals plot
|
||
for i in range(len(y_list)):
|
||
res_val = y_list[i] - pred_list[i]
|
||
residuals.append(res_val)
|
||
|
||
residuals_vs_fitted.append({
|
||
"fitted": float(pred_list[i]),
|
||
"residual": res_val
|
||
})
|
||
|
||
# 5. Calculate Partial Regression Plots (Added Variable Plots)
|
||
# These show the isolated effect of each variable controlling for others
|
||
partial_regression_plots = {}
|
||
|
||
# Only calculate for multiple regression (more than 1 feature)
|
||
if len(X_raw.columns) > 1:
|
||
for feature_name in X_raw.columns:
|
||
# Get other features (all except current)
|
||
other_features = [col for col in X_raw.columns if col != feature_name]
|
||
|
||
if len(other_features) == 0:
|
||
continue
|
||
|
||
# Step 1: Regress Y on all features except current one
|
||
X_other = X_raw[other_features]
|
||
X_other_const = sm.add_constant(X_other)
|
||
model_y = sm.OLS(y, X_other_const).fit()
|
||
y_residuals = y - model_y.predict(X_other_const)
|
||
|
||
# Step 2: Regress current feature on other features
|
||
model_x = sm.OLS(X_raw[feature_name], X_other_const).fit()
|
||
x_residuals = X_raw[feature_name] - model_x.predict(X_other_const)
|
||
|
||
# Step 3: Create partial plot data
|
||
partial_plot = []
|
||
for i in range(len(y)):
|
||
partial_plot.append({
|
||
"x": float(x_residuals.iloc[i]),
|
||
"y": float(y_residuals.iloc[i])
|
||
})
|
||
|
||
# Sort by x for proper line rendering
|
||
partial_plot.sort(key=lambda item: item["x"])
|
||
partial_regression_plots[feature_name] = partial_plot
|
||
|
||
# Generate equation strings
|
||
equations = generate_equations(model.params.to_dict(), model_type)
|
||
|
||
summary = {
|
||
"r_squared": float(model.rsquared) if hasattr(model, 'rsquared') else float(model.prsquared),
|
||
"adj_r_squared": float(model.rsquared_adj) if hasattr(model, 'rsquared_adj') else None,
|
||
"aic": float(model.aic),
|
||
"bic": float(model.bic),
|
||
"coefficients": model.params.to_dict(),
|
||
"p_values": model.pvalues.to_dict(),
|
||
"std_errors": model.bse.to_dict(),
|
||
"sample_size": int(model.nobs),
|
||
"residuals": residuals,
|
||
"fit_plot": fit_plot, # Backward compatibility (first feature)
|
||
"fit_plots_by_feature": fit_plots_by_feature, # All features
|
||
"partial_regression_plots": partial_regression_plots, # Partial plots for multivariate
|
||
"diagnostic_plot": residuals_vs_fitted,
|
||
"equations": equations # LaTeX, Python, Excel formats
|
||
}
|
||
return summary
|
||
except Exception as e:
|
||
raise ValueError(f"Model calculation failed: {str(e)}")
|