@@ -260,71 +260,69 @@ public function testItRetriesTheMessage()
260
260
{
261
261
$ amqpConnection = $ this ->getMockBuilder (\AMQPConnection::class)->disableOriginalConstructor ()->getMock ();
262
262
$ amqpChannel = $ this ->getMockBuilder (\AMQPChannel::class)->disableOriginalConstructor ()->getMock ();
263
- $ retryQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
263
+ $ delayQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
264
264
265
265
$ factory = $ this ->getMockBuilder (AmqpFactory::class)->getMock ();
266
266
$ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
267
267
$ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
268
- $ factory ->method ('createQueue ' )->willReturn ($ retryQueue );
268
+ $ factory ->method ('createQueue ' )->willReturn ($ delayQueue );
269
269
$ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
270
- $ retryExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
270
+ $ delayExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
271
271
$ amqpExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock ()
272
272
));
273
273
274
274
$ amqpExchange ->expects ($ this ->once ())->method ('setName ' )->with ('messages ' );
275
275
$ amqpExchange ->method ('getName ' )->willReturn ('messages ' );
276
276
277
- $ retryExchange ->expects ($ this ->once ())->method ('setName ' )->with ('retry ' );
278
- $ retryExchange ->expects ($ this ->once ())->method ('declareExchange ' );
279
- $ retryExchange ->method ('getName ' )->willReturn ('retry ' );
277
+ $ delayExchange ->expects ($ this ->once ())->method ('setName ' )->with ('delay ' );
278
+ $ delayExchange ->expects ($ this ->once ())->method ('declareExchange ' );
279
+ $ delayExchange ->method ('getName ' )->willReturn ('delay ' );
280
280
281
- $ retryQueue ->expects ($ this ->once ())->method ('setName ' )->with ('retry_queue_1 ' );
282
- $ retryQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
283
- 'x-message-ttl ' => 10000 ,
281
+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_queue_5000 ' );
282
+ $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
283
+ 'x-message-ttl ' => 5000 ,
284
284
'x-dead-letter-exchange ' => 'messages ' ,
285
285
]);
286
286
287
- $ retryQueue ->expects ($ this ->once ())->method ('declareQueue ' );
288
- $ retryQueue ->expects ($ this ->once ())->method ('bind ' )->with ('retry ' , 'attempt_1 ' );
287
+ $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
288
+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delay ' , 'delay_5000 ' );
289
289
290
290
$ envelope = $ this ->getMockBuilder (\AMQPEnvelope::class)->getMock ();
291
291
$ envelope ->method ('getHeader ' )->with ('symfony-messenger-attempts ' )->willReturn (false );
292
292
$ envelope ->method ('getHeaders ' )->willReturn (['x-some-headers ' => 'foo ' ]);
293
293
$ envelope ->method ('getBody ' )->willReturn ('{} ' );
294
294
295
- $ retryExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'attempt_1 ' , AMQP_NOPARAM , ['headers ' => ['x-some-headers ' => 'foo ' , 'symfony-messenger-attempts ' => 1 ]]);
295
+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_5000 ' , AMQP_NOPARAM , ['headers ' => ['x-some-headers ' => 'foo ' , 'symfony-messenger-attempts ' => 1 ]]);
296
296
297
- $ connection = Connection::fromDsn ('amqp://localhost/%2f/messages ' , [' retry ' => [ ' attempts ' => 3 ] ], $ factory );
298
- $ connection ->publishForRetry ($ envelope );
297
+ $ connection = Connection::fromDsn ('amqp://localhost/%2f/messages ' , [], $ factory );
298
+ $ connection ->publishForRetry ($ envelope, 5000 );
299
299
}
300
300
301
301
public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs ()
302
302
{
303
303
$ amqpConnection = $ this ->getMockBuilder (\AMQPConnection::class)->disableOriginalConstructor ()->getMock ();
304
304
$ amqpChannel = $ this ->getMockBuilder (\AMQPChannel::class)->disableOriginalConstructor ()->getMock ();
305
- $ retryQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
305
+ $ delayQueue = $ this ->getMockBuilder (\AMQPQueue::class)->disableOriginalConstructor ()->getMock ();
306
306
307
307
$ factory = $ this ->getMockBuilder (AmqpFactory::class)->getMock ();
308
308
$ factory ->method ('createConnection ' )->willReturn ($ amqpConnection );
309
309
$ factory ->method ('createChannel ' )->willReturn ($ amqpChannel );
310
- $ factory ->method ('createQueue ' )->willReturn ($ retryQueue );
310
+ $ factory ->method ('createQueue ' )->willReturn ($ delayQueue );
311
311
$ factory ->method ('createExchange ' )->will ($ this ->onConsecutiveCalls (
312
- $ retryExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
312
+ $ delayExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock (),
313
313
$ amqpExchange = $ this ->getMockBuilder (\AMQPExchange::class)->disableOriginalConstructor ()->getMock ()
314
314
));
315
315
316
316
$ amqpExchange ->expects ($ this ->once ())->method ('setName ' )->with ('messages ' );
317
317
$ amqpExchange ->method ('getName ' )->willReturn ('messages ' );
318
318
319
- $ retryExchange ->expects ($ this ->once ())->method ('setName ' )->with ('retry ' );
320
- $ retryExchange ->expects ($ this ->once ())->method ('declareExchange ' );
321
- $ retryExchange ->method ('getName ' )->willReturn ('retry ' );
319
+ $ delayExchange ->expects ($ this ->once ())->method ('setName ' )->with ('delay ' );
320
+ $ delayExchange ->expects ($ this ->once ())->method ('declareExchange ' );
321
+ $ delayExchange ->method ('getName ' )->willReturn ('delay ' );
322
322
323
323
$ connectionOptions = [
324
324
'retry ' => [
325
- 'attempts ' => 3 ,
326
325
'dead_routing_key ' => 'my_dead_routing_key ' ,
327
- 'ttl ' => [30000 , 60000 , 120000 ],
328
326
],
329
327
];
330
328
@@ -335,17 +333,17 @@ public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs()
335
333
$ messageRetriedTwice ->method ('getHeaders ' )->willReturn (['symfony-messenger-attempts ' => '2 ' ]);
336
334
$ messageRetriedTwice ->method ('getBody ' )->willReturn ('{} ' );
337
335
338
- $ retryQueue ->expects ($ this ->once ())->method ('setName ' )->with ('retry_queue_3 ' );
339
- $ retryQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
336
+ $ delayQueue ->expects ($ this ->once ())->method ('setName ' )->with ('delay_queue_120000 ' );
337
+ $ delayQueue ->expects ($ this ->once ())->method ('setArguments ' )->with ([
340
338
'x-message-ttl ' => 120000 ,
341
339
'x-dead-letter-exchange ' => 'messages ' ,
342
340
]);
343
341
344
- $ retryQueue ->expects ($ this ->once ())->method ('declareQueue ' );
345
- $ retryQueue ->expects ($ this ->once ())->method ('bind ' )->with ('retry ' , 'attempt_3 ' );
342
+ $ delayQueue ->expects ($ this ->once ())->method ('declareQueue ' );
343
+ $ delayQueue ->expects ($ this ->once ())->method ('bind ' )->with ('delay ' , 'delay_120000 ' );
346
344
347
- $ retryExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'attempt_3 ' , AMQP_NOPARAM , ['headers ' => ['symfony-messenger-attempts ' => 3 ]]);
348
- $ connection ->publishForRetry ($ messageRetriedTwice );
345
+ $ delayExchange ->expects ($ this ->once ())->method ('publish ' )->with ('{} ' , 'delay_120000 ' , AMQP_NOPARAM , ['headers ' => ['symfony-messenger-attempts ' => 3 ]]);
346
+ $ connection ->publishForRetry ($ messageRetriedTwice, 120000 );
349
347
}
350
348
}
351
349
0 commit comments