Skip to content

Consumer Integration test #35038

@vladanzivanovic

Description

@vladanzivanovic

Symfony version(s) affected: 4.4.1

Description
After upgrading messenger components to version 4.4.1, my integration test is failed.
After digging into the code I found that this listener StopWorkerOnMessageLimitListener makes me trouble. In the test I'm starting two consumers commands, with two different transports. For the first transport I'm setting limit to 1 and for the second is 3.
The problem is, when consumer start consuming from second transport EventDispatcher has already added StopWorkerOnMessageLimitListener from first transport. First transport has message limit 1 and that breaks second transport

How to reproduce

This is first transport:

        self::bootKernel();
        $this->loadFixtures();
        $receiver = 'marketplace_core_entity_account_membership';

        $command = self::$container->get('console.command.messenger_consume_messages');
        $output = new BufferedOutput();
        $command->run(new StringInput($receiver . ' --bus=app.kafka_consumer_bus --limit=1'), $output);

This is second transport

        self::bootKernel();
        $this->loadFixtures();
        $receiver = 'marketplace_core_entity_account_membership';

        $command = self::$container->get('console.command.messenger_consume_messages');
        $output = new BufferedOutput();
        $command->run(new StringInput($receiver . ' --bus=app.kafka_consumer_bus --limit=3'), $output);

Possible Solution

Additional context
When start for the second transport $maximumNumberOfMessages in StopWorkerOnMessageLimitListener -> onWorkerRunning first is 1

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions