166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.ensemble import IsolationForest
|
|
from typing import List, Dict, Any
|
|
|
|
def detect_univariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
|
|
"""
|
|
Detects outliers in specific numeric columns using the Interquartile Range (IQR) method.
|
|
|
|
Args:
|
|
df: Input DataFrame
|
|
columns: List of column names to analyze
|
|
excluded_indices: List of row indices to exclude from detection
|
|
|
|
Returns:
|
|
Dictionary of {original_row_index: [reasons]}
|
|
"""
|
|
# Exclude specified rows if provided
|
|
if excluded_indices:
|
|
df = df[~df.index.isin(excluded_indices)]
|
|
|
|
outliers = {}
|
|
for col in columns:
|
|
if col not in df.columns:
|
|
continue
|
|
|
|
s = pd.to_numeric(df[col], errors='coerce')
|
|
q1 = s.quantile(0.25)
|
|
q3 = s.quantile(0.75)
|
|
iqr = q3 - q1
|
|
lower_bound = q1 - 1.5 * iqr
|
|
upper_bound = q3 + 1.5 * iqr
|
|
|
|
flags = (s < lower_bound) | (s > upper_bound)
|
|
indices = df.index[flags].tolist()
|
|
|
|
for idx in indices:
|
|
val = df.at[idx, col]
|
|
reason = f"Column '{col}' value {val} is outside IQR bounds [{lower_bound:.2f}, {upper_bound:.2f}]"
|
|
if idx not in outliers:
|
|
outliers[idx] = []
|
|
outliers[idx].append(reason)
|
|
|
|
return outliers
|
|
|
|
def detect_multivariate_outliers(df: pd.DataFrame, columns: List[str], excluded_indices: List[int] = None) -> Dict[int, List[str]]:
|
|
"""
|
|
Detects anomalies across multiple numeric columns using Isolation Forest.
|
|
|
|
Args:
|
|
df: Input DataFrame
|
|
columns: List of column names to analyze
|
|
excluded_indices: List of row indices to exclude from detection (already filtered out)
|
|
|
|
Returns:
|
|
Dictionary of {original_row_index: [reasons]}
|
|
"""
|
|
# Store original indices
|
|
original_indices = df.index.tolist()
|
|
|
|
# Exclude specified rows if provided
|
|
if excluded_indices:
|
|
df = df[~df.index.isin(excluded_indices)]
|
|
|
|
# Select only relevant numeric columns
|
|
numeric_df = df[columns].apply(pd.to_numeric, errors='coerce')
|
|
|
|
if numeric_df.empty:
|
|
return {}
|
|
|
|
# Get rows with all values present (no NaNs in selected columns)
|
|
valid_mask = numeric_df.notna().all(axis=1)
|
|
numeric_df_clean = numeric_df[valid_mask]
|
|
|
|
if numeric_df_clean.empty:
|
|
return {}
|
|
|
|
# Fit Isolation Forest
|
|
model = IsolationForest(contamination='auto', random_state=42)
|
|
preds = model.fit_predict(numeric_df_clean)
|
|
|
|
# IsolationForest returns -1 for outliers
|
|
# Get the indices from the clean DataFrame (these are the original indices)
|
|
outlier_indices = numeric_df_clean.index[preds == -1].tolist()
|
|
|
|
return {int(idx): ["Multivariate anomaly detected by Isolation Forest"] for idx in outlier_indices}
|
|
|
|
def merge_outliers(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Merges results into a flat list of outlier objects.
|
|
DEPRECATED: Use merge_outliers_structured instead for better type separation.
|
|
"""
|
|
all_indices = set(uni.keys()) | set(multi.keys())
|
|
results = []
|
|
|
|
for idx in sorted(all_indices):
|
|
reasons = uni.get(idx, []) + multi.get(idx, [])
|
|
results.append({
|
|
"index": int(idx),
|
|
"reasons": reasons
|
|
})
|
|
|
|
return results
|
|
|
|
def merge_outliers_structured(uni: Dict[int, List[str]], multi: Dict[int, List[str]]) -> Dict[str, Any]:
|
|
"""
|
|
Merges and separates outliers by type for better frontend handling.
|
|
|
|
Returns:
|
|
Dictionary with:
|
|
- 'univariate': Dict mapping column names to their specific outliers
|
|
- 'multivariate': List of outliers that affect multiple columns
|
|
- 'all': Flat list of all outliers (for backwards compatibility)
|
|
|
|
This structure allows the frontend to:
|
|
1. Show column-specific outliers when clicking a column header
|
|
2. Show global/multivariate outliers in a separate view
|
|
3. Clearly distinguish between local and global anomalies
|
|
"""
|
|
# Extract column names from univariate reasons
|
|
column_outliers: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
|
for idx, reasons in uni.items():
|
|
for reason in reasons:
|
|
# Extract column name from reason string
|
|
# Format: "Column 'Price' value 100 is outside..."
|
|
if "Column '" in reason:
|
|
col_start = reason.index("Column '") + 8
|
|
col_end = reason.index("'", col_start)
|
|
col_name = reason[col_start:col_end]
|
|
|
|
if col_name not in column_outliers:
|
|
column_outliers[col_name] = []
|
|
|
|
# Check if this row index already exists for this column
|
|
existing = next((x for x in column_outliers[col_name] if x["index"] == idx), None)
|
|
if not existing:
|
|
column_outliers[col_name].append({
|
|
"index": int(idx),
|
|
"reasons": [reason]
|
|
})
|
|
else:
|
|
existing["reasons"].append(reason)
|
|
|
|
# Prepare multivariate outliers
|
|
multivariate_list = [
|
|
{"index": int(idx), "reasons": reasons}
|
|
for idx, reasons in multi.items()
|
|
]
|
|
|
|
# Prepare legacy flat format (backwards compatibility)
|
|
all_indices = set(uni.keys()) | set(multi.keys())
|
|
all_outliers = []
|
|
for idx in sorted(all_indices):
|
|
reasons = uni.get(idx, []) + multi.get(idx, [])
|
|
all_outliers.append({
|
|
"index": int(idx),
|
|
"reasons": reasons
|
|
})
|
|
|
|
return {
|
|
"univariate": column_outliers,
|
|
"multivariate": multivariate_list,
|
|
"all": all_outliers
|
|
}
|