Skip to content

[Messenger] Move RejectRedeliveredMessageMiddleware to AMQP Package #41749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions UPGRADE-5.4.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
UPGRADE FROM 5.3 to 5.4
=======================

Messenger
---------
* Deprecate `Middleware\RejectRedeliveredMessageMiddleware`. Install `symfony/amqp-messenger` and use same class from there.

=======
Cache
-----

Expand Down
2 changes: 1 addition & 1 deletion UPGRADE-6.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ Mailer

Messenger
---------

* Removed `Middleware\RejectRedeliveredMessageMiddleware`. Use same class from `symfony/amqp-messenger`.
* Removed AmqpExt transport. Run `composer require symfony/amqp-messenger` to keep the transport in your application.
* Removed Doctrine transport. Run `composer require symfony/doctrine-messenger` to keep the transport in your application.
* Removed RedisExt transport. Run `composer require symfony/redis-messenger` to keep the transport in your application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
use Symfony\Component\Messenger\Bridge\Amqp\Middleware\RejectRedeliveredMessageMiddleware;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
Expand All @@ -27,7 +28,6 @@
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware;
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
Expand All @@ -44,8 +44,8 @@
use Symfony\Component\Messenger\Transport\TransportFactory;

return static function (ContainerConfigurator $container) {
$container->services()
->alias(SerializerInterface::class, 'messenger.default_serializer')
$services = $container->services();
$services->alias(SerializerInterface::class, 'messenger.default_serializer')

// Asynchronous
->set('messenger.senders_locator', SendersLocator::class)
Expand Down Expand Up @@ -91,11 +91,13 @@
->set('messenger.middleware.validation', ValidationMiddleware::class)
->args([
service('validator'),
])
]);

->set('messenger.middleware.reject_redelivered_message_middleware', RejectRedeliveredMessageMiddleware::class)
if (class_exists(RejectRedeliveredMessageMiddleware::class)) {
$services->set('messenger.middleware.reject_redelivered_message_middleware', RejectRedeliveredMessageMiddleware::class);
}

->set('messenger.middleware.failed_message_processing_middleware', FailedMessageProcessingMiddleware::class)
$services->set('messenger.middleware.failed_message_processing_middleware', FailedMessageProcessingMiddleware::class)

->set('messenger.middleware.traceable', TraceableMiddleware::class)
->abstract()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Amqp\Middleware;

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp as LegacyAmqpReceivedStamp;

/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @author Tobias Schultze <http://tobion.de>
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}

// Legacy code to support symfony/messenger < 5.1
$amqpReceivedStamp = $envelope->last(LegacyAmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof LegacyAmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}

return $stack->next()->handle($envelope, $stack);
}
}
2 changes: 1 addition & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CHANGELOG

5.4
---

* Deprecate `Middleware\RejectRedeliveredMessageMiddleware`. Install `symfony/amqp-messenger` and use same class from there.
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.

5.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,9 @@

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp as LegacyAmqpReceivedStamp;

/**
* Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP.
*
* The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly.
* The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy.
*
* AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out
* or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the
* redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent
* infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry
* limit and potential delay.
*
* @author Tobias Schultze <http://tobion.de>
* @deprecated since Symfony 5.4, to be removed in 6.0. Use Symfony\Component\Messenger\Bridge\Amqp\Middleware\RejectRedeliveredMessageMiddleware instead.
*/
class RejectRedeliveredMessageMiddleware implements MiddlewareInterface
class RejectRedeliveredMessageMiddleware extends \Symfony\Component\Messenger\Bridge\Amqp\Middleware\RejectRedeliveredMessageMiddleware
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}

// Legacy code to support symfony/messenger < 5.1
$amqpReceivedStamp = $envelope->last(LegacyAmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof LegacyAmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) {
throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.');
}

return $stack->next()->handle($envelope, $stack);
}
}