Files
DTFluxAPI/Source/DTFluxNetwork/Private/DTFluxQueuedManager.cpp

440 lines
12 KiB
C++
Raw Normal View History

// Fill out your copyright notice in the Description page of Project Settings.
#include "DTFluxQueuedManager.h"
#include "DTFluxNetworkModule.h"
#include "JsonObjectConverter.h"
const FString FDTFluxQueuedRequest::Serialize() const
{
FString JSONString;
switch (RequestType)
{
case EDTFluxRequestType::RaceData:
{
FDTFluxRaceDataRequest RaceData;
FJsonObjectConverter::UStructToJsonObjectString(RaceData, JSONString);
break;
}
case EDTFluxRequestType::TeamList:
{
const FDTFluxTeamListRequest TeamList;
FJsonObjectConverter::UStructToJsonObjectString(TeamList, JSONString);
break;
}
case EDTFluxRequestType::ContestRanking:
{
FDTFluxContestRankingRequest ContestRanking(ContestId);
FJsonObjectConverter::UStructToJsonObjectString(ContestRanking, JSONString);
break;
}
case EDTFluxRequestType::StageRanking:
{
FDTFluxStageRankingRequest StageRanking(ContestId, StageId);
FJsonObjectConverter::UStructToJsonObjectString(StageRanking, JSONString);
break;
}
case EDTFluxRequestType::SplitRanking:
{
FDTFluxSplitRankingRequest SplitRanking(ContestId, StageId, SplitId);
FJsonObjectConverter::UStructToJsonObjectString(SplitRanking, JSONString);
break;
}
default:
JSONString = "";
break;
}
return JSONString;
}
UDTFluxQueuedManager::UDTFluxQueuedManager()
: bIsInitialized(false)
, CheckInterval(0.5f)
, TimeSinceLastCheck(0.0f)
{
}
UDTFluxQueuedManager::~UDTFluxQueuedManager()
{
ClearAllRequests();
}
void UDTFluxQueuedManager::Initialize()
{
if (!bIsInitialized)
{
UE_LOG(logDTFluxNetwork, Log, TEXT("Initializing DTFluxQueuedManager"));
bIsInitialized = true;
}
}
FGuid UDTFluxQueuedManager::QueueRequest(EDTFluxRequestType RequestType, int32 ContestId, int32 StageId, int32 SplitId,
const FString& RawMessage)
{
// Créer la requête avec les structs existants
FDTFluxQueuedRequest NewRequest(RequestType, ContestId, StageId, SplitId);
NewRequest.RawResponse = RawMessage;
// Ajouter à la queue des requêtes en attente
PendingRequestsQueue.Enqueue(NewRequest);
UE_LOG(logDTFluxNetwork, Verbose, TEXT("Queued request %s: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*NewRequest.RequestId.ToString(), (int32)RequestType, ContestId, StageId, SplitId);
return NewRequest.RequestId;
}
bool UDTFluxQueuedManager::MarkRequestAsError(const FGuid& TargetRequestGuid)
{
// TODO: Implement a retry mechanism
// For now we simply suppress the request and log a message
bool bFoundMatch = false;
FDTFluxQueuedRequest Request;
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
while (PendingRequestsQueue.Dequeue(Request))
{
if (Request.RequestId == TargetRequestGuid)
{
UE_LOG(logDTFluxNetwork, Error,
TEXT("Marked request %s as error: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
else
{
TempQueue.Enqueue(Request);
}
}
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
if (bFoundMatch)
{
UE_LOG(logDTFluxNetwork, Error, TEXT("No Request Found with GUID %s"), *TargetRequestGuid.ToString());
}
return true;
}
bool UDTFluxQueuedManager::MarkRequestAsResponded(const FGuid& TargetRequestGuid)
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
bool bFoundMatch = false;
// Parcourir toutes les requêtes en attente
FDTFluxQueuedRequest Request;
while (PendingRequestsQueue.Dequeue(Request))
{
if (!bFoundMatch && Request.RequestId == TargetRequestGuid)
{
// Marquer comme ayant reçu une réponse
Request.bHasReceivedResponse = true;
bFoundMatch = true;
// Ajouter à la queue des requêtes terminées
CompletedRequestsQueue.Enqueue(Request);
UE_LOG(logDTFluxNetwork, Verbose,
TEXT("Marked request %s as responded: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
else
{
// Remettre dans la queue temporaire
TempQueue.Enqueue(Request);
}
}
// Remettre les requêtes non traitées dans la queue principale
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
return bFoundMatch;
}
bool UDTFluxQueuedManager::MarkRequestAsResponded(const FDTFluxQueuedRequest& TargetRequest)
{
return MarkRequestAsResponded(TargetRequest.RequestId);
}
bool UDTFluxQueuedManager::IsRequestPending(FGuid& OutRequestId, EDTFluxApiDataType RequestType, int32 ContestId,
int32 StageId,
int32 SplitId)
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
bool bFoundMatch = false;
// Parcourir toutes les requêtes en attente
FDTFluxQueuedRequest Request;
while (PendingRequestsQueue.Dequeue(Request))
{
// Vérifier si cette requête correspond
if (!bFoundMatch && Request.Matches(RequestType, ContestId, StageId, SplitId))
{
bFoundMatch = true;
OutRequestId = Request.RequestId;
UE_LOG(logDTFluxNetwork, Verbose,
TEXT("Found pending request %s: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
// Remettre dans la queue temporaire
TempQueue.Enqueue(Request);
}
// Remettre toutes les requêtes dans la queue principale
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
return bFoundMatch;
}
FDTFluxQueuedRequest* UDTFluxQueuedManager::GetRequestPending(EDTFluxRequestType RequestType, int32 ContestId,
int32 StageId, int32 SplitId)
{
auto SearchInQueue = [&RequestType, ContestId, StageId, SplitId](
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc>& Queue) -> FDTFluxQueuedRequest*
{
// Copie temporaire de la queue pour la recherche
TQueue<FDTFluxQueuedRequest> TempQueue;
FDTFluxQueuedRequest* FoundItem = nullptr;
FDTFluxQueuedRequest Item;
while (Queue.Dequeue(Item))
{
if (Item.RequestType == RequestType && Item.ContestId == ContestId && Item.StageId == StageId && Item.
SplitId == SplitId) // Assuming RequestId is your GUID field
{
FoundItem = &Item;
}
// Remettre dans la queue temporaire
TempQueue.Enqueue(Item);
}
while (TempQueue.Dequeue(Item))
{
Queue.Enqueue(Item);
}
return FoundItem;
};
return SearchInQueue(PendingRequestsQueue);
}
const FDTFluxQueuedRequest* UDTFluxQueuedManager::GetRequest(const FGuid& SearchedGuid)
{
auto SearchInQueue = [&SearchedGuid](TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc>& Queue) -> FDTFluxQueuedRequest*
{
// Copie temporaire de la queue pour la recherche
TQueue<FDTFluxQueuedRequest> TempQueue;
FDTFluxQueuedRequest* FoundItem = nullptr;
FDTFluxQueuedRequest Item;
while (Queue.Dequeue(Item))
{
if (Item.RequestId == SearchedGuid) // Assuming RequestId is your GUID field
{
// Trouver l'élément dans la queue originale
// On doit refaire une copie car on ne peut pas retourner l'adresse de 'Item'
FoundItem = &Item;
}
// Remettre dans la queue temporaire
TempQueue.Enqueue(Item);
}
while (TempQueue.Dequeue(Item))
{
Queue.Enqueue(Item);
}
return FoundItem;
};
// Chercher dans chaque queue
if (FDTFluxQueuedRequest* Found = SearchInQueue(PendingRequestsQueue))
return Found;
if (const FDTFluxQueuedRequest* Found = SearchInQueue(CompletedRequestsQueue))
return Found;
if (const FDTFluxQueuedRequest* Found = SearchInQueue(TimedOutRequestsQueue))
return Found;
return nullptr;
}
int32 UDTFluxQueuedManager::GetPendingRequestCount()
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
int32 Count = 0;
// Compter les requêtes en attente
FDTFluxQueuedRequest Request;
while (PendingRequestsQueue.Dequeue(Request))
{
Count++;
TempQueue.Enqueue(Request);
}
// Remettre toutes les requêtes dans la queue principale
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
return Count;
}
int32 UDTFluxQueuedManager::CleanupTimedOutRequests()
{
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> TempQueue;
int32 TimeoutCount = 0;
// Parcourir toutes les requêtes en attente
FDTFluxQueuedRequest Request;
while (PendingRequestsQueue.Dequeue(Request))
{
if (Request.HasTimedOut())
{
// Ajouter à la queue des requêtes expirées
TimedOutRequestsQueue.Enqueue(Request);
TimeoutCount++;
UE_LOG(logDTFluxNetwork, Warning,
TEXT("Request %s timed out: Type=%d, ContestId=%d, StageId=%d, SplitId=%d"),
*Request.RequestId.ToString(), (int32)Request.RequestType, Request.ContestId, Request.StageId,
Request.SplitId);
}
else
{
// Remettre dans la queue temporaire
TempQueue.Enqueue(Request);
}
}
// Remettre les requêtes non expirées dans la queue principale
while (TempQueue.Dequeue(Request))
{
PendingRequestsQueue.Enqueue(Request);
}
return TimeoutCount;
}
int32 UDTFluxQueuedManager::CleanCashedRequests()
{
int32 CleanedRequestsCount = 0;
// Queue temporaire pour stocker les requêtes encore valides
TQueue<FDTFluxQueuedRequest, EQueueMode::Mpsc> ValidCompletedRequests;
// Traiter toutes les requêtes terminées
FDTFluxQueuedRequest CompletedRequest;
while (CompletedRequestsQueue.Dequeue(CompletedRequest))
{
// Vérifier si la requête est cacheable et a reçu une réponse
if (CompletedRequest.bIsCacheable && CompletedRequest.bHasReceivedResponse)
{
// Calculer l'âge de la requête en secondes
float RequestAge = (FDateTime::Now() - CompletedRequest.CreatedAt).GetTotalSeconds();
// Vérifier si le cache est encore valide
if (RequestAge <= CompletedRequest.CachedValidity)
{
// Le cache est encore valide, conserver la requête
ValidCompletedRequests.Enqueue(CompletedRequest);
}
else
{
// Le cache a expiré, compter cette requête comme nettoyée
CleanedRequestsCount++;
UE_LOG(LogTemp, Verbose,
TEXT("DTFluxQueuedManager: Cleaned expired cached request %s (Age: %.2fs, Validity: %.2fs)"),
*CompletedRequest.RequestId.ToString(), RequestAge, CompletedRequest.CachedValidity);
}
}
else
{
// Requête non cacheable ou sans réponse, la conserver
ValidCompletedRequests.Enqueue(CompletedRequest);
}
}
// Restaurer la queue avec uniquement les requêtes valides
while (ValidCompletedRequests.Dequeue(CompletedRequest))
{
CompletedRequestsQueue.Enqueue(CompletedRequest);
}
// Log du résultat si des requêtes ont été nettoyées
if (CleanedRequestsCount > 0)
{
UE_LOG(LogTemp, Log, TEXT("DTFluxQueuedManager: Cleaned %d expired cached requests"), CleanedRequestsCount);
}
return CleanedRequestsCount;
}
void UDTFluxQueuedManager::ClearAllRequests()
{
// Vider toutes les queues
FDTFluxQueuedRequest DummyRequest;
while (PendingRequestsQueue.Dequeue(DummyRequest))
{
}
while (CompletedRequestsQueue.Dequeue(DummyRequest))
{
}
while (TimedOutRequestsQueue.Dequeue(DummyRequest))
{
}
UE_LOG(logDTFluxNetwork, Log, TEXT("Cleared all pending requests"));
}
void UDTFluxQueuedManager::Tick(float DeltaTime)
{
if (!bIsInitialized)
{
return;
}
// Incrémenter le temps écoulé
TimeSinceLastCheck += DeltaTime;
// Vérifier si c'est le moment de nettoyer les requêtes expirées
if (TimeSinceLastCheck >= CheckInterval)
{
TimeSinceLastCheck = 0.0f;
CleanupTimedOutRequests();
}
// Traiter les requêtes expirées
FDTFluxQueuedRequest TimedOutRequest;
while (TimedOutRequestsQueue.Dequeue(TimedOutRequest))
{
// Déclencher l'événement pour chaque requête expirée
OnRequestTimedOut.Broadcast(TimedOutRequest);
}
}
bool UDTFluxQueuedManager::IsTickable() const
{
return bIsInitialized;
}
TStatId UDTFluxQueuedManager::GetStatId() const
{
RETURN_QUICK_DECLARE_CYCLE_STAT(UDTFluxQueuedManager, STATGROUP_Tickables);
}