import pandas as pd import numpy as np import statsmodels.api as sm from sklearn.ensemble import RandomForestRegressor from sklearn.inspection import permutation_importance from sklearn.preprocessing import PolynomialFeatures from scipy import stats from typing import List, Dict, Any, Tuple import sympy as sp def calculate_correlation_matrix( df: pd.DataFrame, columns: List[str], method: str = 'pearson', min_threshold: float = None, include_pvalues: bool = True ) -> Dict[str, Any]: """ Calculate correlation matrix with optional p-values and filtering. Args: df: Input DataFrame columns: List of column names to analyze method: Correlation method ('pearson', 'spearman', 'kendall') min_threshold: Minimum absolute correlation value to include (optional) include_pvalues: Whether to calculate statistical significance Returns: Dictionary with matrix data, p-values, and metadata """ if not columns: return {"matrix": [], "pvalues": [], "metadata": {}} # Convert to numeric and handle missing values numeric_df = df[columns].apply(pd.to_numeric, errors='coerce') # Remove columns with too many missing values (>50%) missing_ratios = numeric_df.isnull().sum() / len(numeric_df) valid_cols = missing_ratios[missing_ratios <= 0.5].index.tolist() if len(valid_cols) < 2: return {"matrix": [], "pvalues": [], "metadata": {"error": "Need at least 2 valid numeric columns"}} # Use pairwise deletion for correlation (more robust than listwise) clean_df = numeric_df[valid_cols] # Calculate correlation matrix corr_matrix = clean_df.corr(method=method) # Calculate p-values if requested pvalue_matrix = None if include_pvalues: pvalue_matrix = pd.DataFrame(np.zeros_like(corr_matrix), index=corr_matrix.index, columns=corr_matrix.columns) for i, col1 in enumerate(corr_matrix.columns): for j, col2 in enumerate(corr_matrix.index): if i != j: # Pairwise complete observations valid_data = clean_df[[col1, col2]].dropna() if len(valid_data) >= 3: if method == 'pearson': _, pval = stats.pearsonr(valid_data.iloc[:, 0], valid_data.iloc[:, 1]) elif method == 'spearman': _, pval = stats.spearmanr(valid_data.iloc[:, 0], valid_data.iloc[:, 1]) elif method == 'kendall': _, pval = stats.kendalltau(valid_data.iloc[:, 0], valid_data.iloc[:, 1]) else: pval = np.nan pvalue_matrix.iloc[i, j] = pval # Build results results = [] pvalue_results = [] for x in corr_matrix.columns: for y in corr_matrix.index: value = float(corr_matrix.at[y, x]) # Apply threshold filter if specified if min_threshold is not None and abs(value) < min_threshold: continue results.append({ "x": x, "y": y, "value": value, "abs_value": abs(value) }) if include_pvalues and pvalue_matrix is not None: pvalue_results.append({ "x": x, "y": y, "pvalue": float(pvalue_matrix.at[y, x]) if not pd.isna(pvalue_matrix.at[y, x]) else None, "significant": bool((pvalue_matrix.at[y, x] or 1) < 0.05 if not pd.isna(pvalue_matrix.at[y, x]) else False) }) # Calculate summary statistics n_observations = len(clean_df) return { "matrix": results, "pvalues": pvalue_results if include_pvalues else [], "metadata": { "method": method, "n_observations": n_observations, "n_variables": len(valid_cols), "columns_analyzed": valid_cols, "threshold_applied": min_threshold } } def get_correlation_summary(correlation_data: Dict[str, Any]) -> Dict[str, Any]: """ Generate summary statistics from correlation data. Identifies strongest correlations (positive and negative). """ matrix = correlation_data.get("matrix", []) # Filter out diagonal (self-correlation) off_diagonal = [m for m in matrix if m["x"] != m["y"]] if not off_diagonal: return {"strongest": [], "weakest": []} # Sort by absolute correlation value sorted_by_abs = sorted(off_diagonal, key=lambda x: x["abs_value"], reverse=True) # Get strongest correlations (top 5) strongest = sorted_by_abs[:5] # Get weakest correlations (bottom 5, but non-zero) weakest = [m for m in sorted_by_abs if m["abs_value"] > 0][-5:] weakest = sorted(weakest, key=lambda x: x["abs_value"]) return { "strongest": strongest, "weakest": weakest, "total_pairs": len(off_diagonal) } def calculate_feature_importance(df: pd.DataFrame, features: List[str], target: str) -> List[Dict[str, Any]]: if not features or not target: return [] df_clean = df.dropna(subset=[target]) X = df_clean[features].apply(pd.to_numeric, errors='coerce').fillna(0) y = df_clean[target] if y.dtype == 'object' or y.dtype == 'string': y = pd.factorize(y)[0] model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X, y) result = permutation_importance(model, X, y, n_repeats=10, random_state=42, n_jobs=-1) importances = result.importances_mean results = [{"feature": name, "score": max(0, float(score))} for name, score in zip(features, importances)] total = sum(r["score"] for r in results) if total > 0: for r in results: r["score"] /= total return sorted(results, key=lambda x: x["score"], reverse=True) def generate_equations(coefficients: Dict[str, float], model_type: str) -> Dict[str, str]: """ Generate equation strings in LaTeX, Python, and Excel formats. Args: coefficients: Dictionary of feature names to coefficient values model_type: Type of regression model ('linear', 'polynomial', 'exponential', 'logistic') Returns: Dictionary with 'latex', 'python', and 'excel' equation strings """ from sympy import symbols, sympify, latex, Float, preorder_traversal, Mul, Pow # Extract intercept intercept = 0.0 feature_coefs = {} for key, value in coefficients.items(): if key in ['const', 'intercept', '(Intercept)']: intercept = float(value) else: feature_coefs[key] = float(value) # Helper function to format number cleanly for Python/Excel def format_number(num: float) -> str: """Format number with 3 decimal places max""" if num == 0: return "0" abs_num = abs(num) # Use scientific notation for very small or very large numbers if abs_num >= 10000 or (abs_num < 0.001 and abs_num > 0): return f"{num:.2e}" # Regular decimal with 3 decimal places max formatted = f"{num:.3f}" # Remove trailing zeros return formatted.rstrip('0').rstrip('.') # Build LaTeX with sympy using scientific notation # Create symbols for each variable for name in feature_coefs.keys(): safe_name = name.replace(' ', '_').replace('^', '_pow_') symbols(safe_name) # Build expression string expr_parts = [] intercept_str = f"{intercept:.10f}" expr_parts.append(intercept_str) for name, coef in feature_coefs.items(): safe_name = name.replace(' ', '_').replace('^', '_pow_') coef_str = f"{coef:.10f}" expr_parts.append(f"{coef_str}*{safe_name}") expr_str = " + ".join(expr_parts) expr = sympify(expr_str) # Scientific notation rounding function def scientific_round_expr(e, ndigits=2): """ Convert floats to scientific notation with specified decimal places. Example: 12345.678 -> 1.23 × 10^4 """ repl = {} for node in preorder_traversal(e): if isinstance(node, Float): val = float(node.evalf(6)) # Get enough precision abs_val = abs(val) # Use scientific notation for large or small numbers if abs_val >= 10000 or (abs_val < 0.01 and abs_val > 0): sci_str = f"{val:.{ndigits}e}" mantissa, exponent = sci_str.split('e') # Reconstruct as: mantissa × 10^exponent repl[node] = Mul(Float(mantissa), Pow(10, int(exponent)), evaluate=False) else: # Regular rounding for normal numbers repl[node] = Float(round(val, ndigits)) return e.xreplace(repl) # Apply scientific rounding expr_sci = scientific_round_expr(expr, 2) # Convert to LaTeX latex_eq_raw = latex(expr_sci, fold_frac_powers=True, fold_short_frac=True, mul_symbol='times') # Replace safe names with readable display names for name in feature_coefs.keys(): safe_name = name.replace(' ', '_').replace('^', '_pow_') display_name = name.replace('_', ' ') latex_eq_raw = latex_eq_raw.replace(safe_name, f"\\mathrm{{{display_name}}}") # Add "y = " prefix latex_eq = f"y = {latex_eq_raw}" # Build Python format python_parts = [] for name, coef in feature_coefs.items(): coef_str = format_number(coef) if coef >= 0: python_parts.append(f"+ {coef_str}*{name}") else: python_parts.append(f"- {format_number(abs(coef))}*{name}") intercept_str_clean = format_number(intercept) python_eq = f"y = {intercept_str_clean} " + ' '.join(python_parts) if python_parts else f"y = {intercept_str_clean}" # Generate Excel format col_letters = {name: chr(65 + i) for i, name in enumerate(feature_coefs.keys())} excel_parts = [] for name, coef in feature_coefs.items(): coef_str = format_number(coef) col_letter = col_letters[name] if coef >= 0: excel_parts.append(f"+ {coef_str}*{col_letter}1") else: excel_parts.append(f"- {format_number(abs(coef))}*{col_letter}1") excel_eq = f"={intercept_str_clean} " + ' '.join(excel_parts) if excel_parts else f"={intercept_str_clean}" return { "latex": latex_eq, "python": python_eq, "excel": excel_eq } def run_regression_analysis(df: pd.DataFrame, x_cols: List[str], y_col: str, model_type: str = "linear", poly_degree: int = 1, include_interactions: bool = False) -> Dict[str, Any]: # 1. Prep Data # Capture original X for plotting before transformation X_original = df[x_cols].apply(pd.to_numeric, errors='coerce') y_data = df[y_col] # Align indices after dropna data = pd.concat([X_original, y_data], axis=1).dropna() if data.empty or len(data) < len(x_cols) + 1: raise ValueError("Insufficient data.") X_raw = data[x_cols] # Keep for plotting y = pd.to_numeric(data[y_col], errors='coerce') X = X_raw.copy() # Start with raw for modelling # 2. Advanced Feature Engineering if model_type == "polynomial" or include_interactions: degree = poly_degree if model_type == "polynomial" else 2 interaction_only = include_interactions and model_type != "polynomial" poly = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=False) X_poly = poly.fit_transform(X) poly_cols = poly.get_feature_names_out(X.columns) X = pd.DataFrame(X_poly, columns=poly_cols, index=X.index) # 3. Model Fitting try: model = None y_pred = None if model_type == "logistic": X_const = sm.add_constant(X) y_bin = (y > y.median()).astype(int) model = sm.Logit(y_bin, X_const).fit(disp=0) y_pred = model.predict(X_const) y = y_bin elif model_type == "exponential": if (y <= 0).any(): raise ValueError("Exponential regression requires Y > 0.") y_log = np.log(y) X_const = sm.add_constant(X) lin_model = sm.OLS(y_log, X_const).fit() y_pred = np.exp(lin_model.predict(X_const)) model = lin_model else: # Linear or Polynomial X_const = sm.add_constant(X) model = sm.OLS(y, X_const).fit() y_pred = model.predict(X_const) # 4. Construct Visualization Data # Create fit plots for each original feature fit_plots_by_feature = {} residuals_vs_fitted = [] y_list = y.tolist() pred_list = y_pred.tolist() residuals = [] # Create a fit plot for each original feature for feature_name in X_raw.columns: x_feature_list = X_raw[feature_name].tolist() feature_plot = [] for i in range(len(y_list)): feature_plot.append({ "x": float(x_feature_list[i]), "real": float(y_list[i]), "pred": float(pred_list[i]) }) # Sort by X for proper curve rendering feature_plot.sort(key=lambda item: item["x"]) fit_plots_by_feature[feature_name] = feature_plot # Also create a single fit_plot using the first feature for backward compatibility fit_plot = fit_plots_by_feature[X_raw.columns[0]] if len(X_raw.columns) > 0 else [] # Residuals plot for i in range(len(y_list)): res_val = y_list[i] - pred_list[i] residuals.append(res_val) residuals_vs_fitted.append({ "fitted": float(pred_list[i]), "residual": res_val }) # 5. Calculate Partial Regression Plots (Added Variable Plots) # These show the isolated effect of each variable controlling for others partial_regression_plots = {} # Only calculate for multiple regression (more than 1 feature) if len(X_raw.columns) > 1: for feature_name in X_raw.columns: # Get other features (all except current) other_features = [col for col in X_raw.columns if col != feature_name] if len(other_features) == 0: continue # Step 1: Regress Y on all features except current one X_other = X_raw[other_features] X_other_const = sm.add_constant(X_other) model_y = sm.OLS(y, X_other_const).fit() y_residuals = y - model_y.predict(X_other_const) # Step 2: Regress current feature on other features model_x = sm.OLS(X_raw[feature_name], X_other_const).fit() x_residuals = X_raw[feature_name] - model_x.predict(X_other_const) # Step 3: Create partial plot data partial_plot = [] for i in range(len(y)): partial_plot.append({ "x": float(x_residuals.iloc[i]), "y": float(y_residuals.iloc[i]) }) # Sort by x for proper line rendering partial_plot.sort(key=lambda item: item["x"]) partial_regression_plots[feature_name] = partial_plot # Generate equation strings equations = generate_equations(model.params.to_dict(), model_type) summary = { "r_squared": float(model.rsquared) if hasattr(model, 'rsquared') else float(model.prsquared), "adj_r_squared": float(model.rsquared_adj) if hasattr(model, 'rsquared_adj') else None, "aic": float(model.aic), "bic": float(model.bic), "coefficients": model.params.to_dict(), "p_values": model.pvalues.to_dict(), "std_errors": model.bse.to_dict(), "sample_size": int(model.nobs), "residuals": residuals, "fit_plot": fit_plot, # Backward compatibility (first feature) "fit_plots_by_feature": fit_plots_by_feature, # All features "partial_regression_plots": partial_regression_plots, # Partial plots for multivariate "diagnostic_plot": residuals_vs_fitted, "equations": equations # LaTeX, Python, Excel formats } return summary except Exception as e: raise ValueError(f"Model calculation failed: {str(e)}")