Skip to content

WIP: [Messenger] Infinite Loop Receiver #28547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\Transport\Enhancers;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Tests\Fixtures\CallbackReceiver;
use Symfony\Component\Messenger\Transport\Enhancers\InfiniteLoopReceiver;

class InfiniteLoopReceiverTest extends TestCase
{
public function testReceiverReceivesUntilStopIsCalled()
{
$i = 0;
$decoratedReceiver = null;
$receiver = new CallbackReceiver(function ($handler) use (&$i, &$decoratedReceiver) {
$i += 1;
if ($i === 3) {
$decoratedReceiver->stop();
}
});

$decoratedReceiver = new InfiniteLoopReceiver($receiver);
$decoratedReceiver->receive(function() {});
$this->assertEquals(3, $i);
}

public function testReceiverDelegatesStopToInnerReceiver()
{
$decoratedReceiver = null;
$receiver = new class($decoratedReceiver) implements ReceiverInterface {
public $wasStopped = 0;
private $decoratedReceiver;

public function __construct(&$decoratedReceiver) {
$this->decoratedReceiver = &$decoratedReceiver;
}

public function receive(callable $handle): void {
$this->decoratedReceiver->stop();
}
public function stop(): void {
$this->wasStopped += 1;
}
};
$decoratedReceiver = new InfiniteLoopReceiver($receiver);
$decoratedReceiver->receive(function() {});
$this->assertEquals(1, $receiver->wasStopped);
}

/**
* @expectedException Exception
* @expectedExceptionMessage test
*/
public function testReceiverRethrowsAnyExceptions()
{
$decoratedReceiver = null;
$receiver = new CallbackReceiver(function ($handler) {
throw new \Exception('test');
});

$decoratedReceiver = new InfiniteLoopReceiver($receiver);
$decoratedReceiver->receive(function() {});
}
}
48 changes: 19 additions & 29 deletions src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class AmqpReceiver implements ReceiverInterface
{
private $serializer;
private $connection;
private $shouldStop;

public function __construct(Connection $connection, SerializerInterface $serializer = null)
{
Expand All @@ -38,44 +37,35 @@ public function __construct(Connection $connection, SerializerInterface $seriali
*/
public function receive(callable $handler): void
{
while (!$this->shouldStop) {
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
$handler(null);
$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
$handler(null);

usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);

continue;
}
return;
}

try {
$handler($this->serializer->decode(array(
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
)));
try {
$handler($this->serializer->decode(array(
'body' => $AMQPEnvelope->getBody(),
'headers' => $AMQPEnvelope->getHeaders(),
)));

$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($AMQPEnvelope);
$this->connection->ack($AMQPEnvelope);
} catch (RejectMessageExceptionInterface $e) {
$this->connection->reject($AMQPEnvelope);

throw $e;
} catch (\Throwable $e) {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
throw $e;
} catch (\Throwable $e) {
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);

throw $e;
} finally {
if (\function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}
throw $e;
}
}

public function stop(): void
{
$this->shouldStop = true;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Enhancers\InfiniteLoopReceiver;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand Down Expand Up @@ -58,7 +59,7 @@ public function send(Envelope $envelope): void

private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->connection, $this->serializer);
return $this->receiver = new InfiniteLoopReceiver(new AmqpReceiver($this->connection, $this->serializer));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only issue I can see here: it's more painful to create the receiver and it's very much error-prone for whoever is using these objects directly. Can't the AmqpReceiver use the InfiniteLoopReceiver instead? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sroze I'm not sure, the only way for Amqp to utilize this would be if we provided a utility method like:

function performOnInfiniteLoop(callable $shouldStop, callable $handler) {
  while (!$shouldStop()) {
     $handler();
  }
}

Also, are we 100% sure it's a bc break? because it's an internal class that's not really exposed to the outside since the TransportFactory is responsible for creating the actual receiver.

}

private function getSender()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Symfony\Component\Messenger\Transport\Enhancers;

use Symfony\Component\Messenger\Transport\ReceiverInterface;

/**
* @author RJ Garcia <rj@bighead.net>
*/
class InfiniteLoopReceiver implements ReceiverInterface
{
private $receiver;
private $shouldStop;

public function __construct(ReceiverInterface $receiver)
{
$this->receiver = $receiver;
$this->shouldStop = false;
}

public function receive(callable $handler): void
{
while (!$this->shouldStop) {
try {
$this->receiver->receive($handler);
} catch (\Throwable $t) {}

if (\function_exists('pcntl_signal_dispatch')) {
\pcntl_signal_dispatch();
}

if ($t ?? null) {
throw $t;
}
}
}

public function stop(): void
{
$this->shouldStop = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you should call $this->receiver->stop() as well here.

$this->receiver->stop();
}
}