Skip to content

Commit 652c020

Browse files
[Messenger] fix Redis support on 32b arch
1 parent 6d688f6 commit 652c020

File tree

3 files changed

+59
-14
lines changed

3 files changed

+59
-14
lines changed

.appveyor.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ install:
2121
- cd ext
2222
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_apcu-5.1.18-7.1-ts-vc14-x86.zip
2323
- 7z x php_apcu-5.1.18-7.1-ts-vc14-x86.zip -y >nul
24+
- appveyor DownloadFile https://github.com/symfony/binary-utils/releases/download/v0.1/php_redis-5.1.1-7.1-ts-vc14-x86.zip
25+
- 7z x php_redis-5.1.1-7.1-ts-vc14-x86.zip -y >nul
2426
- cd ..
2527
- copy /Y php.ini-development php.ini-min
2628
- echo memory_limit=-1 >> php.ini-min
@@ -36,6 +38,7 @@ install:
3638
- echo opcache.enable_cli=1 >> php.ini-max
3739
- echo extension=php_openssl.dll >> php.ini-max
3840
- echo extension=php_apcu.dll >> php.ini-max
41+
- echo extension=php_redis.dll >> php.ini-max
3942
- echo apc.enable_cli=1 >> php.ini-max
4043
- echo extension=php_intl.dll >> php.ini-max
4144
- echo extension=php_mbstring.dll >> php.ini-max
@@ -54,6 +57,7 @@ install:
5457
- SET COMPOSER_ROOT_VERSION=%SYMFONY_VERSION%.x-dev
5558
- php composer.phar update --no-progress --ansi
5659
- php phpunit install
60+
- choco install memurai-developer
5761

5862
test_script:
5963
- SET X=0

src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ public function testGetAfterReject()
220220
{
221221
$redis = new \Redis();
222222
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
223+
$connection->cleanup();
223224

224225
$connection->add('1', []);
225226
$connection->add('2', []);
@@ -230,20 +231,38 @@ public function testGetAfterReject()
230231
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
231232
$this->assertNotNull($connection->get());
232233

233-
$redis->del('messenger-rejectthenget');
234+
$connection->cleanup();
234235
}
235236

236237
public function testGetNonBlocking()
237238
{
238239
$redis = new \Redis();
239240

240241
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
242+
$connection->cleanup();
241243

242244
$this->assertNull($connection->get()); // no message, should return null immediately
243245
$connection->add('1', []);
244246
$this->assertNotEmpty($message = $connection->get());
245247
$connection->reject($message['id']);
246-
$redis->del('messenger-getnonblocking');
248+
249+
$connection->cleanup();
250+
}
251+
252+
public function testGetDelayed()
253+
{
254+
$redis = new \Redis();
255+
256+
$connection = Connection::fromDsn('redis://localhost/messenger-delayed', [], $redis);
257+
$connection->cleanup();
258+
259+
$connection->add('1', [], 100);
260+
$this->assertNull($connection->get());
261+
usleep(300000);
262+
$this->assertNotEmpty($message = $connection->get());
263+
$connection->reject($message['id']);
264+
265+
$connection->cleanup();
247266
}
248267

249268
public function testJsonError()

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -141,30 +141,47 @@ public function get(): ?array
141141
if ($this->autoSetup) {
142142
$this->setup();
143143
}
144+
$now = microtime();
145+
$now = substr($now, 11).substr($now, 2, 3);
144146

145147
try {
146-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
148+
$queuedMessageCount = $this->connection->rawCommand('ZCOUNT', $this->queue, 0, $now);
147149
} catch (\RedisException $e) {
148150
throw new TransportException($e->getMessage(), 0, $e);
149151
}
150152

151153
if ($queuedMessageCount) {
152154
for ($i = 0; $i < $queuedMessageCount; ++$i) {
153155
try {
154-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
156+
$queuedMessages = $this->connection->rawCommand('ZPOPMIN', $this->queue, 1) ?: [];
155157
} catch (\RedisException $e) {
156158
throw new TransportException($e->getMessage(), 0, $e);
157159
}
158160

159-
foreach ($queuedMessages as $queuedMessage => $time) {
161+
$i = \count($queuedMessages);
162+
while (2 <= $i) {
163+
$expiry = $queuedMessages[--$i];
164+
$queuedMessage = $queuedMessages[--$i];
165+
166+
if (\strlen($expiry) === \strlen($now) ? $expiry > $now : \strlen($expiry) < \strlen($now)) {
167+
if (!$this->connection->rawCommand('ZADD', $this->queue, 'NX', $expiry, $queuedMessage)) {
168+
if ($error = $this->connection->getLastError() ?: null) {
169+
$this->connection->clearLastError();
170+
}
171+
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
172+
}
173+
174+
continue;
175+
}
176+
160177
$queuedMessage = json_decode($queuedMessage, true);
161178
// if a futured placed message is actually popped because of a race condition with
162179
// another running message consumer, the message is readded to the queue by add function
163180
// else its just added stream and will be available for all stream consumers
164181
$this->add(
165182
$queuedMessage['body'],
166183
$queuedMessage['headers'],
167-
$time - $this->getCurrentTimeInMilliseconds()
184+
0
168185
);
169186
}
170187
}
@@ -255,7 +272,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255272
}
256273

257274
try {
258-
if ($delayInMs > 0) { // the delay could be smaller 0 in a queued message
275+
if ($delayInMs > 0) { // the delay is <= 0 for queued messages
259276
$message = json_encode([
260277
'body' => $body,
261278
'headers' => $headers,
@@ -267,8 +284,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267284
throw new TransportException(json_last_error_msg());
268285
}
269286

270-
$score = $this->getCurrentTimeInMilliseconds() + $delayInMs;
271-
$added = $this->connection->zadd($this->queue, ['NX'], $score, $message);
287+
$now = explode(' ', microtime(), 2);
288+
$now[0] = str_pad($delayInMs + substr($now[0], 2, 3), 3, '0', \STR_PAD_LEFT);
289+
if (3 < \strlen($now[0])) {
290+
$now[1] += substr($now[0], 0, -3);
291+
$now[0] = substr($now[0], -3);
292+
293+
if (\is_float($now[1])) {
294+
throw new TransportException("Message delay is too big: {$delayInMs}ms.");
295+
}
296+
}
297+
298+
$added = $this->connection->rawCommand('ZADD', $this->queue, 'NX', $now[1].$now[0], $message);
272299
} else {
273300
$message = json_encode([
274301
'body' => $body,
@@ -316,11 +343,6 @@ public function setup(): void
316343
$this->autoSetup = false;
317344
}
318345

319-
private function getCurrentTimeInMilliseconds(): int
320-
{
321-
return (int) (microtime(true) * 1000);
322-
}
323-
324346
public function cleanup(): void
325347
{
326348
$this->connection->del($this->stream);

0 commit comments

Comments
 (0)