244 lines
7.1 KiB
Python
244 lines
7.1 KiB
Python
"""
|
|
Scraping worker module.
|
|
|
|
This module provides a worker that consumes scraping tasks
|
|
from RabbitMQ and executes scraping operations.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, List
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.scrapers.twitter_scraper import TwitterScraper, create_twitter_scraper
|
|
from app.scrapers.reddit_scraper import RedditScraper, create_reddit_scraper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ScrapingWorker:
|
|
"""
|
|
Worker for processing scraping tasks.
|
|
|
|
Features:
|
|
- Consumes tasks from scraping_tasks queue
|
|
- Executes Twitter and Reddit scraping
|
|
- Publishes results to results queue
|
|
- Handles errors with retries
|
|
- Structured logging
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
twitter_bearer_token: str,
|
|
reddit_client_id: str,
|
|
reddit_client_secret: str
|
|
):
|
|
"""
|
|
Initialize scraping worker.
|
|
|
|
Args:
|
|
twitter_bearer_token: Twitter API bearer token
|
|
reddit_client_id: Reddit API client ID
|
|
reddit_client_secret: Reddit API client secret
|
|
"""
|
|
self.twitter_bearer_token = twitter_bearer_token
|
|
self.reddit_client_id = reddit_client_id
|
|
self.reddit_client_secret = reddit_client_secret
|
|
|
|
# Initialize scrapers (lazy initialization)
|
|
self.twitter_scraper: TwitterScraper = None
|
|
self.reddit_scraper: RedditScraper = None
|
|
|
|
def _get_twitter_scraper(self) -> TwitterScraper:
|
|
"""Get or create Twitter scraper instance."""
|
|
if self.twitter_scraper is None:
|
|
self.twitter_scraper = create_twitter_scraper(
|
|
bearer_token=self.twitter_bearer_token,
|
|
vip_match_ids=[]
|
|
)
|
|
return self.twitter_scraper
|
|
|
|
def _get_reddit_scraper(self) -> RedditScraper:
|
|
"""Get or create Reddit scraper instance."""
|
|
if self.reddit_scraper is None:
|
|
self.reddit_scraper = create_reddit_scraper(
|
|
client_id=self.reddit_client_id,
|
|
client_secret=self.reddit_client_secret
|
|
)
|
|
return self.reddit_scraper
|
|
|
|
def execute_scraping_task(
|
|
self,
|
|
task: Dict,
|
|
db: Session
|
|
) -> Dict:
|
|
"""
|
|
Execute a scraping task.
|
|
|
|
Args:
|
|
task: Scraping task data
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with scraping results
|
|
"""
|
|
source = task.get('source')
|
|
match_id = task.get('match_id')
|
|
keywords = task.get('keywords', [])
|
|
priority = task.get('priority', 'normal')
|
|
|
|
logger.info(
|
|
f"🔧 Executing scraping task: match_id={match_id}, "
|
|
f"source={source}, priority={priority}"
|
|
)
|
|
|
|
try:
|
|
if source == 'twitter':
|
|
return self._execute_twitter_scraping(match_id, keywords, db)
|
|
elif source == 'reddit':
|
|
return self._execute_reddit_scraping(match_id, keywords, db)
|
|
else:
|
|
logger.error(f"❌ Unknown scraping source: {source}")
|
|
return {
|
|
'collected_count': 0,
|
|
'status': 'error',
|
|
'error': f'Unknown source: {source}'
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Scraping task failed: {e}")
|
|
return {
|
|
'collected_count': 0,
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _execute_twitter_scraping(
|
|
self,
|
|
match_id: int,
|
|
keywords: List[str],
|
|
db: Session
|
|
) -> Dict:
|
|
"""
|
|
Execute Twitter scraping.
|
|
|
|
Args:
|
|
match_id: Match identifier
|
|
keywords: Search keywords
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with scraping results
|
|
"""
|
|
try:
|
|
scraper = self._get_twitter_scraper()
|
|
|
|
# Scrape and save tweets
|
|
tweets = scraper.scrape_and_save(
|
|
match_id=match_id,
|
|
keywords=keywords,
|
|
db=db,
|
|
max_results=100
|
|
)
|
|
|
|
logger.info(
|
|
f"✅ Twitter scraping completed: {len(tweets)} tweets collected"
|
|
)
|
|
|
|
return {
|
|
'collected_count': len(tweets),
|
|
'status': 'success',
|
|
'metadata': {
|
|
'source': 'twitter',
|
|
'match_id': match_id,
|
|
'keywords': keywords
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Twitter scraping failed: {e}")
|
|
return {
|
|
'collected_count': 0,
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
def _execute_reddit_scraping(
|
|
self,
|
|
match_id: int,
|
|
keywords: List[str],
|
|
db: Session
|
|
) -> Dict:
|
|
"""
|
|
Execute Reddit scraping.
|
|
|
|
Args:
|
|
match_id: Match identifier
|
|
keywords: Search keywords
|
|
db: Database session
|
|
|
|
Returns:
|
|
Dictionary with scraping results
|
|
"""
|
|
try:
|
|
scraper = self._get_reddit_scraper()
|
|
|
|
# Scrape and save Reddit posts
|
|
result = scraper.scrape_and_save(
|
|
match_id=match_id,
|
|
db=db,
|
|
keywords=keywords,
|
|
scrape_comments=True
|
|
)
|
|
|
|
posts = result.get('posts', [])
|
|
comments = result.get('comments', [])
|
|
|
|
logger.info(
|
|
f"✅ Reddit scraping completed: "
|
|
f"{len(posts)} posts, {len(comments)} comments collected"
|
|
)
|
|
|
|
return {
|
|
'collected_count': len(posts) + len(comments),
|
|
'status': 'success',
|
|
'metadata': {
|
|
'source': 'reddit',
|
|
'match_id': match_id,
|
|
'keywords': keywords,
|
|
'posts_count': len(posts),
|
|
'comments_count': len(comments)
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Reddit scraping failed: {e}")
|
|
return {
|
|
'collected_count': 0,
|
|
'status': 'error',
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
def create_scraping_worker(
|
|
twitter_bearer_token: str,
|
|
reddit_client_id: str,
|
|
reddit_client_secret: str
|
|
) -> ScrapingWorker:
|
|
"""
|
|
Factory function to create a scraping worker.
|
|
|
|
Args:
|
|
twitter_bearer_token: Twitter API bearer token
|
|
reddit_client_id: Reddit API client ID
|
|
reddit_client_secret: Reddit API client secret
|
|
|
|
Returns:
|
|
Configured ScrapingWorker instance
|
|
"""
|
|
return ScrapingWorker(
|
|
twitter_bearer_token=twitter_bearer_token,
|
|
reddit_client_id=reddit_client_id,
|
|
reddit_client_secret=reddit_client_secret
|
|
)
|