Skip to content

Commit 2a31b47

Browse files
committed
[Messenger] Doctrine transport - add an option for the id strategy
1 parent 1e083a0 commit 2a31b47

File tree

3 files changed

+65
-16
lines changed

3 files changed

+65
-16
lines changed

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,12 @@ public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
211211
{
212212
Connection::buildConfiguration('doctrine://default?new_option=woops');
213213
}
214+
215+
/**
216+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
217+
*/
218+
public function testItThrowsAnExceptionIfTheIdStrategyIsNotSupported()
219+
{
220+
Connection::buildConfiguration('doctrine://default?id_strategy=not_supported');
221+
}
214222
}

src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ protected function setUp()
4343
public function testConnectionSendAndGet()
4444
{
4545
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
46-
$encoded = $this->connection->get();
47-
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
48-
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
46+
$doctrineEnvelop = $this->connection->get();
47+
$this->assertEquals('{"message": "Hi"}', $doctrineEnvelop['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelop['headers']);
4949
}
5050

5151
public function testSendWithDelay()
@@ -96,8 +96,8 @@ public function testItRetrieveTheFirstAvailableMessage()
9696
'available_at' => '2019-03-15 12:30:00',
9797
]);
9898

99-
$encoded = $this->connection->get();
100-
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
99+
$doctrineEnvelop = $this->connection->get();
100+
$this->assertEquals('{"message": "Hi available"}', $doctrineEnvelop['body']);
101101
}
102102

103103
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
@@ -124,4 +124,16 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
124124
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
125125
$this->connection->reject($next['id']);
126126
}
127+
128+
public function testUuidStrategy()
129+
{
130+
$connection = new Connection(['id_strategy' => Connection::ID_STRATEGY_UUID], $this->driverConnection);
131+
$this->driverConnection->exec('DROP TABLE messenger_messages');
132+
$connection->setup();
133+
$connection->send('{"message": "Hi uuid"}', ['type' => DummyMessage::class]);
134+
$message = $this->connection->get();
135+
$this->assertEquals(1, preg_match('/[a-f0-9]{8}\-[a-f0-9]{4}\-[a-f0-9]{4}\-(8|9|a|b)[a-f0-9]{3}\-[a-f0-9]{12}/', $message['id']));
136+
$connection->reject($message['id']);
137+
$this->driverConnection->exec('DROP TABLE messenger_messages');
138+
}
127139
}

src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Connection
2929
* Configuration of the connection.
3030
*
3131
* * table_name: name of the table
32+
* * id_strategy: Strategy for the id field. uuid or auto_increment: Default: auto_increment
3233
* * connection: name of the Doctrine's entity manager
3334
* * queue_name: name of the queue
3435
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
@@ -39,11 +40,18 @@ class Connection
3940
private $driverConnection;
4041
const DEFAULT_OPTIONS = [
4142
'table_name' => 'messenger_messages',
43+
'id_strategy' => self::ID_STRATEGY_AUTO_INCREMENT,
4244
'queue_name' => 'default',
4345
'redeliver_timeout' => 3600,
4446
'loop_sleep' => 200000,
4547
'auto_setup' => true,
4648
];
49+
const ID_STRATEGY_AUTO_INCREMENT = 'auto_increment';
50+
const ID_STRATEGY_UUID = 'uuid';
51+
const ID_STRATEGIES = [
52+
self::ID_STRATEGY_AUTO_INCREMENT,
53+
self::ID_STRATEGY_UUID,
54+
];
4755

4856
public function __construct(array $configuration, DBALConnection $driverConnection)
4957
{
@@ -71,12 +79,17 @@ public static function buildConfiguration($dsn, array $options = [])
7179
$configuration = [
7280
'connection' => $components['host'],
7381
'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']),
82+
'id_strategy' => $options['id_strategy'] ?? ($query['id_strategy'] ?? self::DEFAULT_OPTIONS['id_strategy']),
7483
'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']),
7584
'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']),
7685
'loop_sleep' => $options['loop_sleep'] ?? ($query['loop_sleep'] ?? self::DEFAULT_OPTIONS['loop_sleep']),
7786
'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
7887
];
7988

89+
if (!\in_array($configuration['id_strategy'], self::ID_STRATEGIES)) {
90+
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $configuration['id_strategy'], implode(', ', self::ID_STRATEGIES)));
91+
}
92+
8093
// check for extra keys in options
8194
$optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
8295
if (0 < \count($optionsExtraKeys)) {
@@ -102,15 +115,19 @@ public function send(string $body, array $headers, int $delay = 0): void
102115
$now = (\DateTime::createFromFormat('U.u', microtime(true)));
103116
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
104117

118+
$values = [
119+
'body' => ':body',
120+
'headers' => ':headers',
121+
'queue_name' => ':queue_name',
122+
'created_at' => ':created_at',
123+
'available_at' => ':available_at',
124+
];
125+
if (self::ID_STRATEGY_UUID === $this->configuration['id_strategy']) {
126+
$values['id'] = $this->driverConnection->getDatabasePlatform()->getGuidExpression();
127+
}
105128
$queryBuilder = $this->driverConnection->createQueryBuilder()
106129
->insert($this->configuration['table_name'])
107-
->values([
108-
'body' => ':body',
109-
'headers' => ':headers',
110-
'queue_name' => ':queue_name',
111-
'created_at' => ':created_at',
112-
'available_at' => ':available_at',
113-
]);
130+
->values($values);
114131

115132
$this->executeQuery($queryBuilder->getSQL(), [
116133
':body' => $body,
@@ -223,11 +240,23 @@ private function getSchema(): Schema
223240
{
224241
$schema = new Schema();
225242
$table = $schema->createTable($this->configuration['table_name']);
226-
$table->addColumn('id', Type::BIGINT)
227-
->setAutoincrement(true)
228-
->setNotnull(true);
229-
$table->addColumn('body', Type::TEXT)
243+
switch ($this->configuration['id_strategy']) {
244+
case self::ID_STRATEGY_UUID:
245+
$table->addColumn('id', Type::GUID)
246+
->setNotnull(true);
247+
break;
248+
case self::ID_STRATEGY_AUTO_INCREMENT:
249+
$table->addColumn('id', Type::BIGINT)
250+
->setAutoincrement(true)
251+
->setNotnull(true);
252+
break;
253+
default:
254+
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $this->configuration['id_strategy'], self::ID_STRATEGIES));
255+
}
256+
if ($this->configuration['id_strategy']) {
257+
$table->addColumn('body', Type::TEXT)
230258
->setNotnull(true);
259+
}
231260
$table->addColumn('headers', Type::JSON)
232261
->setNotnull(true);
233262
$table->addColumn('queue_name', Type::STRING)

0 commit comments

Comments
 (0)