Skip to content

Commit e355a4e

Browse files
Refractor using redis streams
1 parent 828f018 commit e355a4e

File tree

7 files changed

+185
-170
lines changed

7 files changed

+185
-170
lines changed

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 53 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,49 @@
1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

1414
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516

1617
/**
18+
* A Redis connection.
19+
*
1720
* @author Antoine Bluchet <soyuka@gmail.com>
21+
* @author Alexander Schranz <alexander@sulu.io>
22+
*
23+
* @final
24+
*
25+
* @experimental in 4.3
1826
*/
1927
class Connection
2028
{
21-
const PROCESSING_QUEUE_SUFFIX = '_processing';
22-
const DEFAULT_CONNECTION_CREDENTIALS = array('host' => '127.0.0.1', 'port' => 6379);
23-
const DEFAULT_REDIS_OPTIONS = array('serializer' => \Redis::SERIALIZER_PHP, 'processing_ttl' => 10000, 'blocking_timeout' => 1000);
24-
25-
/**
26-
* @var \Redis
27-
*/
2829
private $connection;
30+
private $stream;
31+
private $group;
32+
private $consumer;
33+
private $blockingTimeout;
2934

30-
/**
31-
* @var string
32-
*/
33-
private $queue;
34-
35-
public function __construct(string $queue, array $connectionCredentials = self::DEFAULT_CONNECTION_CREDENTIALS, array $redisOptions = self::DEFAULT_REDIS_OPTIONS)
35+
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [])
3636
{
3737
$this->connection = new \Redis();
3838
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
3939
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
40-
// We force this because we rely on the fact that redis doesn't timeout with bRPopLPush
41-
$this->connection->setOption(\Redis::OPT_READ_TIMEOUT, -1);
42-
$this->queue = $queue;
43-
$this->processingTtl = $redisOptions['processing_ttl'] ?? self::DEFAULT_REDIS_OPTIONS['processing_ttl'];
44-
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? self::DEFAULT_REDIS_OPTIONS['blocking_timeout'];
40+
$this->stream = $configuration['stream'] ?? 'messages';
41+
$this->group = $configuration['group'] ?? 'symfony';
42+
$this->consumer = $configuration['consumer'] ?? 'consumer';
43+
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
4544
}
4645

47-
public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_REDIS_OPTIONS): self
46+
public static function fromDsn(string $dsn, array $redisOptions = []): self
4847
{
4948
if (false === $parsedUrl = parse_url($dsn)) {
5049
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
5150
}
5251

53-
$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : $redisOptions['queue'] ?? 'messages';
52+
$pathParts = explode('/', $parsedUrl['path']);
53+
54+
$stream = $pathParts[1] ?? '';
55+
$group = $pathParts[2] ?? '';
56+
$consumer = $pathParts[3] ?? '';
57+
5458
$connectionCredentials = array(
5559
'host' => $parsedUrl['host'] ?? '127.0.0.1',
5660
'port' => $parsedUrl['port'] ?? 6379,
@@ -61,96 +65,53 @@ public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_
6165
$redisOptions = array_replace_recursive($redisOptions, $parsedQuery);
6266
}
6367

64-
return new self($queue, $connectionCredentials, $redisOptions);
68+
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions);
6569
}
6670

67-
/**
68-
* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
69-
* Also sets a key with TTL that will be checked by the `doCheck` method.
70-
*/
71-
public function waitAndGet(): ?array
71+
public function get(): iterable
7272
{
73-
$this->doCheck();
74-
$value = $this->connection->bRPopLPush($this->queue, $this->queue.self::PROCESSING_QUEUE_SUFFIX, $this->blockingTimeout);
73+
$messages = $this->connection->xreadgroup(
74+
$this->group,
75+
$this->consumer,
76+
[$this->stream => '>'],
77+
1,
78+
$this->blockingTimeout
79+
);
7580

76-
// false in case of timeout
77-
if (false === $value) {
78-
return null;
81+
if (false === $messages) {
82+
throw new LogicException(
83+
$this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
84+
);
7985
}
8086

81-
$key = md5($value['body']);
82-
$this->connection->set($key, 1, array('px' => $this->processingTtl));
83-
84-
return $value;
85-
}
87+
foreach ($messages[$this->stream] as $key => $message) {
88+
$redisEnvelope = \json_decode($message, true);
8689

87-
/**
88-
* Acknowledge the message:
89-
* 1. Remove the ttl key
90-
* 2. LREM the message from the processing list.
91-
*/
92-
public function ack($message)
93-
{
94-
$key = md5($message['body']);
95-
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
96-
$this->connection->multi()
97-
->lRem($processingQueue, $message)
98-
->del($key)
99-
->exec();
90+
yield [
91+
'id' => $key,
92+
'body' => $redisEnvelope['headers'],
93+
'headers' => $redisEnvelope['headers'],
94+
];
95+
}
10096
}
10197

102-
/**
103-
* Reject the message: we acknowledge it, means we remove it form the queues.
104-
*
105-
* @TODO: log something?
106-
*/
107-
public function reject($message)
98+
public function ack(string $id): bool
10899
{
109-
$this->ack($message);
100+
$this->connection->xack($this->stream, $this->group, [$id]);
110101
}
111102

112-
/**
113-
* Requeue - add it back to the queue
114-
* All we have to do is to make our key expire and let the `doCheck` system manage it.
115-
*/
116-
public function requeue($message)
103+
public function reject(string $id): bool
117104
{
118-
$key = md5($message['body']);
119-
$this->connection->expire($key, -1);
105+
$this->connection->xdel($this->stream, [$id]);
120106
}
121107

122-
/**
123-
* Add item at the tail of list.
124-
*/
125-
public function add($message)
108+
public function add(array $message, int $delay)
126109
{
127-
$this->connection->lpush($this->queue, $message);
110+
$this->connection->xadd($this->stream, '*', ['content' => json_encode($message)]);
128111
}
129112

130-
/**
131-
* The check:
132-
* 1. Get the processing queue items
133-
* 2. Check if the TTL is over
134-
* 3. If it is, rpush back the message to the origin queue.
135-
*/
136-
private function doCheck()
113+
public function setup(): void
137114
{
138-
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
139-
$pending = $this->connection->lRange($processingQueue, 0, -1);
140-
141-
foreach ($pending as $temp) {
142-
$key = md5($temp['body']);
143-
144-
if ($this->connection->ttl($key) > 0) {
145-
continue;
146-
}
147-
148-
$this->connection
149-
->multi()
150-
->del($key)
151-
->lRem($processingQueue, $temp, 1)
152-
->rPush($this->queue, $temp)
153-
->exec();
154-
}
115+
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
155116
}
156117
}

