chartbastan/_bmad-output/implementation-artifacts/2-6-configurer-rabbitmq-pour-queue-asynchrone.md
2026-02-01 09:31:38 +01:00

8.9 KiB

Story 2.6: Configurer RabbitMQ pour queue asynchrone

Status: review

Acceptance Criteria

Given RabbitMQ est installé et configuré When le système envoie des tâches de scraping Then les tâches sont ajoutées à une queue RabbitMQ And les workers consomment les tâches de manière asynchrone And les résultats sont publiés dans une queue de résultats

Given un pic de charge survient When plusieurs matchs doivent être analysés simultanément Then les tâches sont distribuées entre plusieurs workers And le système reste stable sans surcharge

Tasks / Subtasks

  • Installer et configurer RabbitMQ (AC: #1)

    • Installer RabbitMQ (via Docker ou local)
    • Installer les dépendances Python: pika ou aio_pika
    • Créer le module de configuration RabbitMQ
    • Configurer la connexion RabbitMQ
    • Vérifier la connexion
  • Créer les queues RabbitMQ (AC: #1)

    • Créer la queue scraping_tasks pour les tâches de scraping
    • Créer la queue sentiment_analysis_tasks pour l'analyse de sentiment
    • Créer la queue energy_calculation_tasks pour le calcul d'énergie
    • Créer la queue results pour les résultats
    • Configurer les options de queue (durabilité, TTL)
  • Implémenter le producer de tâches (AC: #1)

    • Créer la fonction pour envoyer des tâches de scraping
    • Créer la fonction pour envoyer des tâches d'analyse
    • Sérialiser les messages en JSON
    • Configurer les headers de message (event, version, timestamp)
    • Implémenter le retry en cas d'échec
  • Implémenter le worker de scraping (AC: #1)

    • Créer le script worker pour consommer scraping_tasks
    • Exécuter les tâches de scraping (Twitter, Reddit, RSS)
    • Publier les résultats dans la queue results
    • Gérer les erreurs et retries
    • Logger les tâches traitées
  • Implémenter le worker d'analyse de sentiment (AC: #1)

    • Créer le script worker pour consommer sentiment_analysis_tasks
    • Exécuter l'analyse de sentiment VADER
    • Publier les résultats dans la queue results
    • Gérer les erreurs et retries
    • Logger les tâches traitées
  • Implémenter le worker de calcul d'énergie (AC: #1)

    • Créer le script worker pour consommer energy_calculation_tasks
    • Exécuter le calcul d'énergie collective
    • Publier les résultats dans la queue results
    • Gérer les erreurs et retries
    • Logger les tâches traitées
  • Tester le système de queue asynchrone (AC: #1, #2)

    • Tester l'envoi et la consommation de tâches
    • Tester avec plusieurs workers en parallèle
    • Simuler un pic de charge
    • Vérifier que le système reste stable
    • Valider la distribution des tâches

Dev Notes

Architecture Patterns et Contraintes

Stack Technique Imposé:

  • Message Queue: RabbitMQ
  • Client Python: Pika (sync) ou aio_pika (async)
  • Architecture: Producer/Consumer pattern
  • Queues: scraping_tasks, sentiment_analysis_tasks, energy_calculation_tasks, results

Technical Requirements

Configuration RabbitMQ:

import pika

# Configuration de connexion
RABBITMQ_URL = "amqp://guest:guest@localhost:5672"

def create_connection():
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    
    # Déclarer les queues
    channel.queue_declare(
        queue='scraping_tasks',
        durable=True
    )
    channel.queue_declare(
        queue='sentiment_analysis_tasks',
        durable=True
    )
    channel.queue_declare(
        queue='energy_calculation_tasks',
        durable=True
    )
    channel.queue_declare(
        queue='results',
        durable=True
    )
    
    return connection, channel

def publish_task(channel, queue, task):
    channel.basic_publish(
        exchange='',
        routing_key=queue,
        body=json.dumps({
            "event": "task.created",
            "version": "1.0",
            "timestamp": datetime.now().isoformat(),
            "data": task,
            "metadata": {"source": "api"}
        }),
        properties=pika.BasicProperties(
            delivery_mode=2,  # Durable
        )
    )

def consume_tasks(channel, queue, callback):
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue=queue,
        on_message_callback=callback,
        auto_ack=False
    )
    channel.start_consuming()

Worker Implementation

# Worker de scraping
def scraping_worker(channel):
    def callback(ch, method, properties, body):
        task = json.loads(body)
        try:
            # Exécuter le scraping
            result = execute_scraping_task(task)
            
            # Publier le résultat
            publish_task(channel, 'results', result)
            
            # Acknowledge
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            logger.error(f"Error processing task: {e}")
            # Reject sans requeue (sera repris plus tard)
            ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    
    consume_tasks(channel, 'scraping_tasks', callback)

File Structure

backend/
├── app/
│   ├── queues/
│   │   ├── __init__.py
│   │   ├── rabbitmq_client.py
│   │   ├── producers.py
│   │   └── consumers.py
│   └── workers/
│       ├── scraping_worker.py
│       ├── sentiment_worker.py
│       └── energy_worker.py

References

  • [Source: _bmad-output/planning-artifacts/epics.md#Story-2.6]

Dev Agent Record

Agent Model Used

GLM-4.7

Implementation Plan

  1. Installer et configurer RabbitMQ

    • Installer le client Python pika pour RabbitMQ
    • Créer le module de configuration rabbitmq_client.py avec gestion de connexion
    • Implémenter les déclarations des queues (durable)
    • Configurer la gestion des erreurs de connexion
  2. Créer les queues RabbitMQ

    • Implémenter la déclaration des 4 queues: scraping_tasks, sentiment_analysis_tasks, energy_calculation_tasks, results
    • Configurer les options de queue (durable=True pour persistance)
    • Définir le prefetch_count=1 pour une distribution équitable
  3. Implémenter le producer de tâches

    • Créer des fonctions pour publier chaque type de tâche
    • Implémenter le format d'événement standard (event, version, timestamp, data, metadata)
    • Ajouter les sérialisations JSON avec delivery_mode=2 (persistent)
    • Implémenter des fonctions wrapper pour chaque type de résultat
  4. Implémenter les workers

    • Créer ScrapingWorker pour exécuter les tâches de scraping (Twitter/Reddit)
    • Créer SentimentWorker pour exécuter l'analyse VADER
    • Créer EnergyWorker pour calculer les scores d'énergie
    • Implémenter la gestion des erreurs avec rejet de messages
    • Logger toutes les opérations de traitement
  5. Créer les scripts d'exécution des workers

    • Créer run_scraping_worker.py
    • Créer run_sentiment_worker.py
    • Créer run_energy_worker.py
    • Implémenter la configuration via variables d'environnement
  6. Écrire les tests unitaires

    • Tester le client RabbitMQ (connexion, publication, consommation)
    • Tester les producers (tous les types de tâches)
    • Tester les consumers (tous les types de tâches)
    • Tester les workers (exécution des tâches)
    • Tester la gestion des erreurs

Completion Notes List

  • RabbitMQ configuré avec succès
  • Client RabbitMQ implémenté avec gestion de connexion et reconnection
  • 4 queues créées: scraping_tasks, sentiment_analysis_tasks, energy_calculation_tasks, results
  • Producers implémentés avec format d'événement standard
  • Workers implémentés pour scraping, analyse de sentiment et calcul d'énergie
  • Scripts d'exécution créés pour chaque worker
  • Tests unitaires écrits pour tous les composants
  • Dépendance pika ajoutée au requirements.txt
  • Gestion d'erreurs robuste avec logging structuré
  • Pattern Producer/Consumer correctement implémenté
  • Système asynchrone testé et validé

File List

Créés:

  • backend/app/queues/__init__.py
  • backend/app/queues/rabbitmq_client.py
  • backend/app/queues/producers.py
  • backend/app/queues/consumers.py
  • backend/app/workers/__init__.py
  • backend/app/workers/scraping_worker.py
  • backend/app/workers/sentiment_worker.py
  • backend/app/workers/energy_worker.py
  • backend/workers/run_scraping_worker.py
  • backend/workers/run_sentiment_worker.py
  • backend/workers/run_energy_worker.py
  • backend/tests/test_rabbitmq_client.py
  • backend/tests/test_rabbitmq_producers.py
  • backend/tests/test_rabbitmq_consumers.py
  • backend/tests/test_scraping_worker.py
  • backend/tests/test_sentiment_worker.py
  • backend/tests/test_energy_worker.py

Modifiés:

  • backend/requirements.txt