Skip to content

[Messenger] SendMessageToTransportsEvent is not dispatched #32375

@aaitimov

Description

@aaitimov

Symfony version(s) affected: 4.3.2

Description

Hi.

So I was trying to use messenger events at work and it never fired SendMessageToTransportsEvent. So I went home and decided to create an empty symfony project to test it (maybe there is a problem with my config, idk). However, it also does not appear to work in a test project.

How to reproduce

Event subscriber class is below. It simply logs when an event occurs.

<?php

namespace App\EventSubscriber;

use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;

class TestSubscriber implements EventSubscriberInterface
{
    /** LoggerInterface $logger */
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function onSendToTransport(SendMessageToTransportsEvent $event)
    {
        $this->logger->debug('Message dispatched');
    }

    public function onFailed(WorkerMessageFailedEvent $event)
    {
        $this->logger->debug('Message failed');

        if ($event->willRetry()) {
            $this->logger->debug('Message will be retried');
        } else {
            $this->logger->debug('Message will not be retried');
        }
    }

    public function onReceived(WorkerMessageReceivedEvent $event)
    {
        $this->logger->debug('Received message');
     }

    public function onHandled(WorkerMessageHandledEvent $event)
    {
        $this->logger->debug('Task handled');
    }

    public static function getSubscribedEvents()
    {
        return [
            SendMessageToTransportsEvent::class => 'onSendToTransport',
            WorkerMessageReceivedEvent::class => 'onReceived',
            WorkerMessageFailedEvent::class => 'onFailed',
            WorkerMessageHandledEvent::class => 'onHandled'
        ];
    }
}

Message:

<?php

namespace App\Message;

class TestMessage implements AsyncMessageInterface
{
}

Dispatching with controller:

<?php

namespace App\Controller;

use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Routing\Annotation\Route;
use App\Message\TestMessage;

class TestController extends AbstractController
{
    /**
     * @Route("/test", name="test")
     */
    public function index()
    {
        $this->dispatchMessage(new TestMessage());

        return $this->json([
            'message' => 'Welcome to your new controller!',
            'path' => 'src/Controller/TestController.php',
        ]);
    }
}

Message handler:

<?php

namespace App\MessageHandler;

use App\Message\TestMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Psr\Log\LoggerInterface;

class TestMessageHandler implements MessageHandlerInterface
{
    /** @var LoggerInterface $logger */
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke(TestMessage $message)
    {
        sleep(5);
    }
}

Log output is below:

[2019-07-04 16:19:12] request.INFO: Matched route "test". {"route":"test","route_parameters":{"_route":"test","_controller":"App\\Controller\\TestController::index"},"request_uri":"http://localhost:8000/test","method":"GET"} []
[2019-07-04 16:19:12] messenger.INFO: Sending message App\Message\TestMessage with Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport {"message":"[object] (App\\Message\\TestMessage: {})","class":"App\\Message\\TestMessage","sender":"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpTransport"} []
[2019-07-04 16:19:12] app.DEBUG: Received message [] []
[2019-07-04 16:19:12] messenger.INFO: Received message App\Message\TestMessage {"message":"[object] (App\\Message\\TestMessage: {})","class":"App\\Message\\TestMessage"} []
[2019-07-04 16:19:17] messenger.INFO: Message App\Message\TestMessage handled by App\MessageHandler\TestMessageHandler::__invoke {"message":"[object] (App\\Message\\TestMessage: {})","class":"App\\Message\\TestMessage","handler":"App\\MessageHandler\\TestMessageHandler::__invoke"} []
[2019-07-04 16:19:17] app.DEBUG: Task handled [] []
[2019-07-04 16:19:17] messenger.INFO: App\Message\TestMessage was handled successfully (acknowledging to transport). {"message":"[object] (App\\Message\\TestMessage: {})","class":"App\\Message\\TestMessage"} []

Messenger config:

framework:
    messenger:
        # Uncomment this (and the failed transport below) to send failed messages to this transport for later handling.
        # failure_transport: failed

        transports:
            # https://symfony.com/doc/current/messenger.html#transport-configuration
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
            # failed: 'doctrine://default?queue_name=failed'
            # sync: 'sync://'

        routing:
            # Route your messages to the transports
            'App\Message\AsyncMessageInterface': async

Messages are dispatched to RabbitMQ:

MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions