Conteúdos

Super Poderes do AWS Simple Queue Service


O Simple Queue Service é um dos serviços mais populares e versáteis do catálogo da AWS. É usado como uma cola entre muitos serviços e integrações. Mas, embora sua função principal seja entregar mensagens, o SQS possui alguns recursos que podem ser usados de várias maneiras criativas, economizando tempo e dinheiro, além de reduzir a complexidade do código de todo o projeto.

Sub-Minute Scheduler

O primeiro recurso do SQS que vamos aproveitar são as delayed messages, postergando a entrega até que o tempo de atraso tenha expirado. Juntamente com o envio de mensagens em lote, uma tarefa será chamada várias vezes em execuções curtas, economizando computação ($$$$) de uma operação de longa execução com algumas chamadas de suspensão entre as execuções.

Podemos agrupar essa lógica em um serviço:

import { v4 } from 'uuid';
import { SQSClient, SendMessageBatchCommand } from "@aws-sdk/client-sqs";

export class SchedulingService {
    sqsClient: SQSClient;

    constructor(private queueUrl: string, private batchSize: number) {
        this.sqsClient = new SQSClient();
    }

    async dispatch(taskPrefix: string, delayInterval: number): Promise<void> {
        const messages: SendMessageBatchRequestEntry[] = [];

        for (let i = 1; i <= this.batchSize; i++) {
            const step = i * this.batchSize;
            const id = v4();

            message.push({
                Id: `${taskPrefix}-${id}`,
                MessageBody: JSON.stringify({
                    StepId: step
                }),
                DelaySeconds: step * delayInterval
            });
        }

        const command = new SendMessageBatchCommand({
            QueueUrl: this.queueUrl,
            Entries: messages
        });
        await this.sqsClient.send(command)
    }

}

Em uma função Lambda agendada, você chama seu serviço para despachar mensagens dentro de um intervalo de tempo específico:

import { SchedulingService } from '.../scheduling.service';

// Variables passed by environment
const queueUrl = ...;
const batchSize = ...;
const taskPrefix = ...;

const scheduler = new SchedulingService(queueUrl, batchSize);

export async function schedulingHandler(_event: any) {

    await scheduler.dispatch(taskPrefix, 6);
}

O serviço de agendamento pode ser utilizado em diversas tarefas, assim como uma tarefa pode possuir vários serviços de agendamento, um para cada tarefa.

As tarefas devem ser cadastradas como receptoras das filas configuradas no serviço. Cada execução é iniciada ao receber uma mensagem.

Exercise
Como exercício para os leitores, o serviço de agendamento pode ser estendido para enviar dados para as tarefas.
Semáforos

Como as mensagens são independentes entre si, uma instância de uma tarefa ainda pode estar em execução quando a próxima instância for iniciada. Para resolver esse problema, podemos desenvolver um semáforo para permitir que apenas uma tarefa seja executada por vez. Como este é um conceito bem conhecido usado por muitos grandes sistemas, existem muitas opções de código aberto e comerciais disponíveis, mas vamos propor uma abordagem mais simples.

O aspecto mais importante para um semáforo é que ele deve ser atualizado ou consultado atomicamente; cada operação não interfere entre si, e mesmo para duas atualizações sequenciais a segunda atualização não deve acontecer se o estado solicitado for o mesmo da primeira.

Implementaremos um semáforo binário, que possui apenas os estados aberto e fechado. A implementação mais comum é usar um banco de dados que suporte operações atômicas. No catálogo da AWS, as opções mais comuns são DynamoDB e Redis (através do AWS Elasticache). Para DynamoDB existe a biblioteca DynamoDB Lock Client que é escrita em Java. Para nosso exemplo de Typescript, implementaremos nosso Semaphore Service usando o DynamoDB.

Note
Como quase todos os aplicativos precisam do suporte de um banco de dados, o uso do DynamoDB não deve ser um custo adicional.
Exercise
Um serviço de semáforo implementado usando Redis também pode ser implementado como um exercício. Porém, deve se tomar cuidado com cluster, pois a replicação tem um atraso devido à replicação entre as instâncias.
import { DynamoDBClient, UpdateItemCommand } from "@aws-sdk/client-dynamodb";

export class SemaphoreService {
    dynamoClient: DynamoDBClient;

    constructor(private tableName: string, private keyName: string) {
        this.dynamoClient = new DynamoDBClient({});
    }

    async acquire(): Promise<boolean> {
        try {
            await this.dispathCommand(true);
        } catch (error) {
            return Promise.resolve(false);
        }

        return Promise.resolve(true);
    }

    async release(): Promise<boolean> {
        try {
            await this.dispathCommand(false);
        } catch (error) {
            return Promise.resolve(false);
        }

        return Promise.resolve(true);
    }

