12
12
namespace Symfony \Component \Messenger \Transport \RedisExt ;
13
13
14
14
use Symfony \Component \Messenger \Exception \InvalidArgumentException ;
15
+ use Symfony \Component \Messenger \Exception \LogicException ;
15
16
16
17
/**
18
+ * A Redis connection.
19
+ *
17
20
* @author Antoine Bluchet <soyuka@gmail.com>
21
+ * @author Alexander Schranz <alexander@sulu.io>
22
+ *
23
+ * @final
24
+ *
25
+ * @experimental in 4.3
18
26
*/
19
27
class Connection
20
28
{
21
- const PROCESSING_QUEUE_SUFFIX = '_processing ' ;
22
- const DEFAULT_CONNECTION_CREDENTIALS = array ('host ' => '127.0.0.1 ' , 'port ' => 6379 );
23
- const DEFAULT_REDIS_OPTIONS = array ('serializer ' => \Redis::SERIALIZER_PHP , 'processing_ttl ' => 10000 , 'blocking_timeout ' => 1000 );
24
-
25
- /**
26
- * @var \Redis
27
- */
28
29
private $ connection ;
30
+ private $ stream ;
31
+ private $ group ;
32
+ private $ consumer ;
33
+ private $ blockingTimeout ;
29
34
30
- /**
31
- * @var string
32
- */
33
- private $ queue ;
34
-
35
- public function __construct (string $ queue , array $ connectionCredentials = self ::DEFAULT_CONNECTION_CREDENTIALS , array $ redisOptions = self ::DEFAULT_REDIS_OPTIONS )
35
+ public function __construct (array $ configuration , array $ connectionCredentials = [], array $ redisOptions = [])
36
36
{
37
37
$ this ->connection = new \Redis ();
38
38
$ this ->connection ->connect ($ connectionCredentials ['host ' ] ?? '127.0.0.1 ' , $ connectionCredentials ['port ' ] ?? 6379 );
39
39
$ this ->connection ->setOption (\Redis::OPT_SERIALIZER , $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP );
40
- // We force this because we rely on the fact that redis doesn't timeout with bRPopLPush
41
- $ this ->connection ->setOption (\Redis::OPT_READ_TIMEOUT , -1 );
42
- $ this ->queue = $ queue ;
43
- $ this ->processingTtl = $ redisOptions ['processing_ttl ' ] ?? self ::DEFAULT_REDIS_OPTIONS ['processing_ttl ' ];
44
- $ this ->blockingTimeout = $ redisOptions ['blocking_timeout ' ] ?? self ::DEFAULT_REDIS_OPTIONS ['blocking_timeout ' ];
40
+ $ this ->stream = $ configuration ['stream ' ] ?? 'messages ' ;
41
+ $ this ->group = $ configuration ['group ' ] ?? 'symfony ' ;
42
+ $ this ->consumer = $ configuration ['consumer ' ] ?? 'consumer ' ;
43
+ $ this ->blockingTimeout = $ redisOptions ['blocking_timeout ' ] ?? null ;
45
44
}
46
45
47
- public static function fromDsn (string $ dsn , array $ redisOptions = self :: DEFAULT_REDIS_OPTIONS ): self
46
+ public static function fromDsn (string $ dsn , array $ redisOptions = [] ): self
48
47
{
49
48
if (false === $ parsedUrl = parse_url ($ dsn )) {
50
49
throw new InvalidArgumentException (sprintf ('The given Redis DSN "%s" is invalid. ' , $ dsn ));
51
50
}
52
51
53
- $ queue = isset ($ parsedUrl ['path ' ]) ? trim ($ parsedUrl ['path ' ], '/ ' ) : $ redisOptions ['queue ' ] ?? 'messages ' ;
52
+ $ pathParts = explode ('/ ' , $ parsedUrl ['path ' ]);
53
+
54
+ $ stream = $ pathParts [1 ] ?? '' ;
55
+ $ group = $ pathParts [2 ] ?? '' ;
56
+ $ consumer = $ pathParts [3 ] ?? '' ;
57
+
54
58
$ connectionCredentials = array (
55
59
'host ' => $ parsedUrl ['host ' ] ?? '127.0.0.1 ' ,
56
60
'port ' => $ parsedUrl ['port ' ] ?? 6379 ,
@@ -61,96 +65,53 @@ public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_
61
65
$ redisOptions = array_replace_recursive ($ redisOptions , $ parsedQuery );
62
66
}
63
67
64
- return new self ($ queue , $ connectionCredentials , $ redisOptions );
68
+ return new self ([ ' stream ' => $ stream , ' group ' => $ group , ' consumer ' => $ consumer ] , $ connectionCredentials , $ redisOptions );
65
69
}
66
70
67
- /**
68
- * Takes last element (tail) of the list and add it to the processing queue (head - blocking)
69
- * Also sets a key with TTL that will be checked by the `doCheck` method.
70
- */
71
- public function waitAndGet (): ?array
71
+ public function get (): iterable
72
72
{
73
- $ this ->doCheck ();
74
- $ value = $ this ->connection ->bRPopLPush ($ this ->queue , $ this ->queue .self ::PROCESSING_QUEUE_SUFFIX , $ this ->blockingTimeout );
73
+ $ messages = $ this ->connection ->xreadgroup (
74
+ $ this ->group ,
75
+ $ this ->consumer ,
76
+ [$ this ->stream => '> ' ],
77
+ 1 ,
78
+ $ this ->blockingTimeout
79
+ );
75
80
76
- // false in case of timeout
77
- if (false === $ value ) {
78
- return null ;
81
+ if (false === $ messages ) {
82
+ throw new LogicException (
83
+ $ this ->connection ->getLastError () ?: 'Unexpected redis stream error happened. '
84
+ );
79
85
}
80
86
81
- $ key = md5 ($ value ['body ' ]);
82
- $ this ->connection ->set ($ key , 1 , array ('px ' => $ this ->processingTtl ));
83
-
84
- return $ value ;
85
- }
87
+ foreach ($ messages [$ this ->stream ] as $ key => $ message ) {
88
+ $ redisEnvelope = \json_decode ($ message , true );
86
89
87
- /**
88
- * Acknowledge the message:
89
- * 1. Remove the ttl key
90
- * 2. LREM the message from the processing list.
91
- */
92
- public function ack ($ message )
93
- {
94
- $ key = md5 ($ message ['body ' ]);
95
- $ processingQueue = $ this ->queue .self ::PROCESSING_QUEUE_SUFFIX ;
96
- $ this ->connection ->multi ()
97
- ->lRem ($ processingQueue , $ message )
98
- ->del ($ key )
99
- ->exec ();
90
+ yield [
91
+ 'id ' => $ key ,
92
+ 'body ' => $ redisEnvelope ['headers ' ],
93
+ 'headers ' => $ redisEnvelope ['headers ' ],
94
+ ];
95
+ }
100
96
}
101
97
102
- /**
103
- * Reject the message: we acknowledge it, means we remove it form the queues.
104
- *
105
- * @TODO: log something?
106
- */
107
- public function reject ($ message )
98
+ public function ack (string $ id ): bool
108
99
{
109
- $ this ->ack ( $ message );
100
+ $ this ->connection -> xack ( $ this -> stream , $ this -> group , [ $ id ] );
110
101
}
111
102
112
- /**
113
- * Requeue - add it back to the queue
114
- * All we have to do is to make our key expire and let the `doCheck` system manage it.
115
- */
116
- public function requeue ($ message )
103
+ public function reject (string $ id ): bool
117
104
{
118
- $ key = md5 ($ message ['body ' ]);
119
- $ this ->connection ->expire ($ key , -1 );
105
+ $ this ->connection ->xdel ($ this ->stream , [$ id ]);
120
106
}
121
107
122
- /**
123
- * Add item at the tail of list.
124
- */
125
- public function add ($ message )
108
+ public function add (array $ message , int $ delay )
126
109
{
127
- $ this ->connection ->lpush ($ this ->queue , $ message );
110
+ $ this ->connection ->xadd ($ this ->stream , ' * ' , [ ' content ' => json_encode ( $ message)] );
128
111
}
129
112
130
- /**
131
- * The check:
132
- * 1. Get the processing queue items
133
- * 2. Check if the TTL is over
134
- * 3. If it is, rpush back the message to the origin queue.
135
- */
136
- private function doCheck ()
113
+ public function setup (): void
137
114
{
138
- $ processingQueue = $ this ->queue .self ::PROCESSING_QUEUE_SUFFIX ;
139
- $ pending = $ this ->connection ->lRange ($ processingQueue , 0 , -1 );
140
-
141
- foreach ($ pending as $ temp ) {
142
- $ key = md5 ($ temp ['body ' ]);
143
-
144
- if ($ this ->connection ->ttl ($ key ) > 0 ) {
145
- continue ;
146
- }
147
-
148
- $ this ->connection
149
- ->multi ()
150
- ->del ($ key )
151
- ->lRem ($ processingQueue , $ temp , 1 )
152
- ->rPush ($ this ->queue , $ temp )
153
- ->exec ();
154
- }
115
+ $ this ->connection ->xgroup ('CREATE ' , $ this ->stream , $ this ->group , 0 , true );
155
116
}
156
117
}
0 commit comments