Conteúdos

EZMissile System Parte 1: Actor Cache


No meu novo projeto, tenho um requisito específico para preencher todo o estágio com projéteis, mísseis e bombas. Para isso, é necessário um sistema de cache para gerenciar a quantidade de atores na tela.

Se você tiver alguma dúvida se isso é realmente necessário, pode verificar o código para UWorld::SpawnActor para a quantidade de validação que esta função executa. Além disso, há muitas postagens falando sobre reduzir o número de atores que executam o evento Tick cada quadro (embora não falemos sobre isso neste post).

Além dos problemas que precisamos resolver, o escopo deste plugin requer adição e remoção frequentes de objetos na tela. Além disso, os atores requerem componentes como sistemas de partículas e detecção de colisão, então o uso de instanced static mesh components não parece uma boa ideia.

Aqui falaremos sobre as iterações durante a criação do sistema de cache e como ele evoluiu de uma implementação simples (que ajudou muito) para uma biblioteca altamente performática. Ele deve ajudar no gerenciamento de centenas (se não milhares) de atores, então o desempenho é crítico.

Primeira iteração: usando TMultiMap

Comecei com uma ideia muito simples de uma classe contendo um TMultiMap mantendo uma lista de Atores por seus tipos de classe.

#pragma once

#include "CoreMinimal.h"
#include "Containers/Map.h"
#include "Containers/Set.h"
#include "Templates/SubclassOf.h"
#include "Algo/Find.h"
#include "Kismet/KismetSystemLibrary.h"

template <typename InElementType>
class TActorPool
{
public:
	//static_assert(TIsDerivedFrom<InElementType, AActor>::IsDerived, "ElementType must be derived from AActor");

	using KeyType = TSubclassOf<InElementType>;
	using MapType = TMultiMap<KeyType, InElementType*>;
	using ArrayType = TArray<InElementType*>;
	using SetType = TSet<InElementType*>;
	using StatsType = TMap<KeyType, int32>;

	TActorPool() = default;
	TActorPool(TActorPool&&) = delete;
	TActorPool(const TActorPool&) = delete;
	~TActorPool() = default;

	// Extract key from item
	FORCEINLINE KeyType ParseKeyFromItem(InElementType* Item) { return Item->GetClass(); }

	void Add(InElementType* NewObject)
	{
		KeyType Key = ParseKeyFromItem(NewObject);
		PerClassMap.AddUnique(Key, NewObject);
	}

	void Remove(InElementType* NewObject)
	{
		KeyType Key = ParseKeyFromItem(NewObject);
		PerClassMap.RemoveSingle(Key, NewObject);
	}

	int32 Num() const
	{
		return PerClassMap.Num();
	}

	int32 Num(KeyType Key) const
	{
		return PerClassMap.Num(Key);
	}

	InElementType* FindByPredicate(TSubclassOf<InElementType> Type, TFunctionRef<bool(const InElementType*)> Predicate) const
	{
		TArray<InElementType*> FoundObjects;
		PerClassMap.MultiFind(Type, FoundObjects);

		InElementType** FoundObject = Algo::FindByPredicate(FoundObjects, [&Predicate](const InElementType* Item)
			{
				// Check if missile is valid, for safety
				if (!UKismetSystemLibrary::IsValid(Item))
				{
					// Problem with this instance, destroy it
					UE_LOG(LogTemp, Display, TEXT("TMissilePool::FindByPredicate (line %d) -> %s is invalid"), __LINE__, *UKismetSystemLibrary::GetDisplayName(Item));
					return false;
				}

				return Predicate(Item);
			});

		return FoundObject ? *FoundObject : nullptr;
	}

	/* Updates and return information about cache size for each subtype */
	StatsType& QueryPoolStats() 
	{
		TSet<KeyType> Keys;
		PerClassMap.GetKeys(Keys);
		for (KeyType Key : Keys)
		{
			InternalStats.Add(Key, PerClassMap.Num(Key));
		}

		return InternalStats;
	}

