Skip to content

[RFC] Rate limiting message handling in Symfony Messenger #50465

@dejagersh

Description

@dejagersh

Description

This proposal introduces a feature into Symfony Messenger which allows for messages to be rate limited, similar to Laravel's RateLimited middleware: https://laravel.com/docs/10.x/queues#rate-limiting

In Laravel you define your limiter as such:

RateLimiter::for('backups', function (object $job) {
    return Limit::perHour(1);
});
class CreateBackup implements ShouldQueue
{
    //

    public function middleware(): array
    {
        return [new RateLimited('backups')];
    }
}

When you dispatch e.g. 10 CreateBackup jobs, it will immediately run the first job. The next 9 jobs will be re-dispatched and made available at T + 1 hour, limiting job executions to 1 per hour.

Our specific use case

In our app our jobs interact a lot with external APIs, specifically to pull data for our clients. For one client there are might be thousands of such jobs in the system. The external API has rate limits in place which we must adhere to. For each client we have an API key for that external API, and rate limits are defined per API key. I would like to be able to elegantly define these rate limits for our messages and redispatch messages if rate limits are (about to be) exceeded.

Proposal

I've tried to implement this myself using a Messenger middleware. It's a rough draft, but this works alright:

class RateLimitMiddleware implements MiddlewareInterface
{
    public function __construct(private RateLimiterFactory $anonymousApiLimiter, private MessageBusInterface $messageBus, private LoggerInterface $logger)
    {

    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        if (false === $envelope->getMessage() instanceof ShouldBeRateLimited || !$envelope->last(ReceivedStamp::class)) {
            return $stack->next()->handle($envelope, $stack);
        }

        /** @var ShouldBeRateLimited $message */
        $message = $envelope->getMessage();

        $limiter = $this->anonymousApiLimiter->create($message->rateLimitKey());

        $limit = $limiter->consume(1);

        if (false === $limit->isAccepted()) {
            $this->logger->info('Rate limit exceeded', [
                'retry-after' => $limit->getRetryAfter()->getTimestamp(),
                'limit' => $limit->getLimit(),
            ]);

            $this->messageBus->dispatch(
                (new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter()))
            );

            return $envelope;
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

ShouldBeRateLimited looks like this:

interface ShouldBeRateLimited
{
    public function rateLimitKey(): string;
}

So a message might look like this:

class ProcessPodcast implements ShouldBeRateLimited
{
     private $podcastId;

     public function __construct(string $podcastId)
     {
         $this->podcastId = $podcastId;
     }

    public function getPodcastId(): string
    {
        return $this->podcastId;
    }

    public function rateLimitKey(): string
    {
        return 'process_podcast:' . $this->getPodcastId();
    }
}

Known issue with this implementation: rate limiter factory can't be customised per message.

Alternative solutions to the same problem

Putting that logic in the handler

public function __invoke(ProcessPodcast $message)
{
    $limit = $this->anonymousApiLimiter->create('process-podcast:' . $message->getPodcastId())->consume(1);

    if (!$limit->isAccepted()) {
        $this->logger->info('Rate limit exceeded', [
            'retry-after' => $limit->getRetryAfter()->getTimestamp(),
            'limit' => $limit->getLimit(),
        ]);

        $this->messageBus->dispatch((new Envelope($message))->with(DelayStamp::delayUntil($limit->getRetryAfter())));

        return;
    }

    // do process podcast logic
}

This works alright, except it gets really tedious when you have to add this to many message handlers, especially when you do more than rate limiting, such as: preventing overlapping jobs, checking circuit breakers etc...

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions