Contents

AWS Simple Queue Service's Super Powers


The Simple Queue Service is one of the most popular and versatile services in the AWS catalog. It is used as a glue among many services and integrations. But although it’s primary task is to deliver messages, SQS has some features that can be used in many creative ways, saving time and money, as well as reducing code complexity for the whole project.

Sub-Minute Scheduler

The first SQS feature that we’ll take advantage of is delayed messages, that will postpone delivery until the delay time has expired. Together with batch message sending, a task will be called many times in short executions, saving computing ($$$$) from a long running operation with some sleep calls among executions.

We can wrap this logic into a service:

import { v4 } from 'uuid';
import { SQSClient, SendMessageBatchCommand } from "@aws-sdk/client-sqs";

export class SchedulingService {
    sqsClient: SQSClient;

    constructor(private queueUrl: string, private batchSize: number) {
        this.sqsClient = new SQSClient();
    }

    async dispatch(taskPrefix: string, delayInterval: number): Promise<void> {
        const messages: SendMessageBatchRequestEntry[] = [];

        for (let i = 1; i <= this.batchSize; i++) {
            const step = i * this.batchSize;
            const id = v4();

            message.push({
                Id: `${taskPrefix}-${id}`,
                MessageBody: JSON.stringify({
                    StepId: step
                }),
                DelaySeconds: step * delayInterval
            });
        }

        const command = new SendMessageBatchCommand({
            QueueUrl: this.queueUrl,
            Entries: messages
        });
        await this.sqsClient.send(command)
    }

}

In a scheduled Lambda function, service dispatches messages within a specific time interval:

import { SchedulingService } from '.../scheduling.service';

// Variables passed by environment
const queueUrl = ...;
const batchSize = ...;
const taskPrefix = ...;

const scheduler = new SchedulingService(queueUrl, batchSize);

export async function schedulingHandler(_event: any) {

    await scheduler.dispatch(taskPrefix, 6);
}

The scheduling service can be used in many tasks, as well as a task can own many scheduling services, one for each task.

The tasks should be registered as receivers for the queues configured in the service. Each execution is started by receiving a message.

Exercise
As a exercise for the readers, the scheduling service can be extended to send data to the tasks.
Semaphores

Since the messages are independent from one another, an instance of a task can still be running when the next instance starts. To solve this problem, we can develop a semaphore to allow only one task to execute at a time. As this is a well known concept used by many big systems, there are many open source and commercial options available, but we’ll propose a simpler approach.

The most important aspect for a semaphore is that it should be updated or queried atomically, so there’s no interference between operations, and even for two sequencial updates the second update should not happen if the requested state is the same as the first one.

We’ll implement a binary semaphore, that only has the open and closed states. The most common implementation is to use a database that supports atomic operations. In the AWS catalog, the most common options are DynamoDB and Redis (through AWS Elasticache). For DynamoDB there is the DynamoDB Lock Client library that is written in Java. For our Typescript example, we’ll implement our Semaphore Service using DynamoDB.

Note
Almost every application needs the support of a database, so using DynamoDB should not be an additional cost.
Exercise
A semaphore service implemented using Redis can also be implemented as an exercise. However, care must be taken with clustering, as replication has a delay due to the communication between instances.
import { DynamoDBClient, UpdateItemCommand } from "@aws-sdk/client-dynamodb";

export class SemaphoreService {
    dynamoClient: DynamoDBClient;

    constructor(private tableName: string, private keyName: string) {
        this.dynamoClient = new DynamoDBClient({});
    }

    async acquire(): Promise<boolean> {
        try {
            await this.dispathCommand(true);
        } catch (error) {
            return Promise.resolve(false);
        }

        return Promise.resolve(true);
    }

    async release(): Promise<boolean> {
        try {
            await this.dispathCommand(false);
        } catch (error) {
            return Promise.resolve(false);
        }

        return Promise.resolve(true);
    }

    private async dispathCommand(requestLock: boolean): Promise<any> {
        const cmd = new UpdateItemCommand({
            TableName: this.tableName,
            Key: {
                PK: { S: this.keyName }
            },
            UpdateExpression: 'SET Value = :NewValue',
            ConditionExpression: 'Value = :OldValue'
        });

        if (requestLock) {
            cmd.input.ExpressionAttributeValues = {
                ':NewValue': { N: '1' },
                ':OldValue': { N: '0' },
            }
        } else {
            cmd.input.ExpressionAttributeValues = {
                ':NewValue': { N: '0' },
                ':OldValue': { N: '1' },
            }
        }

        return await this.dynamoClient.send(cmd);
    }
}

