2026-01-11 22:56:02 +01:00

166 lines
5.6 KiB
Python

import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from typing import List, Dict, Any
def detect_univariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
"""
Detects outliers in specific numeric columns using the Interquartile Range (IQR) method.
Args:
df: Input DataFrame
columns: List of column names to analyze
excluded_indices: List of row indices to exclude from detection
Returns:
Dictionary of {original_row_index: [reasons]}
"""
# Exclude specified rows if provided
if excluded_indices:
df = df[~df.index.isin(excluded_indices)]
outliers = {}
for col in columns:
if col not in df.columns:
continue
s = pd.to_numeric(df[col], errors='coerce')
q1 = s.quantile(0.25)
q3 = s.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
flags = (s < lower_bound) | (s > upper_bound)
indices = df.index[flags].tolist()
for idx in indices:
val = df.at[idx, col]
reason = f"Column '{col}' value {val} is outside IQR bounds [{lower_bound:.2f}, {upper_bound:.2f}]"
if idx not in outliers:
outliers[idx] = []
outliers[idx].append(reason)
return outliers
def detect_multivariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
"""
Detects anomalies across multiple numeric columns using Isolation Forest.
Args:
df: Input DataFrame
columns: List of column names to analyze
excluded_indices: List of row indices to exclude from detection (already filtered out)
Returns:
Dictionary of {original_row_index: [reasons]}
"""
# Store original indices
original_indices = df.index.tolist()
# Exclude specified rows if provided
if excluded_indices:
df = df[~df.index.isin(excluded_indices)]
# Select only relevant numeric columns
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
if numeric_df.empty:
return {}
# Get rows with all values present (no NaNs in selected columns)
valid_mask = numeric_df.notna().all(axis=1)
numeric_df_clean = numeric_df[valid_mask]
if numeric_df_clean.empty:
return {}
# Fit Isolation Forest
model = IsolationForest(contamination='auto', random_state=42)
preds = model.fit_predict(numeric_df_clean)
# IsolationForest returns -1 for outliers
# Get the indices from the clean DataFrame (these are the original indices)
outlier_indices = numeric_df_clean.index[preds == -1].tolist()
return {int(idx): ["Multivariate anomaly detected by Isolation Forest"] for idx in outlier_indices}
def merge_outliers(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> List[Dict[str, Any]]:
"""
Merges results into a flat list of outlier objects.
DEPRECATED: Use merge_outliers_structured instead for better type separation.
"""
all_indices = set(uni.keys()) | set(multi.keys())
results = []
for idx in sorted(all_indices):
reasons = uni.get(idx, []) + multi.get(idx, [])
results.append({
"index": int(idx),
"reasons": reasons
})
return results
def merge_outliers_structured(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> Dict[str, Any]:
"""
Merges and separates outliers by type for better frontend handling.
Returns:
Dictionary with:
- 'univariate': Dict mapping column names to their specific outliers
- 'multivariate': List of outliers that affect multiple columns
- 'all': Flat list of all outliers (for backwards compatibility)
This structure allows the frontend to:
1. Show column-specific outliers when clicking a column header
2. Show global/multivariate outliers in a separate view
3. Clearly distinguish between local and global anomalies
"""
# Extract column names from univariate reasons
column_outliers: Dict[str, List[Dict[str, Any]]] = {}
for idx, reasons in uni.items():
for reason in reasons:
# Extract column name from reason string
# Format: "Column 'Price' value 100 is outside..."
if "Column '" in reason:
col_start = reason.index("Column '") + 8
col_end = reason.index("'", col_start)
col_name = reason[col_start:col_end]
if col_name not in column_outliers:
column_outliers[col_name] = []
# Check if this row index already exists for this column
existing = next((x for x in column_outliers[col_name] if x["index"] == idx), None)
if not existing:
column_outliers[col_name].append({
"index": int(idx),
"reasons": [reason]
})
else:
existing["reasons"].append(reason)
# Prepare multivariate outliers
multivariate_list = [
{"index": int(idx), "reasons": reasons}
for idx, reasons in multi.items()
]
# Prepare legacy flat format (backwards compatibility)
all_indices = set(uni.keys()) | set(multi.keys())
all_outliers = []
for idx in sorted(all_indices):
reasons = uni.get(idx, []) + multi.get(idx, [])
all_outliers.append({
"index": int(idx),
"reasons": reasons
})
return {
"univariate": column_outliers,
"multivariate": multivariate_list,
"all": all_outliers
}