chartbastan/backend/app/workers/energy_worker.py
2026-02-01 09:31:38 +01:00

192 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Energy calculation worker module.
This module provides a worker that consumes energy calculation tasks
from RabbitMQ and executes energy calculation operations.
"""
import logging
from typing import Dict, List, Optional
from sqlalchemy.orm import Session
from app.services.energy_service import (
calculate_and_store_energy_score,
get_energy_score_by_match_and_team
)
from app.schemas.energy_score import EnergyScoreCalculationRequest
from app.ml.energy_calculator import (
calculate_energy_score,
get_source_weights
)
logger = logging.getLogger(__name__)
class EnergyWorker:
"""
Worker for processing energy calculation tasks.
Features:
- Consumes tasks from energy_calculation_tasks queue
- Executes energy score calculations
- Publishes results to results queue
- Handles errors with retries
- Structured logging
"""
def __init__(self):
"""Initialize energy calculation worker."""
# No initialization needed for energy worker
# Energy calculator is initialized in ml/energy_calculator
pass
def execute_energy_calculation_task(
self,
task: Dict,
db: Session
) -> Dict:
"""
Execute an energy calculation task.
Args:
task: Energy calculation task data
db: Database session
Returns:
Dictionary with energy calculation results
"""
match_id = task.get('match_id')
team_id = task.get('team_id')
twitter_sentiments = task.get('twitter_sentiments', [])
reddit_sentiments = task.get('reddit_sentiments', [])
rss_sentiments = task.get('rss_sentiments', [])
tweets_with_timestamps = task.get('tweets_with_timestamps', [])
logger.info(
f"🔧 Executing energy calculation task: "
f"match_id={match_id}, team_id={team_id}"
)
try:
# Check if energy score already exists
existing_score = get_energy_score_by_match_and_team(
db, match_id, team_id
)
if existing_score:
logger.info(
f" Energy score already exists for "
f"match {match_id}, team {team_id}"
)
return {
'energy_score': existing_score.score,
'confidence': existing_score.confidence,
'sources_used': existing_score.sources_used,
'status': 'success',
'metadata': {
'match_id': match_id,
'team_id': team_id,
'updated_existing': True
}
}
# Create calculation request
request = EnergyScoreCalculationRequest(
match_id=match_id,
team_id=team_id,
twitter_sentiments=twitter_sentiments,
reddit_sentiments=reddit_sentiments,
rss_sentiments=rss_sentiments,
tweets_with_timestamps=tweets_with_timestamps
)
# Calculate and store energy score
energy_score = calculate_and_store_energy_score(db, request)
logger.info(
f"✅ Energy calculation completed: "
f"score={energy_score.score:.2f}, "
f"confidence={energy_score.confidence:.2f}"
)
return {
'energy_score': energy_score.score,
'confidence': energy_score.confidence,
'sources_used': energy_score.sources_used,
'status': 'success',
'metadata': {
'match_id': match_id,
'team_id': team_id,
'twitter_score': energy_score.twitter_score,
'reddit_score': energy_score.reddit_score,
'rss_score': energy_score.rss_score,
'temporal_factor': energy_score.temporal_factor
}
}
except Exception as e:
logger.error(f"❌ Energy calculation task failed: {e}")
return {
'energy_score': 0.0,
'confidence': 0.0,
'sources_used': [],
'status': 'error',
'error': str(e)
}
def calculate_mock_energy(
self,
twitter_sentiments: List[Dict],
reddit_sentiments: List[Dict],
rss_sentiments: List[Dict] = None,
tweets_with_timestamps: List[Dict] = None
) -> Dict:
"""
Calculate energy score without storing to database (for testing).
Args:
twitter_sentiments: List of Twitter sentiment scores
reddit_sentiments: List of Reddit sentiment scores
rss_sentiments: Optional list of RSS sentiment scores
tweets_with_timestamps: Optional list of tweets with timestamps
Returns:
Dictionary with energy calculation results
"""
try:
result = calculate_energy_score(
match_id=0,
team_id=0,
twitter_sentiments=twitter_sentiments,
reddit_sentiments=reddit_sentiments,
rss_sentiments=rss_sentiments or [],
tweets_with_timestamps=tweets_with_timestamps or []
)
return {
'energy_score': result['score'],
'confidence': result['confidence'],
'sources_used': result['sources_used'],
'status': 'success'
}
except Exception as e:
logger.error(f"❌ Mock energy calculation failed: {e}")
return {
'energy_score': 0.0,
'confidence': 0.0,
'sources_used': [],
'status': 'error',
'error': str(e)
}
def create_energy_worker() -> EnergyWorker:
"""
Factory function to create an energy calculation worker.
Returns:
Configured EnergyWorker instance
"""
return EnergyWorker()