269 lines
6.9 KiB
Python
269 lines
6.9 KiB
Python
"""
|
|
Message producers module.
|
|
|
|
This module provides functions to publish tasks and results to RabbitMQ queues.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
from datetime import datetime
|
|
|
|
from app.queues.rabbitmq_client import RabbitMQClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def publish_scraping_task(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
source: str,
|
|
keywords: Optional[List[str]] = None,
|
|
priority: str = "normal"
|
|
) -> None:
|
|
"""
|
|
Publish a scraping task to the queue.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
source: Source to scrape ('twitter', 'reddit', 'rss')
|
|
keywords: Optional list of keywords for filtering
|
|
priority: Task priority ('low', 'normal', 'high', 'vip')
|
|
"""
|
|
task = {
|
|
"task_type": "scraping",
|
|
"match_id": match_id,
|
|
"source": source,
|
|
"keywords": keywords or [],
|
|
"priority": priority,
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
client.publish_message(
|
|
queue_name='scraping_tasks',
|
|
data=task,
|
|
event_type="scraping.task.created"
|
|
)
|
|
|
|
logger.info(
|
|
f"📤 Published scraping task for match {match_id} "
|
|
f"(source: {source}, priority: {priority})"
|
|
)
|
|
|
|
|
|
def publish_sentiment_analysis_task(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
source: str,
|
|
entity_ids: List[str],
|
|
texts: Optional[List[str]] = None
|
|
) -> None:
|
|
"""
|
|
Publish a sentiment analysis task to the queue.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
source: Source type ('twitter', 'reddit')
|
|
entity_ids: List of entity IDs to analyze
|
|
texts: Optional list of texts (if not fetched from DB)
|
|
"""
|
|
task = {
|
|
"task_type": "sentiment_analysis",
|
|
"match_id": match_id,
|
|
"source": source,
|
|
"entity_ids": entity_ids,
|
|
"texts": texts or [],
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
client.publish_message(
|
|
queue_name='sentiment_analysis_tasks',
|
|
data=task,
|
|
event_type="sentiment_analysis.task.created"
|
|
)
|
|
|
|
logger.info(
|
|
f"📤 Published sentiment analysis task for match {match_id} "
|
|
f"(source: {source}, entities: {len(entity_ids)})"
|
|
)
|
|
|
|
|
|
def publish_energy_calculation_task(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
team_id: int,
|
|
twitter_sentiments: Optional[List[Dict]] = None,
|
|
reddit_sentiments: Optional[List[Dict]] = None,
|
|
rss_sentiments: Optional[List[Dict]] = None,
|
|
tweets_with_timestamps: Optional[List[Dict]] = None
|
|
) -> None:
|
|
"""
|
|
Publish an energy calculation task to the queue.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
team_id: Team identifier
|
|
twitter_sentiments: Optional list of Twitter sentiment scores
|
|
reddit_sentiments: Optional list of Reddit sentiment scores
|
|
rss_sentiments: Optional list of RSS sentiment scores
|
|
tweets_with_timestamps: Optional list of tweets with timestamps
|
|
"""
|
|
task = {
|
|
"task_type": "energy_calculation",
|
|
"match_id": match_id,
|
|
"team_id": team_id,
|
|
"twitter_sentiments": twitter_sentiments or [],
|
|
"reddit_sentiments": reddit_sentiments or [],
|
|
"rss_sentiments": rss_sentiments or [],
|
|
"tweets_with_timestamps": tweets_with_timestamps or [],
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
client.publish_message(
|
|
queue_name='energy_calculation_tasks',
|
|
data=task,
|
|
event_type="energy_calculation.task.created"
|
|
)
|
|
|
|
logger.info(
|
|
f"📤 Published energy calculation task for match {match_id}, "
|
|
f"team {team_id}"
|
|
)
|
|
|
|
|
|
def publish_result(
|
|
client: RabbitMQClient,
|
|
result_type: str,
|
|
data: Dict
|
|
) -> None:
|
|
"""
|
|
Publish a result to the results queue.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
result_type: Type of result ('scraping', 'sentiment', 'energy')
|
|
data: Result data
|
|
"""
|
|
result = {
|
|
"result_type": result_type,
|
|
"data": data,
|
|
"created_at": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
client.publish_message(
|
|
queue_name='results',
|
|
data=result,
|
|
event_type="result.published"
|
|
)
|
|
|
|
logger.info(
|
|
f"📤 Published {result_type} result to results queue"
|
|
)
|
|
|
|
|
|
def publish_scraping_result(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
source: str,
|
|
collected_count: int,
|
|
metadata: Optional[Dict] = None
|
|
) -> None:
|
|
"""
|
|
Publish a scraping result.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
source: Source scraped ('twitter', 'reddit', 'rss')
|
|
collected_count: Number of items collected
|
|
metadata: Optional additional metadata
|
|
"""
|
|
result_data = {
|
|
"match_id": match_id,
|
|
"source": source,
|
|
"collected_count": collected_count,
|
|
"status": "success",
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
publish_result(
|
|
client=client,
|
|
result_type="scraping",
|
|
data=result_data
|
|
)
|
|
|
|
|
|
def publish_sentiment_analysis_result(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
source: str,
|
|
analyzed_count: int,
|
|
metrics: Dict,
|
|
metadata: Optional[Dict] = None
|
|
) -> None:
|
|
"""
|
|
Publish a sentiment analysis result.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
source: Source analyzed ('twitter', 'reddit')
|
|
analyzed_count: Number of items analyzed
|
|
metrics: Aggregated sentiment metrics
|
|
metadata: Optional additional metadata
|
|
"""
|
|
result_data = {
|
|
"match_id": match_id,
|
|
"source": source,
|
|
"analyzed_count": analyzed_count,
|
|
"metrics": metrics,
|
|
"status": "success",
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
publish_result(
|
|
client=client,
|
|
result_type="sentiment",
|
|
data=result_data
|
|
)
|
|
|
|
|
|
def publish_energy_calculation_result(
|
|
client: RabbitMQClient,
|
|
match_id: int,
|
|
team_id: int,
|
|
energy_score: float,
|
|
confidence: float,
|
|
sources_used: List[str],
|
|
metadata: Optional[Dict] = None
|
|
) -> None:
|
|
"""
|
|
Publish an energy calculation result.
|
|
|
|
Args:
|
|
client: RabbitMQ client instance
|
|
match_id: Match identifier
|
|
team_id: Team identifier
|
|
energy_score: Calculated energy score
|
|
confidence: Confidence level
|
|
sources_used: List of sources used in calculation
|
|
metadata: Optional additional metadata
|
|
"""
|
|
result_data = {
|
|
"match_id": match_id,
|
|
"team_id": team_id,
|
|
"energy_score": energy_score,
|
|
"confidence": confidence,
|
|
"sources_used": sources_used,
|
|
"status": "success",
|
|
"metadata": metadata or {}
|
|
}
|
|
|
|
publish_result(
|
|
client=client,
|
|
result_type="energy",
|
|
data=result_data
|
|
)
|