357 lines
10 KiB
Python
357 lines
10 KiB
Python
"""
|
||
Energy Calculator Module.
|
||
|
||
This module calculates collective energy scores based on sentiment analysis
|
||
from multiple sources (Twitter, Reddit, RSS) using a weighted formula.
|
||
|
||
Formula: Score = (Positive - Negative) × Volume × Virality
|
||
"""
|
||
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
from logging import getLogger
|
||
|
||
logger = getLogger(__name__)
|
||
|
||
# Source weights as specified in requirements
|
||
SOURCE_WEIGHTS = {
|
||
'twitter': 0.60,
|
||
'reddit': 0.25,
|
||
'rss': 0.15
|
||
}
|
||
|
||
# Temporal weighting parameters
|
||
TEMPORAL_DECAY_HOURS = 48 # Full decay over 48 hours
|
||
MIN_TEMPORAL_WEIGHT = 0.5 # Minimum weight for old tweets
|
||
|
||
|
||
def calculate_energy_score(
|
||
match_id: int,
|
||
team_id: int,
|
||
twitter_sentiments: List[Dict[str, float]] = None,
|
||
reddit_sentiments: List[Dict[str, float]] = None,
|
||
rss_sentiments: List[Dict[str, float]] = None,
|
||
tweets_with_timestamps: List[Dict] = None
|
||
) -> Dict[str, any]:
|
||
"""
|
||
Calculate energy score for a team based on multi-source sentiment data.
|
||
|
||
Args:
|
||
match_id: ID of the match
|
||
team_id: ID of the team
|
||
twitter_sentiments: List of Twitter sentiment scores
|
||
reddit_sentiments: List of Reddit sentiment scores
|
||
rss_sentiments: List of RSS sentiment scores
|
||
tweets_with_timestamps: List of tweets with timestamps for temporal weighting
|
||
|
||
Returns:
|
||
Dictionary containing:
|
||
- score: Final energy score (0-100)
|
||
- confidence: Confidence level (0-1)
|
||
- sources_used: List of sources used in calculation
|
||
"""
|
||
# Initialize with empty lists if None
|
||
twitter_sentiments = twitter_sentiments or []
|
||
reddit_sentiments = reddit_sentiments or []
|
||
rss_sentiments = rss_sentiments or []
|
||
tweets_with_timestamps = tweets_with_timestamps or []
|
||
|
||
# Calculate energy scores for each source using the formula
|
||
twitter_energy_score = _calculate_source_energy(twitter_sentiments)
|
||
reddit_energy_score = _calculate_source_energy(reddit_sentiments)
|
||
rss_energy_score = _calculate_source_energy(rss_sentiments)
|
||
|
||
# Determine available sources
|
||
available_sources = []
|
||
if twitter_sentiments:
|
||
available_sources.append('twitter')
|
||
if reddit_sentiments:
|
||
available_sources.append('reddit')
|
||
if rss_sentiments:
|
||
available_sources.append('rss')
|
||
|
||
# Check if no sentiment data is available
|
||
if not available_sources:
|
||
logger.warning(f"No sentiment data available for match_id={match_id}, team_id={team_id}")
|
||
return {
|
||
'score': 0.0,
|
||
'confidence': 0.0,
|
||
'sources_used': []
|
||
}
|
||
|
||
# Apply source weights (with degraded mode adjustment)
|
||
weighted_score = apply_source_weights(
|
||
twitter_score=twitter_energy_score,
|
||
reddit_score=reddit_energy_score,
|
||
rss_score=rss_energy_score,
|
||
available_sources=available_sources
|
||
)
|
||
|
||
# Apply temporal weighting if tweets with timestamps are available
|
||
time_weighted_score = weighted_score
|
||
if tweets_with_timestamps and available_sources:
|
||
time_weighted_score = apply_temporal_weighting(
|
||
base_score=weighted_score,
|
||
tweets_with_timestamps=tweets_with_timestamps
|
||
)
|
||
|
||
# Normalize score to 0-100 range
|
||
final_score = normalize_score(time_weighted_score)
|
||
|
||
# Calculate confidence level
|
||
total_weight = sum(SOURCE_WEIGHTS[s] for s in available_sources)
|
||
confidence = calculate_confidence(
|
||
available_sources=available_sources,
|
||
total_weight=total_weight
|
||
)
|
||
|
||
return {
|
||
'score': final_score,
|
||
'confidence': confidence,
|
||
'sources_used': available_sources
|
||
}
|
||
|
||
|
||
def _calculate_source_energy(sentiments: List[Dict[str, float]]) -> float:
|
||
"""
|
||
Calculate energy score for a single source using the formula:
|
||
Score = (Positive - Negative) × Volume × Virality
|
||
|
||
Args:
|
||
sentiments: List of sentiment scores with 'positive' and 'negative' keys
|
||
|
||
Returns:
|
||
Energy score for the source (can be negative or positive)
|
||
"""
|
||
if not sentiments:
|
||
return 0.0
|
||
|
||
# Calculate aggregated metrics
|
||
total_count = len(sentiments)
|
||
positive_ratio = sum(s.get('positive', 0) for s in sentiments) / total_count
|
||
negative_ratio = sum(s.get('negative', 0) for s in sentiments) / total_count
|
||
|
||
# Volume: total number of sentiments
|
||
volume = total_count
|
||
|
||
# Virality: average absolute compound score (intensity of sentiment)
|
||
virality = sum(abs(s.get('compound', 0)) for s in sentiments) / total_count
|
||
|
||
# Apply the energy formula
|
||
energy = (positive_ratio - negative_ratio) * volume * virality
|
||
|
||
return energy
|
||
|
||
|
||
def apply_source_weights(
|
||
twitter_score: float,
|
||
reddit_score: float,
|
||
rss_score: float,
|
||
available_sources: List[str]
|
||
) -> float:
|
||
"""
|
||
Apply source weights to calculate weighted score.
|
||
|
||
Args:
|
||
twitter_score: Energy score from Twitter
|
||
reddit_score: Energy score from Reddit
|
||
rss_score: Energy score from RSS
|
||
available_sources: List of available sources
|
||
|
||
Returns:
|
||
Weighted energy score
|
||
"""
|
||
if not available_sources:
|
||
return 0.0
|
||
|
||
# Adjust weights for degraded mode
|
||
adjusted_weights = adjust_weights_for_degraded_mode(
|
||
original_weights=SOURCE_WEIGHTS,
|
||
available_sources=available_sources
|
||
)
|
||
|
||
# Calculate weighted score
|
||
weighted_score = 0.0
|
||
if 'twitter' in available_sources:
|
||
weighted_score += twitter_score * adjusted_weights['twitter']
|
||
if 'reddit' in available_sources:
|
||
weighted_score += reddit_score * adjusted_weights['reddit']
|
||
if 'rss' in available_sources:
|
||
weighted_score += rss_score * adjusted_weights['rss']
|
||
|
||
return weighted_score
|
||
|
||
|
||
def adjust_weights_for_degraded_mode(
|
||
original_weights: Dict[str, float],
|
||
available_sources: List[str]
|
||
) -> Dict[str, float]:
|
||
"""
|
||
Adjust weights proportionally when sources are unavailable.
|
||
|
||
Args:
|
||
original_weights: Original source weights
|
||
available_sources: List of available sources
|
||
|
||
Returns:
|
||
Adjusted weights that sum to 1.0
|
||
"""
|
||
if not available_sources:
|
||
return {}
|
||
|
||
# Calculate total weight of available sources
|
||
total_weight = sum(original_weights[s] for s in available_sources)
|
||
|
||
# Adjust weights proportionally
|
||
adjusted_weights = {}
|
||
for source in available_sources:
|
||
adjusted_weights[source] = original_weights[source] / total_weight
|
||
|
||
logger.info(f"Adjusted weights for degraded mode: {adjusted_weights}")
|
||
|
||
return adjusted_weights
|
||
|
||
|
||
def apply_temporal_weighting(
|
||
base_score: float,
|
||
tweets_with_timestamps: List[Dict]
|
||
) -> float:
|
||
"""
|
||
Apply temporal weighting to energy score based on tweet recency.
|
||
|
||
Recent tweets (within 1 hour) have higher weight (1.0)
|
||
Old tweets (24+ hours) have lower weight (0.5)
|
||
Decay happens over 48 hours.
|
||
|
||
Args:
|
||
base_score: Base energy score
|
||
tweets_with_timestamps: List of tweets with 'created_at' timestamps
|
||
|
||
Returns:
|
||
Temporally weighted energy score
|
||
"""
|
||
if not tweets_with_timestamps:
|
||
return base_score
|
||
|
||
now = datetime.utcnow()
|
||
weighted_sum = 0.0
|
||
total_weight = 0.0
|
||
|
||
for tweet in tweets_with_timestamps:
|
||
# Parse timestamp
|
||
created_at = tweet.get('created_at')
|
||
if not created_at:
|
||
continue
|
||
|
||
# Calculate time difference in hours
|
||
if isinstance(created_at, str):
|
||
created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
||
|
||
hours_ago = (now - created_at).total_seconds() / 3600
|
||
|
||
# Calculate temporal weight (linear decay from 1.0 to 0.5 over 48 hours)
|
||
time_weight = max(MIN_TEMPORAL_WEIGHT, 1.0 - (hours_ago / TEMPORAL_DECAY_HOURS))
|
||
|
||
# Weight the tweet's contribution by its temporal weight
|
||
sentiment_contribution = tweet.get('compound', 0)
|
||
weighted_sum += sentiment_contribution * time_weight
|
||
total_weight += time_weight
|
||
|
||
# Calculate weighted average
|
||
if total_weight > 0:
|
||
# Adjust base score by temporal factor
|
||
temporal_factor = weighted_sum / total_weight
|
||
# Apply temporal weighting to the base score
|
||
time_weighted_score = base_score * (1 + abs(temporal_factor))
|
||
else:
|
||
time_weighted_score = base_score
|
||
|
||
return time_weighted_score
|
||
|
||
|
||
def normalize_score(score: float) -> float:
|
||
"""
|
||
Normalize energy score to 0-100 range.
|
||
|
||
Args:
|
||
score: Raw energy score
|
||
|
||
Returns:
|
||
Normalized score between 0 and 100
|
||
"""
|
||
# Clamp score to 0-100 range
|
||
normalized = max(0.0, min(100.0, score))
|
||
return normalized
|
||
|
||
|
||
def calculate_confidence(
|
||
available_sources: List[str],
|
||
total_weight: float
|
||
) -> float:
|
||
"""
|
||
Calculate confidence level based on available sources.
|
||
|
||
Args:
|
||
available_sources: List of available sources
|
||
total_weight: Total weight of available sources
|
||
|
||
Returns:
|
||
Confidence level between 0 and 1
|
||
"""
|
||
if not available_sources:
|
||
return 0.0
|
||
|
||
# Confidence is based on total weight of available sources
|
||
# All sources: 0.6 + 0.25 + 0.15 = 1.0 → confidence ~1.0
|
||
# Single source (Twitter): 0.6 → confidence ~0.6
|
||
# Single source (RSS): 0.15 → confidence ~0.15
|
||
|
||
confidence = total_weight
|
||
|
||
return confidence
|
||
|
||
|
||
def calculate_energy_score_by_source(
|
||
source: str,
|
||
sentiments: List[Dict[str, float]]
|
||
) -> float:
|
||
"""
|
||
Calculate energy score for a single source.
|
||
|
||
Args:
|
||
source: Source name ('twitter', 'reddit', or 'rss')
|
||
sentiments: List of sentiment scores
|
||
|
||
Returns:
|
||
Energy score for the source
|
||
"""
|
||
if source not in SOURCE_WEIGHTS:
|
||
logger.warning(f"Unknown source: {source}")
|
||
return 0.0
|
||
|
||
energy_score = _calculate_source_energy(sentiments)
|
||
return energy_score
|
||
|
||
|
||
def get_source_weights() -> Dict[str, float]:
|
||
"""
|
||
Get the current source weights.
|
||
|
||
Returns:
|
||
Dictionary of source weights
|
||
"""
|
||
return SOURCE_WEIGHTS.copy()
|
||
|
||
|
||
def get_temporal_weighting_parameters() -> Dict[str, float]:
|
||
"""
|
||
Get the current temporal weighting parameters.
|
||
|
||
Returns:
|
||
Dictionary of temporal weighting parameters
|
||
"""
|
||
return {
|
||
'decay_hours': TEMPORAL_DECAY_HOURS,
|
||
'min_weight': MIN_TEMPORAL_WEIGHT
|
||
}
|