Skip to content

[Messenger] Refactoring failure to FailedMessage & allowing for requeue #31397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -74,6 +74,7 @@
use Symfony\Component\Lock\Store\StoreFactory;
use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Mailer\Mailer;
use Symfony\Component\Messenger\Failure\FailedMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
Expand Down Expand Up @@ -286,9 +287,6 @@ public function load(array $configs, ContainerBuilder $container)
$container->removeDefinition('console.command.messenger_debug');
$container->removeDefinition('console.command.messenger_stop_workers');
$container->removeDefinition('console.command.messenger_setup_transports');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}

$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
Expand Down Expand Up @@ -1745,19 +1743,41 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$messageToSendersMapping = [];
$messagesToSendAndHandle = [];
if ($config['failure_transport']) {
$failureTransport = $config['failure_transport'];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line could be in the if :)


$loader->load('messenger_failure.xml');
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $failureTransport)
->replaceArgument(4, $transportRetryReferences[$failureTransport] ?? null);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $failureTransport);
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $failureTransport);

// push routing for FailedMessage to the failure transport
if (!isset($messageToSendersMapping[FailedMessage::class])) {
$messageToSendersMapping[FailedMessage::class] = [];
}
$messageToSendersMapping[FailedMessage::class][] = $failureTransport;

// in case this is a tagged sender, make sure it's in the aliases
if (!isset($senderAliases[$failureTransport])) {
$senderAliases[$failureTransport] = $failureTransport;
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section was mostly just moved from below. The $messagesToSendersMapping is the new part: it adds routing for the FailedMessage class to whatever the failure transport is.


foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
$senders = [];

// make sure senderAliases contains all senders
foreach ($messageConfiguration['senders'] as $sender) {
if (!isset($senderAliases[$sender])) {
$senderAliases[$sender] = $sender;
}
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
}

$messageToSendersMapping[$message] = $messageConfiguration['senders'];
$messageToSendersMapping[$message] = new IteratorArgument($senders);
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}

Expand All @@ -1768,29 +1788,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
->replaceArgument(2, $messagesToSendAndHandle)
->replaceArgument(1, $messagesToSendAndHandle)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the logic from $senders = [] is a revert of #30970

;

$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);

if ($config['failure_transport']) {
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(1, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport'])
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
} else {
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
}
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
25 changes: 0 additions & 25 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,31 +114,6 @@
<tag name="console.command" command="messenger:stop-workers" />
</service>

<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="event_dispatcher" />
<argument /> <!-- Retry strategy -->
<argument type="service" id="logger" />

<tag name="console.command" command="messenger:failed:retry" />
</service>

<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:show" />
</service>

<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:remove" />
</service>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into a new file so we can conditionally import that file (instead importing, then removing if the failure transport is disabled).

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

<!-- Asynchronous -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message senders map -->
<argument /> <!-- senders locator -->
<argument type="collection" /> <!-- Per message sender iterators -->
<argument type="collection" /> <!-- Messages to send and handle -->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert of #30970

</service>
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
Expand Down Expand Up @@ -93,15 +92,6 @@
<argument /> <!-- max delay ms -->
</service>

<!-- failed handling -->
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" />
<argument /> <!-- Failure transport name -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<!-- routable message bus -->
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
<argument /> <!-- Message bus locator -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd">

<services>
<defaults public="false" />

<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\Failure\SendFailedMessageToFailureTransportListener">
<tag name="kernel.event_subscriber" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" />
<argument /> <!-- Failure transport name -->
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="messenger.failure.failed_message_handler" class="Symfony\Component\Messenger\Failure\FailedMessageHandler">
<tag name="messenger.message_handler" handles="Symfony\Component\Messenger\Failure\FailedMessage" method="__invoke" />
<tag name="monolog.logger" channel="messenger" />
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="logger" on-invalid="ignore" />
</service>

<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->
<argument type="service" id="messenger.routable_message_bus" />
<argument type="service" id="event_dispatcher" />
<argument /> <!-- Retry strategy -->
<argument type="service" id="logger" />

<tag name="console.command" command="messenger:failed:retry" />
</service>

<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:show" />
</service>

<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
<argument /> <!-- Receiver name -->
<argument /> <!-- Receiver locator -->

<tag name="console.command" command="messenger:failed:remove" />
</service>
</services>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -721,16 +721,12 @@ public function testMessengerRouting()
'*' => false,
];

$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
$sendersMapping = $senderLocatorDefinition->getArgument(0);
$this->assertEquals([
'amqp',
'audit',
], $sendersMapping[DummyMessage::class]);
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
'amqp' => new Reference('messenger.transport.amqp'),
'audit' => new Reference('audit'),
], $sendersMapping[DummyMessage::class]->getValues());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert of #30970

}

public function testMessengerTransportConfiguration()
Expand Down
2 changes: 0 additions & 2 deletions src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ CHANGELOG
4.3.0
-----

* [BC BREAK] `SendersLocatorInterface` has an additional method:
`getSenderByAlias()`.
* A new `ListableReceiverInterface` was added, which a receiver
can implement (when applicable) to enable listing and fetching
individual messages by id (used in the new "Failed Messages" commands).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
use Symfony\Component\Console\Helper\Dumper;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
use Symfony\Component\Messenger\Failure\FailedMessage;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
Expand Down Expand Up @@ -59,27 +60,27 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
{
$io->title('Failed Message Details');

/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
$message = $envelope->getMessage();
if (!$message instanceof FailedMessage) {
$io->warning('Message does not appear to have been sent to this transport after failing');

return;
}

$rows = [
['Class', \get_class($envelope->getMessage())],
['Class', \get_class($message->getFailedEnvelope()->getMessage())],
];

if (null !== $id = $this->getMessageId($envelope)) {
$rows[] = ['Message Id', $id];
}

if (null === $sentToFailureTransportStamp) {
$io->warning('Message does not appear to have been sent to this transport after failing');
} else {
$rows = array_merge($rows, [
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
]);
}
$rows = array_merge($rows, [
['Failed at', $message->getFailedAt()->format('Y-m-d H:i:s')],
['Error', $message->getExceptionMessage()],
['Error Class', $message->getFlattenException() ? $message->getFlattenException()->getClass() : '(unknown)'],
['Transport', $this->getOriginalTransportName($message->getFailedEnvelope())],
]);

$io->table([], $rows);

Expand All @@ -88,7 +89,7 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
$dump = new Dumper($io);
$io->writeln($dump($envelope->getMessage()));
$io->title('Exception:');
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
$io->writeln($message->getFlattenException()->getTraceAsString());
} else {
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
}
Expand All @@ -109,4 +110,12 @@ protected function getReceiver(): ReceiverInterface
{
return $this->receiver;
}

private function getOriginalTransportName(Envelope $envelope): ?string
{
/** @var ReceivedStamp $receivedStamp */
$receivedStamp = $envelope->last(ReceivedStamp::class);

return null === $receivedStamp ? null : $receivedStamp->getTransportName();
}
}
Loading