	/* Iterates by whole pool to remove invalid items */
	void CheckPool(const ArrayType& InvalidItems)
	{
		for (auto It = PerClassMap.CreateIterator(); It; ++It)
		{
			if (!UKismetSystemLibrary::IsValid(It.Value()))
			{
				InvalidItems.Add(It.RemoveCurrent());
			}
		}
	}

private:
	MapType PerClassMap;
	StatsType InternalStats;
};

Esta primeira versão funcionou muito bem reduzindo o número de chamadas de spawn. Mas temos um problema oculto na função FindByPredicate. Suponha que o cache tenha 200 itens, com 160 mísseis em uso na tela, e um personagem precisa encontrar um míssil de uso livre. Mesmo com o melhor algoritmo, há uma chance de que uma boa parte de toda a coleção seja percorrida, e para cada item o predicado será chamado. E ainda temos o problema de que as chamadas para FindByPredicate não são thread-safe, então uma corrida pode ocorrer se vários personagens precisarem encontrar um míssil de uso livre.

Segunda iteração: usando TQueue

Para resolver ambos os problemas, TQueue se encaixa muito bem. O item é inserido em uma extremidade da fila e é removido na outra para ser usado na cena. Embora TQueue NÃO seja thread-safe para remoção de itens, é seguro para adição de itens.

Aqui está o código para esta versão:

#pragma once

#include "CoreMinimal.h"
#include "Containers/Map.h"
#include "Containers/Set.h"
#include "Containers/MpscQueue.h"
#include "Templates/SubclassOf.h"
#include "Algo/Find.h"
#include "Kismet/KismetSystemLibrary.h"

template <typename InElementType>
class TActorPoolQueue
{
public:
	//static_assert(TIsDerivedFrom<InElementType, AActor>::IsDerived, "ElementType must be derived from AActor");

	using KeyType = TSubclassOf<InElementType>;
	using QueueType = TMpscQueue<InElementType*>;
	using MapType = TMultiMap<KeyType, QueueType*>;
	using StatsType = TMap<KeyType, int32>;

	TActorPoolQueue() = default;
	TActorPoolQueue(TActorPoolQueue&&) = delete;
	TActorPoolQueue(const TActorPoolQueue&) = delete;
	~TActorPoolQueue() = default;

	/**
	 * Add or return an element to the appropriate queue
	 */
	void Add(InElementType* NewObject)
	{
		KeyType Key = ParseKeyFromItem(NewObject);
		QueueType* Queue = FindPerKeyContainer(Key);

		Queue->Enqueue(NewObject);
	}

	/**
	 * Remove an element from the queue
	 */
	bool Remove(KeyType ObjectType, InElementType*& NewObject)
	{
		QueueType* Queue = FindPerKeyContainer(ObjectType);

		Queue->Dequeue(NewObject);
		return NewObject != nullptr;
	}

	// TODO refactor to count all items from queues
	int32 Num() const
	{
		return PerClassMap.Num();
	}

	// TODO refactor to count all items from queues
	int32 Num(KeyType Key) const
	{
		return PerClassMap.Num(Key);
	}

	/**
	 *
	 */
	InElementType* FindByPredicate(TSubclassOf<InElementType> Type, TFunctionRef<bool(const InElementType*)> Predicate) const
	{
		return nullptr;
	}

	/**
	 * Updates and return information about cache size for each subtype 
	 */
	StatsType& QueryPoolStats() 
	{
		TSet<KeyType> Keys;
		PerClassMap.GetKeys(Keys);
		for (KeyType Key : Keys)
		{
			InternalStats.Add(Key, PerClassMap.Num(Key));
		}

		return InternalStats;
	}

private:
	MapType PerClassMap;
	StatsType InternalStats;

	// Extract key from item
	FORCEINLINE KeyType ParseKeyFromItem(InElementType* Item) { return Item->GetClass(); }

