-
-
Notifications
You must be signed in to change notification settings - Fork 9.7k
[Messenger] Refactoring failure to FailedMessage & allowing for requeue #31397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,9 +40,9 @@ | |
use Symfony\Component\Console\Application; | ||
use Symfony\Component\Console\Command\Command; | ||
use Symfony\Component\DependencyInjection\Alias; | ||
use Symfony\Component\DependencyInjection\Argument\IteratorArgument; | ||
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument; | ||
use Symfony\Component\DependencyInjection\ChildDefinition; | ||
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass; | ||
use Symfony\Component\DependencyInjection\ContainerBuilder; | ||
use Symfony\Component\DependencyInjection\ContainerInterface; | ||
use Symfony\Component\DependencyInjection\Definition; | ||
|
@@ -74,6 +74,7 @@ | |
use Symfony\Component\Lock\Store\StoreFactory; | ||
use Symfony\Component\Lock\StoreInterface; | ||
use Symfony\Component\Mailer\Mailer; | ||
use Symfony\Component\Messenger\Failure\FailedMessage; | ||
use Symfony\Component\Messenger\Handler\MessageHandlerInterface; | ||
use Symfony\Component\Messenger\MessageBus; | ||
use Symfony\Component\Messenger\MessageBusInterface; | ||
|
@@ -286,9 +287,6 @@ public function load(array $configs, ContainerBuilder $container) | |
$container->removeDefinition('console.command.messenger_debug'); | ||
$container->removeDefinition('console.command.messenger_stop_workers'); | ||
$container->removeDefinition('console.command.messenger_setup_transports'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_retry'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_show'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_remove'); | ||
} | ||
|
||
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']); | ||
|
@@ -1745,19 +1743,41 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder | |
|
||
$messageToSendersMapping = []; | ||
$messagesToSendAndHandle = []; | ||
if ($config['failure_transport']) { | ||
$failureTransport = $config['failure_transport']; | ||
|
||
$loader->load('messenger_failure.xml'); | ||
$container->getDefinition('console.command.messenger_failed_messages_retry') | ||
->replaceArgument(0, $failureTransport) | ||
->replaceArgument(4, $transportRetryReferences[$failureTransport] ?? null); | ||
$container->getDefinition('console.command.messenger_failed_messages_show') | ||
->replaceArgument(0, $failureTransport); | ||
$container->getDefinition('console.command.messenger_failed_messages_remove') | ||
->replaceArgument(0, $failureTransport); | ||
|
||
// push routing for FailedMessage to the failure transport | ||
if (!isset($messageToSendersMapping[FailedMessage::class])) { | ||
$messageToSendersMapping[FailedMessage::class] = []; | ||
} | ||
$messageToSendersMapping[FailedMessage::class][] = $failureTransport; | ||
|
||
// in case this is a tagged sender, make sure it's in the aliases | ||
if (!isset($senderAliases[$failureTransport])) { | ||
$senderAliases[$failureTransport] = $failureTransport; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section was mostly just moved from below. The |
||
|
||
foreach ($config['routing'] as $message => $messageConfiguration) { | ||
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { | ||
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message)); | ||
} | ||
$senders = []; | ||
|
||
// make sure senderAliases contains all senders | ||
foreach ($messageConfiguration['senders'] as $sender) { | ||
if (!isset($senderAliases[$sender])) { | ||
$senderAliases[$sender] = $sender; | ||
} | ||
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender); | ||
} | ||
|
||
$messageToSendersMapping[$message] = $messageConfiguration['senders']; | ||
$messageToSendersMapping[$message] = new IteratorArgument($senders); | ||
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle']; | ||
} | ||
|
||
|
@@ -1768,29 +1788,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder | |
|
||
$container->getDefinition('messenger.senders_locator') | ||
->replaceArgument(0, $messageToSendersMapping) | ||
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences)) | ||
->replaceArgument(2, $messagesToSendAndHandle) | ||
->replaceArgument(1, $messagesToSendAndHandle) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the logic from |
||
; | ||
|
||
$container->getDefinition('messenger.retry_strategy_locator') | ||
->replaceArgument(0, $transportRetryReferences); | ||
|
||
if ($config['failure_transport']) { | ||
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') | ||
->replaceArgument(1, $config['failure_transport']); | ||
$container->getDefinition('console.command.messenger_failed_messages_retry') | ||
->replaceArgument(0, $config['failure_transport']) | ||
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null); | ||
$container->getDefinition('console.command.messenger_failed_messages_show') | ||
->replaceArgument(0, $config['failure_transport']); | ||
$container->getDefinition('console.command.messenger_failed_messages_remove') | ||
->replaceArgument(0, $config['failure_transport']); | ||
} else { | ||
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_retry'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_show'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_remove'); | ||
} | ||
} | ||
|
||
private function registerCacheConfiguration(array $config, ContainerBuilder $container) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,31 +114,6 @@ | |
<tag name="console.command" command="messenger:stop-workers" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
<argument type="service" id="messenger.routable_message_bus" /> | ||
<argument type="service" id="event_dispatcher" /> | ||
<argument /> <!-- Retry strategy --> | ||
<argument type="service" id="logger" /> | ||
|
||
<tag name="console.command" command="messenger:failed:retry" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
|
||
<tag name="console.command" command="messenger:failed:show" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
|
||
<tag name="console.command" command="messenger:failed:remove" /> | ||
</service> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved into a new file so we can conditionally import that file (instead importing, then removing if the failure transport is disabled). |
||
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand"> | ||
<argument type="service" id="router" /> | ||
<argument type="service" id="debug.file_link_formatter" on-invalid="null" /> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,7 @@ | |
|
||
<!-- Asynchronous --> | ||
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator"> | ||
<argument type="collection" /> <!-- Per message senders map --> | ||
<argument /> <!-- senders locator --> | ||
<argument type="collection" /> <!-- Per message sender iterators --> | ||
<argument type="collection" /> <!-- Messages to send and handle --> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert of #30970 |
||
</service> | ||
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware"> | ||
|
@@ -93,15 +92,6 @@ | |
<argument /> <!-- max delay ms --> | ||
</service> | ||
|
||
<!-- failed handling --> | ||
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener"> | ||
<tag name="kernel.event_subscriber" /> | ||
<tag name="monolog.logger" channel="messenger" /> | ||
<argument type="service" id="messenger.routable_message_bus" /> | ||
<argument /> <!-- Failure transport name --> | ||
<argument type="service" id="logger" on-invalid="ignore" /> | ||
</service> | ||
|
||
<!-- routable message bus --> | ||
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus"> | ||
<argument /> <!-- Message bus locator --> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
<?xml version="1.0" ?> | ||
|
||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd"> | ||
|
||
<services> | ||
<defaults public="false" /> | ||
|
||
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\Failure\SendFailedMessageToFailureTransportListener"> | ||
<tag name="kernel.event_subscriber" /> | ||
<tag name="monolog.logger" channel="messenger" /> | ||
<argument type="service" id="messenger.routable_message_bus" /> | ||
<argument /> <!-- Failure transport name --> | ||
<argument type="service" id="logger" on-invalid="ignore" /> | ||
</service> | ||
|
||
<service id="messenger.failure.failed_message_handler" class="Symfony\Component\Messenger\Failure\FailedMessageHandler"> | ||
<tag name="messenger.message_handler" handles="Symfony\Component\Messenger\Failure\FailedMessage" method="__invoke" /> | ||
<tag name="monolog.logger" channel="messenger" /> | ||
<argument type="service" id="messenger.routable_message_bus" /> | ||
<argument type="service" id="logger" on-invalid="ignore" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
<argument type="service" id="messenger.routable_message_bus" /> | ||
<argument type="service" id="event_dispatcher" /> | ||
<argument /> <!-- Retry strategy --> | ||
<argument type="service" id="logger" /> | ||
|
||
<tag name="console.command" command="messenger:failed:retry" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
|
||
<tag name="console.command" command="messenger:failed:show" /> | ||
</service> | ||
|
||
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand"> | ||
<argument /> <!-- Receiver name --> | ||
<argument /> <!-- Receiver locator --> | ||
|
||
<tag name="console.command" command="messenger:failed:remove" /> | ||
</service> | ||
</services> | ||
</container> |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -721,16 +721,12 @@ public function testMessengerRouting() | |
'*' => false, | ||
]; | ||
|
||
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2)); | ||
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1)); | ||
$sendersMapping = $senderLocatorDefinition->getArgument(0); | ||
$this->assertEquals([ | ||
'amqp', | ||
'audit', | ||
], $sendersMapping[DummyMessage::class]); | ||
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1)); | ||
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0))); | ||
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]); | ||
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]); | ||
'amqp' => new Reference('messenger.transport.amqp'), | ||
'audit' => new Reference('audit'), | ||
], $sendersMapping[DummyMessage::class]->getValues()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert of #30970 |
||
} | ||
|
||
public function testMessengerTransportConfiguration() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line could be in the
if
:)