Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {
BOOLEAN StoppingThread : 1;
BOOLEAN StoppedThread : 1;
BOOLEAN DestroyedThread : 1;
BOOLEAN DrainingEvents : 1;
#if DEBUG // Debug flags - Must not be in the bitfield.
BOOLEAN ThreadStarted;
BOOLEAN ThreadFinished;
Expand All @@ -108,6 +109,7 @@ typedef struct CXPLAT_WORKER_POOL {

CXPLAT_RUNDOWN_REF Rundown;
uint32_t WorkerCount;
BOOLEAN External;

#if DEBUG
//
Expand Down Expand Up @@ -160,6 +162,16 @@ UpdatePollCompletion(
CxPlatUpdateExecutionContexts(Worker);
}

void
CxPlatWorkerPoolDrainEvents(
_In_ CXPLAT_WORKER_POOL* WorkerPool
);

void
CxPlatProcessEvents(
_In_ CXPLAT_WORKER* Worker
);

BOOLEAN
CxPlatWorkerPoolInitWorker(
_Inout_ CXPLAT_WORKER* Worker,
Expand Down Expand Up @@ -302,6 +314,7 @@ CxPlatWorkerPoolCreate(
}
CxPlatZeroMemory(WorkerPool, WorkerPoolSize);
WorkerPool->WorkerCount = ProcessorCount;
WorkerPool->External = FALSE;

//
// Build up the configuration for creating the worker threads.
Expand Down Expand Up @@ -395,6 +408,7 @@ CxPlatWorkerPoolCreateExternal(
}
CxPlatZeroMemory(WorkerPool, WorkerPoolSize);
WorkerPool->WorkerCount = Count;
WorkerPool->External = TRUE;

//
// Set up each worker thread with the configuration initialized above. Also
Expand Down Expand Up @@ -449,6 +463,15 @@ CxPlatWorkerPoolDelete(
#else
UNREFERENCED_PARAMETER(RefType);
#endif

if (WorkerPool->External) {
//
// In the case of external execution, it's possible for ExecutionDelete
// to run before all the queues have been drained of internal cleanup work.
// Run all the workers until there's nothing left to do here.
//
CxPlatWorkerPoolDrainEvents(WorkerPool);
}
CxPlatRundownReleaseAndWait(&WorkerPool->Rundown);

#if DEBUG
Expand Down Expand Up @@ -666,6 +689,32 @@ CxPlatWorkerPoolWorkerPoll(
return Worker->State.WaitTime;
}

#define CXPLAT_WORKER_DRAINING_ITERATION_COUNT 10

void
CxPlatWorkerPoolDrainEvents(
_In_ CXPLAT_WORKER_POOL* WorkerPool
)
{
BOOLEAN MoreWork = FALSE;
do {
MoreWork = FALSE;

for (uint32_t i = 0; i < WorkerPool->WorkerCount; ++i) {
CXPLAT_WORKER* Worker = &WorkerPool->Workers[i];
Worker->DrainingEvents = TRUE;
Worker->StoppedThread = FALSE;

CxPlatWorkerThread(Worker);

if (Worker->State.NoWorkCount < CXPLAT_WORKER_DRAINING_ITERATION_COUNT) {
MoreWork = TRUE;
}
}

} while (MoreWork);
}

#define DYNAMIC_POOL_PROCESSING_PERIOD 1000000 // 1 second
#define DYNAMIC_POOL_PRUNE_COUNT 8

Expand Down Expand Up @@ -811,6 +860,16 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
CxPlatRunExecutionContexts(Worker); // Run once more to handle race conditions
}

if (Worker->DrainingEvents) {
if (Worker->State.WaitTime == UINT32_MAX) {
//
// Don't wait forever for new events, just check if any events are present and
// continue.
//
Worker->State.WaitTime = 0;
}
}

CxPlatProcessEvents(Worker);

if (Worker->State.NoWorkCount == 0) {
Expand All @@ -824,6 +883,13 @@ CXPLAT_THREAD_CALLBACK(CxPlatWorkerThread, Context)
CxPlatProcessDynamicPoolAllocators(Worker);
Worker->State.LastPoolProcessTime = Worker->State.TimeNow;
}

if (Worker->DrainingEvents) {
//
// Don't run indefinitely; Just do one iteration and stop.
//
Worker->StoppedThread = TRUE;
}
}

Worker->Running = FALSE;
Expand Down
Loading