Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {

//
// Thread used to drive the worker. Only set when the worker is created and
// managed internally (default case).
// Thread used to drive the worker, when the worker is created and
// managed internally (default case). In the external execution case,
// this thread is the cleanup thread.
//
CXPLAT_THREAD Thread;

Expand Down Expand Up @@ -68,6 +69,12 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
//
CXPLAT_SLIST_ENTRY* ExecutionContexts;

//
// The event for the cleanup worker to wait on to start cleanup.
// NULL in the internally-managed execution case.
//
CXPLAT_EVENT* CleanupEvent;

#if DEBUG // Debug statistics
uint64_t LoopCount;
uint64_t EcPollCount;
Expand Down Expand Up @@ -107,7 +114,9 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
typedef struct CXPLAT_WORKER_POOL {

CXPLAT_RUNDOWN_REF Rundown;
CXPLAT_EVENT CleanupEvent; // Only used with external workers.
uint32_t WorkerCount;
BOOLEAN External;

#if DEBUG
//
Expand Down Expand Up @@ -163,9 +172,10 @@ UpdatePollCompletion(
BOOLEAN
CxPlatWorkerPoolInitWorker(
_Inout_ CXPLAT_WORKER* Worker,
_In_ CXPLAT_WORKER_POOL* WorkerPool,
_In_ uint16_t IdealProcessor,
_In_opt_ CXPLAT_EVENTQ* EventQ, // Only for external workers
_In_opt_ CXPLAT_THREAD_CONFIG* ThreadConfig // Only for internal workers
_In_ CXPLAT_THREAD_CONFIG* ThreadConfig
)
{
CxPlatLockInitialize(&Worker->ECLock);
Expand All @@ -174,6 +184,7 @@ CxPlatWorkerPoolInitWorker(
Worker->IdealProcessor = IdealProcessor;
Worker->State.WaitTime = UINT32_MAX;
Worker->State.ThreadID = UINT32_MAX;
Worker->CleanupEvent = &WorkerPool->CleanupEvent;

if (EventQ != NULL) {
Worker->EventQ = *EventQ;
Expand Down Expand Up @@ -302,6 +313,7 @@ CxPlatWorkerPoolCreate(
}
CxPlatZeroMemory(WorkerPool, WorkerPoolSize);
WorkerPool->WorkerCount = ProcessorCount;
WorkerPool->External = FALSE;

//
// Build up the configuration for creating the worker threads.
Expand Down Expand Up @@ -338,7 +350,7 @@ CxPlatWorkerPoolCreate(

CXPLAT_WORKER* Worker = &WorkerPool->Workers[i];
if (!CxPlatWorkerPoolInitWorker(
Worker, IdealProcessor, NULL, &ThreadConfig)) {
Worker, WorkerPool, IdealProcessor, NULL, &ThreadConfig)) {
goto Error;
}
}
Expand Down Expand Up @@ -368,6 +380,15 @@ CxPlatWorkerPoolCreate(
return NULL;
}

CXPLAT_THREAD_CALLBACK(CxPlatExecutionCleanupThread, Context)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function needs comments, it isn't easily understandable without context.
Outside of this PR, it would be very hard to understand why we would ever need to start the worker thread on cleanup (and it already takes some looking around to know that CxPlatWorkerThread starts the worker thread, and doesn't, for instance, returns the worker thread handle from the context).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment could echo what the comment lower ("In the case of external execution...") says, as in "When we take ownership of the execution context after the app call ExecutionDelete, this drive the execution ... yadayada... to execute the cleanup"

{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context;

CxPlatEventWaitForever(*Worker->CleanupEvent);

return CxPlatWorkerThread(Context);
}

_Success_(return != NULL)
CXPLAT_WORKER_POOL*
CxPlatWorkerPoolCreateExternal(
Expand Down Expand Up @@ -395,6 +416,17 @@ CxPlatWorkerPoolCreateExternal(
}
CxPlatZeroMemory(WorkerPool, WorkerPoolSize);
WorkerPool->WorkerCount = Count;
WorkerPool->External = TRUE;

CxPlatEventInitialize(&WorkerPool->CleanupEvent, TRUE, FALSE);

CXPLAT_THREAD_CONFIG ThreadConfig = {
CXPLAT_THREAD_FLAG_SET_IDEAL_PROC,
0,
"cxplat_exec_cleanup",
CxPlatExecutionCleanupThread,
NULL
};

//
// Set up each worker thread with the configuration initialized above. Also
Expand All @@ -407,7 +439,7 @@ CxPlatWorkerPoolCreateExternal(

CXPLAT_WORKER* Worker = &WorkerPool->Workers[i];
if (!CxPlatWorkerPoolInitWorker(
Worker, IdealProcessor, Configs[i].EventQ, NULL)) {
Worker, WorkerPool, IdealProcessor, Configs[i].EventQ, &ThreadConfig)) {
goto Error;
}
Executions[i] = (QUIC_EXECUTION*)Worker;
Expand All @@ -432,6 +464,7 @@ CxPlatWorkerPoolCreateExternal(
CxPlatWorkerPoolDestroyWorker(Worker);
}

CxPlatEventUninitialize(WorkerPool->CleanupEvent);
CXPLAT_FREE(WorkerPool, QUIC_POOL_PLATFORM_WORKER);

return NULL;
Expand All @@ -449,6 +482,17 @@ CxPlatWorkerPoolDelete(
#else
UNREFERENCED_PARAMETER(RefType);
#endif

if (WorkerPool->External) {
//
// In the case of external execution, it's possible for ExecutionDelete
// to run before all the queues have been drained of internal cleanup work.
// By calling ExecutionDelete, the app has indicated it is done with MsQuic,
// so take ownership of the workers and allow them to run on cleanup threads
// until there's nothing left to do.
//
CxPlatEventSet(WorkerPool->CleanupEvent);
}
CxPlatRundownReleaseAndWait(&WorkerPool->Rundown);

#if DEBUG
Expand All @@ -463,6 +507,9 @@ CxPlatWorkerPoolDelete(
}

CxPlatRundownUninitialize(&WorkerPool->Rundown);
if (WorkerPool->External) {
CxPlatEventUninitialize(WorkerPool->CleanupEvent);
}
CXPLAT_FREE(WorkerPool, QUIC_POOL_PLATFORM_WORKER);
}
}
Expand Down
Loading