src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\RedisExt;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
/**
17+
* @author Alexander Schranz <alexander@sulu.io>
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class RedisReceivedStamp implements StampInterface
22+
{
23+
private $id;
24+
25+
public function __construct(string $id)
26+
{
27+
$this->id = $id;
28+
}
29+
30+
public function getId(): string
31+
{
32+
return $this->id;
33+
}
34+
}

src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,61 +11,78 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\LogicException;
16+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
18+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1619
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1720

1821
/**
1922
* @author Antoine Bluchet <soyuka@gmail.com>
23+
* @author Alexander Schranz <alexander@sulu.io>
2024
*/
2125
class RedisReceiver implements ReceiverInterface
2226
{
2327
private $connection;
2428
private $serializer;
25-
private $shouldStop = false;
2629

27-
public function __construct(Connection $connection, SerializerInterface $serializer)
30+
public function __construct(Connection $connection, SerializerInterface $serializer = null)
2831
{
2932
$this->connection = $connection;
30-
$this->serializer = $serializer;
33+
$this->serializer = $serializer ?? new PhpSerializer();
3134
}
3235

3336
/**
3437
* {@inheritdoc}
3538
*/
36-
public function receive(callable $handler): void
39+
public function get(): iterable
3740
{
38-
while (!$this->shouldStop) {
39-
if (null === $message = $this->connection->waitAndGet()) {
40-
$handler(null);
41-
if (\function_exists('pcntl_signal_dispatch')) {
42-
pcntl_signal_dispatch();
43-
}
41+
$redisEnvelope = $this->connection->get();
4442

45-
continue;
46-
}
47-
48-
try {
49-
$handler($this->serializer->decode($message));
50-
$this->connection->ack($message);
51-
} catch (RejectMessageExceptionInterface $e) {
52-
$this->connection->reject($message);
43+
if (null === $redisEnvelope) {
44+
return [];
45+
}
5346

54-
throw $e;
55-
} catch (\Throwable $e) {
56-
$this->connection->requeue($message);
47+
try {
48+
$envelope = $this->serializer->decode([
49+
'body' => $redisEnvelope['body'],
50+
'headers' => $redisEnvelope['headers'],
51+
]);
52+
} catch (MessageDecodingFailedException $exception) {
53+
$this->connection->reject($redisEnvelope['id']);
5754

58-
throw $e;
59-
} finally {
60-
if (\function_exists('pcntl_signal_dispatch')) {
61-
pcntl_signal_dispatch();
62-
}
63-
}
55+
throw $exception;
6456
}
57+
58+
yield $envelope->with(new RedisReceivedStamp($redisEnvelope['id']));
59+
}
60+
61+
/**
62+
* {@inheritdoc}
63+
*/
64+
public function ack(Envelope $envelope): void
65+
{
66+
$this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
6567
}
6668

67-
public function stop(): void
69+
/**
70+
* {@inheritdoc}
71+
*/
72+
public function reject(Envelope $envelope): void
6873
{
69-
$this->shouldStop = true;
74+
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
75+
}
76+
77+
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
78+
{
79+
/** @var RedisReceivedStamp|null $redisReceivedStamp */
80+
$redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
81+
82+
if (null === $redisReceivedStamp) {
83+
throw new LogicException('No RedisReceivedStamp found on the Envelope.');
84+
}
85+
86+
return $redisReceivedStamp;
7087
}
7188
}

0 commit comments

Comments
 (0)