#include "DTFluxAsyncParser.h" #include "DTFluxNetworkModule.h" #include "Struct/DTFluxServerResponseStruct.h" #include "Async/AsyncWork.h" // ================================================================================================ // IMPLÉMENTATION DE LA TÂCHE DE PARSING // ================================================================================================ DECLARE_STATS_GROUP(TEXT("DTFlux"), STATGROUP_DTFlux, STATCAT_Advanced); DECLARE_CYCLE_STAT(TEXT("DTFlux Parsing Task"), STAT_FDTFluxParsingTask, STATGROUP_DTFlux); DECLARE_CYCLE_STAT(TEXT("DTFlux Parsing Task DoWork"), STAT_FDTFluxParsingTask_DoWork, STATGROUP_DTFlux); FDTFluxParsingTask::FDTFluxParsingTask( const FGuid& InRequestId, const FString& InRawJsonData, FOnParsingCompleted InOnCompleted, FOnParsingFailed InOnFailed ) : RequestId(InRequestId) , RawJsonData(InRawJsonData) , OnCompleted(InOnCompleted) , OnFailed(InOnFailed) , StartTime(FPlatformTime::Seconds()) { } void FDTFluxParsingTask::DoTask(ENamedThreads::Type CurrentThread, const FGraphEventRef& MyCompletionGraphEvent) { SCOPE_CYCLE_COUNTER(STAT_FDTFluxParsingTask_DoWork); UE_LOG(logDTFluxNetwork, VeryVerbose, TEXT("Starting async parsing for request %s"), *RequestId.ToString()); TSharedPtr ParsedResponse; bool bParsingSuccess = false; FString ErrorMessage; try { // === PARSING SUR LE THREAD WORKER === EDTFluxResponseStatus Status; ParsedResponse = MakeShared(RawJsonData, Status, false); // Pas de logs sur worker thread if (Status == EDTFluxResponseStatus::Success) { bParsingSuccess = true; UE_LOG(logDTFluxNetwork, VeryVerbose, TEXT("Async parsing successful for request %s"), *RequestId.ToString()); } else { ErrorMessage = FString::Printf(TEXT("Parsing failed with status: %s"), *UEnum::GetValueAsString(Status)); UE_LOG(logDTFluxNetwork, Warning, TEXT("Async parsing failed for request %s: %s"), *RequestId.ToString(), *ErrorMessage); } } catch (const std::exception& e) { ErrorMessage = FString::Printf(TEXT("Exception during parsing: %s"), ANSI_TO_TCHAR(e.what())); UE_LOG(logDTFluxNetwork, Error, TEXT("Exception during async parsing for request %s: %s"), *RequestId.ToString(), *ErrorMessage); } catch (...) { ErrorMessage = TEXT("Unknown exception during parsing"); UE_LOG(logDTFluxNetwork, Error, TEXT("Unknown exception during async parsing for request %s"), *RequestId.ToString()); } const float ParsingTime = (FPlatformTime::Seconds() - StartTime) * 1000.0f; // En millisecondes // === PROGRAMMER LA CALLBACK SUR LE MAIN THREAD === FFunctionGraphTask::CreateAndDispatchWhenReady( [this, ParsedResponse, bParsingSuccess, ErrorMessage, ParsingTime]() { // Cette lambda s'exécute sur le main thread if (bParsingSuccess && ParsedResponse.IsValid()) { OnCompleted.ExecuteIfBound(RequestId, ParsedResponse, true); } else { OnFailed.ExecuteIfBound(RequestId, ErrorMessage); } }, TStatId(), nullptr, ENamedThreads::GameThread // Forcer l'exécution sur le main thread ); } // ================================================================================================ // IMPLÉMENTATION DU PARSER ASYNCHRONE // ================================================================================================ FDTFluxAsyncParser::FDTFluxAsyncParser() { UE_LOG(logDTFluxNetwork, Log, TEXT("AsyncParser initialized")); } FDTFluxAsyncParser::~FDTFluxAsyncParser() { CancelAllParsing(); UE_LOG(logDTFluxNetwork, Log, TEXT("AsyncParser destroyed")); } void FDTFluxAsyncParser::ParseResponseAsync( const FGuid& RequestId, const FString& RawJsonData, FOnParsingCompleted OnCompleted, FOnParsingFailed OnFailed) { if (RawJsonData.IsEmpty()) { OnFailed.ExecuteIfBound(RequestId, TEXT("Empty JSON data")); return; } // Créer la tâche de parsing FGraphEventRef Task = FFunctionGraphTask::CreateAndDispatchWhenReady( [RequestId, RawJsonData, OnCompleted, OnFailed]() { // Ce code s'exécute sur le worker thread const double StartTime = FPlatformTime::Seconds(); TSharedPtr ParsedResponse; bool bParsingSuccess = false; FString ErrorMessage; try { EDTFluxResponseStatus Status; ParsedResponse = MakeShared(RawJsonData, Status, false); if (Status == EDTFluxResponseStatus::Success) { bParsingSuccess = true; } else { ErrorMessage = FString::Printf(TEXT("Parsing failed with status: %s"), *UEnum::GetValueAsString(Status)); } } catch (const std::exception& e) { ErrorMessage = FString::Printf(TEXT("Exception during parsing: %s"), ANSI_TO_TCHAR(e.what())); } catch (...) { ErrorMessage = TEXT("Unknown exception during parsing"); } const float ParsingTime = (FPlatformTime::Seconds() - StartTime) * 1000.0f; FFunctionGraphTask::CreateAndDispatchWhenReady( [RequestId, ParsedResponse, bParsingSuccess, ErrorMessage, OnCompleted, OnFailed]() { // Cette lambda s'exécute sur le main thread if (bParsingSuccess && ParsedResponse.IsValid()) { OnCompleted.ExecuteIfBound(RequestId, ParsedResponse, true); } else { OnFailed.ExecuteIfBound(RequestId, ErrorMessage); } }, TStatId(), nullptr, ENamedThreads::GameThread // Forcer main thread ); }, TStatId(), nullptr, ENamedThreads::AnyBackgroundThreadNormalTask ); // Tracker la tâche { FScopeLock Lock(&TasksLock); ActiveTasks.Add(Task); } UE_LOG(logDTFluxNetwork, Verbose, TEXT("Queued async parsing task for request %s"), *RequestId.ToString()); } TSharedPtr FDTFluxAsyncParser::ParseResponseSync( const FString& RawJsonData, float TimeoutSeconds) { if (RawJsonData.IsEmpty()) { return nullptr; } // Variables pour la synchronisation TSharedPtr Result; std::atomic bCompleted{false}; // Lancer le parsing async avec callback sync FOnParsingCompleted OnCompleted = FOnParsingCompleted::CreateLambda( [&Result, &bCompleted](const FGuid& RequestId, TSharedPtr ParsedResponse, bool bSuccess) { if (bSuccess) { Result = ParsedResponse; } bCompleted.store(true); } ); FOnParsingFailed OnFailed = FOnParsingFailed::CreateLambda( [&bCompleted](const FGuid& RequestId, const FString& ErrorMessage) { UE_LOG(logDTFluxNetwork, Warning, TEXT("Sync parsing failed: %s"), *ErrorMessage); bCompleted.store(true); } ); FGuid TempId = FGuid::NewGuid(); ParseResponseAsync(TempId, RawJsonData, OnCompleted, OnFailed); // Attendre avec timeout const double StartTime = FPlatformTime::Seconds(); while (!bCompleted.load() && (FPlatformTime::Seconds() - StartTime) < TimeoutSeconds) { FPlatformProcess::Sleep(0.001f); // 1ms } return Result; } void FDTFluxAsyncParser::CancelAllParsing() { FScopeLock Lock(&TasksLock); for (const FGraphEventRef& Task : ActiveTasks) { // Note: On ne peut pas vraiment "cancel" une tâche TaskGraph en cours, // mais on peut marquer qu'on ne veut plus les résultats } ActiveTasks.Empty(); UE_LOG(logDTFluxNetwork, Log, TEXT("Cancelled all pending parsing tasks")); } FDTFluxAsyncParser::FParsingStats FDTFluxAsyncParser::GetStats() const { FScopeLock StatsLock_Local(&StatsLock); FScopeLock TasksLock_Local(&TasksLock); FParsingStats Stats; Stats.TasksInProgress = ActiveTasks.Num(); Stats.TasksCompleted = TasksCompletedCount; Stats.TasksFailed = TasksFailedCount; if (ParsingTimes.Num() > 0) { float Sum = 0.0f; for (float Time : ParsingTimes) { Sum += Time; } Stats.AverageParsingTimeMs = Sum / ParsingTimes.Num(); } return Stats; } void FDTFluxAsyncParser::ResetStats() { FScopeLock Lock(&StatsLock); TasksCompletedCount = 0; TasksFailedCount = 0; ParsingTimes.Empty(); } void FDTFluxAsyncParser::OnTaskCompleted(bool bSuccess, float ParsingTimeMs) { FScopeLock Lock(&StatsLock); if (bSuccess) { TasksCompletedCount++; } else { TasksFailedCount++; } ParsingTimes.Add(ParsingTimeMs); // Garder seulement les 100 derniers temps pour la moyenne if (ParsingTimes.Num() > 100) { ParsingTimes.RemoveAt(0); } } void FDTFluxAsyncParser::CleanupCompletedTasks() { FScopeLock Lock(&TasksLock); for (auto It = ActiveTasks.CreateIterator(); It; ++It) { const FGraphEventRef& Task = *It; if (Task.IsValid() && Task->IsComplete()) { It.RemoveCurrent(); // Supprime l'élément actuel de manière sécurisée } } }