	// Find queue by key type
	FORCEINLINE QueueType* FindPerKeyContainer(KeyType Key)
	{
		QueueType** Queue = PerClassMap.Find(Key);
		if (Queue == nullptr)
		{
			QueueType* NewQueue = new QueueType();
			PerClassMap.Add(Key, NewQueue);
			Queue = PerClassMap.Find(Key);
		}

		return *Queue;
	}
};

Esta implementação simplificou o gerenciamento das instâncias e é muito rápida. No nosso caso, os mísseis expirados são inseridos na cabeça da fila, e os jogadores solicitantes obtêm mísseis da cauda da fila.

Esta versão é suficiente para nossos propósitos. Mas eu queria tentar algo mais avançado e, ao mesmo tempo, evitar alguns possíveis problemas.

A primeira pergunta é sobre o manuseio do míssil. Enquanto em uso, o míssil fica fora do cache, sem nenhum conhecimento de seu status. Não é assim que um cache deve se comportar.

A segunda pergunta é que, mesmo tendo um desempenho melhor do que pesquisar um objeto de uso gratuito em um TArray, o TQueue aloca e desaloca memória internamente porque internamente é uma lista vinculada. Embora o objeto do nó seja muito pequeno, as alocações são feitas no heap e, devido ao comportamento desta implementação, podem causar fragmentação de memória e atrasos durante a inserção e remoção. E sim, isso é de nível muito baixo, mas para obter melhor desempenho é necessário salvar todos os ciclos possíveis.

Pesquisa de campo

Neste ponto, inspecionei muitos grupos de atores desenvolvidos para a Unreal Engine, tanto plugins de código aberto quanto pagos. A grande maioria usa uma combinação de TMap cuja chave é o tipo de classe (um objeto TSubclassOf) e o valor sendo tuplas contendo dois arrays, um para os objetos ativos (na tela) e outro para os inativos.

Todos funcionam muito bem, mas tenho dúvidas sobre o desempenho sob os requisitos de alta frequência de adição e remoção. No final do dia, estamos movendo memória de um lugar para outro e manipulando memória, e isso tem um custo.

Pensei em uma maneira de não mover nenhuma memória, mas apenas verificar o estado dessa instância do objeto. Como cada objeto terá apenas dois estados (ativo e inativo), um bit para cada instância seria o suficiente. Então, podemos usar uma combinação de TArray contendo as instâncias do ator e um bitmap para armazenar o status de cada instância, ambos controlados por um contêiner de armazenamento para cada tipo de classe. E os contêineres serão mapeados por um TMap, porque o TMultiMap não suporta valores de lista de acesso por índice, apenas a lista inteira.

Terceira iteração: criando o contêiner de armazenamento

A parte difícil aqui é a lógica para analisar e detectar a primeira instância livre na lista de instâncias. Usaremos os mesmos recursos que os bancos de dados usam e criaremos o que é chamado de índice de bitmap. Os índices de bitmap não são usados ​​para classificação, mas para atribuir um status que mapeia para outra lista. Dessa forma, podemos consultar facilmente o índice de algumas ou todas as instâncias que têm o status que desejamos.

Pesquisar instâncias não utilizadas

Para nosso índice de bitmap, usaremos um TArray<int64>, onde cada bit de cada elemento representa o status de um item no array de instâncias. Para pesquisar a primeira instância livre no índice, o algoritmo é o seguinte:

// Bitmap Index array
TArray<int64> UsageArray;
constexpr int32 IndicesPerInt64 = 64;
...
...
int32 FindFreeIndex() const
{
	// For each item in the index list...
    for (int32 i = 0; i < UsageArray.Num(); ++i)
    {
		// This is a GUARD: if whole item is 1 (i.e.: no available instance)...
        if (UsageArray[i] == ~0LL)
        {
			// Then continue to the next item
			continue;
		}

		// For each bit in the item...
		for (int32 j = 0; j < IndicesPerInt64; ++j)
		{
			If the bit has value 0...
			if ((UsageArray[i] & (1LL << j)) == 0)
			{
				// Return bit index combined with item index
				return i * IndicesPerInt64 + j;
			}
		}
    }
    return INDEX_NONE;
}

