""" Energy Calculator Module. This module calculates collective energy scores based on sentiment analysis from multiple sources (Twitter, Reddit, RSS) using a weighted formula. Formula: Score = (Positive - Negative) × Volume × Virality """ from datetime import datetime from typing import Dict, List, Optional from logging import getLogger logger = getLogger(__name__) # Source weights as specified in requirements SOURCE_WEIGHTS = { 'twitter': 0.60, 'reddit': 0.25, 'rss': 0.15 } # Temporal weighting parameters TEMPORAL_DECAY_HOURS = 48 # Full decay over 48 hours MIN_TEMPORAL_WEIGHT = 0.5 # Minimum weight for old tweets def calculate_energy_score( match_id: int, team_id: int, twitter_sentiments: List[Dict[str, float]] = None, reddit_sentiments: List[Dict[str, float]] = None, rss_sentiments: List[Dict[str, float]] = None, tweets_with_timestamps: List[Dict] = None ) -> Dict[str, any]: """ Calculate energy score for a team based on multi-source sentiment data. Args: match_id: ID of the match team_id: ID of the team twitter_sentiments: List of Twitter sentiment scores reddit_sentiments: List of Reddit sentiment scores rss_sentiments: List of RSS sentiment scores tweets_with_timestamps: List of tweets with timestamps for temporal weighting Returns: Dictionary containing: - score: Final energy score (0-100) - confidence: Confidence level (0-1) - sources_used: List of sources used in calculation """ # Initialize with empty lists if None twitter_sentiments = twitter_sentiments or [] reddit_sentiments = reddit_sentiments or [] rss_sentiments = rss_sentiments or [] tweets_with_timestamps = tweets_with_timestamps or [] # Calculate energy scores for each source using the formula twitter_energy_score = _calculate_source_energy(twitter_sentiments) reddit_energy_score = _calculate_source_energy(reddit_sentiments) rss_energy_score = _calculate_source_energy(rss_sentiments) # Determine available sources available_sources = [] if twitter_sentiments: available_sources.append('twitter') if reddit_sentiments: available_sources.append('reddit') if rss_sentiments: available_sources.append('rss') # Check if no sentiment data is available if not available_sources: logger.warning(f"No sentiment data available for match_id={match_id}, team_id={team_id}") return { 'score': 0.0, 'confidence': 0.0, 'sources_used': [] } # Apply source weights (with degraded mode adjustment) weighted_score = apply_source_weights( twitter_score=twitter_energy_score, reddit_score=reddit_energy_score, rss_score=rss_energy_score, available_sources=available_sources ) # Apply temporal weighting if tweets with timestamps are available time_weighted_score = weighted_score if tweets_with_timestamps and available_sources: time_weighted_score = apply_temporal_weighting( base_score=weighted_score, tweets_with_timestamps=tweets_with_timestamps ) # Normalize score to 0-100 range final_score = normalize_score(time_weighted_score) # Calculate confidence level total_weight = sum(SOURCE_WEIGHTS[s] for s in available_sources) confidence = calculate_confidence( available_sources=available_sources, total_weight=total_weight ) return { 'score': final_score, 'confidence': confidence, 'sources_used': available_sources } def _calculate_source_energy(sentiments: List[Dict[str, float]]) -> float: """ Calculate energy score for a single source using the formula: Score = (Positive - Negative) × Volume × Virality Args: sentiments: List of sentiment scores with 'positive' and 'negative' keys Returns: Energy score for the source (can be negative or positive) """ if not sentiments: return 0.0 # Calculate aggregated metrics total_count = len(sentiments) positive_ratio = sum(s.get('positive', 0) for s in sentiments) / total_count negative_ratio = sum(s.get('negative', 0) for s in sentiments) / total_count # Volume: total number of sentiments volume = total_count # Virality: average absolute compound score (intensity of sentiment) virality = sum(abs(s.get('compound', 0)) for s in sentiments) / total_count # Apply the energy formula energy = (positive_ratio - negative_ratio) * volume * virality return energy def apply_source_weights( twitter_score: float, reddit_score: float, rss_score: float, available_sources: List[str] ) -> float: """ Apply source weights to calculate weighted score. Args: twitter_score: Energy score from Twitter reddit_score: Energy score from Reddit rss_score: Energy score from RSS available_sources: List of available sources Returns: Weighted energy score """ if not available_sources: return 0.0 # Adjust weights for degraded mode adjusted_weights = adjust_weights_for_degraded_mode( original_weights=SOURCE_WEIGHTS, available_sources=available_sources ) # Calculate weighted score weighted_score = 0.0 if 'twitter' in available_sources: weighted_score += twitter_score * adjusted_weights['twitter'] if 'reddit' in available_sources: weighted_score += reddit_score * adjusted_weights['reddit'] if 'rss' in available_sources: weighted_score += rss_score * adjusted_weights['rss'] return weighted_score def adjust_weights_for_degraded_mode( original_weights: Dict[str, float], available_sources: List[str] ) -> Dict[str, float]: """ Adjust weights proportionally when sources are unavailable. Args: original_weights: Original source weights available_sources: List of available sources Returns: Adjusted weights that sum to 1.0 """ if not available_sources: return {} # Calculate total weight of available sources total_weight = sum(original_weights[s] for s in available_sources) # Adjust weights proportionally adjusted_weights = {} for source in available_sources: adjusted_weights[source] = original_weights[source] / total_weight logger.info(f"Adjusted weights for degraded mode: {adjusted_weights}") return adjusted_weights def apply_temporal_weighting( base_score: float, tweets_with_timestamps: List[Dict] ) -> float: """ Apply temporal weighting to energy score based on tweet recency. Recent tweets (within 1 hour) have higher weight (1.0) Old tweets (24+ hours) have lower weight (0.5) Decay happens over 48 hours. Args: base_score: Base energy score tweets_with_timestamps: List of tweets with 'created_at' timestamps Returns: Temporally weighted energy score """ if not tweets_with_timestamps: return base_score now = datetime.utcnow() weighted_sum = 0.0 total_weight = 0.0 for tweet in tweets_with_timestamps: # Parse timestamp created_at = tweet.get('created_at') if not created_at: continue # Calculate time difference in hours if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) hours_ago = (now - created_at).total_seconds() / 3600 # Calculate temporal weight (linear decay from 1.0 to 0.5 over 48 hours) time_weight = max(MIN_TEMPORAL_WEIGHT, 1.0 - (hours_ago / TEMPORAL_DECAY_HOURS)) # Weight the tweet's contribution by its temporal weight sentiment_contribution = tweet.get('compound', 0) weighted_sum += sentiment_contribution * time_weight total_weight += time_weight # Calculate weighted average if total_weight > 0: # Adjust base score by temporal factor temporal_factor = weighted_sum / total_weight # Apply temporal weighting to the base score time_weighted_score = base_score * (1 + abs(temporal_factor)) else: time_weighted_score = base_score return time_weighted_score def normalize_score(score: float) -> float: """ Normalize energy score to 0-100 range. Args: score: Raw energy score Returns: Normalized score between 0 and 100 """ # Clamp score to 0-100 range normalized = max(0.0, min(100.0, score)) return normalized def calculate_confidence( available_sources: List[str], total_weight: float ) -> float: """ Calculate confidence level based on available sources. Args: available_sources: List of available sources total_weight: Total weight of available sources Returns: Confidence level between 0 and 1 """ if not available_sources: return 0.0 # Confidence is based on total weight of available sources # All sources: 0.6 + 0.25 + 0.15 = 1.0 → confidence ~1.0 # Single source (Twitter): 0.6 → confidence ~0.6 # Single source (RSS): 0.15 → confidence ~0.15 confidence = total_weight return confidence def calculate_energy_score_by_source( source: str, sentiments: List[Dict[str, float]] ) -> float: """ Calculate energy score for a single source. Args: source: Source name ('twitter', 'reddit', or 'rss') sentiments: List of sentiment scores Returns: Energy score for the source """ if source not in SOURCE_WEIGHTS: logger.warning(f"Unknown source: {source}") return 0.0 energy_score = _calculate_source_energy(sentiments) return energy_score def get_source_weights() -> Dict[str, float]: """ Get the current source weights. Returns: Dictionary of source weights """ return SOURCE_WEIGHTS.copy() def get_temporal_weighting_parameters() -> Dict[str, float]: """ Get the current temporal weighting parameters. Returns: Dictionary of temporal weighting parameters """ return { 'decay_hours': TEMPORAL_DECAY_HOURS, 'min_weight': MIN_TEMPORAL_WEIGHT }