Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/inc/quic_platform_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,22 @@ CxPlatCqeGetSqe(

#endif

//
// WCP compatibility layer for non-Windows platforms.
// On POSIX, WCP (Wait Completion Packet) doesn't exist, so we map
// WCP-specific types and functions to their standard equivalents.
//
typedef CXPLAT_SQE CXPLAT_SQE_WCP;

#define CxPlatEventQEnqueueWcp(queue, sqe) \
CxPlatEventQEnqueue(queue, (CXPLAT_SQE*)(sqe))

#define CxPlatSqeInitializeWcp(queue, completion, sqe) \
CxPlatSqeInitialize(queue, completion, (CXPLAT_SQE*)(sqe))

#define CxPlatSqeCleanupWcp(queue, sqe) \
CxPlatSqeCleanup(queue, (CXPLAT_SQE*)(sqe))

//
// Thread Interfaces.
//
Expand Down
163 changes: 162 additions & 1 deletion src/inc/quic_platform_winuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,52 @@ typedef struct CXPLAT_SQE {
#endif
} CXPLAT_SQE;

// Extended SQE with Wait Completion Packet support for manual events
typedef struct CXPLAT_SQE_WCP {
CXPLAT_SQE BaseSqe; // Base SQE must be first for correct casting.
HANDLE WcpEvent; // Manual-reset event for wake packets
HANDLE WaitCompletionPacket; // Wait completion packet bound to Event
} CXPLAT_SQE_WCP;
//
// Wait Completion Packet functions from ntdll.dll.
//
typedef NTSTATUS (NTAPI *FuncNtCreateWaitCompletionPacket)(
_Out_ PHANDLE WaitCompletionPacketHandle,
_In_ ACCESS_MASK DesiredAccess,
_In_opt_ PVOID ObjectAttributes
);

typedef NTSTATUS (NTAPI *FuncNtAssociateWaitCompletionPacket)(
_In_ HANDLE WaitCompletionPacketHandle,
_In_ HANDLE IoCompletionHandle,
_In_ HANDLE TargetObjectHandle,
_In_opt_ PVOID KeyContext,
_In_opt_ PVOID ApcContext,
_In_ NTSTATUS IoStatus,
_In_ ULONG_PTR IoStatusInformation,
_Out_opt_ PBOOLEAN TargetApcInvoked
);

typedef NTSTATUS (NTAPI *FuncNtCancelWaitCompletionPacket)(
_In_ HANDLE WaitCompletionPacketHandle,
_In_ BOOLEAN RemoveSignaledPacket
);

// Global function pointers (initialized in CxPlatInitialize)
extern FuncNtCreateWaitCompletionPacket NtCreateWaitCompletionPacket;
extern FuncNtAssociateWaitCompletionPacket NtAssociateWaitCompletionPacket;
extern FuncNtCancelWaitCompletionPacket NtCancelWaitCompletionPacket;

// Helper to check if WCP APIs are available
QUIC_INLINE
BOOLEAN
CxPlatWcpAvailable(void)
{
return NtCreateWaitCompletionPacket != NULL &&
NtAssociateWaitCompletionPacket != NULL &&
NtCancelWaitCompletionPacket != NULL;
}

QUIC_INLINE
BOOLEAN
CxPlatEventQInitialize(
Expand Down Expand Up @@ -838,6 +884,22 @@ CxPlatEventQEnqueueEx( // Windows specific extension
return PostQueuedCompletionStatus(*queue, num_bytes, 0, &sqe->Overlapped) != 0;
}

// Enqueue WCP-based SQE to the event queue.
// Falls back to standard PQCS if WCP is unavailable.
QUIC_INLINE
void
CxPlatEventQEnqueueWcp(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE_WCP* sqe
)
{
if (sqe->WcpEvent) {
CxPlatEventSet(sqe->WcpEvent);
} else {
CxPlatEventQEnqueue(queue, &sqe->BaseSqe);
}
}

QUIC_INLINE
uint32_t
CxPlatEventQDequeue(
Expand All @@ -848,7 +910,9 @@ CxPlatEventQDequeue(
)
{
ULONG out_count = 0;
if (!GetQueuedCompletionStatusEx(*queue, events, count, &out_count, wait_time, FALSE)) return 0;
if (!GetQueuedCompletionStatusEx(*queue, events, count, &out_count, wait_time, FALSE)) {
return 0;
}
CXPLAT_DBG_ASSERT(out_count != 0);
CXPLAT_DBG_ASSERT(events[0].lpOverlapped != NULL || out_count == 1);
#if DEBUG
Expand All @@ -872,6 +936,7 @@ CxPlatEventQReturn(
UNREFERENCED_PARAMETER(count);
}

// Only WakeSqe and UpdatePollSqe will be using this.
QUIC_INLINE
BOOLEAN
CxPlatSqeInitialize(
Expand All @@ -886,6 +951,69 @@ CxPlatSqeInitialize(
return TRUE;
}

// Currently only ShutdownSqe will be using this, but it can be used by any SQE that wants to take advantage of WCP for manual events.
QUIC_INLINE
BOOLEAN
CxPlatSqeInitializeWcp(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_EVENT_COMPLETION completion,
_Out_ CXPLAT_SQE_WCP* sqe
)
{
UNREFERENCED_PARAMETER(queue);
CxPlatZeroMemory(sqe, sizeof(*sqe));
sqe->BaseSqe.Completion = completion;

// Only attempt WCP initialization if APIs are available
if (!CxPlatWcpAvailable()) {
// WCP not available - this is acceptable on older Windows versions
// SQE will fall back to standard PQCS (with OOM risk)
return TRUE;
}

if (sqe->WcpEvent == NULL) {
CxPlatEventInitialize(&sqe->WcpEvent, TRUE, FALSE);
}
if (sqe->WcpEvent == NULL) {
return FALSE;
}

NTSTATUS status = NtCreateWaitCompletionPacket(&sqe->WaitCompletionPacket,
GENERIC_ALL,
NULL);
if (!NT_SUCCESS(status)) {
CloseHandle(sqe->WcpEvent);
return FALSE;
}

status = NtAssociateWaitCompletionPacket(
sqe->WaitCompletionPacket,
*queue,
sqe->WcpEvent,
NULL,
&sqe->BaseSqe,
0, // IoStatus STATUS_SUCCESS
0,
NULL);

if (!NT_SUCCESS(status)) {
NTSTATUS CancelStatus = NtCancelWaitCompletionPacket(
sqe->WaitCompletionPacket, // WaitCompletionPacketHandle
TRUE); // RemoveSignaledPacket
CXPLAT_DBG_ASSERT(NT_SUCCESS(CancelStatus));
// Close the Handle Only if the cancel succeeded.
// In failure case leak it to avoid a memory corruption.
if (NT_SUCCESS(CancelStatus)) {
CloseHandle(sqe->WaitCompletionPacket);
CloseHandle(sqe->WcpEvent);
}
return FALSE;
}

return TRUE;
}

// Most of the SQEs that are created through CxPlatStartDatapathIo will be using this.
QUIC_INLINE
void
CxPlatSqeInitializeEx(
Expand All @@ -909,6 +1037,39 @@ CxPlatSqeCleanup(
{
UNREFERENCED_PARAMETER(queue);
UNREFERENCED_PARAMETER(sqe);
// No-op for base SQE
}

QUIC_INLINE
void
CxPlatSqeCleanupWcp(
_In_ CXPLAT_EVENTQ* queue,
_In_ CXPLAT_SQE_WCP* sqe
)
{
if (sqe->WcpEvent)
{
if (sqe->WaitCompletionPacket) {
if (CxPlatWcpAvailable()) {
NTSTATUS CancelStatus = NtCancelWaitCompletionPacket(
sqe->WaitCompletionPacket, // WaitCompletionPacketHandle
TRUE); // RemoveSignaledPacket
CXPLAT_DBG_ASSERT(NT_SUCCESS(CancelStatus));
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests show that assertion triggering.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed assert as the failure is expected

// Close the Handle Only if the cancel succeeded.
// In failure case leak it to avoid a memory corruption.
if (NT_SUCCESS(CancelStatus)) {
CloseHandle(sqe->WaitCompletionPacket);
CloseHandle(sqe->WcpEvent);
}
}
} else {
CloseHandle(sqe->WcpEvent);
}
sqe->WcpEvent = NULL;
sqe->WaitCompletionPacket = NULL;
}
UNREFERENCED_PARAMETER(queue);
UNREFERENCED_PARAMETER(sqe);
}

QUIC_INLINE
Expand Down
14 changes: 14 additions & 0 deletions src/platform/platform_winuser.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ QUIC_TRACE_RUNDOWN_CALLBACK* QuicTraceRundownCallback;
//
typedef LONG (WINAPI *FuncRtlGetVersion)(RTL_OSVERSIONINFOW *);

// Global function pointers (initialized in CxPlatInitialize)
FuncNtCreateWaitCompletionPacket NtCreateWaitCompletionPacket = NULL;
FuncNtAssociateWaitCompletionPacket NtAssociateWaitCompletionPacket = NULL;
FuncNtCancelWaitCompletionPacket NtCancelWaitCompletionPacket = NULL;

_IRQL_requires_max_(PASSIVE_LEVEL)
void
CxPlatSystemLoad(
Expand Down Expand Up @@ -266,6 +271,15 @@ CxPlatInitialize(
SuccessfullySetVersion = TRUE;
}
}

// Load Wait Completion Packet functions (available on Windows 11+)
NtCreateWaitCompletionPacket =
(FuncNtCreateWaitCompletionPacket)GetProcAddress(NtDllHandle, "NtCreateWaitCompletionPacket");
NtAssociateWaitCompletionPacket =
(FuncNtAssociateWaitCompletionPacket)GetProcAddress(NtDllHandle, "NtAssociateWaitCompletionPacket");
NtCancelWaitCompletionPacket =
(FuncNtCancelWaitCompletionPacket)GetProcAddress(NtDllHandle, "NtCancelWaitCompletionPacket");

FreeLibrary(NtDllHandle);
}
CXPLAT_DBG_ASSERT(SuccessfullySetVersion); // TODO: Is the assert here enough or is there an appropriate QUIC_STATUS we return?
Expand Down
49 changes: 41 additions & 8 deletions src/platform/platform_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ typedef struct QUIC_CACHEALIGN CXPLAT_WORKER {

//
// Submission queue entry for shutting down the worker thread.
//
CXPLAT_SQE ShutdownSqe;
// On Windows, this uses the extended WCP-enabled SQE for better performance.
// On other platforms, this is a standard SQE.
CXPLAT_SQE_WCP ShutdownSqe;

//
// Submission queue entry for waking the thread to poll.
Expand Down Expand Up @@ -131,6 +132,7 @@ ShutdownCompletion(
{
CXPLAT_WORKER* Worker =
CXPLAT_CONTAINING_RECORD(CxPlatCqeGetSqe(Cqe), CXPLAT_WORKER, ShutdownSqe);

Worker->StoppedThread = TRUE;
}

Expand Down Expand Up @@ -188,13 +190,15 @@ CxPlatWorkerPoolInitWorker(
Worker->InitializedEventQ = TRUE;
}

if (!CxPlatSqeInitialize(&Worker->EventQ, ShutdownCompletion, &Worker->ShutdownSqe)) {
// On Windows, use WCP-enabled SQE for shutdown
if (!CxPlatSqeInitializeWcp(&Worker->EventQ, ShutdownCompletion, &Worker->ShutdownSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"CxPlatSqeInitialize(shutdown)");
"CxPlatSqeInitializeWcp(shutdown)");
return FALSE;
}

Worker->InitializedShutdownSqe = TRUE;

if (!CxPlatSqeInitialize(&Worker->EventQ, WakeCompletion, &Worker->WakeSqe)) {
Expand Down Expand Up @@ -235,7 +239,8 @@ CxPlatWorkerPoolDestroyWorker(
{
if (Worker->InitializedThread) {
Worker->StoppingThread = TRUE;
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->ShutdownSqe);

CxPlatEventQEnqueueWcp(&Worker->EventQ, &Worker->ShutdownSqe);
CxPlatThreadWait(&Worker->Thread);
CxPlatThreadDelete(&Worker->Thread);
#if DEBUG
Expand All @@ -253,7 +258,7 @@ CxPlatWorkerPoolDestroyWorker(
CxPlatSqeCleanup(&Worker->EventQ, &Worker->WakeSqe);
}
if (Worker->InitializedShutdownSqe) {
CxPlatSqeCleanup(&Worker->EventQ, &Worker->ShutdownSqe);
CxPlatSqeCleanupWcp(&Worker->EventQ, &Worker->ShutdownSqe);
}
if (Worker->InitializedEventQ) {
CxPlatEventQCleanup(&Worker->EventQ);
Expand Down Expand Up @@ -547,7 +552,19 @@ CxPlatWorkerPoolAddExecutionContext(
CxPlatLockRelease(&Worker->ECLock);

if (QueueEvent) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe);
//
// Notify the worker to process the newly added execution context.
// The EC is already safely queued in PendingECs above, so if this
// enqueue fails (e.g., due to OOM), the worker will still eventually
// process the EC on its next loop iteration via CxPlatUpdateExecutionContexts.
// This results in increased latency but not a critical failure.
//
if (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->UpdatePollSqe)) {
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"UpdatePollSqe enqueue failed (likely OOM) - EC will be processed on next worker iteration");
}
}
}

Expand All @@ -558,7 +575,23 @@ CxPlatWakeExecutionContext(
{
CXPLAT_WORKER* Worker = (CXPLAT_WORKER*)Context->CxPlatContext;
if (!InterlockedFetchAndSetBoolean(&Worker->Running)) {
CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe);
//
// Worker was idle (Running was FALSE). Set it to TRUE and wake the worker.
// If the enqueue fails (e.g., due to OOM), reset Running to FALSE to allow
// subsequent wake attempts to retry. This prevents the worker from being
// stuck in a long/infinite wait when Running=TRUE but no wake signal was sent.
//
if (!CxPlatEventQEnqueue(&Worker->EventQ, &Worker->WakeSqe)) {
//
// Failed to wake the worker. Reset Running to FALSE using an interlocked
// operation to maintain thread safety with the worker thread.
//
InterlockedFetchAndClearBoolean(&Worker->Running);
QuicTraceEvent(
LibraryError,
"[ lib] ERROR, %s.",
"WakeSqe enqueue failed (likely OOM) - wake attempt will be retried");
}
}
}

Expand Down
Loading