Como nota lateral, eu gosto muito de guard. Sair cedo se a condição não for atendida simplifica muito o fluxo do código (não tenho certeza se isso é performático do ponto de vista da CPU).

Do ponto de vista do desempenho, o algoritmo acima tem espaço para melhorias. Primeiro, há loops aninhados, que podem ser dimensionados pelo número de itens na matriz. Segundo, o loop interno é usado para comparar bit a bit, o que não parece ser um problema, mas há instruções feitas exatamente para nos ajudar. A função FMath::CountTrailingZeros64 se encaixa perfeitamente nesse fluxo, verificando o número de bits não definidos (= 0) até encontrar o primeiro bit definido.

No nosso caso, FindFreeIndex busca os primeiros bits não definidos. Então, precisamos inverter o bitmap e usar a função FMath::CountTrailingZeros64 corretamente.

// Bitmap Index array
TArray<int64> UsageArray;
constexpr int32 IndicesPerInt64 = 64;
...
...
int32 FindFreeIndex() const
{
	// For each item in the index list...
    for (int32 i = 0; i < UsageArray.Num(); ++i)
    {
		// For each bit in the item...
		int32 FreeBitIndex = FMath::CountTrailingZeros64(~UsageArray[i]);
		if (FreeBitIndex < 64)
		{
			If the bit has value 0...
			if ((UsageArray[i] & (1LL << j)) == 0)
			{
				// Return bit index combined with item index
				return i * IndicesPerInt64 + j;
			}
		}
    }
    return INDEX_NONE;
}

Uma otimização inesperada é que podemos remover a cláusula de guarda, porque quando verificamos se o resultado de FMath::CountTrailingZeros64 é menor que 64 é o mesmo quando verificamos se UsageArray[i] == ~0LL (lembre-se de que negamos o valor de entrada).

Atualizando índices

Após selecionar o índice disponível em nossa estrutura de índice de bitmap, precisamos atualizar a entrada para marcá-la como utilizável. E ao retornar o item para o cache, marque-o como livre.

void MarkAsUsed(int32 Index)
{
    int32 UsageArrayIndex = Index / IndicesPerInt64;
    int32 BitIndex = Index % IndicesPerInt64;
    UsageArray[UsageArrayIndex] |= (1LL << BitIndex);
}

void MarkAsFree(int32 Index)
{
    int32 UsageArrayIndex = Index / IndicesPerInt64;
    int32 BitIndex = Index % IndicesPerInt64;
    UsageArray[UsageArrayIndex] &= ~(1LL << BitIndex);
}

Não há segredo aqui. Basicamente, a tarefa é encontrar o item e os índices de bits fornecidos a nós em FindFreeIndex, e usar algumas operações bit a bit para alterar o valor do bit.

Para marcar como usado, fazemos uma operação OR entre o item de índice e o número 1 deslocado pelo valor do índice de bits. Como exemplo, para atualizar o bit 19:

Index item		0100101010010010101001001010100100101010010010101001001010100110
Shift value		0000000000000000000000000000000000000000000001000000000000000000  OR
--------------------------------------------------------------------------------
Index updated	0100101010010010101001001010100100101010010011101001001010100110

Unmark the bit it a little trickier, as we need to do an AND operation between the index item and the number 1 shifted by the value of bit index NEGATED. As an example, to update the bit 19:

Shift value		0000000000000000000000000000000000000000000001000000000000000000 NOT
--------------------------------------------------------------------------------
Shift updated	1111111111111111111111111111111111111111111110111111111111111111

Index item		0100101010010010101001001010100100101010010011101001001010100110
Shift updated	1111111111111111111111111111111111111111111110111111111111111111  AND
--------------------------------------------------------------------------------
Index updated	0100101010010010101001001010100100101010010010101001001010100110

