Skip to content

[Cache] fix eventual consistency when using RedisTagAwareAdapter with a cluster #41804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 70 additions & 78 deletions src/Symfony/Component/Cache/Adapter/RedisTagAwareAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,13 @@
use Symfony\Component\Cache\Traits\RedisTrait;

/**
* Stores tag id <> cache id relationship as a Redis Set, lookup on invalidation using RENAME+SMEMBERS.
* Stores tag id <> cache id relationship as a Redis Set.
*
* Set (tag relation info) is stored without expiry (non-volatile), while cache always gets an expiry (volatile) even
* if not set by caller. Thus if you configure redis with the right eviction policy you can be safe this tag <> cache
* relationship survives eviction (cache cleanup when Redis runs out of memory).
*
* Requirements:
* - Client: PHP Redis or Predis
* Note: Due to lack of RENAME support it is NOT recommended to use Cluster on Predis, instead use phpredis.
* - Server: Redis 2.8+
* Configured with any `volatile-*` eviction policy, OR `noeviction` if it will NEVER fill up memory
* Redis server 2.8+ with any `volatile-*` eviction policy, OR `noeviction` if you're sure memory will NEVER fill up
*
* Design limitations:
* - Max 4 billion cache keys per cache tag as limited by Redis Set datatype.
Expand All @@ -49,11 +45,6 @@ class RedisTagAwareAdapter extends AbstractTagAwareAdapter
{
use RedisTrait;

/**
* Limits for how many keys are deleted in batch.
*/
private const BULK_DELETE_LIMIT = 10000;

