2026-02-01 09:31:38 +01:00

247 lines
8.5 KiB
Python

"""
Message consumers module.
This module provides functions to consume and process tasks from RabbitMQ queues.
"""
import json
import logging
from typing import Callable, Dict
from sqlalchemy.orm import Session
from app.queues.rabbitmq_client import RabbitMQClient
logger = logging.getLogger(__name__)
def consume_scraping_tasks(
client: RabbitMQClient,
callback: Callable[[Dict], Dict],
db_session_factory: Callable[[], Session]
) -> None:
"""
Consume scraping tasks from queue and process them.
Args:
client: RabbitMQ client instance
callback: Function to process scraping tasks
db_session_factory: Factory function to create DB sessions
"""
def on_message(ch, method, properties, body):
try:
# Parse message
message = json.loads(body)
task_data = message.get('data', {})
logger.info(
f"📥 Received scraping task: match_id={task_data.get('match_id')}, "
f"source={task_data.get('source')}"
)
# Create database session
db = db_session_factory()
try:
# Process task
result = callback(task_data, db)
# Publish result
from app.queues.producers import publish_scraping_result
publish_scraping_result(
client=client,
match_id=task_data.get('match_id'),
source=task_data.get('source'),
collected_count=result.get('collected_count', 0),
metadata=result.get('metadata', {})
)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"✅ Completed scraping task for match {task_data.get('match_id')}")
except Exception as e:
logger.error(f"❌ Error processing scraping task: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
finally:
db.close()
except Exception as e:
logger.error(f"❌ Error parsing scraping task message: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# Start consuming
client.consume_messages(queue_name='scraping_tasks', callback=on_message)
def consume_sentiment_analysis_tasks(
client: RabbitMQClient,
callback: Callable[[Dict], Dict],
db_session_factory: Callable[[], Session]
) -> None:
"""
Consume sentiment analysis tasks from queue and process them.
Args:
client: RabbitMQ client instance
callback: Function to process sentiment analysis tasks
db_session_factory: Factory function to create DB sessions
"""
def on_message(ch, method, properties, body):
try:
# Parse message
message = json.loads(body)
task_data = message.get('data', {})
logger.info(
f"📥 Received sentiment analysis task: "
f"match_id={task_data.get('match_id')}, "
f"source={task_data.get('source')}"
)
# Create database session
db = db_session_factory()
try:
# Process task
result = callback(task_data, db)
# Publish result
from app.queues.producers import publish_sentiment_analysis_result
publish_sentiment_analysis_result(
client=client,
match_id=task_data.get('match_id'),
source=task_data.get('source'),
analyzed_count=result.get('analyzed_count', 0),
metrics=result.get('metrics', {}),
metadata=result.get('metadata', {})
)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(
f"✅ Completed sentiment analysis task for "
f"match {task_data.get('match_id')}"
)
except Exception as e:
logger.error(f"❌ Error processing sentiment analysis task: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
finally:
db.close()
except Exception as e:
logger.error(f"❌ Error parsing sentiment analysis task message: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# Start consuming
client.consume_messages(
queue_name='sentiment_analysis_tasks',
callback=on_message
)
def consume_energy_calculation_tasks(
client: RabbitMQClient,
callback: Callable[[Dict], Dict],
db_session_factory: Callable[[], Session]
) -> None:
"""
Consume energy calculation tasks from queue and process them.
Args:
client: RabbitMQ client instance
callback: Function to process energy calculation tasks
db_session_factory: Factory function to create DB sessions
"""
def on_message(ch, method, properties, body):
try:
# Parse message
message = json.loads(body)
task_data = message.get('data', {})
logger.info(
f"📥 Received energy calculation task: "
f"match_id={task_data.get('match_id')}, "
f"team_id={task_data.get('team_id')}"
)
# Create database session
db = db_session_factory()
try:
# Process task
result = callback(task_data, db)
# Publish result
from app.queues.producers import publish_energy_calculation_result
publish_energy_calculation_result(
client=client,
match_id=task_data.get('match_id'),
team_id=task_data.get('team_id'),
energy_score=result.get('energy_score', 0.0),
confidence=result.get('confidence', 0.0),
sources_used=result.get('sources_used', []),
metadata=result.get('metadata', {})
)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(
f"✅ Completed energy calculation task for "
f"match {task_data.get('match_id')}, "
f"team {task_data.get('team_id')}"
)
except Exception as e:
logger.error(f"❌ Error processing energy calculation task: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
finally:
db.close()
except Exception as e:
logger.error(f"❌ Error parsing energy calculation task message: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# Start consuming
client.consume_messages(
queue_name='energy_calculation_tasks',
callback=on_message
)
def consume_results(
client: RabbitMQClient,
callback: Callable[[Dict], None]
) -> None:
"""
Consume results from results queue.
Args:
client: RabbitMQ client instance
callback: Function to process results
"""
def on_message(ch, method, properties, body):
try:
# Parse message
message = json.loads(body)
result_data = message.get('data', {})
logger.info(
f"📥 Received result: type={result_data.get('result_type')}, "
f"match_id={result_data.get('data', {}).get('match_id')}"
)
# Process result
callback(result_data)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"✅ Processed {result_data.get('result_type')} result")
except Exception as e:
logger.error(f"❌ Error processing result: {e}")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
# Start consuming
client.consume_messages(queue_name='results', callback=on_message)