Como o único bit diferente de 1 é aquele que queremos desmarcar, ele mudará seu valor. Qualquer outro valor permanecerá inalterado.

Observando atentamente o cálculo usado para recuperar os índices de item e bit, sabemos que IndicesPerInt64 é e sempre será um múltiplo de dois, então podemos simplesmente substituir os operadores de divisão (lento) e módulo (mais lento) por operações bitwise (muito, muito rápidas). E como o código para analisar os índices é o mesmo, também podemos passar para outra função.

FORCEINLINE void ParceInternalIndexes(int32 Index, int32& ItemIndex, int32& BitIndex)
{
	ItemIndex = Index >> FMath::CeilLogTwo(IndicesPerInt64);	// Optimized division
    BitIndex = Index & (IndicesPerInt64 - 1);					// Optimized modulo
}

FORCEINLINE void MarkAsUsed(int32 Index)
{
    int32 UsageArrayIndex = 0, BitIndex = 0;
	ParceInternalIndexes(Index, UsageArrayIndex, BitIndex);
    UsageArray[UsageArrayIndex] |= (1LL << BitIndex);
}

FORCEINLINE void MarkAsFree(int32 Index)
{
    int32 UsageArrayIndex = 0, BitIndex = 0;
	ParceInternalIndexes(Index, UsageArrayIndex, BitIndex);
    UsageArray[UsageArrayIndex] &= ~(1LL << BitIndex);
}

Com esses métodos, agora podemos nos aprofundar mais verificando o fluxo entre FindFreeIndex e a função que obteremos da instância livre do cache:

AActor* GetFreeInstance()
{
	int32 InstanceIndex = FindFreeIndex();
	MarkAsUsed(InstanceIndex);
	return InstanceArray[InstanceIndex];
}

Você percebeu que em FindFreeIndex estamos analisando índices da lista de índices e do deslocamento de bits, e então obtendo ambos os índices analisando o valor dentro da função MarkAsUsed. Podemos obter ambos os índices e marcar a instância como usada desta forma:

int32 FindFreeIndex()
{
    for (int32 It = 0; It < UsageArray.Num(); ++It)
	{
		// Optimized search through bits
		int32 FreeBitIndex = FMath::CountTrailingZeros64(~UsageArray[It]);
		if (FreeBitIndex < 64)
		{
			// Mark item as "in use"
			UsageArray[It] |= (1LL << FreeBitIndex);

			// Return instance index
			return It * IndicesPerInt64 + FreeBitIndex;
		}
	}

	return INDEX_NONE;
}

Precisamos remover o modificador const nesta função, porque estamos alterando campos do proprietário da função. Além disso, podemos remover a chamada MarkAsUsed de GetFreeInstance.

AActor* GetFreeInstance()
{
	int32 InstanceIndex = FindFreeIndex();
	return InstanceArray[InstanceIndex];
}

Código Final e Resultados

E quanto à comparação entre todas as implementações? Usando um teste de unidade para simular o uso no jogo adicionando e removendo instâncias aleatoriamente do pool, podemos estimar o desempenho. O teste de unidade foi executado 15 vezes, cada vez executando 10.000 operações com adição e remoção aleatórias, e os tempos médios estão na tabela abaixo:

Implementação Tempo médio de execução do teste (em segundos)
TMultiMap c/ FindByPredicate 0,0392638
Fila 0,0028125
Bitmap sem otimizações 0,0015458
Bitmap otimizado 0,0013150

É importante perceber que nenhum tempo de execução é ruim, mas o tempo gasto por cada implementação é notavelmente melhor. No meu ponto de vista, implementações de fila ou bitmap podem ser usadas.

Espero que este artigo seja útil. Usei algum conhecimento de desenvolvimento corporativo para melhorar o desempenho nessas tarefas de baixo nível. Falarei sobre processamento em lote para movimentação de mísseis e projéteis na segunda parte desta série.