Skip to content

[Messenger] Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers #42345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Result as DriverResult;
Expand All @@ -23,8 +24,11 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\TableDiff;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Types;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -402,4 +406,54 @@ public function providePlatformSql(): iterable
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];
}

/**
* @dataProvider setupIndicesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());

$schemaManager = $this->createMock(AbstractSchemaManager::class);
$schema = new Schema();
$expectedTable = $schema->createTable('messenger_messages');
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);

$platformMock = $this->createMock($platformClass);
$platformMock
->expects(self::once())
->method('getAlterTableSQL')
->with(self::callback(static function (TableDiff $tableDiff): bool {
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
}))
->willReturn([]);
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);

$connection = new Connection([], $driverConnection);
$connection->setup();
}

public function setupIndicesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\LockMode;
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\Comparator;
Expand Down Expand Up @@ -386,7 +387,6 @@ private function getSchema(): Schema
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(true);
Expand All @@ -395,8 +395,11 @@ private function getSchema(): Schema
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
$table->addIndex(['delivered_at']);

return $schema;
Expand Down