From 89f623eefbc7b9240346e6a52b4b2044ee4e8a74 Mon Sep 17 00:00:00 2001 From: Anthony Rossi <41394064+anrossi@users.noreply.github.com> Date: Wed, 17 Dec 2025 20:02:34 -0800 Subject: [PATCH 1/5] Drain Work Queues in ExecutionDelete --- src/platform/platform_worker.c | 54 ++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index 1ac0e1dc2d..e2ca9bbd84 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -160,6 +160,16 @@ UpdatePollCompletion( CxPlatUpdateExecutionContexts(Worker); } +void +CxPlatWorkerPoolWorkerDrainEvents( + _In_ CXPLAT_WORKER* Worker + ); + +void +CxPlatProcessEvents( + _In_ CXPLAT_WORKER* Worker + ); + BOOLEAN CxPlatWorkerPoolInitWorker( _Inout_ CXPLAT_WORKER* Worker, @@ -449,6 +459,17 @@ CxPlatWorkerPoolDelete( #else UNREFERENCED_PARAMETER(RefType); #endif + + if (RefType == CXPLAT_WORKER_POOL_REF_EXTERNAL) { + // + // In the case of external execution, it's possible for ExecutionDelete + // to run before all the queues have been drained of internal cleanup work. + // Run all the workers until there's nothing left to do here. + // + for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { + CxPlatWorkerPoolWorkerDrainEvents(&WorkerPool->Workers[i]); + } + } CxPlatRundownReleaseAndWait(&WorkerPool->Rundown); #if DEBUG @@ -666,6 +687,39 @@ CxPlatWorkerPoolWorkerPoll( return Worker->State.WaitTime; } +void +CxPlatWorkerPoolWorkerDrainEvents( + _In_ CXPLAT_WORKER* Worker + ) +{ + uint32_t Iterations = 0; + do { + Worker->State.TimeNow = CxPlatTimeUs64(); + Worker->State.ThreadID = CxPlatCurThreadID(); + + CxPlatRunExecutionContexts(Worker); + if (Worker->State.WaitTime && InterlockedFetchAndClearBoolean(&Worker->Running)) { + Worker->State.TimeNow = CxPlatTimeUs64(); + CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions + } + + // + // Set the wait time to zero here to process as soon as possible. + // Otherwise, CxPlatProcessEvents may wait this many milliseconds. + // + Worker->State.WaitTime = 0; + + // + // Assume there is no work to do, and this will update to zero if work was done. + // + Worker->State.NoWorkCount = 1; + CxPlatProcessEvents(Worker); + + ++Iterations; + CXPLAT_DBG_ASSERTMSG(Iterations < 10, "Is the library still active?"); + } while (Worker->State.NoWorkCount == 0); +} + #define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second #define DYNAMIC_POOL_PRUNE_COUNT 8 From b4c324d543b39b4c992ec9be965a1460d60b4103 Mon Sep 17 00:00:00 2001 From: Anthony Rossi <41394064+anrossi@users.noreply.github.com> Date: Wed, 17 Dec 2025 20:22:11 -0800 Subject: [PATCH 2/5] Wrap debug variable in debug checks --- src/platform/platform_worker.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index e2ca9bbd84..83c49ea90c 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -692,7 +692,9 @@ CxPlatWorkerPoolWorkerDrainEvents( _In_ CXPLAT_WORKER* Worker ) { +#if DEBUG uint32_t Iterations = 0; +#endif do { Worker->State.TimeNow = CxPlatTimeUs64(); Worker->State.ThreadID = CxPlatCurThreadID(); @@ -715,8 +717,9 @@ CxPlatWorkerPoolWorkerDrainEvents( Worker->State.NoWorkCount = 1; CxPlatProcessEvents(Worker); - ++Iterations; - CXPLAT_DBG_ASSERTMSG(Iterations < 10, "Is the library still active?"); +#if DEBUG + CXPLAT_DBG_ASSERTMSG(++Iterations < 10, "Is the library still active?"); +#endif } while (Worker->State.NoWorkCount == 0); } From 231abf13f720982bcfa9d8098efb147bd37fd89c Mon Sep 17 00:00:00 2001 From: Anthony Rossi <41394064+anrossi@users.noreply.github.com> Date: Thu, 8 Jan 2026 18:29:43 -0800 Subject: [PATCH 3/5] Refactor --- src/platform/platform_worker.c | 73 +++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 32 deletions(-) diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index 83c49ea90c..0f4f04257a 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -92,6 +92,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { BOOLEAN StoppingThread : 1; BOOLEAN StoppedThread : 1; BOOLEAN DestroyedThread : 1; + BOOLEAN DrainingEvents : 1; #if DEBUG // Debug flags - Must not be in the bitfield. BOOLEAN ThreadStarted; BOOLEAN ThreadFinished; @@ -108,6 +109,7 @@ typedef struct CXPLAT_WORKER_POOL { CXPLAT_RUNDOWN_REF Rundown; uint32_t WorkerCount; + BOOLEAN External; #if DEBUG // @@ -161,8 +163,8 @@ UpdatePollCompletion( } void -CxPlatWorkerPoolWorkerDrainEvents( - _In_ CXPLAT_WORKER* Worker +CxPlatWorkerPoolDrainEvents( + _In_ CXPLAT_WORKER_POOL* WorkerPool ); void @@ -312,6 +314,7 @@ CxPlatWorkerPoolCreate( } CxPlatZeroMemory(WorkerPool, WorkerPoolSize); WorkerPool->WorkerCount = ProcessorCount; + WorkerPool->External = FALSE; // // Build up the configuration for creating the worker threads. @@ -405,6 +408,7 @@ CxPlatWorkerPoolCreateExternal( } CxPlatZeroMemory(WorkerPool, WorkerPoolSize); WorkerPool->WorkerCount = Count; + WorkerPool->External = TRUE; // // Set up each worker thread with the configuration initialized above. Also @@ -460,15 +464,13 @@ CxPlatWorkerPoolDelete( UNREFERENCED_PARAMETER(RefType); #endif - if (RefType == CXPLAT_WORKER_POOL_REF_EXTERNAL) { + if (WorkerPool->External) { // // In the case of external execution, it's possible for ExecutionDelete // to run before all the queues have been drained of internal cleanup work. // Run all the workers until there's nothing left to do here. // - for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { - CxPlatWorkerPoolWorkerDrainEvents(&WorkerPool->Workers[i]); - } + CxPlatWorkerPoolDrainEvents(WorkerPool); } CxPlatRundownReleaseAndWait(&WorkerPool->Rundown); @@ -687,40 +689,30 @@ CxPlatWorkerPoolWorkerPoll( return Worker->State.WaitTime; } +#define CXPLAT_WORKER_DRAINING_ITERATION_COUNT 10 + void -CxPlatWorkerPoolWorkerDrainEvents( - _In_ CXPLAT_WORKER* Worker +CxPlatWorkerPoolDrainEvents( + _In_ CXPLAT_WORKER_POOL* WorkerPool ) { -#if DEBUG - uint32_t Iterations = 0; -#endif + BOOLEAN MoreWork = FALSE; do { - Worker->State.TimeNow = CxPlatTimeUs64(); - Worker->State.ThreadID = CxPlatCurThreadID(); + MoreWork = FALSE; - CxPlatRunExecutionContexts(Worker); - if (Worker->State.WaitTime && InterlockedFetchAndClearBoolean(&Worker->Running)) { - Worker->State.TimeNow = CxPlatTimeUs64(); - CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions - } + for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { + CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; + Worker->DrainingEvents = TRUE; + Worker->StoppedThread = FALSE; - // - // Set the wait time to zero here to process as soon as possible. - // Otherwise, CxPlatProcessEvents may wait this many milliseconds. - // - Worker->State.WaitTime = 0; + CxPlatWorkerThread(Worker); - // - // Assume there is no work to do, and this will update to zero if work was done. - // - Worker->State.NoWorkCount = 1; - CxPlatProcessEvents(Worker); + if (Worker->State.NoWorkCount < CXPLAT_WORKER_DRAINING_ITERATION_COUNT) { + MoreWork = TRUE; + } + } -#if DEBUG - CXPLAT_DBG_ASSERTMSG(++Iterations < 10, "Is the library still active?"); -#endif - } while (Worker->State.NoWorkCount == 0); + } while (MoreWork); } #define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second @@ -868,6 +860,16 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions } + if (Worker->DrainingEvents) { + if (Worker->State.WaitTime == UINT32_MAX) { + // + // Don't wait forever for new events, just check if any events are present and + // continue. + // + Worker->State.WaitTime = 0; + } + } + CxPlatProcessEvents(Worker); if (Worker->State.NoWorkCount == 0) { @@ -881,6 +883,13 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatProcessDynamicPoolAllocators(Worker); Worker->State.LastPoolProcessTime = Worker->State.TimeNow; } + + if (Worker->DrainingEvents) { + // + // Don't run indefinitely; Just do one iteration and stop. + // + Worker->StoppedThread = TRUE; + } } Worker->Running = FALSE; From 544ca9d1aba0298aa623200d527965e0ae20b493 Mon Sep 17 00:00:00 2001 From: Anthony Rossi <41394064+anrossi@users.noreply.github.com> Date: Fri, 9 Jan 2026 20:05:49 -0800 Subject: [PATCH 4/5] Create cleanup threads and transfer workers to run on them --- src/platform/platform_worker.c | 92 +++++++++++++--------------------- 1 file changed, 36 insertions(+), 56 deletions(-) diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index 0f4f04257a..58beaf9107 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -18,8 +18,9 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // - // Thread used to drive the worker. Only set when the worker is created and - // managed internally (default case). + // Thread used to drive the worker, when the worker is created and + // managed internally (default case). In the external execution case, + // this thread is the cleanup thread. // CXPLAT_THREAD Thread; @@ -68,6 +69,12 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { // CXPLAT_SLIST_ENTRY* ExecutionContexts; + // + // The event for the cleanup worker to wait on to start cleanup. + // NULL in the internally-managed execution case. + // + CXPLAT_EVENT* CleanupEvent; + #if DEBUG // Debug statistics uint64_t LoopCount; uint64_t EcPollCount; @@ -92,7 +99,6 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { BOOLEAN StoppingThread : 1; BOOLEAN StoppedThread : 1; BOOLEAN DestroyedThread : 1; - BOOLEAN DrainingEvents : 1; #if DEBUG // Debug flags - Must not be in the bitfield. BOOLEAN ThreadStarted; BOOLEAN ThreadFinished; @@ -108,6 +114,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER { typedef struct CXPLAT_WORKER_POOL { CXPLAT_RUNDOWN_REF Rundown; + CXPLAT_EVENT CleanupEvent; // Only used with external workers. uint32_t WorkerCount; BOOLEAN External; @@ -162,11 +169,6 @@ UpdatePollCompletion( CxPlatUpdateExecutionContexts(Worker); } -void -CxPlatWorkerPoolDrainEvents( - _In_ CXPLAT_WORKER_POOL* WorkerPool - ); - void CxPlatProcessEvents( _In_ CXPLAT_WORKER* Worker @@ -175,9 +177,10 @@ CxPlatProcessEvents( BOOLEAN CxPlatWorkerPoolInitWorker( _Inout_ CXPLAT_WORKER* Worker, + _In_ CXPLAT_WORKER_POOL* WorkerPool, _In_ uint16_t IdealProcessor, _In_opt_ CXPLAT_EVENTQ* EventQ, // Only for external workers - _In_opt_ CXPLAT_THREAD_CONFIG* ThreadConfig // Only for internal workers + _In_ CXPLAT_THREAD_CONFIG* ThreadConfig ) { CxPlatLockInitialize(&Worker->ECLock); @@ -186,6 +189,7 @@ CxPlatWorkerPoolInitWorker( Worker->IdealProcessor = IdealProcessor; Worker->State.WaitTime = UINT32_MAX; Worker->State.ThreadID = UINT32_MAX; + Worker->CleanupEvent = &WorkerPool->CleanupEvent; if (EventQ != NULL) { Worker->EventQ = *EventQ; @@ -351,7 +355,7 @@ CxPlatWorkerPoolCreate( CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; if (!CxPlatWorkerPoolInitWorker( - Worker, IdealProcessor, NULL, &ThreadConfig)) { + Worker, WorkerPool, IdealProcessor, NULL, &ThreadConfig)) { goto Error; } } @@ -381,6 +385,15 @@ CxPlatWorkerPoolCreate( return NULL; } +CXPLAT_THREAD_CALLBACK(CxPlatExecutionCleanupThread, Context) +{ + CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context; + + CxPlatEventWaitForever(*Worker->CleanupEvent); + + return CxPlatWorkerThread(Context); +} + _Success_(return != NULL) CXPLAT_WORKER_POOL* CxPlatWorkerPoolCreateExternal( @@ -410,6 +423,16 @@ CxPlatWorkerPoolCreateExternal( WorkerPool->WorkerCount = Count; WorkerPool->External = TRUE; + CxPlatEventInitialize(&WorkerPool->CleanupEvent, TRUE, FALSE); + + CXPLAT_THREAD_CONFIG ThreadConfig = { + CXPLAT_THREAD_FLAG_SET_IDEAL_PROC, + 0, + "cxplat_exec_cleanup", + CxPlatExecutionCleanupThread, + NULL + }; + // // Set up each worker thread with the configuration initialized above. Also // creates the event queue and all the SQEs used to shutdown, wake and poll @@ -421,7 +444,7 @@ CxPlatWorkerPoolCreateExternal( CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; if (!CxPlatWorkerPoolInitWorker( - Worker, IdealProcessor, Configs[i].EventQ, NULL)) { + Worker, WorkerPool, IdealProcessor, Configs[i].EventQ, &ThreadConfig)) { goto Error; } Executions[i] = (QUIC_EXECUTION*)Worker; @@ -468,9 +491,9 @@ CxPlatWorkerPoolDelete( // // In the case of external execution, it's possible for ExecutionDelete // to run before all the queues have been drained of internal cleanup work. - // Run all the workers until there's nothing left to do here. + // Allow the cleanup threads to run until there's nothing left to do. // - CxPlatWorkerPoolDrainEvents(WorkerPool); + CxPlatEventSet(WorkerPool->CleanupEvent); } CxPlatRundownReleaseAndWait(&WorkerPool->Rundown); @@ -689,32 +712,6 @@ CxPlatWorkerPoolWorkerPoll( return Worker->State.WaitTime; } -#define CXPLAT_WORKER_DRAINING_ITERATION_COUNT 10 - -void -CxPlatWorkerPoolDrainEvents( - _In_ CXPLAT_WORKER_POOL* WorkerPool - ) -{ - BOOLEAN MoreWork = FALSE; - do { - MoreWork = FALSE; - - for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) { - CXPLAT_WORKER* Worker = &WorkerPool->Workers[i]; - Worker->DrainingEvents = TRUE; - Worker->StoppedThread = FALSE; - - CxPlatWorkerThread(Worker); - - if (Worker->State.NoWorkCount < CXPLAT_WORKER_DRAINING_ITERATION_COUNT) { - MoreWork = TRUE; - } - } - - } while (MoreWork); -} - #define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second #define DYNAMIC_POOL_PRUNE_COUNT 8 @@ -860,16 +857,6 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions } - if (Worker->DrainingEvents) { - if (Worker->State.WaitTime == UINT32_MAX) { - // - // Don't wait forever for new events, just check if any events are present and - // continue. - // - Worker->State.WaitTime = 0; - } - } - CxPlatProcessEvents(Worker); if (Worker->State.NoWorkCount == 0) { @@ -883,13 +870,6 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context) CxPlatProcessDynamicPoolAllocators(Worker); Worker->State.LastPoolProcessTime = Worker->State.TimeNow; } - - if (Worker->DrainingEvents) { - // - // Don't run indefinitely; Just do one iteration and stop. - // - Worker->StoppedThread = TRUE; - } } Worker->Running = FALSE; From e00e0c69486fc8a8c58bfee1b371d138b93083b7 Mon Sep 17 00:00:00 2001 From: Anthony Rossi <41394064+anrossi@users.noreply.github.com> Date: Sun, 11 Jan 2026 18:56:21 -0800 Subject: [PATCH 5/5] Cleanup event on uninitialize. Update comment. --- src/platform/platform_worker.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/platform/platform_worker.c b/src/platform/platform_worker.c index 58beaf9107..b3e2e83f93 100644 --- a/src/platform/platform_worker.c +++ b/src/platform/platform_worker.c @@ -169,11 +169,6 @@ UpdatePollCompletion( CxPlatUpdateExecutionContexts(Worker); } -void -CxPlatProcessEvents( - _In_ CXPLAT_WORKER* Worker - ); - BOOLEAN CxPlatWorkerPoolInitWorker( _Inout_ CXPLAT_WORKER* Worker, @@ -469,6 +464,7 @@ CxPlatWorkerPoolCreateExternal( CxPlatWorkerPoolDestroyWorker(Worker); } + CxPlatEventUninitialize(WorkerPool->CleanupEvent); CXPLAT_FREE(WorkerPool, QUIC_POOL_PLATFORM_WORKER); return NULL; @@ -491,7 +487,9 @@ CxPlatWorkerPoolDelete( // // In the case of external execution, it's possible for ExecutionDelete // to run before all the queues have been drained of internal cleanup work. - // Allow the cleanup threads to run until there's nothing left to do. + // By calling ExecutionDelete, the app has indicated it is done with MsQuic, + // so take ownership of the workers and allow them to run on cleanup threads + // until there's nothing left to do. // CxPlatEventSet(WorkerPool->CleanupEvent); } @@ -509,6 +507,9 @@ CxPlatWorkerPoolDelete( } CxPlatRundownUninitialize(&WorkerPool->Rundown); + if (WorkerPool->External) { + CxPlatEventUninitialize(WorkerPool->CleanupEvent); + } CXPLAT_FREE(WorkerPool, QUIC_POOL_PLATFORM_WORKER); } }