""" Message consumers module. This module provides functions to consume and process tasks from RabbitMQ queues. """ import json import logging from typing import Callable, Dict from sqlalchemy.orm import Session from app.queues.rabbitmq_client import RabbitMQClient logger = logging.getLogger(__name__) def consume_scraping_tasks( client: RabbitMQClient, callback: Callable[[Dict], Dict], db_session_factory: Callable[[], Session] ) -> None: """ Consume scraping tasks from queue and process them. Args: client: RabbitMQ client instance callback: Function to process scraping tasks db_session_factory: Factory function to create DB sessions """ def on_message(ch, method, properties, body): try: # Parse message message = json.loads(body) task_data = message.get('data', {}) logger.info( f"📥 Received scraping task: match_id={task_data.get('match_id')}, " f"source={task_data.get('source')}" ) # Create database session db = db_session_factory() try: # Process task result = callback(task_data, db) # Publish result from app.queues.producers import publish_scraping_result publish_scraping_result( client=client, match_id=task_data.get('match_id'), source=task_data.get('source'), collected_count=result.get('collected_count', 0), metadata=result.get('metadata', {}) ) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) logger.info(f"✅ Completed scraping task for match {task_data.get('match_id')}") except Exception as e: logger.error(f"❌ Error processing scraping task: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) finally: db.close() except Exception as e: logger.error(f"❌ Error parsing scraping task message: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) # Start consuming client.consume_messages(queue_name='scraping_tasks', callback=on_message) def consume_sentiment_analysis_tasks( client: RabbitMQClient, callback: Callable[[Dict], Dict], db_session_factory: Callable[[], Session] ) -> None: """ Consume sentiment analysis tasks from queue and process them. Args: client: RabbitMQ client instance callback: Function to process sentiment analysis tasks db_session_factory: Factory function to create DB sessions """ def on_message(ch, method, properties, body): try: # Parse message message = json.loads(body) task_data = message.get('data', {}) logger.info( f"📥 Received sentiment analysis task: " f"match_id={task_data.get('match_id')}, " f"source={task_data.get('source')}" ) # Create database session db = db_session_factory() try: # Process task result = callback(task_data, db) # Publish result from app.queues.producers import publish_sentiment_analysis_result publish_sentiment_analysis_result( client=client, match_id=task_data.get('match_id'), source=task_data.get('source'), analyzed_count=result.get('analyzed_count', 0), metrics=result.get('metrics', {}), metadata=result.get('metadata', {}) ) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) logger.info( f"✅ Completed sentiment analysis task for " f"match {task_data.get('match_id')}" ) except Exception as e: logger.error(f"❌ Error processing sentiment analysis task: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) finally: db.close() except Exception as e: logger.error(f"❌ Error parsing sentiment analysis task message: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) # Start consuming client.consume_messages( queue_name='sentiment_analysis_tasks', callback=on_message ) def consume_energy_calculation_tasks( client: RabbitMQClient, callback: Callable[[Dict], Dict], db_session_factory: Callable[[], Session] ) -> None: """ Consume energy calculation tasks from queue and process them. Args: client: RabbitMQ client instance callback: Function to process energy calculation tasks db_session_factory: Factory function to create DB sessions """ def on_message(ch, method, properties, body): try: # Parse message message = json.loads(body) task_data = message.get('data', {}) logger.info( f"📥 Received energy calculation task: " f"match_id={task_data.get('match_id')}, " f"team_id={task_data.get('team_id')}" ) # Create database session db = db_session_factory() try: # Process task result = callback(task_data, db) # Publish result from app.queues.producers import publish_energy_calculation_result publish_energy_calculation_result( client=client, match_id=task_data.get('match_id'), team_id=task_data.get('team_id'), energy_score=result.get('energy_score', 0.0), confidence=result.get('confidence', 0.0), sources_used=result.get('sources_used', []), metadata=result.get('metadata', {}) ) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) logger.info( f"✅ Completed energy calculation task for " f"match {task_data.get('match_id')}, " f"team {task_data.get('team_id')}" ) except Exception as e: logger.error(f"❌ Error processing energy calculation task: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) finally: db.close() except Exception as e: logger.error(f"❌ Error parsing energy calculation task message: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) # Start consuming client.consume_messages( queue_name='energy_calculation_tasks', callback=on_message ) def consume_results( client: RabbitMQClient, callback: Callable[[Dict], None] ) -> None: """ Consume results from results queue. Args: client: RabbitMQ client instance callback: Function to process results """ def on_message(ch, method, properties, body): try: # Parse message message = json.loads(body) result_data = message.get('data', {}) logger.info( f"📥 Received result: type={result_data.get('result_type')}, " f"match_id={result_data.get('data', {}).get('match_id')}" ) # Process result callback(result_data) # Acknowledge message ch.basic_ack(delivery_tag=method.delivery_tag) logger.info(f"✅ Processed {result_data.get('result_type')} result") except Exception as e: logger.error(f"❌ Error processing result: {e}") ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False) # Start consuming client.consume_messages(queue_name='results', callback=on_message)