Skip to content

Commit 7327919

Browse files
committed
feature #38468 Messenger multiple failed transports (monteiro)
This PR was squashed before being merged into the 5.3-dev branch. Discussion ---------- Messenger multiple failed transports | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tickets | Fix #34911 | License | MIT | Doc PR | symfony/symfony-docs#13489 <!-- symfony/symfony-docs#13489 Replace this notice by a short README for your feature/bugfix. This will help people understand your PR and can be used as a start for the documentation. Additionally (see https://symfony.com/roadmap): - Always add tests and ensure they pass. - Never break backward compatibility (see https://symfony.com/bc). - Bug fixes must be submitted against the lowest maintained branch where they apply (lowest branches are regularly merged to upper ones so they get the fixes too.) - Features and deprecations must be submitted against branch master. --> ## Strategy applied - Pass a map of transports and failed transports to the `SendFailedMessageToFailureTransportListener`. This way we re-use the same listener. - Local failed transport has more priority than a global failed transport defined. ## Configuration example ```yaml # config/packages/messenger.yaml framework: # no need to set failed transport globally if I want a specific behaviour per transport. failure_transport: failed # all transports have this failed transport messenger: transports: failed: 'doctrine://default?queue_name=failed' failed_important: 'doctrine://default?queue_name=failed_important' async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' failure_transport: failed # takes precedence over the global defined "failed_transport" retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 async_no_failure_transport: # it will use the global defined transport if no one is defined. dsn: '%env(MESSENGER_TRANSPORT_DSN)%' retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 async_send_specific_failure_queue: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' failed_transport: failed_important # takes precedence over the global defined "failed_transport" retry_strategy: max_retries: 3 delay: 1000 multiplier: 2 ``` You can test this feature easily on a [demo project](https://github.com/monteiro/messenger-multiple-failed-transports-pr34979). Just follow the [README](https://github.com/monteiro/messenger-multiple-failed-transports-pr34979). **More information on issue #34911** ## What needs to be done so this can be merged: - [x] validate strategy - [ ] update src/**/CHANGELOG.md files - [x] update tests to cover all cases - [x] create doc PR Commits ------- 5810b6c Messenger multiple failed transports
2 parents d7b6805 + 5810b6c commit 7327919

24 files changed

+1498
-71
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,10 @@ function ($a) {
13131313
->prototype('variable')
13141314
->end()
13151315
->end()
1316+
->scalarNode('failure_transport')
1317+
->defaultNull()
1318+
->info('Transport name to send failed messages to (after all retries have failed).')
1319+
->end()
13161320
->arrayNode('retry_strategy')
13171321
->addDefaultsIfNotSet()
13181322
->beforeNormalization()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1916,15 +1916,38 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19161916
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
19171917
}
19181918

1919+
$failureTransports = [];
1920+
if ($config['failure_transport']) {
1921+
if (!isset($config['transports'][$config['failure_transport']])) {
1922+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1923+
}
1924+
1925+
$container->setAlias('messenger.failure_transports.default', 'messenger.transport.'.$config['failure_transport']);
1926+
$failureTransports[] = $config['failure_transport'];
1927+
}
1928+
1929+
$failureTransportsByName = [];
1930+
foreach ($config['transports'] as $name => $transport) {
1931+
if ($transport['failure_transport']) {
1932+
$failureTransports[] = $transport['failure_transport'];
1933+
$failureTransportsByName[$name] = $transport['failure_transport'];
1934+
} elseif ($config['failure_transport']) {
1935+
$failureTransportsByName[$name] = $config['failure_transport'];
1936+
}
1937+
}
1938+
19191939
$senderAliases = [];
19201940
$transportRetryReferences = [];
19211941
foreach ($config['transports'] as $name => $transport) {
19221942
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
1923-
19241943
$transportDefinition = (new Definition(TransportInterface::class))
19251944
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
19261945
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1927-
->addTag('messenger.receiver', ['alias' => $name])
1946+
->addTag('messenger.receiver', [
1947+
'alias' => $name,
1948+
'is_failure_transport' => \in_array($name, $failureTransports),
1949+
]
1950+
)
19281951
;
19291952
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
19301953
$senderAliases[$name] = $transportId;
@@ -1955,6 +1978,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19551978
$senderReferences[$serviceId] = new Reference($serviceId);
19561979
}
19571980

