Skip to content

Commit 6fb05dc

Browse files
committed
bug #31545 [Messenger] Fix redis Connection::get() should be non blocking by default (chalasr)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] Fix redis Connection::get() should be non blocking by default | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | n/a | License | MIT | Doc PR | todo The `\Redis::xreadgroup()` method waits until a message arrives or the specified timeout is reached before returning, which means that `RedisExt\Connection::get()` is blocking. That's inconsistent with other transports which all returns immediately in case there is no message, for instance the AMQP transport uses `\Amqp::get()` instead of `\Amqp::consume()` for this reason. It also short-circuits the worker's stop logic: both the `--time-limit` option of the `messenger:consume` command and the `messenger:stop-workers` don't work with the redis transport. This returns early in case the message count is 0 and no blocking timeout has been configured. Commits ------- 229502a [Messenger] Make redis Connection::get() non blocking by default
2 parents 2ecad3f + 229502a commit 6fb05dc

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ public function testFromDsnWithOptions()
4646
'host' => 'localhost',
4747
'port' => 6379,
4848
], [
49-
'blocking_timeout' => 30,
49+
'serializer' => 2,
5050
]),
51-
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['blocking_timeout' => 30])
51+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['serializer' => 2])
5252
);
5353
}
5454

@@ -59,9 +59,9 @@ public function testFromDsnWithQueryOptions()
5959
'host' => 'localhost',
6060
'port' => 6379,
6161
], [
62-
'blocking_timeout' => 30,
62+
'serializer' => 2,
6363
]),
64-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
64+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
6565
);
6666
}
6767

@@ -134,16 +134,20 @@ public function testGetAfterReject()
134134
$redis->del('messenger-rejectthenget');
135135
}
136136

137-
public function testBlockingTimeout()
137+
public function testGetNonBlocking()
138138
{
139139
$redis = new \Redis();
140-
$connection = Connection::fromDsn('redis://localhost/messenger-blockingtimeout', ['blocking_timeout' => 1], $redis);
140+
141+
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
141142
try {
142143
$connection->setup();
143144
} catch (TransportException $e) {
144145
}
145146

146-
$this->assertNull($connection->get());
147-
$redis->del('messenger-blockingtimeout');
147+
$this->assertNull($connection->get()); // no message, should return null immediately
148+
$connection->add('1', []);
149+
$this->assertNotEmpty($message = $connection->get());
150+
$connection->reject($message['id']);
151+
$redis->del('messenger-getnonblocking');
148152
}
149153
}

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ class Connection
3131
private $stream;
3232
private $group;
3333
private $consumer;
34-
private $blockingTimeout;
3534
private $couldHavePendingMessages = true;
3635

3736
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -42,7 +41,6 @@ public function __construct(array $configuration, array $connectionCredentials =
4241
$this->stream = $configuration['stream'] ?? '' ?: 'messages';
4342
$this->group = $configuration['group'] ?? '' ?: 'symfony';
4443
$this->consumer = $configuration['consumer'] ?? '' ?: 'consumer';
45-
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
4644
}
4745

4846
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
@@ -83,8 +81,7 @@ public function get(): ?array
8381
$this->group,
8482
$this->consumer,
8583
[$this->stream => $messageId],
86-
1,
87-
$this->blockingTimeout
84+
1
8885
);
8986
} catch (\RedisException $e) {
9087
}
@@ -142,7 +139,7 @@ public function reject(string $id): void
142139
}
143140
}
144141

145-
public function add(string $body, array $headers)
142+
public function add(string $body, array $headers): void
146143
{
147144
$e = null;
148145
try {

0 commit comments

Comments
 (0)