-
-
Notifications
You must be signed in to change notification settings - Fork 9.7k
Description
Description
This proposal introduces a feature into Symfony Messenger which allows for messages to be rate limited, similar to Laravel's RateLimited
middleware: https://laravel.com/docs/10.x/queues#rate-limiting
In Laravel you define your limiter as such:
RateLimiter::for('backups', function (object $job) {
return Limit::perHour(1);
});
class CreateBackup implements ShouldQueue
{
//
public function middleware(): array
{
return [new RateLimited('backups')];
}
}
When you dispatch e.g. 10 CreateBackup
jobs, it will immediately run the first job. The next 9 jobs will be re-dispatched and made available at T + 1 hour, limiting job executions to 1 per hour.
Our specific use case
In our app our jobs interact a lot with external APIs, specifically to pull data for our clients. For one client there are might be thousands of such jobs in the system. The external API has rate limits in place which we must adhere to. For each client we have an API key for that external API, and rate limits are defined per API key. I would like to be able to elegantly define these rate limits for our messages and redispatch messages if rate limits are (about to be) exceeded.
Proposal
I've tried to implement this myself using a Messenger middleware. It's a rough draft, but this works alright:
class RateLimitMiddleware implements MiddlewareInterface
{
public function __construct(private RateLimiterFactory $anonymousApiLimiter, private MessageBusInterface $messageBus, private LoggerInterface $logger)
{
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (false === $envelope->getMessage() instanceof ShouldBeRateLimited || !$envelope->last(ReceivedStamp::class)) {
return $stack->next()->handle($envelope, $stack);
}
/** @var ShouldBeRateLimited $message */
$message = $envelope->getMessage();
$limiter = $this->anonymousApiLimiter->create($message->rateLimitKey());
$limit = $limiter->consume(1);
if (false === $limit->isAccepted()) {
$this->logger->info('Rate limit exceeded', [
'retry-after' => $limit->getRetryAfter()->getTimestamp(),
'limit' => $limit->getLimit(),
]);
$this->messageBus->dispatch(
(new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter()))
);
return $envelope;
}
return $stack->next()->handle($envelope, $stack);
}
}
ShouldBeRateLimited
looks like this:
interface ShouldBeRateLimited
{
public function rateLimitKey(): string;
}
So a message might look like this:
class ProcessPodcast implements ShouldBeRateLimited
{
private $podcastId;
public function __construct(string $podcastId)
{
$this->podcastId = $podcastId;
}
public function getPodcastId(): string
{
return $this->podcastId;
}
public function rateLimitKey(): string
{
return 'process_podcast:' . $this->getPodcastId();
}
}
Known issue with this implementation: rate limiter factory can't be customised per message.
Alternative solutions to the same problem
Putting that logic in the handler
public function __invoke(ProcessPodcast $message)
{
$limit = $this->anonymousApiLimiter->create('process-podcast:' . $message->getPodcastId())->consume(1);
if (!$limit->isAccepted()) {
$this->logger->info('Rate limit exceeded', [
'retry-after' => $limit->getRetryAfter()->getTimestamp(),
'limit' => $limit->getLimit(),
]);
$this->messageBus->dispatch((new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter())));
return;
}
// do process podcast logic
}
This works alright, except it gets really tedious when you have to add this to many message handlers, especially when you do more than rate limiting, such as: preventing overlapping jobs, checking circuit breakers etc...