""" Energy calculation worker module. This module provides a worker that consumes energy calculation tasks from RabbitMQ and executes energy calculation operations. """ import logging from typing import Dict, List, Optional from sqlalchemy.orm import Session from app.services.energy_service import ( calculate_and_store_energy_score, get_energy_score_by_match_and_team ) from app.schemas.energy_score import EnergyScoreCalculationRequest from app.ml.energy_calculator import ( calculate_energy_score, get_source_weights ) logger = logging.getLogger(__name__) class EnergyWorker: """ Worker for processing energy calculation tasks. Features: - Consumes tasks from energy_calculation_tasks queue - Executes energy score calculations - Publishes results to results queue - Handles errors with retries - Structured logging """ def __init__(self): """Initialize energy calculation worker.""" # No initialization needed for energy worker # Energy calculator is initialized in ml/energy_calculator pass def execute_energy_calculation_task( self, task: Dict, db: Session ) -> Dict: """ Execute an energy calculation task. Args: task: Energy calculation task data db: Database session Returns: Dictionary with energy calculation results """ match_id = task.get('match_id') team_id = task.get('team_id') twitter_sentiments = task.get('twitter_sentiments', []) reddit_sentiments = task.get('reddit_sentiments', []) rss_sentiments = task.get('rss_sentiments', []) tweets_with_timestamps = task.get('tweets_with_timestamps', []) logger.info( f"🔧 Executing energy calculation task: " f"match_id={match_id}, team_id={team_id}" ) try: # Check if energy score already exists existing_score = get_energy_score_by_match_and_team( db, match_id, team_id ) if existing_score: logger.info( f"â„šī¸ Energy score already exists for " f"match {match_id}, team {team_id}" ) return { 'energy_score': existing_score.score, 'confidence': existing_score.confidence, 'sources_used': existing_score.sources_used, 'status': 'success', 'metadata': { 'match_id': match_id, 'team_id': team_id, 'updated_existing': True } } # Create calculation request request = EnergyScoreCalculationRequest( match_id=match_id, team_id=team_id, twitter_sentiments=twitter_sentiments, reddit_sentiments=reddit_sentiments, rss_sentiments=rss_sentiments, tweets_with_timestamps=tweets_with_timestamps ) # Calculate and store energy score energy_score = calculate_and_store_energy_score(db, request) logger.info( f"✅ Energy calculation completed: " f"score={energy_score.score:.2f}, " f"confidence={energy_score.confidence:.2f}" ) return { 'energy_score': energy_score.score, 'confidence': energy_score.confidence, 'sources_used': energy_score.sources_used, 'status': 'success', 'metadata': { 'match_id': match_id, 'team_id': team_id, 'twitter_score': energy_score.twitter_score, 'reddit_score': energy_score.reddit_score, 'rss_score': energy_score.rss_score, 'temporal_factor': energy_score.temporal_factor } } except Exception as e: logger.error(f"❌ Energy calculation task failed: {e}") return { 'energy_score': 0.0, 'confidence': 0.0, 'sources_used': [], 'status': 'error', 'error': str(e) } def calculate_mock_energy( self, twitter_sentiments: List[Dict], reddit_sentiments: List[Dict], rss_sentiments: List[Dict] = None, tweets_with_timestamps: List[Dict] = None ) -> Dict: """ Calculate energy score without storing to database (for testing). Args: twitter_sentiments: List of Twitter sentiment scores reddit_sentiments: List of Reddit sentiment scores rss_sentiments: Optional list of RSS sentiment scores tweets_with_timestamps: Optional list of tweets with timestamps Returns: Dictionary with energy calculation results """ try: result = calculate_energy_score( match_id=0, team_id=0, twitter_sentiments=twitter_sentiments, reddit_sentiments=reddit_sentiments, rss_sentiments=rss_sentiments or [], tweets_with_timestamps=tweets_with_timestamps or [] ) return { 'energy_score': result['score'], 'confidence': result['confidence'], 'sources_used': result['sources_used'], 'status': 'success' } except Exception as e: logger.error(f"❌ Mock energy calculation failed: {e}") return { 'energy_score': 0.0, 'confidence': 0.0, 'sources_used': [], 'status': 'error', 'error': str(e) } def create_energy_worker() -> EnergyWorker: """ Factory function to create an energy calculation worker. Returns: Configured EnergyWorker instance """ return EnergyWorker()