402 lines
11 KiB
C++
402 lines
11 KiB
C++
|
|
// Fill out your copyright notice in the Description page of Project Settings.
|
|||
|
|
|
|||
|
|
#include "DTFluxQueuedManager.h"
|
|||
|
|
#include "DTFluxNetworkModule.h"
|
|||
|
|
#include "JsonObjectConverter.h"
|
|||
|
|
|
|||
|
|
|
|||
|
|
const FString FDTFluxQueuedRequest::Serialize() const
|
|||
|
|
{
|
|||
|
|
FString JSONString;
|
|||
|
|
switch (RequestType)
|
|||
|
|
{
|
|||
|
|
case EDTFluxRequestType::RaceData:
|
|||
|
|
|
|||
|
|
{
|
|||
|
|
FDTFluxRaceDataRequest RaceData;
|
|||
|
|
FJsonObjectConverter::UStructToJsonObjectString(RaceData, JSONString);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
case EDTFluxRequestType::TeamList:
|
|||
|
|
{
|
|||
|
|
const FDTFluxTeamListRequest TeamList;
|
|||
|
|
FJsonObjectConverter::UStructToJsonObjectString(TeamList, JSONString);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
case EDTFluxRequestType::ContestRanking:
|
|||
|
|
{
|
|||
|
|
FDTFluxContestRankingRequest ContestRanking(ContestId);
|
|||
|
|
FJsonObjectConverter::UStructToJsonObjectString(ContestRanking, JSONString);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
case EDTFluxRequestType::StageRanking:
|
|||
|
|
{
|
|||
|
|
FDTFluxStageRankingRequest StageRanking(ContestId, StageId);
|
|||
|
|
FJsonObjectConverter::UStructToJsonObjectString(StageRanking, JSONString);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
case EDTFluxRequestType::SplitRanking:
|
|||
|
|
{
|
|||
|
|
FDTFluxSplitRankingRequest SplitRanking(ContestId, StageId, SplitId);
|
|||
|
|
FJsonObjectConverter::UStructToJsonObjectString(SplitRanking, JSONString);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
default:
|
|||
|
|
JSONString = "";
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
return JSONString;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
UDTFluxQueuedManager::UDTFluxQueuedManager()
|
|||
|
|
: bIsInitialized(false)
|
|||
|
|
, CheckInterval(0.5f)
|
|||
|
|
, TimeSinceLastCheck(0.0f)
|
|||
|
|
{
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
UDTFluxQueuedManager::~UDTFluxQueuedManager()
|
|||
|
|
{
|
|||
|
|
ClearAllRequests();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void UDTFluxQueuedManager::Initialize()
|
|||
|
|
{
|
|||
|
|
if (!bIsInitialized)
|
|||
|
|
{
|
|||
|
|
UE_LOG(logDTFluxNetwork, Log, TEXT("Initializing DTFluxQueuedManager"));
|
|||
|
|
bIsInitialized = true;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
FGuid UDTFluxQueuedManager::QueueRequest(EDTFluxRequestType RequestType, int32 ContestId, int32 StageId, int32 SplitId,
|
|||
|
|
const FString& RawMessage)
|
|||
|
|
{
|
|||
|
|
// Créer la requête avec les structs existants
|
|||
|
|
FDTFluxQueuedRequest NewRequest(RequestType, ContestId, StageId, SplitId);
|
|||
|
|
NewRequest.RawResponse = RawMessage;
|
|||
|
|
|
|||
|
|
// Ajouter à la queue des requêtes en attente
|
|||
|
|
PendingRequestsQueue.Enqueue(NewRequest);
|
|||
|
|
|
|||
|
|
UE_LOG(logDTFluxNetwork, Verbose, TEXT("Queued request %s: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
|
|||
|
|
*NewRequest.RequestId.ToString(), (int32)RequestType, ContestId, StageId, SplitId);
|
|||
|
|
|
|||
|
|
return NewRequest.RequestId;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool UDTFluxQueuedManager::MarkRequestAsResponded(const FGuid& TargetRequestGuid)
|
|||
|
|
{
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
|
|||
|
|
bool bFoundMatch = false;
|
|||
|
|
|
|||
|
|
// Parcourir toutes les requêtes en attente
|
|||
|
|
FDTFluxQueuedRequest Request;
|
|||
|
|
while (PendingRequestsQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
if (!bFoundMatch && Request.RequestId == TargetRequestGuid)
|
|||
|
|
{
|
|||
|
|
// Marquer comme ayant reçu une réponse
|
|||
|
|
Request.bHasReceivedResponse = true;
|
|||
|
|
bFoundMatch = true;
|
|||
|
|
|
|||
|
|
// Ajouter à la queue des requêtes terminées
|
|||
|
|
CompletedRequestsQueue.Enqueue(Request);
|
|||
|
|
|
|||
|
|
UE_LOG(logDTFluxNetwork, Verbose,
|
|||
|
|
TEXT("Marked request %s as responded: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
|
|||
|
|
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
|
|||
|
|
Request.SplitId);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
// Remettre dans la queue temporaire
|
|||
|
|
TempQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Remettre les requêtes non traitées dans la queue principale
|
|||
|
|
while (TempQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
PendingRequestsQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return bFoundMatch;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool UDTFluxQueuedManager::MarkRequestAsResponded(const FDTFluxQueuedRequest& TargetRequest)
|
|||
|
|
{
|
|||
|
|
return MarkRequestAsResponded(TargetRequest.RequestId);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool UDTFluxQueuedManager::IsRequestPending(EDTFluxApiDataType RequestType, int32 ContestId, int32 StageId,
|
|||
|
|
int32 SplitId)
|
|||
|
|
{
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
|
|||
|
|
bool bFoundMatch = false;
|
|||
|
|
|
|||
|
|
// Parcourir toutes les requêtes en attente
|
|||
|
|
FDTFluxQueuedRequest Request;
|
|||
|
|
while (PendingRequestsQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
// Vérifier si cette requête correspond
|
|||
|
|
if (!bFoundMatch && Request.Matches(RequestType, ContestId, StageId, SplitId))
|
|||
|
|
{
|
|||
|
|
bFoundMatch = true;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Remettre dans la queue temporaire
|
|||
|
|
TempQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Remettre toutes les requêtes dans la queue principale
|
|||
|
|
while (TempQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
PendingRequestsQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return bFoundMatch;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
FDTFluxQueuedRequest* UDTFluxQueuedManager::GetRequestPending(EDTFluxRequestType RequestType, int32 ContestId,
|
|||
|
|
int32 StageId, int32 SplitId)
|
|||
|
|
{
|
|||
|
|
auto SearchInQueue = [&RequestType, ContestId, StageId, SplitId](
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc>& Queue) -> FDTFluxQueuedRequest*
|
|||
|
|
{
|
|||
|
|
// Copie temporaire de la queue pour la recherche
|
|||
|
|
TQueue<FDTFluxQueuedRequest> TempQueue;
|
|||
|
|
|
|||
|
|
FDTFluxQueuedRequest* FoundItem = nullptr;
|
|||
|
|
FDTFluxQueuedRequest Item;
|
|||
|
|
while (Queue.Dequeue(Item))
|
|||
|
|
{
|
|||
|
|
if (Item.RequestType == RequestType && Item.ContestId == ContestId && Item.StageId == StageId && Item.
|
|||
|
|
SplitId == SplitId) // Assuming RequestId is your GUID field
|
|||
|
|
{
|
|||
|
|
FoundItem = &Item;
|
|||
|
|
}
|
|||
|
|
// Remettre dans la queue temporaire
|
|||
|
|
TempQueue.Enqueue(Item);
|
|||
|
|
}
|
|||
|
|
while (TempQueue.Dequeue(Item))
|
|||
|
|
{
|
|||
|
|
Queue.Enqueue(Item);
|
|||
|
|
}
|
|||
|
|
return FoundItem;
|
|||
|
|
};
|
|||
|
|
return SearchInQueue(PendingRequestsQueue);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
const FDTFluxQueuedRequest* UDTFluxQueuedManager::GetRequest(const FGuid& SearchedGuid)
|
|||
|
|
{
|
|||
|
|
auto SearchInQueue = [&SearchedGuid](TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc>& Queue) -> FDTFluxQueuedRequest*
|
|||
|
|
{
|
|||
|
|
// Copie temporaire de la queue pour la recherche
|
|||
|
|
TQueue<FDTFluxQueuedRequest> TempQueue;
|
|||
|
|
|
|||
|
|
FDTFluxQueuedRequest* FoundItem = nullptr;
|
|||
|
|
FDTFluxQueuedRequest Item;
|
|||
|
|
while (Queue.Dequeue(Item))
|
|||
|
|
{
|
|||
|
|
if (Item.RequestId == SearchedGuid) // Assuming RequestId is your GUID field
|
|||
|
|
{
|
|||
|
|
// Trouver l'élément dans la queue originale
|
|||
|
|
// On doit refaire une copie car on ne peut pas retourner l'adresse de 'Item'
|
|||
|
|
FoundItem = &Item;
|
|||
|
|
}
|
|||
|
|
// Remettre dans la queue temporaire
|
|||
|
|
TempQueue.Enqueue(Item);
|
|||
|
|
}
|
|||
|
|
while (TempQueue.Dequeue(Item))
|
|||
|
|
{
|
|||
|
|
Queue.Enqueue(Item);
|
|||
|
|
}
|
|||
|
|
return FoundItem;
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// Chercher dans chaque queue
|
|||
|
|
if (FDTFluxQueuedRequest* Found = SearchInQueue(PendingRequestsQueue))
|
|||
|
|
return Found;
|
|||
|
|
|
|||
|
|
if (const FDTFluxQueuedRequest* Found = SearchInQueue(CompletedRequestsQueue))
|
|||
|
|
return Found;
|
|||
|
|
|
|||
|
|
if (const FDTFluxQueuedRequest* Found = SearchInQueue(TimedOutRequestsQueue))
|
|||
|
|
return Found;
|
|||
|
|
|
|||
|
|
return nullptr;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int32 UDTFluxQueuedManager::GetPendingRequestCount()
|
|||
|
|
{
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
|
|||
|
|
int32 Count = 0;
|
|||
|
|
|
|||
|
|
// Compter les requêtes en attente
|
|||
|
|
FDTFluxQueuedRequest Request;
|
|||
|
|
while (PendingRequestsQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
Count++;
|
|||
|
|
TempQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Remettre toutes les requêtes dans la queue principale
|
|||
|
|
while (TempQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
PendingRequestsQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return Count;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int32 UDTFluxQueuedManager::CleanupTimedOutRequests()
|
|||
|
|
{
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
|
|||
|
|
int32 TimeoutCount = 0;
|
|||
|
|
|
|||
|
|
// Parcourir toutes les requêtes en attente
|
|||
|
|
FDTFluxQueuedRequest Request;
|
|||
|
|
while (PendingRequestsQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
if (Request.HasTimedOut())
|
|||
|
|
{
|
|||
|
|
// Ajouter à la queue des requêtes expirées
|
|||
|
|
TimedOutRequestsQueue.Enqueue(Request);
|
|||
|
|
TimeoutCount++;
|
|||
|
|
|
|||
|
|
UE_LOG(logDTFluxNetwork, Warning,
|
|||
|
|
TEXT("Request %s timed out: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
|
|||
|
|
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
|
|||
|
|
Request.SplitId);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
// Remettre dans la queue temporaire
|
|||
|
|
TempQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Remettre les requêtes non expirées dans la queue principale
|
|||
|
|
while (TempQueue.Dequeue(Request))
|
|||
|
|
{
|
|||
|
|
PendingRequestsQueue.Enqueue(Request);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return TimeoutCount;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
int32 UDTFluxQueuedManager::CleanCashedRequests()
|
|||
|
|
{
|
|||
|
|
int32 CleanedRequestsCount = 0;
|
|||
|
|
|
|||
|
|
// Queue temporaire pour stocker les requêtes encore valides
|
|||
|
|
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> ValidCompletedRequests;
|
|||
|
|
|
|||
|
|
// Traiter toutes les requêtes terminées
|
|||
|
|
FDTFluxQueuedRequest CompletedRequest;
|
|||
|
|
while (CompletedRequestsQueue.Dequeue(CompletedRequest))
|
|||
|
|
{
|
|||
|
|
// Vérifier si la requête est cacheable et a reçu une réponse
|
|||
|
|
if (CompletedRequest.bIsCacheable && CompletedRequest.bHasReceivedResponse)
|
|||
|
|
{
|
|||
|
|
// Calculer l'âge de la requête en secondes
|
|||
|
|
float RequestAge = (FDateTime::Now() - CompletedRequest.CreatedAt).GetTotalSeconds();
|
|||
|
|
|
|||
|
|
// Vérifier si le cache est encore valide
|
|||
|
|
if (RequestAge <= CompletedRequest.CachedValidity)
|
|||
|
|
{
|
|||
|
|
// Le cache est encore valide, conserver la requête
|
|||
|
|
ValidCompletedRequests.Enqueue(CompletedRequest);
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
// Le cache a expiré, compter cette requête comme nettoyée
|
|||
|
|
CleanedRequestsCount++;
|
|||
|
|
|
|||
|
|
UE_LOG(LogTemp, Verbose,
|
|||
|
|
TEXT("DTFluxQueuedManager: Cleaned expired cached request %s (Age: %.2fs, Validity: %.2fs)"),
|
|||
|
|
*CompletedRequest.RequestId.ToString(), RequestAge, CompletedRequest.CachedValidity);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
else
|
|||
|
|
{
|
|||
|
|
// Requête non cacheable ou sans réponse, la conserver
|
|||
|
|
ValidCompletedRequests.Enqueue(CompletedRequest);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Restaurer la queue avec uniquement les requêtes valides
|
|||
|
|
while (ValidCompletedRequests.Dequeue(CompletedRequest))
|
|||
|
|
{
|
|||
|
|
CompletedRequestsQueue.Enqueue(CompletedRequest);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Log du résultat si des requêtes ont été nettoyées
|
|||
|
|
if (CleanedRequestsCount > 0)
|
|||
|
|
{
|
|||
|
|
UE_LOG(LogTemp, Log, TEXT("DTFluxQueuedManager: Cleaned %d expired cached requests"), CleanedRequestsCount);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return CleanedRequestsCount;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void UDTFluxQueuedManager::ClearAllRequests()
|
|||
|
|
{
|
|||
|
|
// Vider toutes les queues
|
|||
|
|
FDTFluxQueuedRequest DummyRequest;
|
|||
|
|
while (PendingRequestsQueue.Dequeue(DummyRequest))
|
|||
|
|
{
|
|||
|
|
}
|
|||
|
|
while (CompletedRequestsQueue.Dequeue(DummyRequest))
|
|||
|
|
{
|
|||
|
|
}
|
|||
|
|
while (TimedOutRequestsQueue.Dequeue(DummyRequest))
|
|||
|
|
{
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
UE_LOG(logDTFluxNetwork, Log, TEXT("Cleared all pending requests"));
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
void UDTFluxQueuedManager::Tick(float DeltaTime)
|
|||
|
|
{
|
|||
|
|
if (!bIsInitialized)
|
|||
|
|
{
|
|||
|
|
return;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Incrémenter le temps écoulé
|
|||
|
|
TimeSinceLastCheck += DeltaTime;
|
|||
|
|
|
|||
|
|
// Vérifier si c'est le moment de nettoyer les requêtes expirées
|
|||
|
|
if (TimeSinceLastCheck >= CheckInterval)
|
|||
|
|
{
|
|||
|
|
TimeSinceLastCheck = 0.0f;
|
|||
|
|
CleanupTimedOutRequests();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Traiter les requêtes expirées
|
|||
|
|
FDTFluxQueuedRequest TimedOutRequest;
|
|||
|
|
while (TimedOutRequestsQueue.Dequeue(TimedOutRequest))
|
|||
|
|
{
|
|||
|
|
// Déclencher l'événement pour chaque requête expirée
|
|||
|
|
OnRequestTimedOut.Broadcast(TimedOutRequest);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
bool UDTFluxQueuedManager::IsTickable() const
|
|||
|
|
{
|
|||
|
|
return bIsInitialized;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TStatId UDTFluxQueuedManager::GetStatId() const
|
|||
|
|
{
|
|||
|
|
RETURN_QUICK_DECLARE_CYCLE_STAT(UDTFluxQueuedManager, STATGROUP_Tickables);
|
|||
|
|
}
|