Save France
Backend

Symfony Messenger

Configuration et utilisation du bus de messages asynchrones (Symfony Messenger)

Introduction

Symfony Messenger est utilisé pour découpler les traitements longs ou fragiles du cycle de vie d'une requête HTTP. Le cas d'usage principal est la création de session de paiement Stripe après la signature d'un contrat YouSign.

Architecture

flowchart LR
    A[Controller\nWebhook] -->|dispatch| B[(messenger_messages\nPostgreSQL)]
    B -->|consume| C[Worker\nContainer Docker]
    C -->|traitement| D[Handler\nCreateStripePaymentHandler]
    D -->|succès| E[Stripe API]
    D -->|échec x3| F[(failed queue)]
    
    style B fill:#336791,color:#fff
    style C fill:#2196F3,color:#fff
    style F fill:#FF5722,color:#fff

Configuration

Transport

Le transport utilise Doctrine (PostgreSQL existant), sans infrastructure supplémentaire (pas de Redis ni RabbitMQ).

Fichier : config/packages/messenger.yaml

framework:
    messenger:
        failure_transport: failed

        transports:
            async:
                dsn: 'doctrine://default'
                options:
                    queue_name: default
                retry_strategy:
                    max_retries: 3
                    delay: 1000      # 1 seconde
                    multiplier: 2    # exponentiel : 1s, 2s, 4s
                    max_delay: 0
            failed: 'doctrine://default?queue_name=failed'

        routing:
            'App\Message\CreateStripePaymentMessage': async

Table en base de données

La migration crée automatiquement la table messenger_messages lors du déploiement :

docker compose exec api php bin/console doctrine:migrations:migrate

Worker

Démarrage en développement

docker compose up -d worker

Le service worker est défini dans compose.yml et utilise le même build que l'API.

Configuration du service

# compose.yml
worker:
  build:
    context: api
    target: base
    dockerfile: Dockerfile
  command: "php bin/console messenger:consume --limit=20 --time-limit=3600 --memory-limit=256M --failure-limit=1 -vv async"
  depends_on:
    api:
      condition: service_healthy
  restart: unless-stopped

Options de la commande :

OptionValeurRôle
--limit20Redémarre le process après 20 messages (évite les fuites mémoire)
--time-limit3600Redémarre après 1h maximum
--memory-limit256MRedémarre si la mémoire dépasse 256 Mo
--failure-limit1Stoppe dès le premier échec non récupérable

Combiné à restart: unless-stopped (dev) ou restart_policy: condition: any (prod), le worker redémarre automatiquement.

Messages

CreateStripePaymentMessage

Fichier : src/Message/CreateStripePaymentMessage.php

Transporté après chaque signature de contrat YouSign réussie.

final readonly class CreateStripePaymentMessage
{
    public function __construct(
        public string $requestId,
    ) {}
}

CreateStripePaymentHandler

Fichier : src/MessageHandler/CreateStripePaymentHandler.php

Crée la session de paiement Stripe correspondant au type d'offre (ONE_SHOT ou SUBSCRIPTION).

Logique d'idempotence : si $request->getPaymentId() !== null, le handler skip silencieusement (protection contre le double traitement).

#[AsMessageHandler]
final class CreateStripePaymentHandler
{
    public function __invoke(CreateStripePaymentMessage $message): void
    {
        $request = $this->requestRepository->find($message->requestId);

        if ($request === null || $request->getPaymentId() !== null) {
            return; // Request absente ou déjà traitée
        }

        // Création Stripe selon le type de paiement...
        // Si Stripe lance une exception → Messenger réessaie automatiquement
    }
}

Monitoring

Voir les messages en file d'attente

# Nombre de messages en attente (transport async)
docker compose exec api php bin/console messenger:stats

# Logs du worker en temps réel
docker compose logs -f worker

Gérer les messages en échec

# Lister les messages en échec
docker compose exec api php bin/console messenger:failed:show

# Voir le détail d'un message
docker compose exec api php bin/console messenger:failed:show {id} --show-messages

# Rejouer un message en échec
docker compose exec api php bin/console messenger:failed:retry {id}

# Rejouer tous les messages en échec
docker compose exec api php bin/console messenger:failed:retry --all

Requête SQL directe

-- Messages en attente
SELECT queue_name, COUNT(*) as total
FROM messenger_messages
WHERE delivered_at IS NULL
GROUP BY queue_name;

-- Messages en échec
SELECT id, body, error, failed_at
FROM messenger_messages
WHERE queue_name = 'failed'
ORDER BY failed_at DESC;

Workflow complet (post-signature)

sequenceDiagram
    participant YS as YouSign
    participant C as ContractSignatureController
    participant DB as PostgreSQL
    participant W as Worker
    participant S as Stripe API

    YS->>C: Webhook POST (contrat signé)
    C->>C: Vérification HMAC
    C->>YS: Téléchargement PDF
    C->>DB: Contrat=SIGNED, Request=WAITING_PAYMENT
    C->>DB: INSERT messenger_messages
    C-->>YS: 200 OK (immédiat)

    W->>DB: SELECT message (polling)
    W->>S: Création session paiement
    alt Succès
        S-->>W: Session (url, id)
        W->>DB: UPDATE paymentUrl, paymentId
    else Échec Stripe (retry 1/3)
        W->>DB: UPDATE available_at (+1s)
    else Échec Stripe (retry 2/3)
        W->>DB: UPDATE available_at (+2s)
    else Échec Stripe (retry 3/3)
        W->>DB: MOVE TO failed queue
    end

Procédures d'urgence

Le worker est arrêté (messages en attente)

# Vérifier l'état du worker
docker compose ps worker

# Redémarrer le worker
docker compose restart worker

# Consommer manuellement les messages en attente
docker compose exec api php bin/console messenger:consume async --limit=50

Un paiement Stripe n'a pas été créé

  1. Vérifier si un message est en échec :
    docker compose exec api php bin/console messenger:failed:show
    
  2. Corriger la cause (ex. clé API Stripe invalide, quota dépassé)
  3. Rejouer le message :
    docker compose exec api php bin/console messenger:failed:retry --all
    
  4. Vérifier en base que payment_id et payment_url sont maintenant renseignés :
    SELECT id, status, payment_id, payment_url
    FROM request
    WHERE status = 'WAITING_PAYMENT'
    AND payment_id IS NULL;
    

Liens utiles