1981+
foreach ($config['transports'] as $name => $transport) {
1982+
if ($transport['failure_transport']) {
1983+
if (!isset($senderReferences[$transport['failure_transport']])) {
1984+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
1985+
}
1986+
}
1987+
}
1988+
1989+
$failureTransportReferencesByTransportName = array_map(function ($failureTransportName) use ($senderReferences) {
1990+
return $senderReferences[$failureTransportName];
1991+
}, $failureTransportsByName);
1992+
19581993
$messageToSendersMapping = [];
19591994
foreach ($config['routing'] as $message => $messageConfiguration) {
19601995
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -1985,19 +2020,17 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19852020
$container->getDefinition('messenger.retry_strategy_locator')
19862021
->replaceArgument(0, $transportRetryReferences);
19872022

1988-
if ($config['failure_transport']) {
1989-
if (!isset($senderReferences[$config['failure_transport']])) {
1990-
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1991-
}
1992-
1993-
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1994-
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
2023+
if (\count($failureTransports) > 0) {
19952024
$container->getDefinition('console.command.messenger_failed_messages_retry')
19962025
->replaceArgument(0, $config['failure_transport']);
19972026
$container->getDefinition('console.command.messenger_failed_messages_show')
19982027
->replaceArgument(0, $config['failure_transport']);
19992028
$container->getDefinition('console.command.messenger_failed_messages_remove')
20002029
->replaceArgument(0, $config['failure_transport']);
2030+
2031+
$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportReferencesByTransportName);
2032+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
2033+
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
20012034
} else {
20022035
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
20032036
$container->removeDefinition('console.command.messenger_failed_messages_retry');

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@
165165

166166
->set('console.command.messenger_failed_messages_retry', FailedMessagesRetryCommand::class)
167167
->args([
168-
abstract_arg('Receiver name'),
169-
abstract_arg('Receiver'),
168+
abstract_arg('Default failure receiver name'),
169+
abstract_arg('Receivers'),
170170
service('messenger.routable_message_bus'),
171171
service('event_dispatcher'),
172172
service('logger'),
@@ -175,15 +175,15 @@
175175

176176
->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
177177
->args([
178-
abstract_arg('Receiver name'),
179-
abstract_arg('Receiver'),
178+
abstract_arg('Default failure receiver name'),
179+
abstract_arg('Receivers'),
180180
])
181181
->tag('console.command')
182182

183183
->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
184184
->args([
185-
abstract_arg('Receiver name'),
186-
abstract_arg('Receiver'),
185+
abstract_arg('Default failure receiver name'),
186+
abstract_arg('Receivers'),
187187
])
188188
->tag('console.command')
189189

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172

173173
->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
174174
->args([
175-
abstract_arg('failure transport'),
175+
abstract_arg('failure transports'),
176176
service('logger')->ignoreOnInvalid(),
177177
])
178178
->tag('kernel.event_subscriber')

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@
491491
<xsd:attribute name="name" type="xsd:string" />
492492
<xsd:attribute name="serializer" type="xsd:string" />
493493
<xsd:attribute name="dsn" type="xsd:string" />
494+
<xsd:attribute name="failure-transport" type="xsd:string" />
494495
</xsd:complexType>
495496

496497
<xsd:complexType name="messenger_retry_strategy">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'transports' => [
6+
'transport_1' => [
7+
'dsn' => 'null://',
8+
'failure_transport' => 'failure_transport_1'
9+
],
10+
'transport_2' => 'null://',
11+
'transport_3' => [
12+
'dsn' => 'null://',
13+
'failure_transport' => 'failure_transport_3'
14+
],
15+
'failure_transport_1' => 'null://',
16+
'failure_transport_3' => 'null://'
17+
],
18+
],
19+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'failure_transport' => 'failure_transport_global',
6+
'transports' => [
7+
'transport_1' => [
8+
'dsn' => 'null://',
9+
'failure_transport' => 'failure_transport_1'
10+
],
11+
'transport_2' => 'null://',
12+
'transport_3' => [
13+
'dsn' => 'null://',
14+
'failure_transport' => 'failure_transport_3'
15+
],
16+
'failure_transport_global' => 'null://',
17+
'failure_transport_1' => 'null://',
18+
'failure_transport_3' => 'null://',
19+
],
20+
],
21+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_1" dsn="null://" />
14+
<framework:transport name="failure_transport_3" dsn="null://" />
15+
</framework:messenger>
16+
</framework:config>
17+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger failure-transport="failure_transport_global">
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_global" dsn="null://" />
14+
<framework:transport name="failure_transport_1" dsn="null://" />
15+
<framework:transport name="failure_transport_3" dsn="null://" />
16+
</framework:messenger>
17+
</framework:config>
18+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
framework:
2+
messenger:
3+
transports:
4+
transport_1:
5+
dsn: 'null://'
6+
failure_transport: failure_transport_1
7+
transport_2: 'null://'
8+
transport_3:
9+
dsn: 'null://'
10+
failure_transport: failure_transport_3
11+
failure_transport_1: 'null://'
12+
failure_transport_3: 'null://'

0 commit comments

Comments
 (0)