Skip to content

[Cache] add integration with Messenger to allow computing cached values in a worker #30572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,9 @@ private function addCacheSection(ArrayNodeDefinition $rootNode)
->scalarNode('provider')
->info('Overwrite the setting from the default provider for this adapter.')
->end()
->scalarNode('early_expiration_message_bus')
->example('"messenger.default_bus" to send early expiration events to the default Messenger bus.')
->end()
->scalarNode('clearer')->end()
->end()
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
use Symfony\Component\Yaml\Command\LintCommand as BaseYamlLintCommand;
use Symfony\Component\Yaml\Yaml;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\CallbackInterface;
use Symfony\Contracts\Cache\TagAwareCacheInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\Service\ResetInterface;
Expand Down Expand Up @@ -436,6 +437,8 @@ public function load(array $configs, ContainerBuilder $container)
->addTag('container.env_var_loader');
$container->registerForAutoconfiguration(EnvVarProcessorInterface::class)
->addTag('container.env_var_processor');
$container->registerForAutoconfiguration(CallbackInterface::class)
->addTag('container.reversible');
$container->registerForAutoconfiguration(ServiceLocator::class)
->addTag('container.service_locator');
$container->registerForAutoconfiguration(ServiceSubscriberInterface::class)
Expand Down
7 changes: 7 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Symfony\Component\Cache\Adapter\RedisTagAwareAdapter;
use Symfony\Component\Cache\Adapter\TagAwareAdapter;
use Symfony\Component\Cache\Marshaller\DefaultMarshaller;
use Symfony\Component\Cache\Messenger\EarlyExpirationHandler;
use Symfony\Component\HttpKernel\CacheClearer\Psr6CacheClearer;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\TagAwareCacheInterface;
Expand Down Expand Up @@ -212,6 +213,12 @@
null, // use igbinary_serialize() when available
])

->set('cache.early_expiration_handler', EarlyExpirationHandler::class)
->args([
service('reverse_container'),
])
->tag('messenger.message_handler')

