AWS Event Bridge Scheduler as a workaround to SQS Fifo queue limitations

Cvetanov Goce
Trustpilot Technology
5 min readApr 22, 2024

--

Intro

At Trustpilot, we rely heavily on AWS SQS for making our systems more robust and processing the operations that we can in an async manner. In most of these scenarios, the producer pushes a message to a queue, a consumer picks it up immediately and starts processing it. In other cases, we might want to wait a bit between the time the message is produced and it is consumed.

AWS offers the concept of delay queues where one can configure a SQS queue to delay the initial processing of a message by making it invisible to consumers. This is a setting that applies on the queue level. If one needs to delay only a subset of the messages and not all of the messages in a queue, then one can use message timers where the producer sets a delivery delay on the individual messages.

Problem

The problem that we ran into was a use-case where we wanted to trigger a delayed processing of some messages that were being produced to a fifo queue. Setting the delay on the entire queue was not an option as these use-cases were rare and we did not want to penalize the rest. We were also not able to make use of individual message delays because this feature is not available for fifo queues.

FIFO queues don’t support timers on individual messages. [SQS docs]

One might argue that this use-case we had on our hands is not exactly fitting in the architecture of having a fifo queue in place, but that’s beyond the scope of this blog post. Let’s just say that in these cases, we do not really care about the message ordering and just want to get this through to the consumer, but with a delay.

Solution

EventBridge Scheduler

EventBridge Scheduler sounded like the perfect solution to our problem. We need to post a message to a fifo queue at a specific time and we are not that concerned with the message ordering. The one-time schedule type is exactly what we need.

EventBridge Scheduler has some quotas that one needs to keep in mind. In our case, we considered the following quotas:

  • Quota for 50 requests per second for creating schedules. This is not something that bothered us at all because our use-case is a rare one and we will not come even close to this limitation.
  • Quota for a million schedules per region in total. This is also not something that we will come close to hitting anytime soon, but in any case, since we have proper monitoring in place in the consumer, we were completely fine with deleting the schedules once they are complete (ie. they have sent the message to the queue).

We set up a ScheduleGroup so that all of the schedules we create for this use-case are grouped together. In our case, this might not have been necessary because we immediately delete the schedules after completion, but it still might come in handy for looking at cost explorer for example, or if we introduce another use-case for the scheduler.

At the end of the article, you can find some code snippets around how we set up and used the AWS resources mentioned. These code snippets have been added for illustrative purposes and are there simply for guidance.

Other

We had some other ideas to solve our problem and were ready to explore them further, but since the EventBridge Scheduler solution got the job done, we did not take these other ideas into consideration. Nevertheless, one might find them useful for other similar problems, so here they are.

Introduce a new (standard) queue for the rare use-case producer to send messages to (with the proper delays). The we either add a new Lambda consuming the messages from the standard queue and forwarding them to the fifo queue, or we update the fifo queue consumer to consume messages from both queues.

Add a new field in the message body to the fifo queue and have the fifo queue consumer handle the delay. In our specific case the delays are small (a minute), so we can have the consumer sleep for that amount, or play around with the visibility timeout of the message and send it back to the queue for a retry.

Code

Code snippets for the EventBridge Scheduler solution.

CDK code for setting up the AWS resources:

import * as cdk from "aws-cdk-lib";
import * as iam from "aws-cdk-lib/aws-iam";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as scheduler from "aws-cdk-lib/aws-scheduler";
import * as sqs from "aws-cdk-lib/aws-sqs";

// Set up EventBridge Scheduler-related resources
const scheduleGroup = new scheduler.CfnScheduleGroup(this, 'SchedulerGroup', {
name: 'SQSMessageDelayer',
});
const schedulerRole = new iam.Role(this, 'SchedulerRole', {
assumedBy: new iam.ServicePrincipal('scheduler.amazonaws.com'),
});
// DLQ for messages that the Scheduler can not deliver
const schedulerDlq = new sqs.Queue(this, 'SchedulerDLQ', {
retentionPeriod: cdk.Duration.days(14),
});
schedulerDlq.grantSendMessages(schedulerRole);

// Pre-existing fifo queue, grant send permissions to the scheduler role
const queue = new sqs.Queue(this, 'Queue', {
fifo: true,
contentBasedDeduplication: true,
});
queue.grantSendMessages(schedulerRole);

// Lambda function that handles our rare use-case
const rareUseCaseProducer = new lambda.Function(this, 'RareUseCaseProducer', {
// ... other props
environment: {
QUEUE_ARN: queue.queueArn,
SCHEDULER_ROLE_ARN: schedulerRole.roleArn,
SCHEDULER_DLQ_ARN: schedulerDlq.queueArn,
},
});
rareUseCaseProducer.role?.addManagedPolicy(
iam.ManagedPolicy.fromAwsManagedPolicyName(
'AmazonEventBridgeSchedulerFullAccess'
)
);

Code in our rare use-case producer:

import { randomUUID } from "crypto";
import { SchedulerClient, CreateScheduleCommand } from "@aws-sdk/client-scheduler";
import { config } from "./config";

const scheduler = new SchedulerClient({
region: config.region,
maxAttempts: 3,
});

export async function scheduleMessage(
messageBody: MessageBody,
delayInMs: number = 1000 * 60,
) {
const now = new Date();
const scheduledAt = new Date(now.getTime() + delayInMs);
const scheduledAtIso = scheduledAt.toISOString();
const scheduledAtWithoutMS = scheduledAtIso.substring(0, scheduledAtIso.length - 5);

const message = JSON.stringify({
...messageBody,
// force message uniqueness through content-based deduplication
dedup: randomUUID(),
});

const scheduledCommand = new CreateScheduleCommand({
Name: `Delayed-message-${messageBody.id}-${scheduledAt.getTime()}`,
GroupName: "SQSMessageDelayer",
ScheduleExpression: `at(${scheduledAtWithoutMS})`,
FlexibleTimeWindow: {
Mode: "OFF",
},
Target: {
Arn: config.queueArn,
RoleArn: config.schedulerRoleArn,
Input: message,
SqsParameters: {
MessageGroupId: messageBody.id,
},
DeadLetterConfig: {
Arn: config.schedulerDlqArn,
},
},
ActionAfterCompletion: "DELETE",
});
await scheduler.send(scheduledCommand);
}

--

--