    private async dispathCommand(requestLock: boolean): Promise<any> {
        const cmd = new UpdateItemCommand({
            TableName: this.tableName,
            Key: {
                PK: { S: this.keyName }
            },
            UpdateExpression: 'SET Value = :NewValue',
            ConditionExpression: 'Value = :OldValue'
        });

        if (requestLock) {
            cmd.input.ExpressionAttributeValues = {
                ':NewValue': { N: '1' },
                ':OldValue': { N: '0' },
            }
        } else {
            cmd.input.ExpressionAttributeValues = {
                ':NewValue': { N: '0' },
                ':OldValue': { N: '1' },
            }
        }

        return await this.dynamoClient.send(cmd);
    }
}

O semáforo é então chamado no início da tarefa da função Lambda. Após o início do processo, tentamos adquirir o semáforo; em caso de falha, podemos simplesmente pular a execução e sair, pois a próxima iteração será chamada após alguns segundos.

import { SemaphoreService } from '.../semaphore.service';

// Variables passed by environment
const tableName = ...;
const semaphoreName = ...;

const semaphore = new SemaphoreService(tableName, semaphoreName);

export async function taskHandler(_event: any) {
    const acquired = await semaphore.acquire();
    if (!acquired) {
        return;
    }

    // Process with the task
    ...
}
Semáforo de Contador

Um semáforo de contador é necessário quando você precisa limitar a quantidade de chamadas simultâneas para um serviço ou fluxo de execução. É implementado limitando a quantidade de bloqueios disponíveis no semáforo. Dado nosso exemplo de semáforo, é uma modificação simples, mas deixarei esta tarefa como um exercício para os leitores.

Dados enfileirados em uma fila FIFO

Há um exemplo interessante neste projeto de amostra que usa uma fila SQS para armazenar jogadores esperando para serem correspondidos em um jogo sessão.

Quando o jogador solicita uma sessão de jogo, o manipulador primeiro tenta ler uma mensagem de uma fila SQS (que contém informações de conexão para o servidor do jogo). Se nenhuma mensagem estiver disponível (ou seja, não há jogador esperando), o manipulador envia uma mensagem para a fila SQS contendo as informações de conexão do servidor. Abaixo está um trecho do requestgamesession.py:

def lambda_handler(event, context):

    try:
        # Check if the SQS queue has any player waiting
        resp = sqs_client.receive_message(
            QueueUrl=...,
            ...
        )

        # Delete message from SQS queue
        sqs_client.delete_message(
            QueueUrl=...,
            ReceiptHandle=...
        )

        # Send existing server connection to the requesting player
        connection_info = resp['Messages'][0]['Body']
        ...
        ...

    except:
        print('No player waiting')

    # Request a server from GameLift
    connection_info = ...

    # Send server information to the SQS queue
    resp = sqs_client.send_message(
        QueueUrl=...,
        MessageBody=(
            connection_info
        )
    )

    # Send new server connection to the requesting player
    ...

Dois pontos importantes a serem observados: primeiro é que, como a função Lambda está lendo ativamente da fila SQS, a mensagem deve ser excluída ‘manualmente’. Em segundo lugar, o SQS pode ser criado como FIFO, portanto, as sessões serão correspondidas na ordem em que foram adicionadas à fila do SQS, mas isso é definido pelo desenvolvedor/sistema.

Global Accelerator

Como último recurso importante, o SQS pode ser integrado ao Global Accelerator, permitindo que uma única fila esteja disponível de outras regiões mais facilmente do que uma replicação entre regiões muito mais complexa.

Atualmente, associar um endpoint Global Accelerator a uma fila SQS só pode ser feito usando o AWS CDK framework (o exemplo abaixo é de uma postagem do AWS Community Builders de dev.to ):

import * as cdk from 'aws-cdk-lib';
import * as globalaccelerator from '@aws-cdk/aws-globalaccelerator';
import * as sqs from '@aws-cdk/aws-sqs';

export class MyStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create a new Global Accelerator
    const accelerator = new globalaccelerator.GlobalAccelerator(this, 'Accelerator', {
      acceleratorType: globalaccelerator.AcceleratorType.STATIC,
      enabled: true,
      regions: [globalaccelerator.Regions.US_EAST_1, globalaccelerator.Regions.US_WEST_2]
    });

    // Create an SQS queue
    const queue = new sqs.Queue(this, 'Queue', {
      visibilityTimeout: cdk.Duration.seconds(300)
    });

    // Create an accelerator endpoint for the queue
    const endpoint = new globalaccelerator.AcceleratorEndpoint(this, 'Endpoint', {
      accelerator,
      port: 80,
      protocol: globalaccelerator.Protocol.TCP,
      resource: queue
    });

    // Create a listener for the endpoint
    new globalaccelerator.Listener(this, 'Listener', {
      accelerator,
      port: 80,
      protocol: globalaccelerator.Protocol.TCP,
      endpointGroups: [{
        endpoints: [endpoint]
      }]
    });
  }
}

Conclusão

Todos os exemplos acima são formas não intuitivas de trabalhar com SQS, mas podem economizar tempo de desenvolvimento e reduzir a complexidade de seus projetos, usando uma API bem estabelecida de maneiras criativas. Sinta-se à vontade para usar e evoluir as ideias aqui apresentadas em seus próprios projetos.

Referências