247 lines
8.5 KiB
Python
247 lines
8.5 KiB
Python
"""
|
|
Message consumers module.
|
|
|
|
This module provides functions to consume and process tasks from RabbitMQ queues.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Callable, Dict
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.queues.rabbitmq_client import RabbitMQClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def consume_scraping_tasks(
|
|
client: RabbitMQClient,
|
|
callback: Callable[[Dict], Dict],
|
|
db_session_factory: Callable[[], Session]
|
|
) -> None:
|
|
"""
|
|
Consume scraping tasks from queue and process them.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
callback: Function to process scraping tasks
|
|
db_session_factory: Factory function to create DB sessions
|
|
"""
|
|
def on_message(ch, method, properties, body):
|
|
try:
|
|
# Parse message
|
|
message = json.loads(body)
|
|
task_data = message.get('data', {})
|
|
|
|
logger.info(
|
|
f"📥 Received scraping task: match_id={task_data.get('match_id')}, "
|
|
f"source={task_data.get('source')}"
|
|
)
|
|
|
|
# Create database session
|
|
db = db_session_factory()
|
|
|
|
try:
|
|
# Process task
|
|
result = callback(task_data, db)
|
|
|
|
# Publish result
|
|
from app.queues.producers import publish_scraping_result
|
|
publish_scraping_result(
|
|
client=client,
|
|
match_id=task_data.get('match_id'),
|
|
source=task_data.get('source'),
|
|
collected_count=result.get('collected_count', 0),
|
|
metadata=result.get('metadata', {})
|
|
)
|
|
|
|
# Acknowledge message
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
logger.info(f"✅ Completed scraping task for match {task_data.get('match_id')}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing scraping task: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing scraping task message: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
# Start consuming
|
|
client.consume_messages(queue_name='scraping_tasks', callback=on_message)
|
|
|
|
|
|
def consume_sentiment_analysis_tasks(
|
|
client: RabbitMQClient,
|
|
callback: Callable[[Dict], Dict],
|
|
db_session_factory: Callable[[], Session]
|
|
) -> None:
|
|
"""
|
|
Consume sentiment analysis tasks from queue and process them.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
callback: Function to process sentiment analysis tasks
|
|
db_session_factory: Factory function to create DB sessions
|
|
"""
|
|
def on_message(ch, method, properties, body):
|
|
try:
|
|
# Parse message
|
|
message = json.loads(body)
|
|
task_data = message.get('data', {})
|
|
|
|
logger.info(
|
|
f"📥 Received sentiment analysis task: "
|
|
f"match_id={task_data.get('match_id')}, "
|
|
f"source={task_data.get('source')}"
|
|
)
|
|
|
|
# Create database session
|
|
db = db_session_factory()
|
|
|
|
try:
|
|
# Process task
|
|
result = callback(task_data, db)
|
|
|
|
# Publish result
|
|
from app.queues.producers import publish_sentiment_analysis_result
|
|
publish_sentiment_analysis_result(
|
|
client=client,
|
|
match_id=task_data.get('match_id'),
|
|
source=task_data.get('source'),
|
|
analyzed_count=result.get('analyzed_count', 0),
|
|
metrics=result.get('metrics', {}),
|
|
metadata=result.get('metadata', {})
|
|
)
|
|
|
|
# Acknowledge message
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
logger.info(
|
|
f"✅ Completed sentiment analysis task for "
|
|
f"match {task_data.get('match_id')}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing sentiment analysis task: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing sentiment analysis task message: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
# Start consuming
|
|
client.consume_messages(
|
|
queue_name='sentiment_analysis_tasks',
|
|
callback=on_message
|
|
)
|
|
|
|
|
|
def consume_energy_calculation_tasks(
|
|
client: RabbitMQClient,
|
|
callback: Callable[[Dict], Dict],
|
|
db_session_factory: Callable[[], Session]
|
|
) -> None:
|
|
"""
|
|
Consume energy calculation tasks from queue and process them.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
callback: Function to process energy calculation tasks
|
|
db_session_factory: Factory function to create DB sessions
|
|
"""
|
|
def on_message(ch, method, properties, body):
|
|
try:
|
|
# Parse message
|
|
message = json.loads(body)
|
|
task_data = message.get('data', {})
|
|
|
|
logger.info(
|
|
f"📥 Received energy calculation task: "
|
|
f"match_id={task_data.get('match_id')}, "
|
|
f"team_id={task_data.get('team_id')}"
|
|
)
|
|
|
|
# Create database session
|
|
db = db_session_factory()
|
|
|
|
try:
|
|
# Process task
|
|
result = callback(task_data, db)
|
|
|
|
# Publish result
|
|
from app.queues.producers import publish_energy_calculation_result
|
|
publish_energy_calculation_result(
|
|
client=client,
|
|
match_id=task_data.get('match_id'),
|
|
team_id=task_data.get('team_id'),
|
|
energy_score=result.get('energy_score', 0.0),
|
|
confidence=result.get('confidence', 0.0),
|
|
sources_used=result.get('sources_used', []),
|
|
metadata=result.get('metadata', {})
|
|
)
|
|
|
|
# Acknowledge message
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
logger.info(
|
|
f"✅ Completed energy calculation task for "
|
|
f"match {task_data.get('match_id')}, "
|
|
f"team {task_data.get('team_id')}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing energy calculation task: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing energy calculation task message: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
# Start consuming
|
|
client.consume_messages(
|
|
queue_name='energy_calculation_tasks',
|
|
callback=on_message
|
|
)
|
|
|
|
|
|
def consume_results(
|
|
client: RabbitMQClient,
|
|
callback: Callable[[Dict], None]
|
|
) -> None:
|
|
"""
|
|
Consume results from results queue.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
callback: Function to process results
|
|
"""
|
|
def on_message(ch, method, properties, body):
|
|
try:
|
|
# Parse message
|
|
message = json.loads(body)
|
|
result_data = message.get('data', {})
|
|
|
|
logger.info(
|
|
f"📥 Received result: type={result_data.get('result_type')}, "
|
|
f"match_id={result_data.get('data', {}).get('match_id')}"
|
|
)
|
|
|
|
# Process result
|
|
callback(result_data)
|
|
|
|
# Acknowledge message
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
logger.info(f"✅ Processed {result_data.get('result_type')} result")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error processing result: {e}")
|
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
|
|
|
|
# Start consuming
|
|
client.consume_messages(queue_name='results', callback=on_message)
|