/**
* On cache items without a lifetime set, we set it to 100 days. This is to make sure cache items are
* preferred to be evicted over tag Sets, if eviction policy is configured according to requirements.
Expand Down Expand Up @@ -96,7 +87,7 @@ protected function doSave(array $values, int $lifetime, array $addTagData = [],
{
$eviction = $this->getRedisEvictionPolicy();
if ('noeviction' !== $eviction && 0 !== strpos($eviction, 'volatile-')) {
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
throw new LogicException(sprintf('Redis maxmemory-policy setting "%s" is *not* supported by RedisTagAwareAdapter, use "noeviction" or "volatile-*" eviction policies.', $eviction));
}

// serialize values
Expand Down Expand Up @@ -159,15 +150,9 @@ protected function doDeleteYieldTags(array $ids): iterable
return v:sub(14, 13 + v:byte(13) + v:byte(12) * 256 + v:byte(11) * 65536)
EOLUA;

if ($this->redis instanceof \Predis\ClientInterface) {
$evalArgs = [$lua, 1, &$id];
} else {
$evalArgs = [$lua, [&$id], 1];
}

$results = $this->pipeline(function () use ($ids, &$id, $evalArgs) {
$results = $this->pipeline(function () use ($ids, $lua) {
foreach ($ids as $id) {
yield 'eval' => $evalArgs;
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id] : [$lua, [$id], 1];
}
});

Expand All @@ -185,12 +170,15 @@ protected function doDeleteYieldTags(array $ids): iterable
*/
protected function doDeleteTagRelations(array $tagData): bool
{
$this->pipeline(static function () use ($tagData) {
$results = $this->pipeline(static function () use ($tagData) {
foreach ($tagData as $tagId => $idList) {
array_unshift($idList, $tagId);
yield 'sRem' => $idList;
}
})->rewind();
});
foreach ($results as $result) {
// no-op
}

return true;
}
Expand All @@ -200,77 +188,81 @@ protected function doDeleteTagRelations(array $tagData): bool
*/
protected function doInvalidate(array $tagIds): bool
{
if (!$this->redis instanceof \Predis\ClientInterface || !$this->redis->getConnection() instanceof PredisCluster) {
$movedTagSetIds = $this->renameKeys($this->redis, $tagIds);
} else {
$clusterConnection = $this->redis->getConnection();
$tagIdsByConnection = new \SplObjectStorage();
$movedTagSetIds = [];
// This script scans the set of items linked to tag: it empties the set
// and removes the linked items. When the set is still not empty after
// the scan, it means we're in cluster mode and that the linked items
// are on other nodes: we move the links to a temporary set and we
// gargage collect that set from the client side.

foreach ($tagIds as $id) {
$connection = $clusterConnection->getConnectionByKey($id);
$slot = $tagIdsByConnection[$connection] ?? $tagIdsByConnection[$connection] = new \ArrayObject();
$slot[] = $id;
}
$lua = <<<'EOLUA'
local cursor = '0'
local id = KEYS[1]
repeat
local result = redis.call('SSCAN', id, cursor, 'COUNT', 5000);
cursor = result[1];
local rems = {}

for _, v in ipairs(result[2]) do
local ok, _ = pcall(redis.call, 'DEL', ARGV[1]..v)
if ok then
table.insert(rems, v)
end
end
if 0 < #rems then
redis.call('SREM', id, unpack(rems))
end
until '0' == cursor;

redis.call('SUNIONSTORE', '{'..id..'}'..id, id)
redis.call('DEL', id)

return redis.call('SSCAN', '{'..id..'}'..id, '0', 'COUNT', 5000)
EOLUA;

foreach ($tagIdsByConnection as $connection) {
$slot = $tagIdsByConnection[$connection];
$movedTagSetIds = array_merge($movedTagSetIds, $this->renameKeys(new $this->redis($connection, $this->redis->getOptions()), $slot->getArrayCopy()));
$results = $this->pipeline(function () use ($tagIds, $lua) {
if ($this->redis instanceof \Predis\ClientInterface) {
$prefix = $this->redis->getOptions()->prefix ? $this->redis->getOptions()->prefix->getPrefix() : '';
} elseif (\is_array($prefix = $this->redis->getOption(\Redis::OPT_PREFIX) ?? '')) {
$prefix = current($prefix);
}
}

// No Sets found
if (!$movedTagSetIds) {
return false;
}

// Now safely take the time to read the keys in each set and collect ids we need to delete
$tagIdSets = $this->pipeline(static function () use ($movedTagSetIds) {
foreach ($movedTagSetIds as $movedTagId) {
yield 'sMembers' => [$movedTagId];
foreach ($tagIds as $id) {
yield 'eval' => $this->redis instanceof \Predis\ClientInterface ? [$lua, 1, $id, $prefix] : [$lua, [$id, $prefix], 1];
}
});

// Return combination of the temporary Tag Set ids and their values (cache ids)
$ids = array_merge($movedTagSetIds, ...iterator_to_array($tagIdSets, false));
$lua = <<<'EOLUA'
local id = KEYS[1]
local cursor = table.remove(ARGV)
redis.call('SREM', '{'..id..'}'..id, unpack(ARGV))

// Delete cache in chunks to avoid overloading the connection
foreach (array_chunk(array_unique($ids), self::BULK_DELETE_LIMIT) as $chunkIds) {
$this->doDelete($chunkIds);
}
return redis.call('SSCAN', '{'..id..'}'..id, cursor, 'COUNT', 5000)
EOLUA;

return true;
}
foreach ($results as $id => [$cursor, $ids]) {
while ($ids || '0' !== $cursor) {
$this->doDelete($ids);

/**
* Renames several keys in order to be able to operate on them without risk of race conditions.
*
* Filters out keys that do not exist before returning new keys.
*
* @see https://redis.io/commands/rename
* @see https://redis.io/topics/cluster-spec#keys-hash-tags
*
* @return array Filtered list of the valid moved keys (only those that existed)
*/
private function renameKeys($redis, array $ids): array
{
$newIds = [];
$uniqueToken = bin2hex(random_bytes(10));
$evalArgs = [$id, $cursor];
array_splice($evalArgs, 1, 0, $ids);

$results = $this->pipeline(static function () use ($ids, $uniqueToken) {
foreach ($ids as $id) {
yield 'rename' => [$id, '{'.$id.'}'.$uniqueToken];
}
}, $redis);
if ($this->redis instanceof \Predis\ClientInterface) {
array_unshift($evalArgs, $lua, 1);
} else {
$evalArgs = [$lua, $evalArgs, 1];
}

foreach ($results as $id => $result) {
if (true === $result || ($result instanceof Status && Status::get('OK') === $result)) {
// Only take into account if ok (key existed), will be false on phpredis if it did not exist
$newIds[] = '{'.$id.'}'.$uniqueToken;
$results = $this->pipeline(function () use ($evalArgs) {
yield 'eval' => $evalArgs;
});

foreach ($results as [$cursor, $ids]) {
// no-op
}
}
}

return $newIds;
return true;
}

private function getRedisEvictionPolicy(): string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Cache\Adapter\TagAwareAdapter;
use Symfony\Component\Cache\LockRegistry;
use Symfony\Component\Cache\Tests\Fixtures\PrunableAdapter;

/**
Expand Down Expand Up @@ -199,6 +200,8 @@ public function testGetItemReturnsCacheMissWhenPoolDoesNotHaveItemAndOnlyHasTags

public function testLog()
{
$lockFiles = LockRegistry::setFiles([__FILE__]);

$logger = $this->createMock(LoggerInterface::class);
$logger
->expects($this->atLeastOnce())
Expand All @@ -209,6 +212,8 @@ public function testLog()

// Computing will produce at least one log
$cache->get('foo', static function (): string { return 'ccc'; });

LockRegistry::setFiles($lockFiles);
}

/**
Expand Down
3 changes: 3 additions & 0 deletions src/Symfony/Component/Cache/Tests/LockRegistryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ class LockRegistryTest extends TestCase
{
public function testFiles()
{
if ('\\' === \DIRECTORY_SEPARATOR) {
$this->markTestSkipped('LockRegistry is disabled on Windows');
}
$lockFiles = LockRegistry::setFiles([]);
LockRegistry::setFiles($lockFiles);
$expected = array_map('realpath', glob(__DIR__.'/../Adapter/*'));
Expand Down
18 changes: 10 additions & 8 deletions src/Symfony/Component/Cache/Traits/RedisTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,6 @@ protected function doHave($id)
protected function doClear($namespace)
{
$cleared = true;
if ($this->redis instanceof \Predis\ClientInterface) {
$evalArgs = [0, $namespace];
} else {
$evalArgs = [[$namespace], 0];
}

$hosts = $this->getHosts();
$host = reset($hosts);
if ($host instanceof \Predis\Client && $host->getConnection() instanceof ReplicationInterface) {
Expand All @@ -385,17 +379,20 @@ protected function doClear($namespace)
$info = $host->info('Server');
$info = $info['Server'] ?? $info;

$pattern = $namespace.'*';

if (!version_compare($info['redis_version'], '2.8', '>=')) {
// As documented in Redis documentation (http://redis.io/commands/keys) using KEYS
// can hang your server when it is executed against large databases (millions of items).
// Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above.
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared;
$args = $this->redis instanceof \Predis\ClientInterface ? [0, $pattern] : [[$pattern], 0];
$cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]) for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $args[0], $args[1]) && $cleared;
continue;
}

$cursor = null;
do {
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000);
$keys = $host instanceof \Predis\ClientInterface ? $host->scan($cursor, 'MATCH', $pattern, 'COUNT', 1000) : $host->scan($cursor, $pattern, 1000);
if (isset($keys[1]) && \is_array($keys[1])) {
$cursor = $keys[0];
$keys = $keys[1];
Expand Down Expand Up @@ -507,6 +504,11 @@ private function pipeline(\Closure $generator, $redis = null): \Generator
$results = $redis->exec();
}

if (!$redis instanceof \Predis\ClientInterface && 'eval' === $command && $redis->getLastError()) {
$e = new \RedisException($redis->getLastError());
$results = array_map(function ($v) use ($e) { return false === $v ? $e : $v; }, $results);
}

foreach ($ids as $k => $id) {
yield $id => $results[$k];
}
Expand Down