-
-
Notifications
You must be signed in to change notification settings - Fork 9.7k
[Lock] Add MysqlStore that use GET_LOCK #25578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2a944df
4408fc6
74bf72e
828ac17
2b1a0e5
c0f10a4
7334a1f
51e5e7f
1a0946c
3ae4cce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ cache: | |
services: | ||
- memcached | ||
- mongodb | ||
- mysql | ||
- redis-server | ||
|
||
before_install: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,11 @@ | ||
CHANGELOG | ||
========= | ||
|
||
4.1.0 | ||
----- | ||
|
||
* added Mysql store using GET_LOCK | ||
|
||
3.4.0 | ||
----- | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Lock\Store; | ||
|
||
use Doctrine\DBAL\Connection; | ||
use Symfony\Component\Lock\Exception\InvalidArgumentException; | ||
use Symfony\Component\Lock\Exception\LockAcquiringException; | ||
use Symfony\Component\Lock\Exception\LockConflictedException; | ||
use Symfony\Component\Lock\Key; | ||
use Symfony\Component\Lock\StoreInterface; | ||
|
||
/** | ||
* MysqlStore is a StoreInterface implementation using MySQL/MariaDB GET_LOCK function. | ||
* | ||
* @author Jérôme TAMARELLE <jerome@tamarelle.net> | ||
*/ | ||
class MysqlStore implements StoreInterface | ||
{ | ||
/** | ||
* @var \PDO|Connection | ||
*/ | ||
private $connection; | ||
private $waitTimeout; | ||
|
||
/** | ||
* @param \PDO|Connection $connection | ||
* @param int $waitTimeout Time in seconds to wait for a lock to be released, for non-blocking lock. | ||
*/ | ||
public function __construct($connection, $waitTimeout = 0) | ||
{ | ||
if ($connection instanceof \PDO) { | ||
if ('mysql' !== $driver = $connection->getAttribute(\PDO::ATTR_DRIVER_NAME)) { | ||
throw new InvalidArgumentException(sprintf('%s requires a "mysql" connection. "%s" given.', __CLASS__, $driver)); | ||
} | ||
} elseif ($connection instanceof Connection) { | ||
if ('pdo_mysql' !== $driver = $connection->getDriver()->getName()) { | ||
throw new InvalidArgumentException(sprintf('%s requires a "pdo_mysql" connection. "%s" given.', __CLASS__, $driver)); | ||
} | ||
} else { | ||
throw new InvalidArgumentException(sprintf('"%s" requires PDO or Doctrine\DBAL\Connection instance, "%s" given.', __CLASS__, is_object($connection) ? get_class($connection) : gettype($connection))); | ||
} | ||
|
||
if ($waitTimeout < 0) { | ||
throw new InvalidArgumentException(sprintf('"%s" requires a positive wait timeout, "%d" given. For infine wait, acquire a "blocking" lock.', __CLASS__, $waitTimeout)); | ||
} | ||
|
||
$this->connection = $connection; | ||
$this->waitTimeout = $waitTimeout; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function save(Key $key) | ||
{ | ||
$this->lock($key, false); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function waitAndSave(Key $key) | ||
{ | ||
$this->lock($key, true); | ||
} | ||
|
||
private function lock(Key $key, bool $blocking) | ||
{ | ||
// the lock is maybe already acquired. | ||
if ($key->hasState(__CLASS__)) { | ||
return; | ||
} | ||
|
||
// no timeout for impatient | ||
$timeout = $blocking ? -1 : $this->waitTimeout; | ||
|
||
// Hash the key to guarantee it contains between 1 and 64 characters | ||
$storedKey = hash('sha256', $key); | ||
|
||
$stmt = $this->connection->prepare('SELECT IF(IS_USED_LOCK(:key) = CONNECTION_ID(), -1, GET_LOCK(:key, :timeout))'); | ||
$stmt->bindValue(':key', $storedKey, \PDO::PARAM_STR); | ||
$stmt->bindValue(':timeout', $timeout, \PDO::PARAM_INT); | ||
$stmt->setFetchMode(\PDO::FETCH_COLUMN, 0); | ||
$stmt->execute(); | ||
|
||
// 1: Lock successful | ||
// 0: Already locked by another session | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Null, process killed by admin, or machine out of memory |
||
// -1: Already locked by the same session | ||
$success = $stmt->fetchColumn(); | ||
|
||
if ('-1' === $success) { | ||
throw new LockConflictedException('Lock already acquired with by same connection.'); | ||
} | ||
|
||
if ('1' !== $success) { | ||
throw new LockConflictedException(); | ||
} | ||
|
||
$key->setState(__CLASS__, $storedKey); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function putOffExpiration(Key $key, $ttl) | ||
{ | ||
// the GET_LOCK locks forever, until the session terminates. | ||
$stmt = $this->connection->prepare('SET SESSION wait_timeout=GREATEST(@@wait_timeout, :ttl)'); | ||
$stmt->bindValue(':ttl', $ttl, \PDO::PARAM_INT); | ||
$stmt->execute(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function delete(Key $key) | ||
{ | ||
if (!$key->hasState(__CLASS__)) { | ||
return; | ||
} | ||
|
||
$storedKey = $key->getState(__CLASS__); | ||
|
||
$stmt = $this->connection->prepare('DO RELEASE_LOCK(:key)'); | ||
$stmt->bindValue(':key', $storedKey, \PDO::PARAM_STR); | ||
$stmt->execute(); | ||
|
||
$key->removeState(__CLASS__); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function exists(Key $key) | ||
{ | ||
if (!$key->hasState(__CLASS__)) { | ||
return false; | ||
} | ||
|
||
$storedKey = $key->getState(__CLASS__); | ||
|
||
$stmt = $this->connection->prepare('SELECT IF(IS_USED_LOCK(:key) = CONNECTION_ID(), 1, 0)'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jderusse This query insure that: 1. the connection is still alive & 2. the lock is still assigned to the current connection. |
||
$stmt->bindValue(':key', $storedKey, \PDO::PARAM_STR); | ||
$stmt->setFetchMode(\PDO::FETCH_COLUMN, 0); | ||
$stmt->execute(); | ||
|
||
return '1' === $stmt->fetchColumn(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,8 @@ trait BlockingStoreTestTrait | |
{ | ||
/** | ||
* @see AbstractStoreTest::getStore() | ||
* | ||
* @return StoreInterface | ||
*/ | ||
abstract protected function getStore(); | ||
|
||
|
@@ -38,8 +40,6 @@ public function testBlockingLocks() | |
// Amount a microsecond used to order async actions | ||
$clockDelay = 50000; | ||
|
||
/** @var StoreInterface $store */ | ||
$store = $this->getStore(); | ||
$key = new Key(uniqid(__METHOD__, true)); | ||
$parentPID = posix_getpid(); | ||
|
||
|
@@ -50,6 +50,7 @@ public function testBlockingLocks() | |
// Wait the start of the child | ||
pcntl_sigwaitinfo(array(SIGHUP), $info); | ||
|
||
$store = $this->getStore(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch |
||
try { | ||
// This call should failed given the lock should already by acquired by the child | ||
$store->save($key); | ||
|
@@ -71,6 +72,8 @@ public function testBlockingLocks() | |
} else { | ||
// Block SIGHUP signal | ||
pcntl_sigprocmask(SIG_BLOCK, array(SIGHUP)); | ||
|
||
$store = $this->getStore(); | ||
try { | ||
$store->save($key); | ||
// send the ready signal to the parent | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Lock\Tests\Store; | ||
|
||
use Doctrine\DBAL\DriverManager; | ||
use Symfony\Component\Lock\Key; | ||
use Symfony\Component\Lock\Store\MysqlStore; | ||
|
||
/** | ||
* @author Jérôme TAMARELLE <jerome@tamarelle.net> | ||
*/ | ||
class MysqlStoreTest extends AbstractStoreTest | ||
{ | ||
use BlockingStoreTestTrait; | ||
|
||
private $connectionCase = 'pdo'; | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getStore() | ||
{ | ||
switch ($this->connectionCase) { | ||
case 'pdo': | ||
$connection = new \PDO('mysql:host='.getenv('MYSQL_HOST'), getenv('MYSQL_USERNAME'), getenv('MYSQL_PASSWORD')); | ||
$connection->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); | ||
break; | ||
|
||
case 'dbal': | ||
$connection = DriverManager::getConnection(array( | ||
'driver' => 'pdo_mysql', | ||
'user' => getenv('MYSQL_USERNAME'), | ||
'password' => getenv('MYSQL_PASSWORD'), | ||
'host' => getenv('MYSQL_HOST'), | ||
)); | ||
break; | ||
} | ||
|
||
return new MysqlStore($connection); | ||
} | ||
|
||
public function testSaveWithDoctrineDBAL() | ||
{ | ||
if (!class_exists(DriverManager::class)) { | ||
$this->markTestSkipped('Package doctrine/dbal is required.'); | ||
} | ||
|
||
$this->connectionCase = 'dbal'; | ||
|
||
parent::testSave(); | ||
} | ||
|
||
/** | ||
* @expectedException \InvalidArgumentException | ||
* @expectedExceptionMessage Symfony\Component\Lock\Store\MysqlStore requires a "mysql" connection. "sqlite" given. | ||
*/ | ||
public function testOnlyMySQLDatabaseIsSupported() | ||
{ | ||
$connection = new \PDO('sqlite::memory:'); | ||
|
||
return new MysqlStore($connection); | ||
} | ||
|
||
/** | ||
* @expectedException \InvalidArgumentException | ||
* @expectedExceptionMessage Symfony\Component\Lock\Store\MysqlStore requires a "pdo_mysql" connection. "pdo_sqlite" given. | ||
*/ | ||
public function testOnlyMySQLDatabaseIsSupportedWithDoctrineDBAL() | ||
{ | ||
if (!class_exists(DriverManager::class)) { | ||
$this->markTestSkipped('Package doctrine/dbal is required.'); | ||
} | ||
|
||
$connection = DriverManager::getConnection(array( | ||
'driver' => 'pdo_sqlite', | ||
)); | ||
|
||
return new MysqlStore($connection); | ||
} | ||
|
||
/** | ||
* @expectedException \InvalidArgumentException | ||
* @expectedExceptionMessage "Symfony\Component\Lock\Store\MysqlStore" requires a positive wait timeout, "-1" given. For infine wait, acquire a "blocking" lock. | ||
*/ | ||
public function testOnlyPositiveWaitTimeoutIsSupported() | ||
{ | ||
$connection = $this->createMock(\PDO::class); | ||
$connection->method('getAttribute')->willReturn('mysql'); | ||
|
||
return new MysqlStore($connection, -1); | ||
} | ||
|
||
/** | ||
* @expectedException \Symfony\Component\Lock\Exception\LockConflictedException | ||
* @expectedExceptionMessage Lock already acquired with by same connection. | ||
*/ | ||
public function testWaitTheSameResourceOnTheSameConnectionIsNotSupported() | ||
{ | ||
$store = $this->getStore(); | ||
|
||
$resource = uniqid(__METHOD__, true); | ||
$key1 = new Key($resource); | ||
$key2 = new Key($resource); | ||
|
||
$store->save($key1); | ||
$store->waitAndSave($key2); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be acquired, and not to be released right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be released after the timeout so that sounds right to me. You could also say "The time in seconds the lock is acquired for".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the documentaiton the parameter timeout, define the maximum amount of time allowed to acquire the lock. Once you get it, you can keep it as long as you want.
T0: ask for acquiring a new lock (wait for other process to release the lock for instance)
T1: lock is acquired
T2: lock is released
Here we are talking about the time between T1 and T0.