@@ -141,30 +141,47 @@ public function get(): ?array
141
141
if ($ this ->autoSetup ) {
142
142
$ this ->setup ();
143
143
}
144
+ $ now = microtime ();
145
+ $ now = substr ($ now , 11 ).substr ($ now , 2 , 3 );
144
146
145
147
try {
146
- $ queuedMessageCount = $ this ->connection ->zcount ( $ this ->queue , 0 , $ this -> getCurrentTimeInMilliseconds () );
148
+ $ queuedMessageCount = $ this ->connection ->rawCommand ( ' ZCOUNT ' , $ this ->queue , 0 , $ now );
147
149
} catch (\RedisException $ e ) {
148
150
throw new TransportException ($ e ->getMessage (), 0 , $ e );
149
151
}
150
152
151
153
if ($ queuedMessageCount ) {
152
154
for ($ i = 0 ; $ i < $ queuedMessageCount ; ++$ i ) {
153
155
try {
154
- $ queuedMessages = $ this ->connection ->zpopmin ( $ this ->queue , 1 );
156
+ $ queuedMessages = $ this ->connection ->rawCommand ( ' ZPOPMIN ' , $ this ->queue , 1 ) ?: [] ;
155
157
} catch (\RedisException $ e ) {
156
158
throw new TransportException ($ e ->getMessage (), 0 , $ e );
157
159
}
158
160
159
- foreach ($ queuedMessages as $ queuedMessage => $ time ) {
161
+ $ i = \count ($ queuedMessages );
162
+ while (2 <= $ i ) {
163
+ $ expiry = $ queuedMessages [--$ i ];
164
+ $ queuedMessage = $ queuedMessages [--$ i ];
165
+
166
+ if (\strlen ($ expiry ) === \strlen ($ now ) ? $ expiry > $ now : \strlen ($ expiry ) < \strlen ($ now )) {
167
+ if (!$ this ->connection ->rawCommand ('ZADD ' , $ this ->queue , 'NX ' , $ expiry , $ queuedMessage )) {
168
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
169
+ $ this ->connection ->clearLastError ();
170
+ }
171
+ throw new TransportException ($ error ?? 'Could not add a message to the redis stream. ' );
172
+ }
173
+
174
+ continue ;
175
+ }
176
+
160
177
$ queuedMessage = json_decode ($ queuedMessage , true );
161
178
// if a futured placed message is actually popped because of a race condition with
162
179
// another running message consumer, the message is readded to the queue by add function
163
180
// else its just added stream and will be available for all stream consumers
164
181
$ this ->add (
165
182
$ queuedMessage ['body ' ],
166
183
$ queuedMessage ['headers ' ],
167
- $ time - $ this -> getCurrentTimeInMilliseconds ()
184
+ 0
168
185
);
169
186
}
170
187
}
@@ -255,7 +272,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255
272
}
256
273
257
274
try {
258
- if ($ delayInMs > 0 ) { // the delay could be smaller 0 in a queued message
275
+ if ($ delayInMs > 0 ) { // the delay is <= 0 for queued messages
259
276
$ message = json_encode ([
260
277
'body ' => $ body ,
261
278
'headers ' => $ headers ,
@@ -267,8 +284,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267
284
throw new TransportException (json_last_error_msg ());
268
285
}
269
286
270
- $ score = $ this ->getCurrentTimeInMilliseconds () + $ delayInMs ;
271
- $ added = $ this ->connection ->zadd ($ this ->queue , ['NX ' ], $ score , $ message );
287
+ $ now = explode (' ' , microtime (), 2 );
288
+ $ now [0 ] = str_pad ($ delayInMs + substr ($ now [0 ], 2 , 3 ), 3 , '0 ' , \STR_PAD_LEFT );
289
+ if (3 < \strlen ($ now [0 ])) {
290
+ $ now [1 ] += substr ($ now [0 ], 0 , -3 );
291
+ $ now [0 ] = substr ($ now [0 ], -3 );
292
+
293
+ if (\is_float ($ now [1 ])) {
294
+ throw new TransportException ("Message delay is too big: {$ delayInMs }ms. " );
295
+ }
296
+ }
297
+
298
+ $ added = $ this ->connection ->rawCommand ('ZADD ' , $ this ->queue , 'NX ' , $ now [1 ].$ now [0 ], $ message );
272
299
} else {
273
300
$ message = json_encode ([
274
301
'body ' => $ body ,
@@ -316,11 +343,6 @@ public function setup(): void
316
343
$ this ->autoSetup = false ;
317
344
}
318
345
319
- private function getCurrentTimeInMilliseconds (): int
320
- {
321
- return (int ) (microtime (true ) * 1000 );
322
- }
323
-
324
346
public function cleanup (): void
325
347
{
326
348
$ this ->connection ->del ($ this ->stream );
0 commit comments