Skip to content

[Messenger] Only subscribe to a given bus from the MessageSubscriber #28275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
$handlerBuses = (array) ($tag['bus'] ?? $busIds);

foreach ($handles as $messageClass => $method) {
$buses = $handlerBuses;
if (\is_int($messageClass)) {
$messageClass = $method;
$method = '__invoke';
Expand All @@ -110,8 +111,30 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
}

if (\is_array($method)) {
$messagePriority = $method[1];
$method = $method[0];
if (isset($method[0]) && isset($method[1])) {
$messagePriority = $method[1];
$method = $method[0];
} elseif (isset($method['method']) || isset($method['bus'])) {
if (isset($method['bus'])) {
if (!\in_array($method['bus'], $busIds)) {
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);

throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageClassLocation, $messageClass, $method['bus']));
}

$buses = array($method['bus']);
}

if (isset($method['priority'])) {
$messagePriority = $method['priority'];
}

$method = $method['method'] ?? '__invoke';
} else {
$messageClassLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : $r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);

throw new RuntimeException(sprintf('Invalid configuration %s for message "%s".', $messageClassLocation, $messageClass));
}
}

if (!\class_exists($messageClass)) {
Expand All @@ -132,7 +155,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
$definitionId = $serviceId;
}

foreach ($handlerBuses as $handlerBus) {
foreach ($buses as $handlerBus) {
$handlersByBusAndMessage[$handlerBus][$messageClass][$messagePriority][] = $definitionId;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,48 @@ public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
$this->assertEquals(array(new Reference(HandlerWithGenerators::class), 'secondMessage'), $container->getDefinition($secondReference)->getArgument(0));
}

public function testItRegistersHandlersOnDifferentBuses()
{
$container = $this->getContainerBuilder($eventsBusId = 'event_bus');
$container->register($commandsBusId = 'command_bus', MessageBusInterface::class)->addTag('messenger.bus')->setArgument(0, array());

$container
->register(HandlerOnSpecificBuses::class, HandlerOnSpecificBuses::class)
->addTag('messenger.message_handler')
;

(new MessengerPass())->process($container);

$eventsHandlerLocatorDefinition = $container->getDefinition($container->getDefinition($eventsBusId.'.messenger.handler_resolver')->getArgument(0));
$eventsHandlerMapping = $eventsHandlerLocatorDefinition->getArgument(0);

$this->assertEquals(array('handler.'.DummyMessage::class), array_keys($eventsHandlerMapping));
$firstReference = $eventsHandlerMapping['handler.'.DummyMessage::class]->getValues()[0];
$this->assertEquals(array(new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForEvents'), $container->getDefinition($firstReference)->getArgument(0));

$commandsHandlerLocatorDefinition = $container->getDefinition($container->getDefinition($commandsBusId.'.messenger.handler_resolver')->getArgument(0));
$commandsHandlerMapping = $commandsHandlerLocatorDefinition->getArgument(0);

$this->assertEquals(array('handler.'.DummyMessage::class), array_keys($commandsHandlerMapping));
$firstReference = $commandsHandlerMapping['handler.'.DummyMessage::class]->getValues()[0];
$this->assertEquals(array(new Reference(HandlerOnSpecificBuses::class), 'dummyMethodForCommands'), $container->getDefinition($firstReference)->getArgument(0));
}

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid configuration returned by method "Symfony\Component\Messenger\Tests\DependencyInjection\HandlerOnUndefinedBus::getHandledMessages()" for message "Symfony\Component\Messenger\Tests\Fixtures\DummyMessage": bus "some_undefined_bus" does not exist.
*/
public function testItThrowsAnExceptionOnUnknownBus()
{
$container = $this->getContainerBuilder();
$container
->register(HandlerOnUndefinedBus::class, HandlerOnUndefinedBus::class)
->addTag('messenger.message_handler')
;

(new MessengerPass())->process($container);
}

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\RuntimeException
* @expectedExceptionMessage Invalid sender "app.messenger.sender": class "Symfony\Component\Messenger\Tests\DependencyInjection\InvalidSender" must implement interface "Symfony\Component\Messenger\Transport\SenderInterface".
Expand Down Expand Up @@ -747,6 +789,35 @@ public function secondMessage()
}
}

class HandlerOnSpecificBuses implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
yield DummyMessage::class => array('method' => 'dummyMethodForEvents', 'bus' => 'event_bus');
yield DummyMessage::class => array('method' => 'dummyMethodForCommands', 'bus' => 'command_bus');
}

public function dummyMethodForEvents()
{
}

public function dummyMethodForCommands()
{
}
}

class HandlerOnUndefinedBus implements MessageSubscriberInterface
{
public static function getHandledMessages(): iterable
{
yield DummyMessage::class => array('method' => 'dummyMethodForSomeBus', 'bus' => 'some_undefined_bus');
}

public function dummyMethodForSomeBus()
{
}
}

class UselessMiddleware implements MiddlewareInterface
{
public function handle($message, callable $next)
Expand Down