->set('cache.default_clearer', Psr6CacheClearer::class)
->args([
[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
<xsd:attribute name="public" type="xsd:boolean" />
<xsd:attribute name="default-lifetime" type="xsd:integer" />
<xsd:attribute name="provider" type="xsd:string" />
<xsd:attribute name="early-expiration-message-bus" type="xsd:string" />
<xsd:attribute name="clearer" type="xsd:string" />
</xsd:complexType>

Expand Down
5 changes: 5 additions & 0 deletions src/Symfony/Component/Cache/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.2.0
-----

* added integration with Messenger to allow computing cached values in a worker

5.1.0
-----

Expand Down
28 changes: 26 additions & 2 deletions src/Symfony/Component/Cache/DependencyInjection/CachePoolPass.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
use Symfony\Component\Cache\Adapter\ChainAdapter;
use Symfony\Component\Cache\Messenger\EarlyExpirationDispatcher;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand All @@ -32,15 +33,21 @@ class CachePoolPass implements CompilerPassInterface
private $cachePoolClearerTag;
private $cacheSystemClearerId;
private $cacheSystemClearerTag;
private $reverseContainerId;
private $reversibleTag;
private $messageHandlerId;

public function __construct(string $cachePoolTag = 'cache.pool', string $kernelResetTag = 'kernel.reset', string $cacheClearerId = 'cache.global_clearer', string $cachePoolClearerTag = 'cache.pool.clearer', string $cacheSystemClearerId = 'cache.system_clearer', string $cacheSystemClearerTag = 'kernel.cache_clearer')
public function __construct(string $cachePoolTag = 'cache.pool', string $kernelResetTag = 'kernel.reset', string $cacheClearerId = 'cache.global_clearer', string $cachePoolClearerTag = 'cache.pool.clearer', string $cacheSystemClearerId = 'cache.system_clearer', string $cacheSystemClearerTag = 'kernel.cache_clearer', string $reverseContainerId = 'reverse_container', string $reversibleTag = 'container.reversible', string $messageHandlerId = 'cache.early_expiration_handler')
{
$this->cachePoolTag = $cachePoolTag;
$this->kernelResetTag = $kernelResetTag;
$this->cacheClearerId = $cacheClearerId;
$this->cachePoolClearerTag = $cachePoolClearerTag;
$this->cacheSystemClearerId = $cacheSystemClearerId;
$this->cacheSystemClearerTag = $cacheSystemClearerTag;
$this->reverseContainerId = $reverseContainerId;
$this->reversibleTag = $reversibleTag;
$this->messageHandlerId = $messageHandlerId;
}

/**
Expand All @@ -55,13 +62,15 @@ public function process(ContainerBuilder $container)
$seed .= '.'.$container->getParameter('kernel.container_class');
}

$needsMessageHandler = false;
$allPools = [];
$clearers = [];
$attributes = [
'provider',
'name',
'namespace',
'default_lifetime',
'early_expiration_message_bus',
'reset',
];
foreach ($container->findTaggedServiceIds($this->cachePoolTag) as $id => $tags) {
Expand Down Expand Up @@ -155,13 +164,24 @@ public function process(ContainerBuilder $container)
if ($tags[0][$attr]) {
$pool->addTag($this->kernelResetTag, ['method' => $tags[0][$attr]]);
}
} elseif ('early_expiration_message_bus' === $attr) {
$needsMessageHandler = true;
$pool->addMethodCall('setCallbackWrapper', [(new Definition(EarlyExpirationDispatcher::class))
->addArgument(new Reference($tags[0]['early_expiration_message_bus']))
->addArgument(new Reference($this->reverseContainerId))
->addArgument((new Definition('callable'))
->setFactory([new Reference($id), 'setCallbackWrapper'])
->addArgument(null)
),
]);
$pool->addTag($this->reversibleTag);
} elseif ('namespace' !== $attr || ArrayAdapter::class !== $class) {
$pool->replaceArgument($i++, $tags[0][$attr]);
}
unset($tags[0][$attr]);
}
if (!empty($tags[0])) {
throw new InvalidArgumentException(sprintf('Invalid "%s" tag for service "%s": accepted attributes are "clearer", "provider", "name", "namespace", "default_lifetime" and "reset", found "%s".', $this->cachePoolTag, $id, implode('", "', array_keys($tags[0]))));
throw new InvalidArgumentException(sprintf('Invalid "%s" tag for service "%s": accepted attributes are "clearer", "provider", "name", "namespace", "default_lifetime", "early_expiration_message_bus" and "reset", found "%s".', $this->cachePoolTag, $id, implode('", "', array_keys($tags[0]))));
}

if (null !== $clearer) {
Expand All @@ -171,6 +191,10 @@ public function process(ContainerBuilder $container)
$allPools[$name] = new Reference($id, $container::IGNORE_ON_UNINITIALIZED_REFERENCE);
}

if (!$needsMessageHandler) {
$container->removeDefinition($this->messageHandlerId);
}

$notAliasedCacheClearerId = $this->cacheClearerId;
while ($container->hasAlias($this->cacheClearerId)) {
$this->cacheClearerId = (string) $container->getAlias($this->cacheClearerId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Cache\Messenger;

use Psr\Log\LoggerInterface;
use Symfony\Component\Cache\Adapter\AdapterInterface;
use Symfony\Component\Cache\CacheItem;
use Symfony\Component\DependencyInjection\ReverseContainer;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;

/**
* Sends the computation of cached values to a message bus.
*/
class EarlyExpirationDispatcher
{
private $bus;
private $reverseContainer;
private $callbackWrapper;

public function __construct(MessageBusInterface $bus, ReverseContainer $reverseContainer, callable $callbackWrapper = null)
{
$this->bus = $bus;
$this->reverseContainer = $reverseContainer;
$this->callbackWrapper = $callbackWrapper;
}

public function __invoke(callable $callback, CacheItem $item, bool &$save, AdapterInterface $pool, \Closure $setMetadata, LoggerInterface $logger = null)
{
if (!$item->isHit() || null === $message = EarlyExpirationMessage::create($this->reverseContainer, $callback, $item, $pool)) {
// The item is stale or the callback cannot be reversed: we must compute the value now
$logger && $logger->info('Computing item "{key}" online: '.($item->isHit() ? 'callback cannot be reversed' : 'item is stale'), ['key' => $item->getKey()]);

return null !== $this->callbackWrapper ? ($this->callbackWrapper)($callback, $item, $save, $pool, $setMetadata, $logger) : $callback($item, $save);
}

$envelope = $this->bus->dispatch($message);

if ($logger) {
if ($envelope->last(HandledStamp::class)) {
$logger->info('Item "{key}" was computed online', ['key' => $item->getKey()]);
} else {
$logger->info('Item "{key}" sent for recomputation', ['key' => $item->getKey()]);
}
}

// The item's value is not stale, no need to write it to the backend
$save = false;

return $message->getItem()->get() ?? $item->get();
}
}
80 changes: 80 additions & 0 deletions src/Symfony/Component/Cache/Messenger/EarlyExpirationHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Cache\Messenger;

use Symfony\Component\Cache\CacheItem;
use Symfony\Component\DependencyInjection\ReverseContainer;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

/**
* Computes cached values sent to a message bus.
*/
class EarlyExpirationHandler implements MessageHandlerInterface
{
private $reverseContainer;
private $processedNonces = [];

public function __construct(ReverseContainer $reverseContainer)
{
$this->reverseContainer = $reverseContainer;
}

public function __invoke(EarlyExpirationMessage $message)
{
$item = $message->getItem();
$metadata = $item->getMetadata();
$expiry = $metadata[CacheItem::METADATA_EXPIRY] ?? 0;
$ctime = $metadata[CacheItem::METADATA_CTIME] ?? 0;

if ($expiry && $ctime) {
// skip duplicate or expired messages

$processingNonce = [$expiry, $ctime];
$pool = $message->getPool();
$key = $item->getKey();

if (($this->processedNonces[$pool][$key] ?? null) === $processingNonce) {
return;
}

if (microtime(true) >= $expiry) {
return;
}

$this->processedNonces[$pool] = [$key => $processingNonce] + ($this->processedNonces[$pool] ?? []);

if (\count($this->processedNonces[$pool]) > 100) {
array_pop($this->processedNonces[$pool]);
}
}

static $setMetadata;

$setMetadata = $setMetadata ?? \Closure::bind(
function (CacheItem $item, float $startTime) {
if ($item->expiry > $endTime = microtime(true)) {
$item->newMetadata[CacheItem::METADATA_EXPIRY] = $item->expiry;
$item->newMetadata[CacheItem::METADATA_CTIME] = (int) ceil(1000 * ($endTime - $startTime));
}
},
null,
CacheItem::class
);

$startTime = microtime(true);
$pool = $message->findPool($this->reverseContainer);
$callback = $message->findCallback($this->reverseContainer);
$value = $callback($item);
$setMetadata($item, $startTime);
$pool->save($item->set($value));
}
}
97 changes: 97 additions & 0 deletions src/Symfony/Component/Cache/Messenger/EarlyExpirationMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Cache\Messenger;

use Symfony\Component\Cache\Adapter\AdapterInterface;
use Symfony\Component\Cache\CacheItem;
use Symfony\Component\DependencyInjection\ReverseContainer;

/**
* Conveys a cached value that needs to be computed.
*/
final class EarlyExpirationMessage
{
private $item;
private $pool;
private $callback;

public static function create(ReverseContainer $reverseContainer, callable $callback, CacheItem $item, AdapterInterface $pool): ?self
{
try {
$item = clone $item;
$item->set(null);
} catch (\Exception $e) {
return null;
}

$pool = $reverseContainer->getId($pool);

if (\is_object($callback)) {
if (null === $id = $reverseContainer->getId($callback)) {
return null;
}

$callback = '@'.$id;
} elseif (!\is_array($callback)) {
$callback = (string) $callback;
} elseif (!\is_object($callback[0])) {
$callback = [(string) $callback[0], (string) $callback[1]];
} else {
if (null === $id = $reverseContainer->getId($callback[0])) {
return null;
}

$callback = ['@'.$id, (string) $callback[1]];
}

return new self($item, $pool, $callback);
}

public function getItem(): CacheItem
{
return $this->item;
}

public function getPool(): string
{
return $this->pool;
}

public function getCallback()
{
return $this->callback;
}

public function findPool(ReverseContainer $reverseContainer): AdapterInterface
{
return $reverseContainer->getService($this->pool);
}

public function findCallback(ReverseContainer $reverseContainer): callable
{
if (\is_string($callback = $this->callback)) {
return '@' === $callback[0] ? $reverseContainer->getService(substr($callback, 1)) : $callback;
}
if ('@' === $callback[0][0]) {
$callback[0] = $reverseContainer->getService(substr($callback[0], 1));
}

return $callback;
}

private function __construct(CacheItem $item, string $pool, $callback)
{
$this->item = $item;
$this->pool = $pool;
$this->callback = $callback;
}
}
Loading