@@ -29,6 +29,7 @@ class Connection
29
29
* Configuration of the connection.
30
30
*
31
31
* * table_name: name of the table
32
+ * * id_strategy: Strategy for the id field. uuid or auto_increment: Default: auto_increment
32
33
* * connection: name of the Doctrine's entity manager
33
34
* * queue_name: name of the queue
34
35
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
@@ -39,11 +40,18 @@ class Connection
39
40
private $ driverConnection ;
40
41
const DEFAULT_OPTIONS = [
41
42
'table_name ' => 'messenger_messages ' ,
43
+ 'id_strategy ' => self ::ID_STRATEGY_AUTO_INCREMENT ,
42
44
'queue_name ' => 'default ' ,
43
45
'redeliver_timeout ' => 3600 ,
44
46
'loop_sleep ' => 200000 ,
45
47
'auto_setup ' => true ,
46
48
];
49
+ const ID_STRATEGY_AUTO_INCREMENT = 'auto_increment ' ;
50
+ const ID_STRATEGY_UUID = 'uuid ' ;
51
+ const ID_STRATEGIES = [
52
+ self ::ID_STRATEGY_AUTO_INCREMENT ,
53
+ self ::ID_STRATEGY_UUID ,
54
+ ];
47
55
48
56
public function __construct (array $ configuration , DBALConnection $ driverConnection )
49
57
{
@@ -71,12 +79,17 @@ public static function buildConfiguration($dsn, array $options = [])
71
79
$ configuration = [
72
80
'connection ' => $ components ['host ' ],
73
81
'table_name ' => $ options ['table_name ' ] ?? ($ query ['table_name ' ] ?? self ::DEFAULT_OPTIONS ['table_name ' ]),
82
+ 'id_strategy ' => $ options ['id_strategy ' ] ?? ($ query ['id_strategy ' ] ?? self ::DEFAULT_OPTIONS ['id_strategy ' ]),
74
83
'queue_name ' => $ options ['queue_name ' ] ?? ($ query ['queue_name ' ] ?? self ::DEFAULT_OPTIONS ['queue_name ' ]),
75
84
'redeliver_timeout ' => $ options ['redeliver_timeout ' ] ?? ($ query ['redeliver_timeout ' ] ?? self ::DEFAULT_OPTIONS ['redeliver_timeout ' ]),
76
85
'loop_sleep ' => $ options ['loop_sleep ' ] ?? ($ query ['loop_sleep ' ] ?? self ::DEFAULT_OPTIONS ['loop_sleep ' ]),
77
86
'auto_setup ' => $ options ['auto_setup ' ] ?? ($ query ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ]),
78
87
];
79
88
89
+ if (!\in_array ($ configuration ['id_strategy ' ], self ::ID_STRATEGIES )) {
90
+ throw new TransportException (sprintf ('Unknown id_strategy "%s". Supported strategies are [%s] ' , $ configuration ['id_strategy ' ], implode (', ' , self ::ID_STRATEGIES )));
91
+ }
92
+
80
93
// check for extra keys in options
81
94
$ optionsExtraKeys = array_diff (array_keys ($ options ), array_keys ($ configuration ));
82
95
if (0 < \count ($ optionsExtraKeys )) {
@@ -102,15 +115,19 @@ public function send(string $body, array $headers, int $delay = 0): void
102
115
$ now = (\DateTime::createFromFormat ('U.u ' , microtime (true )));
103
116
$ availableAt = (clone $ now )->modify (sprintf ('+%d seconds ' , $ delay / 1000 ));
104
117
118
+ $ values = [
119
+ 'body ' => ':body ' ,
120
+ 'headers ' => ':headers ' ,
121
+ 'queue_name ' => ':queue_name ' ,
122
+ 'created_at ' => ':created_at ' ,
123
+ 'available_at ' => ':available_at ' ,
124
+ ];
125
+ if (self ::ID_STRATEGY_UUID === $ this ->configuration ['id_strategy ' ]) {
126
+ $ values ['id ' ] = $ this ->driverConnection ->getDatabasePlatform ()->getGuidExpression ();
127
+ }
105
128
$ queryBuilder = $ this ->driverConnection ->createQueryBuilder ()
106
129
->insert ($ this ->configuration ['table_name ' ])
107
- ->values ([
108
- 'body ' => ':body ' ,
109
- 'headers ' => ':headers ' ,
110
- 'queue_name ' => ':queue_name ' ,
111
- 'created_at ' => ':created_at ' ,
112
- 'available_at ' => ':available_at ' ,
113
- ]);
130
+ ->values ($ values );
114
131
115
132
$ this ->executeQuery ($ queryBuilder ->getSQL (), [
116
133
':body ' => $ body ,
@@ -223,11 +240,23 @@ private function getSchema(): Schema
223
240
{
224
241
$ schema = new Schema ();
225
242
$ table = $ schema ->createTable ($ this ->configuration ['table_name ' ]);
226
- $ table ->addColumn ('id ' , Type::BIGINT )
227
- ->setAutoincrement (true )
228
- ->setNotnull (true );
229
- $ table ->addColumn ('body ' , Type::TEXT )
243
+ switch ($ this ->configuration ['id_strategy ' ]) {
244
+ case self ::ID_STRATEGY_UUID :
245
+ $ table ->addColumn ('id ' , Type::GUID )
246
+ ->setNotnull (true );
247
+ break ;
248
+ case self ::ID_STRATEGY_AUTO_INCREMENT :
249
+ $ table ->addColumn ('id ' , Type::BIGINT )
250
+ ->setAutoincrement (true )
251
+ ->setNotnull (true );
252
+ break ;
253
+ default :
254
+ throw new TransportException (sprintf ('Unknown id_strategy "%s". Supported strategies are [%s] ' , $ this ->configuration ['id_strategy ' ], self ::ID_STRATEGIES ));
255
+ }
256
+ if ($ this ->configuration ['id_strategy ' ]) {
257
+ $ table ->addColumn ('body ' , Type::TEXT )
230
258
->setNotnull (true );
259
+ }
231
260
$ table ->addColumn ('headers ' , Type::JSON )
232
261
->setNotnull (true );
233
262
$ table ->addColumn ('queue_name ' , Type::STRING )
0 commit comments