chartbastan/backend/workers/run_scraping_worker.py
2026-02-01 09:31:38 +01:00

96 lines
3.0 KiB
Python

"""
Scraping worker script.
This script runs the scraping worker that consumes tasks
from the scraping_tasks queue and processes them.
"""
import logging
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
sys_path = str(Path(__file__).parent.parent)
if sys_path not in sys.path:
sys.path.insert(0, sys_path)
from app.queues.rabbitmq_client import create_rabbitmq_client
from app.queues.consumers import consume_scraping_tasks
from app.workers.scraping_worker import create_scraping_worker
from app.database import get_db
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def main():
"""Main entry point for scraping worker."""
# Load configuration from environment variables
rabbitmq_url = os.getenv(
'RABBITMQ_URL',
'amqp://guest:guest@localhost:5672'
)
twitter_bearer_token = os.getenv('TWITTER_BEARER_TOKEN', '')
reddit_client_id = os.getenv('REDDIT_CLIENT_ID', '')
reddit_client_secret = os.getenv('REDDIT_CLIENT_SECRET', '')
# Validate configuration
if not twitter_bearer_token:
logger.warning("⚠️ TWITTER_BEARER_TOKEN not set, Twitter scraping disabled")
if not reddit_client_id or not reddit_client_secret:
logger.warning("⚠️ Reddit credentials not set, Reddit scraping disabled")
if not twitter_bearer_token and not reddit_client_id:
logger.error("❌ No scraper credentials configured")
sys.exit(1)
# Create RabbitMQ client
try:
logger.info("🔗 Connecting to RabbitMQ...")
client = create_rabbitmq_client(rabbitmq_url=rabbitmq_url)
client.connect()
logger.info("✅ Connected to RabbitMQ")
except Exception as e:
logger.error(f"❌ Failed to connect to RabbitMQ: {e}")
sys.exit(1)
# Create scraping worker
worker = create_scraping_worker(
twitter_bearer_token=twitter_bearer_token,
reddit_client_id=reddit_client_id,
reddit_client_secret=reddit_client_secret
)
# Define task callback
def task_callback(task_data, db):
"""Callback to process scraping tasks."""
return worker.execute_scraping_task(task_data, db)
# Start consuming tasks
try:
logger.info("🚀 Scraping worker started")
logger.info("📥 Waiting for tasks from scraping_tasks queue...")
consume_scraping_tasks(
client=client,
callback=task_callback,
db_session_factory=get_db
)
except KeyboardInterrupt:
logger.info("⏹️ Scraping worker stopped by user")
except Exception as e:
logger.error(f"❌ Scraping worker error: {e}")
finally:
logger.info("🔌 Closing RabbitMQ connection...")
client.close()
logger.info("👋 Scraping worker shutdown complete")
if __name__ == '__main__':
main()