chartbastan/backend/app/ml/energy_calculator.py
2026-02-01 09:31:38 +01:00

357 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Energy Calculator Module.
This module calculates collective energy scores based on sentiment analysis
from multiple sources (Twitter, Reddit, RSS) using a weighted formula.
Formula: Score = (Positive - Negative) × Volume × Virality
"""
from datetime import datetime
from typing import Dict, List, Optional
from logging import getLogger
logger = getLogger(__name__)
# Source weights as specified in requirements
SOURCE_WEIGHTS = {
'twitter': 0.60,
'reddit': 0.25,
'rss': 0.15
}
# Temporal weighting parameters
TEMPORAL_DECAY_HOURS = 48 # Full decay over 48 hours
MIN_TEMPORAL_WEIGHT = 0.5 # Minimum weight for old tweets
def calculate_energy_score(
match_id: int,
team_id: int,
twitter_sentiments: List[Dict[str, float]] = None,
reddit_sentiments: List[Dict[str, float]] = None,
rss_sentiments: List[Dict[str, float]] = None,
tweets_with_timestamps: List[Dict] = None
) -> Dict[str, any]:
"""
Calculate energy score for a team based on multi-source sentiment data.
Args:
match_id: ID of the match
team_id: ID of the team
twitter_sentiments: List of Twitter sentiment scores
reddit_sentiments: List of Reddit sentiment scores
rss_sentiments: List of RSS sentiment scores
tweets_with_timestamps: List of tweets with timestamps for temporal weighting
Returns:
Dictionary containing:
- score: Final energy score (0-100)
- confidence: Confidence level (0-1)
- sources_used: List of sources used in calculation
"""
# Initialize with empty lists if None
twitter_sentiments = twitter_sentiments or []
reddit_sentiments = reddit_sentiments or []
rss_sentiments = rss_sentiments or []
tweets_with_timestamps = tweets_with_timestamps or []
# Calculate energy scores for each source using the formula
twitter_energy_score = _calculate_source_energy(twitter_sentiments)
reddit_energy_score = _calculate_source_energy(reddit_sentiments)
rss_energy_score = _calculate_source_energy(rss_sentiments)
# Determine available sources
available_sources = []
if twitter_sentiments:
available_sources.append('twitter')
if reddit_sentiments:
available_sources.append('reddit')
if rss_sentiments:
available_sources.append('rss')
# Check if no sentiment data is available
if not available_sources:
logger.warning(f"No sentiment data available for match_id={match_id}, team_id={team_id}")
return {
'score': 0.0,
'confidence': 0.0,
'sources_used': []
}
# Apply source weights (with degraded mode adjustment)
weighted_score = apply_source_weights(
twitter_score=twitter_energy_score,
reddit_score=reddit_energy_score,
rss_score=rss_energy_score,
available_sources=available_sources
)
# Apply temporal weighting if tweets with timestamps are available
time_weighted_score = weighted_score
if tweets_with_timestamps and available_sources:
time_weighted_score = apply_temporal_weighting(
base_score=weighted_score,
tweets_with_timestamps=tweets_with_timestamps
)
# Normalize score to 0-100 range
final_score = normalize_score(time_weighted_score)
# Calculate confidence level
total_weight = sum(SOURCE_WEIGHTS[s] for s in available_sources)
confidence = calculate_confidence(
available_sources=available_sources,
total_weight=total_weight
)
return {
'score': final_score,
'confidence': confidence,
'sources_used': available_sources
}
def _calculate_source_energy(sentiments: List[Dict[str, float]]) -> float:
"""
Calculate energy score for a single source using the formula:
Score = (Positive - Negative) × Volume × Virality
Args:
sentiments: List of sentiment scores with 'positive' and 'negative' keys
Returns:
Energy score for the source (can be negative or positive)
"""
if not sentiments:
return 0.0
# Calculate aggregated metrics
total_count = len(sentiments)
positive_ratio = sum(s.get('positive', 0) for s in sentiments) / total_count
negative_ratio = sum(s.get('negative', 0) for s in sentiments) / total_count
# Volume: total number of sentiments
volume = total_count
# Virality: average absolute compound score (intensity of sentiment)
virality = sum(abs(s.get('compound', 0)) for s in sentiments) / total_count
# Apply the energy formula
energy = (positive_ratio - negative_ratio) * volume * virality
return energy
def apply_source_weights(
twitter_score: float,
reddit_score: float,
rss_score: float,
available_sources: List[str]
) -> float:
"""
Apply source weights to calculate weighted score.
Args:
twitter_score: Energy score from Twitter
reddit_score: Energy score from Reddit
rss_score: Energy score from RSS
available_sources: List of available sources
Returns:
Weighted energy score
"""
if not available_sources:
return 0.0
# Adjust weights for degraded mode
adjusted_weights = adjust_weights_for_degraded_mode(
original_weights=SOURCE_WEIGHTS,
available_sources=available_sources
)
# Calculate weighted score
weighted_score = 0.0
if 'twitter' in available_sources:
weighted_score += twitter_score * adjusted_weights['twitter']
if 'reddit' in available_sources:
weighted_score += reddit_score * adjusted_weights['reddit']
if 'rss' in available_sources:
weighted_score += rss_score * adjusted_weights['rss']
return weighted_score
def adjust_weights_for_degraded_mode(
original_weights: Dict[str, float],
available_sources: List[str]
) -> Dict[str, float]:
"""
Adjust weights proportionally when sources are unavailable.
Args:
original_weights: Original source weights
available_sources: List of available sources
Returns:
Adjusted weights that sum to 1.0
"""
if not available_sources:
return {}
# Calculate total weight of available sources
total_weight = sum(original_weights[s] for s in available_sources)
# Adjust weights proportionally
adjusted_weights = {}
for source in available_sources:
adjusted_weights[source] = original_weights[source] / total_weight
logger.info(f"Adjusted weights for degraded mode: {adjusted_weights}")
return adjusted_weights
def apply_temporal_weighting(
base_score: float,
tweets_with_timestamps: List[Dict]
) -> float:
"""
Apply temporal weighting to energy score based on tweet recency.
Recent tweets (within 1 hour) have higher weight (1.0)
Old tweets (24+ hours) have lower weight (0.5)
Decay happens over 48 hours.
Args:
base_score: Base energy score
tweets_with_timestamps: List of tweets with 'created_at' timestamps
Returns:
Temporally weighted energy score
"""
if not tweets_with_timestamps:
return base_score
now = datetime.utcnow()
weighted_sum = 0.0
total_weight = 0.0
for tweet in tweets_with_timestamps:
# Parse timestamp
created_at = tweet.get('created_at')
if not created_at:
continue
# Calculate time difference in hours
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
hours_ago = (now - created_at).total_seconds() / 3600
# Calculate temporal weight (linear decay from 1.0 to 0.5 over 48 hours)
time_weight = max(MIN_TEMPORAL_WEIGHT, 1.0 - (hours_ago / TEMPORAL_DECAY_HOURS))
# Weight the tweet's contribution by its temporal weight
sentiment_contribution = tweet.get('compound', 0)
weighted_sum += sentiment_contribution * time_weight
total_weight += time_weight
# Calculate weighted average
if total_weight > 0:
# Adjust base score by temporal factor
temporal_factor = weighted_sum / total_weight
# Apply temporal weighting to the base score
time_weighted_score = base_score * (1 + abs(temporal_factor))
else:
time_weighted_score = base_score
return time_weighted_score
def normalize_score(score: float) -> float:
"""
Normalize energy score to 0-100 range.
Args:
score: Raw energy score
Returns:
Normalized score between 0 and 100
"""
# Clamp score to 0-100 range
normalized = max(0.0, min(100.0, score))
return normalized
def calculate_confidence(
available_sources: List[str],
total_weight: float
) -> float:
"""
Calculate confidence level based on available sources.
Args:
available_sources: List of available sources
total_weight: Total weight of available sources
Returns:
Confidence level between 0 and 1
"""
if not available_sources:
return 0.0
# Confidence is based on total weight of available sources
# All sources: 0.6 + 0.25 + 0.15 = 1.0 → confidence ~1.0
# Single source (Twitter): 0.6 → confidence ~0.6
# Single source (RSS): 0.15 → confidence ~0.15
confidence = total_weight
return confidence
def calculate_energy_score_by_source(
source: str,
sentiments: List[Dict[str, float]]
) -> float:
"""
Calculate energy score for a single source.
Args:
source: Source name ('twitter', 'reddit', or 'rss')
sentiments: List of sentiment scores
Returns:
Energy score for the source
"""
if source not in SOURCE_WEIGHTS:
logger.warning(f"Unknown source: {source}")
return 0.0
energy_score = _calculate_source_energy(sentiments)
return energy_score
def get_source_weights() -> Dict[str, float]:
"""
Get the current source weights.
Returns:
Dictionary of source weights
"""
return SOURCE_WEIGHTS.copy()
def get_temporal_weighting_parameters() -> Dict[str, float]:
"""
Get the current temporal weighting parameters.
Returns:
Dictionary of temporal weighting parameters
"""
return {
'decay_hours': TEMPORAL_DECAY_HOURS,
'min_weight': MIN_TEMPORAL_WEIGHT
}