chartbastan/backend/app/workers/scraping_worker.py
2026-02-01 09:31:38 +01:00

244 lines
7.1 KiB
Python

"""
Scraping worker module.
This module provides a worker that consumes scraping tasks
from RabbitMQ and executes scraping operations.
"""
import logging
from typing import Dict, List
from sqlalchemy.orm import Session
from app.scrapers.twitter_scraper import TwitterScraper, create_twitter_scraper
from app.scrapers.reddit_scraper import RedditScraper, create_reddit_scraper
logger = logging.getLogger(__name__)
class ScrapingWorker:
"""
Worker for processing scraping tasks.
Features:
- Consumes tasks from scraping_tasks queue
- Executes Twitter and Reddit scraping
- Publishes results to results queue
- Handles errors with retries
- Structured logging
"""
def __init__(
self,
twitter_bearer_token: str,
reddit_client_id: str,
reddit_client_secret: str
):
"""
Initialize scraping worker.
Args:
twitter_bearer_token: Twitter API bearer token
reddit_client_id: Reddit API client ID
reddit_client_secret: Reddit API client secret
"""
self.twitter_bearer_token = twitter_bearer_token
self.reddit_client_id = reddit_client_id
self.reddit_client_secret = reddit_client_secret
# Initialize scrapers (lazy initialization)
self.twitter_scraper: TwitterScraper = None
self.reddit_scraper: RedditScraper = None
def _get_twitter_scraper(self) -> TwitterScraper:
"""Get or create Twitter scraper instance."""
if self.twitter_scraper is None:
self.twitter_scraper = create_twitter_scraper(
bearer_token=self.twitter_bearer_token,
vip_match_ids=[]
)
return self.twitter_scraper
def _get_reddit_scraper(self) -> RedditScraper:
"""Get or create Reddit scraper instance."""
if self.reddit_scraper is None:
self.reddit_scraper = create_reddit_scraper(
client_id=self.reddit_client_id,
client_secret=self.reddit_client_secret
)
return self.reddit_scraper
def execute_scraping_task(
self,
task: Dict,
db: Session
) -> Dict:
"""
Execute a scraping task.
Args:
task: Scraping task data
db: Database session
Returns:
Dictionary with scraping results
"""
source = task.get('source')
match_id = task.get('match_id')
keywords = task.get('keywords', [])
priority = task.get('priority', 'normal')
logger.info(
f"🔧 Executing scraping task: match_id={match_id}, "
f"source={source}, priority={priority}"
)
try:
if source == 'twitter':
return self._execute_twitter_scraping(match_id, keywords, db)
elif source == 'reddit':
return self._execute_reddit_scraping(match_id, keywords, db)
else:
logger.error(f"❌ Unknown scraping source: {source}")
return {
'collected_count': 0,
'status': 'error',
'error': f'Unknown source: {source}'
}
except Exception as e:
logger.error(f"❌ Scraping task failed: {e}")
return {
'collected_count': 0,
'status': 'error',
'error': str(e)
}
def _execute_twitter_scraping(
self,
match_id: int,
keywords: List[str],
db: Session
) -> Dict:
"""
Execute Twitter scraping.
Args:
match_id: Match identifier
keywords: Search keywords
db: Database session
Returns:
Dictionary with scraping results
"""
try:
scraper = self._get_twitter_scraper()
# Scrape and save tweets
tweets = scraper.scrape_and_save(
match_id=match_id,
keywords=keywords,
db=db,
max_results=100
)
logger.info(
f"✅ Twitter scraping completed: {len(tweets)} tweets collected"
)
return {
'collected_count': len(tweets),
'status': 'success',
'metadata': {
'source': 'twitter',
'match_id': match_id,
'keywords': keywords
}
}
except Exception as e:
logger.error(f"❌ Twitter scraping failed: {e}")
return {
'collected_count': 0,
'status': 'error',
'error': str(e)
}
def _execute_reddit_scraping(
self,
match_id: int,
keywords: List[str],
db: Session
) -> Dict:
"""
Execute Reddit scraping.
Args:
match_id: Match identifier
keywords: Search keywords
db: Database session
Returns:
Dictionary with scraping results
"""
try:
scraper = self._get_reddit_scraper()
# Scrape and save Reddit posts
result = scraper.scrape_and_save(
match_id=match_id,
db=db,
keywords=keywords,
scrape_comments=True
)
posts = result.get('posts', [])
comments = result.get('comments', [])
logger.info(
f"✅ Reddit scraping completed: "
f"{len(posts)} posts, {len(comments)} comments collected"
)
return {
'collected_count': len(posts) + len(comments),
'status': 'success',
'metadata': {
'source': 'reddit',
'match_id': match_id,
'keywords': keywords,
'posts_count': len(posts),
'comments_count': len(comments)
}
}
except Exception as e:
logger.error(f"❌ Reddit scraping failed: {e}")
return {
'collected_count': 0,
'status': 'error',
'error': str(e)
}
def create_scraping_worker(
twitter_bearer_token: str,
reddit_client_id: str,
reddit_client_secret: str
) -> ScrapingWorker:
"""
Factory function to create a scraping worker.
Args:
twitter_bearer_token: Twitter API bearer token
reddit_client_id: Reddit API client ID
reddit_client_secret: Reddit API client secret
Returns:
Configured ScrapingWorker instance
"""
return ScrapingWorker(
twitter_bearer_token=twitter_bearer_token,
reddit_client_id=reddit_client_id,
reddit_client_secret=reddit_client_secret
)