The semaphore is then called at the beginning of the task Lambda function. After the the process begins, we try to acquire the semaphore; in case of failure, we can simply skip the execution and exit, because the next iteration will be called after some seconds.

import { SemaphoreService } from '.../semaphore.service';

// Variables passed by environment
const tableName = ...;
const semaphoreName = ...;

const semaphore = new SemaphoreService(tableName, semaphoreName);

export async function taskHandler(_event: any) {
    const acquired = await semaphore.acquire();
    if (!acquired) {
        return;
    }

    // Process with the task
    ...
}
Counter Semaphores

A counter semaphore is needed when you need to limit the amount of simultaneous calls to a service or execution flow. It’s implemented by limiting the amount of available locks in the semaphore. Given our semaphore example, it’s a simple modification, but I’ll leave this task as an exercise to the readers.

FIFO Lined Up Data

There’s an interesting example in this sample project that uses an SQS queue to store players waiting to be matched in a game session.

When the player requests a game session, the handler first tries to read a message from a SQS queue (which contains a connection info for the game server). If no message is available (i.e. there is no player waiting), then the handler sends a message to the SQS queue containing the server’s connection info. Below is a snippet from the requestgamesession.py:

def lambda_handler(event, context):

    try:
        # Check if the SQS queue has any player waiting
        resp = sqs_client.receive_message(
            QueueUrl=...,
            ...
        )

        # Delete message from SQS queue
        sqs_client.delete_message(
            QueueUrl=...,
            ReceiptHandle=...
        )

        # Send existing server connection to the requesting player
        connection_info = resp['Messages'][0]['Body']
        ...
        ...

    except:
        print('No player waiting')

    # Request a server from GameLift
    connection_info = ...

    # Send server information to the SQS queue
    resp = sqs_client.send_message(
        QueueUrl=...,
        MessageBody=(
            connection_info
        )
    )

    # Send new server connection to the requesting player
    ...

Two important points to notice: first is that, as the Lambda function is actively reading from the SQS queue, the message should be deleted ‘manually’. Second, the SQS can be created as FIFO, so the sessions will be matched in the order they were added to the SQS queue, but this is defined by the developer/system.

Global Accelerator

As the last important feature, SQS can be integrated with Global Accelerator, allowing a single queue to be available from other regions easier than much more complex cross-region replication.

Currently, associating a Global Accelerator endpoint with an SQS queue can only be done by using the AWS CDK framework (the example below is from a AWS Community Builders post from dev.to ):

import * as cdk from 'aws-cdk-lib';
import * as globalaccelerator from '@aws-cdk/aws-globalaccelerator';
import * as sqs from '@aws-cdk/aws-sqs';

export class MyStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create a new Global Accelerator
    const accelerator = new globalaccelerator.GlobalAccelerator(this, 'Accelerator', {
      acceleratorType: globalaccelerator.AcceleratorType.STATIC,
      enabled: true,
      regions: [globalaccelerator.Regions.US_EAST_1, globalaccelerator.Regions.US_WEST_2]
    });

    // Create an SQS queue
    const queue = new sqs.Queue(this, 'Queue', {
      visibilityTimeout: cdk.Duration.seconds(300)
    });

    // Create an accelerator endpoint for the queue
    const endpoint = new globalaccelerator.AcceleratorEndpoint(this, 'Endpoint', {
      accelerator,
      port: 80,
      protocol: globalaccelerator.Protocol.TCP,
      resource: queue
    });

    // Create a listener for the endpoint
    new globalaccelerator.Listener(this, 'Listener', {
      accelerator,
      port: 80,
      protocol: globalaccelerator.Protocol.TCP,
      endpointGroups: [{
        endpoints: [endpoint]
      }]
    });
  }
}

Conclusion

All the examples above are non-intuitive ways to work with SQS, but they can save development time and reduce the complexity of your projects, by using a well established API in creative ways. Feel free to use and evolve the ideas presented here in your own projects.

References