import pandas as pd import numpy as np from sklearn.ensemble import IsolationForest from typing import List, Dict, Any def detect_univariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]: """ Detects outliers in specific numeric columns using the Interquartile Range (IQR) method. Args: df: Input DataFrame columns: List of column names to analyze excluded_indices: List of row indices to exclude from detection Returns: Dictionary of {original_row_index: [reasons]} """ # Exclude specified rows if provided if excluded_indices: df = df[~df.index.isin(excluded_indices)] outliers = {} for col in columns: if col not in df.columns: continue s = pd.to_numeric(df[col], errors='coerce') q1 = s.quantile(0.25) q3 = s.quantile(0.75) iqr = q3 - q1 lower_bound = q1 - 1.5 * iqr upper_bound = q3 + 1.5 * iqr flags = (s < lower_bound) | (s > upper_bound) indices = df.index[flags].tolist() for idx in indices: val = df.at[idx, col] reason = f"Column '{col}' value {val} is outside IQR bounds [{lower_bound:.2f}, {upper_bound:.2f}]" if idx not in outliers: outliers[idx] = [] outliers[idx].append(reason) return outliers def detect_multivariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]: """ Detects anomalies across multiple numeric columns using Isolation Forest. Args: df: Input DataFrame columns: List of column names to analyze excluded_indices: List of row indices to exclude from detection (already filtered out) Returns: Dictionary of {original_row_index: [reasons]} """ # Store original indices original_indices = df.index.tolist() # Exclude specified rows if provided if excluded_indices: df = df[~df.index.isin(excluded_indices)] # Select only relevant numeric columns numeric_df = df[columns].apply(pd.to_numeric, errors='coerce') if numeric_df.empty: return {} # Get rows with all values present (no NaNs in selected columns) valid_mask = numeric_df.notna().all(axis=1) numeric_df_clean = numeric_df[valid_mask] if numeric_df_clean.empty: return {} # Fit Isolation Forest model = IsolationForest(contamination='auto', random_state=42) preds = model.fit_predict(numeric_df_clean) # IsolationForest returns -1 for outliers # Get the indices from the clean DataFrame (these are the original indices) outlier_indices = numeric_df_clean.index[preds == -1].tolist() return {int(idx): ["Multivariate anomaly detected by Isolation Forest"] for idx in outlier_indices} def merge_outliers(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> List[Dict[str, Any]]: """ Merges results into a flat list of outlier objects. DEPRECATED: Use merge_outliers_structured instead for better type separation. """ all_indices = set(uni.keys()) | set(multi.keys()) results = [] for idx in sorted(all_indices): reasons = uni.get(idx, []) + multi.get(idx, []) results.append({ "index": int(idx), "reasons": reasons }) return results def merge_outliers_structured(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> Dict[str, Any]: """ Merges and separates outliers by type for better frontend handling. Returns: Dictionary with: - 'univariate': Dict mapping column names to their specific outliers - 'multivariate': List of outliers that affect multiple columns - 'all': Flat list of all outliers (for backwards compatibility) This structure allows the frontend to: 1. Show column-specific outliers when clicking a column header 2. Show global/multivariate outliers in a separate view 3. Clearly distinguish between local and global anomalies """ # Extract column names from univariate reasons column_outliers: Dict[str, List[Dict[str, Any]]] = {} for idx, reasons in uni.items(): for reason in reasons: # Extract column name from reason string # Format: "Column 'Price' value 100 is outside..." if "Column '" in reason: col_start = reason.index("Column '") + 8 col_end = reason.index("'", col_start) col_name = reason[col_start:col_end] if col_name not in column_outliers: column_outliers[col_name] = [] # Check if this row index already exists for this column existing = next((x for x in column_outliers[col_name] if x["index"] == idx), None) if not existing: column_outliers[col_name].append({ "index": int(idx), "reasons": [reason] }) else: existing["reasons"].append(reason) # Prepare multivariate outliers multivariate_list = [ {"index": int(idx), "reasons": reasons} for idx, reasons in multi.items() ] # Prepare legacy flat format (backwards compatibility) all_indices = set(uni.keys()) | set(multi.keys()) all_outliers = [] for idx in sorted(all_indices): reasons = uni.get(idx, []) + multi.get(idx, []) all_outliers.append({ "index": int(idx), "reasons": reasons }) return { "univariate": column_outliers, "multivariate": multivariate_list, "all": all_outliers }