diff --git a/src/tools/spin/spinquic.cpp b/src/tools/spin/spinquic.cpp index bc33a678d9..3e3d9f65b5 100644 --- a/src/tools/spin/spinquic.cpp +++ b/src/tools/spin/spinquic.cpp @@ -134,13 +134,11 @@ T GetRandom(T UpperBound, uint16_t ThreadID = UINT16_MAX) { } return (T)out; } -#define GetRandom(UpperBound) GetRandom(UpperBound, ThreadID) template T& GetRandomFromVector(std::vector &vec, uint16_t ThreadID) { - return vec.at(GetRandom(vec.size())); + return vec.at(GetRandom(vec.size(), ThreadID)); } -#define GetRandomFromVector(Vec) GetRandomFromVector(Vec, ThreadID) template class LockableVector : public std::vector, public std::mutex { @@ -149,7 +147,7 @@ class LockableVector : public std::vector, public std::mutex { T TryGetRandom(bool Erase = false) { std::lock_guard Lock(*this); if (this->size() > 0) { - auto idx = GetRandom(this->size()); + auto idx = GetRandom(this->size(), ThreadID); auto obj = this->at(idx); if (Erase) { this->erase(this->begin() + idx); @@ -217,10 +215,9 @@ struct SpinQuicGlobals { QUIC_BUFFER* Alpns {nullptr}; uint32_t AlpnCount {0}; const size_t SendBufferSize { MaxBufferSizes[BufferCount - 1] + UINT8_MAX }; - uint8_t* SendBuffer; - SpinQuicGlobals() { - SendBuffer = new uint8_t[SendBufferSize]; - for (size_t i = 0; i < SendBufferSize; i++) { + std::vector SendBuffer; + SpinQuicGlobals() : SendBuffer(SendBufferSize) { + for (size_t i = 0; i < SendBuffer.size(); i++) { SendBuffer[i] = (uint8_t)i; } } @@ -237,7 +234,20 @@ struct SpinQuicGlobals { free(Alpns); } if (Registration) { - MsQuic->RegistrationClose(Registration); + if (GetRandom(2) == 0) { + CXPLAT_EVENT CloseComplete; + CxPlatEventInitialize(&CloseComplete, TRUE, FALSE); + MsQuic->RegistrationClose2( + Registration, + [](void* Context) -> void { + CxPlatEventSet(*(CXPLAT_EVENT*)Context); + }, + &CloseComplete); + CxPlatEventWaitForever(CloseComplete); + CxPlatEventUninitialize(CloseComplete); + } else { + MsQuic->RegistrationClose(Registration); + } } if (MsQuic) { #ifndef FUZZING @@ -245,7 +255,6 @@ struct SpinQuicGlobals { #endif MsQuicClose(MsQuic); } - delete [] SendBuffer; } }; @@ -270,6 +279,8 @@ typedef enum { SpinQuicAPICallCompleteCertificateValidation, SpinQuicAPICallStreamReceiveSetEnabled, SpinQuicAPICallStreamReceiveComplete, + SpinQuicAPICallConnectionPoolCreate, + SpinQuicAPICallStreamProvideReceiveBuffers, SpinQuicAPICallCount // Always the last element } SpinQuicAPICall; @@ -279,12 +290,28 @@ struct SpinQuicStream { uint8_t SendOffset {0}; bool Deleting {false}; uint64_t PendingRecvLength {UINT64_MAX}; // UINT64_MAX means no pending receive + std::vector RecvBuffer; SpinQuicStream(SpinQuicConnection& Connection, HQUIC Handle = nullptr) : Connection(Connection), Handle(Handle) {} ~SpinQuicStream() { Deleting = true; MsQuic.StreamClose(Handle); } static SpinQuicStream* Get(HQUIC Stream) { return (SpinQuicStream*)MsQuic.GetContext(Stream); } + std::vector AllocateReceiveBuffers(const std::vector& Sizes) { + size_t TotalSize = 0; + for (auto Size : Sizes) { + TotalSize += Size; + } + size_t Offset = RecvBuffer.size(); + RecvBuffer.resize(Offset + TotalSize); + std::vector Buffers(Sizes.size()); + for (size_t i = 0; i < Sizes.size(); i++) { + Buffers[i].Length = Sizes[i]; + Buffers[i].Buffer = RecvBuffer.data() + Offset; + Offset += Sizes[i]; + } + return Buffers; + } }; struct SpinQuicConnection { @@ -345,7 +372,7 @@ struct SpinQuicConnection { // Requires Lock to be held HQUIC TryGetStream(bool Remove = false) { if (Streams.size() != 0) { - auto idx = GetRandom(Streams.size()); + auto idx = GetRandom(Streams.size(), ThreadID); HQUIC Stream = Streams[idx]; if (Remove) { Streams.erase(Streams.begin() + idx); @@ -379,22 +406,22 @@ QUIC_STATUS QUIC_API SpinQuicHandleStreamEvent(HQUIC Stream, void* , QUIC_STREAM auto ctx = SpinQuicStream::Get(Stream); auto ThreadID = ctx->Connection.ThreadID; - if (GetRandom(5) == 0) { + if (GetRandom(5, ThreadID) == 0) { SpinQuicGetRandomParam(Stream, ThreadID); } - if (GetRandom(10) == 0) { + if (GetRandom(10, ThreadID) == 0) { SpinQuicSetRandomStreamParam(Stream, ThreadID); } - if (!ctx->Deleting && GetRandom(20) == 0) { - MsQuic.StreamShutdown(Stream, (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16), 0); + if (!ctx->Deleting && GetRandom(20, ThreadID) == 0) { + MsQuic.StreamShutdown(Stream, (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16, ThreadID), 0); goto Exit; } switch (Event->Type) { case QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN: - MsQuic.StreamShutdown(Stream, (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16), 0); + MsQuic.StreamShutdown(Stream, (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16, ThreadID), 0); break; case QUIC_STREAM_EVENT_PEER_SEND_ABORTED: { std::lock_guard Lock(ctx->Connection.Lock); @@ -415,15 +442,15 @@ QUIC_STATUS QUIC_API SpinQuicHandleStreamEvent(HQUIC Stream, void* , QUIC_STREAM } Offset += Event->RECEIVE.Buffers[i].Length; } - int Random = GetRandom(5); + int Random = GetRandom(5, ThreadID); std::lock_guard Lock(ctx->Connection.Lock); CXPLAT_DBG_ASSERT(ctx->PendingRecvLength == UINT64_MAX); if (Random == 0) { ctx->PendingRecvLength = Event->RECEIVE.TotalBufferLength; return QUIC_STATUS_PENDING; // Pend the receive, to be completed later. } else if (Random == 1 && Event->RECEIVE.TotalBufferLength > 0) { - Event->RECEIVE.TotalBufferLength = GetRandom(Event->RECEIVE.TotalBufferLength + 1); // Partially (or fully) consume the data. - if (GetRandom(10) == 0) { + Event->RECEIVE.TotalBufferLength = GetRandom(Event->RECEIVE.TotalBufferLength + 1, ThreadID); // Partially (or fully) consume the data. + if (GetRandom(10, ThreadID) == 0) { return QUIC_STATUS_CONTINUE; // Don't pause receive callbacks. } } @@ -448,14 +475,14 @@ QUIC_STATUS QUIC_API SpinQuicHandleConnectionEvent(HQUIC Connection, void* , QUI switch (Event->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: { - int Selector = GetRandom(3); + int Selector = GetRandom(3, ThreadID); uint16_t DataLength = 0; uint8_t* Data = nullptr; if (Selector == 1) { // // Send ticket with some data // - DataLength = (uint16_t)(GetRandom(999) + 1); + DataLength = (uint16_t)(GetRandom(999, ThreadID) + 1); } else if (Selector == 2) { // // Send ticket with too much data @@ -472,7 +499,7 @@ QUIC_STATUS QUIC_API SpinQuicHandleConnectionEvent(HQUIC Connection, void* , QUI DataLength = 0; } } - QUIC_SEND_RESUMPTION_FLAGS Flags = (GetRandom(2) == 0) ? QUIC_SEND_RESUMPTION_FLAG_NONE : QUIC_SEND_RESUMPTION_FLAG_FINAL; + QUIC_SEND_RESUMPTION_FLAGS Flags = (GetRandom(2, ThreadID) == 0) ? QUIC_SEND_RESUMPTION_FLAG_NONE : QUIC_SEND_RESUMPTION_FLAG_FINAL; MsQuic.ConnectionSendResumptionTicket(Connection, Flags, DataLength, Data); free(Data); break; @@ -481,18 +508,30 @@ QUIC_STATUS QUIC_API SpinQuicHandleConnectionEvent(HQUIC Connection, void* , QUI SpinQuicConnection::Get(Connection)->OnShutdownComplete(); break; case QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED: { - if (GetRandom(10) == 0) { + if (GetRandom(10, ThreadID) == 0) { return QUIC_STATUS_NOT_SUPPORTED; } - if (GetRandom(10) == 0) { + if (GetRandom(10, ThreadID) == 0) { MsQuic.StreamClose(Event->PEER_STREAM_STARTED.Stream); return QUIC_STATUS_SUCCESS; } - if (GetRandom(2) == 0) { + if (GetRandom(2, ThreadID) == 0) { Event->PEER_STREAM_STARTED.Flags |= QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES; } + if (GetRandom(5, ThreadID) == 0) { + Event->PEER_STREAM_STARTED.Flags |= QUIC_STREAM_OPEN_FLAG_APP_OWNED_BUFFERS; + } auto StreamCtx = new SpinQuicStream(*ctx, Event->PEER_STREAM_STARTED.Stream); MsQuic.SetCallbackHandler(Event->PEER_STREAM_STARTED.Stream, (void *)SpinQuicHandleStreamEvent, StreamCtx); + if (Event->PEER_STREAM_STARTED.Flags & QUIC_STREAM_OPEN_FLAG_APP_OWNED_BUFFERS) { + uint32_t BufCount = GetRandom(3, ThreadID) + 1; // 1-3 buffers + std::vector Sizes(BufCount); + for (uint32_t i = 0; i < BufCount; i++) { + Sizes[i] = MaxBufferSizes[GetRandom(BufferCount, ThreadID)]; + } + auto Buffers = StreamCtx->AllocateReceiveBuffers(Sizes); + MsQuic.StreamProvideReceiveBuffers(Event->PEER_STREAM_STARTED.Stream, BufCount, Buffers.data()); + } ctx->AddStream(Event->PEER_STREAM_STARTED.Stream); break; } @@ -522,7 +561,7 @@ QUIC_STATUS QUIC_API SpinQuicServerHandleListenerEvent(HQUIC /* Listener */, voi switch (Event->Type) { case QUIC_LISTENER_EVENT_NEW_CONNECTION: { - if (!GetRandom(20)) { + if (!GetRandom(20, ThreadID)) { return QUIC_STATUS_CONNECTION_REFUSED; } MsQuic.SetCallbackHandler(Event->NEW_CONNECTION.Connection, (void*)SpinQuicHandleConnectionEvent, &((ListenerContext*)Context)->ThreadID); @@ -591,7 +630,7 @@ struct SetParamHelper { void SpinQuicRandomizeSettings(QUIC_SETTINGS& Settings, uint16_t ThreadID) { - switch (GetRandom(38)) { + switch (GetRandom(38, ThreadID)) { case 0: //Settings.MaxBytesPerKey = GetRandom(UINT64_MAX); //Settings.IsSet.MaxBytesPerKey = TRUE; @@ -661,7 +700,7 @@ void SpinQuicRandomizeSettings(QUIC_SETTINGS& Settings, uint16_t ThreadID) //Settings.IsSet.KeepAliveIntervalMs = TRUE; break; case 17: - Settings.CongestionControlAlgorithm = GetRandom((uint16_t)QUIC_CONGESTION_CONTROL_ALGORITHM_MAX); + Settings.CongestionControlAlgorithm = GetRandom((uint16_t)QUIC_CONGESTION_CONTROL_ALGORITHM_MAX, ThreadID); Settings.IsSet.CongestionControlAlgorithm = TRUE; break; case 18: @@ -693,27 +732,27 @@ void SpinQuicRandomizeSettings(QUIC_SETTINGS& Settings, uint16_t ThreadID) //Settings.IsSet.SendBufferingEnabled = TRUE; break; case 25: - Settings.PacingEnabled = GetRandom((uint8_t)1); + Settings.PacingEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.PacingEnabled = TRUE; break; case 26: - Settings.MigrationEnabled = GetRandom((uint8_t)1); + Settings.MigrationEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.MigrationEnabled = TRUE; break; case 27: - Settings.DatagramReceiveEnabled = GetRandom((uint8_t)1); + Settings.DatagramReceiveEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.DatagramReceiveEnabled = TRUE; break; case 28: - Settings.ServerResumptionLevel = GetRandom((uint8_t)3); + Settings.ServerResumptionLevel = GetRandom((uint8_t)3, ThreadID); Settings.IsSet.ServerResumptionLevel = TRUE; break; case 29: - Settings.GreaseQuicBitEnabled = GetRandom((uint8_t)1); + Settings.GreaseQuicBitEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.GreaseQuicBitEnabled = TRUE; break; case 30: - Settings.EcnEnabled = GetRandom((uint8_t)1); + Settings.EcnEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.EcnEnabled = TRUE; break; case 31: @@ -729,19 +768,19 @@ void SpinQuicRandomizeSettings(QUIC_SETTINGS& Settings, uint16_t ThreadID) //Settings.IsSet.DestCidUpdateIdleTimeoutMs = TRUE; break; case 34: - Settings.HyStartEnabled = GetRandom((uint8_t)1); + Settings.HyStartEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.HyStartEnabled = TRUE; break; case 35: - Settings.EncryptionOffloadAllowed = GetRandom((uint8_t)1); + Settings.EncryptionOffloadAllowed = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.EncryptionOffloadAllowed = TRUE; break; case 36: - Settings.ReliableResetEnabled = GetRandom((uint8_t)1); + Settings.ReliableResetEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.ReliableResetEnabled = TRUE; break; case 37: - Settings.OneWayDelayEnabled = GetRandom((uint8_t)1); + Settings.OneWayDelayEnabled = GetRandom((uint8_t)1, ThreadID); Settings.IsSet.OneWayDelayEnabled = TRUE; break; default: @@ -755,7 +794,7 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) QUIC_SETTINGS Settings = {0}; SetParamHelper Helper; - switch (0x05000000 | (GetRandom(24))) { + switch (0x05000000 | (GetRandom(24, ThreadID))) { case QUIC_PARAM_CONN_QUIC_VERSION: // uint32_t // QUIC_VERSION is get-only break; @@ -774,7 +813,7 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) case QUIC_PARAM_CONN_STATISTICS_PLAT: // QUIC_STATISTICS break; // Get Only case QUIC_PARAM_CONN_SHARE_UDP_BINDING: // uint8_t (BOOLEAN) - Helper.SetUint8(QUIC_PARAM_CONN_SHARE_UDP_BINDING, (uint8_t)GetRandom(2)); + Helper.SetUint8(QUIC_PARAM_CONN_SHARE_UDP_BINDING, (uint8_t)GetRandom(2, ThreadID)); break; case QUIC_PARAM_CONN_LOCAL_BIDI_STREAM_COUNT: // uint16_t break; // Get Only @@ -786,10 +825,10 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) Helper.SetPtr(QUIC_PARAM_CONN_CLOSE_REASON_PHRASE, "ABCDEFGHI\x00\x00\x00\x00\x00", 10); break; case QUIC_PARAM_CONN_STREAM_SCHEDULING_SCHEME: // QUIC_STREAM_SCHEDULING_SCHEME - Helper.SetUint32(QUIC_PARAM_CONN_STREAM_SCHEDULING_SCHEME, GetRandom(QUIC_STREAM_SCHEDULING_SCHEME_COUNT)); + Helper.SetUint32(QUIC_PARAM_CONN_STREAM_SCHEDULING_SCHEME, GetRandom(QUIC_STREAM_SCHEDULING_SCHEME_COUNT, ThreadID)); break; case QUIC_PARAM_CONN_DATAGRAM_RECEIVE_ENABLED: // uint8_t (BOOLEAN) - Helper.SetUint8(QUIC_PARAM_CONN_DATAGRAM_RECEIVE_ENABLED, (uint8_t)GetRandom(2)); + Helper.SetUint8(QUIC_PARAM_CONN_DATAGRAM_RECEIVE_ENABLED, (uint8_t)GetRandom(2, ThreadID)); break; case QUIC_PARAM_CONN_DATAGRAM_SEND_ENABLED: // uint8_t (BOOLEAN) break; // Get Only @@ -800,7 +839,7 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) // TODO break; case QUIC_PARAM_CONN_PEER_CERTIFICATE_VALID: // uint8_t (BOOLEAN) - Helper.SetUint8(QUIC_PARAM_CONN_PEER_CERTIFICATE_VALID, (uint8_t)GetRandom(2)); + Helper.SetUint8(QUIC_PARAM_CONN_PEER_CERTIFICATE_VALID, (uint8_t)GetRandom(2, ThreadID)); break; case QUIC_PARAM_CONN_LOCAL_INTERFACE: // uint32_t // TODO @@ -813,12 +852,12 @@ void SpinQuicSetRandomConnectionParam(HQUIC Connection, uint16_t ThreadID) case QUIC_PARAM_CONN_CIBIR_ID: // bytes[] if (FuzzData) { // assume 8 byte buffer for now - uint64_t Buffer = GetRandom(UINT64_MAX); + uint64_t Buffer = GetRandom(UINT64_MAX, ThreadID); memcpy(RandomBuffer, &Buffer, sizeof(RandomBuffer)); } else { CxPlatRandom(sizeof(RandomBuffer), RandomBuffer); } - Helper.SetPtr(QUIC_PARAM_CONN_CIBIR_ID, RandomBuffer, 1 + (uint8_t)GetRandom(sizeof(RandomBuffer))); + Helper.SetPtr(QUIC_PARAM_CONN_CIBIR_ID, RandomBuffer, 1 + (uint8_t)GetRandom(sizeof(RandomBuffer), ThreadID)); break; case QUIC_PARAM_CONN_STATISTICS_V2: // QUIC_STATISTICS_V2 break; // Get Only @@ -838,7 +877,7 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) { SetParamHelper Helper; - switch (0x08000000 | (GetRandom(6))) { + switch (0x08000000 | (GetRandom(6, ThreadID))) { case QUIC_PARAM_STREAM_ID: // QUIC_UINT62 break; // Get Only case QUIC_PARAM_STREAM_0RTT_LENGTH: // QUIC_ADDR @@ -846,12 +885,12 @@ void SpinQuicSetRandomStreamParam(HQUIC Stream, uint16_t ThreadID) case QUIC_PARAM_STREAM_IDEAL_SEND_BUFFER_SIZE: // QUIC_ADDR break; // Get Only case QUIC_PARAM_STREAM_PRIORITY: // uint16_t - Helper.SetUint16(QUIC_PARAM_STREAM_PRIORITY, (uint16_t)GetRandom(UINT16_MAX)); + Helper.SetUint16(QUIC_PARAM_STREAM_PRIORITY, (uint16_t)GetRandom(UINT16_MAX, ThreadID)); break; case QUIC_PARAM_STREAM_STATISTICS: break; // Get Only case QUIC_PARAM_STREAM_RELIABLE_OFFSET: - Helper.SetUint64(QUIC_PARAM_STREAM_RELIABLE_OFFSET, (uint64_t)GetRandom(UINT64_MAX)); + Helper.SetUint64(QUIC_PARAM_STREAM_RELIABLE_OFFSET, (uint64_t)GetRandom(UINT64_MAX, ThreadID)); default: break; } @@ -879,19 +918,19 @@ const uint32_t ParamCounts[] = { void SpinQuicGetRandomParam(HQUIC Handle, uint16_t ThreadID) { for (uint32_t i = 0; i < GET_PARAM_LOOP_COUNT; ++i) { - uint32_t Level = (uint32_t)GetRandom(ARRAYSIZE(ParamCounts)); - uint32_t Param = (uint32_t)GetRandom(((ParamCounts[Level] & 0xFFFFFFF)) + 1); + uint32_t Level = (uint32_t)GetRandom(ARRAYSIZE(ParamCounts), ThreadID); + uint32_t Param = (uint32_t)GetRandom(((ParamCounts[Level] & 0xFFFFFFF)) + 1, ThreadID); uint32_t Combined = ((Level+1) << 28) + Param; Combined &= ~QUIC_PARAM_HIGH_PRIORITY; // TODO: enable high priority GetParam uint8_t OutBuffer[200]; - uint32_t OutBufferLength = (uint32_t)GetRandom(sizeof(OutBuffer) + 1); + uint32_t OutBufferLength = (uint32_t)GetRandom(sizeof(OutBuffer) + 1, ThreadID); MsQuic.GetParam( - (GetRandom(10) == 0) ? nullptr : Handle, + (GetRandom(10, ThreadID) == 0) ? nullptr : Handle, Combined, &OutBufferLength, - (GetRandom(10) == 0) ? nullptr : OutBuffer); + (GetRandom(10, ThreadID) == 0) ? nullptr : OutBuffer); } } @@ -908,7 +947,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste CxPlatTimeDiff64(Gb.StartTimeMs, CxPlatTimeMs64()) < SpinSettings.RunTimeMs) { if (Listeners) { - auto Value = GetRandom(100); + auto Value = GetRandom(100, ThreadID); if (Value >= 90) { for (auto &Listener : *Listeners) { MsQuic.ListenerStop(Listener); @@ -916,9 +955,9 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste } else if (Value >= 40) { for (auto &Listener : *Listeners) { QUIC_ADDR sockAddr = { 0 }; - QuicAddrSetFamily(&sockAddr, GetRandom(2) ? QUIC_ADDRESS_FAMILY_INET : QUIC_ADDRESS_FAMILY_UNSPEC); - QuicAddrSetPort(&sockAddr, GetRandomFromVector(SpinSettings.Ports)); - MsQuic.ListenerStart(Listener, &Gb.Alpns[GetRandom(Gb.AlpnCount)], 1, &sockAddr); + QuicAddrSetFamily(&sockAddr, GetRandom(2, ThreadID) ? QUIC_ADDRESS_FAMILY_INET : QUIC_ADDRESS_FAMILY_UNSPEC); + QuicAddrSetPort(&sockAddr, GetRandomFromVector(SpinSettings.Ports, ThreadID)); + MsQuic.ListenerStart(Listener, &Gb.Alpns[GetRandom(Gb.AlpnCount, ThreadID)], 1, &sockAddr); } } else { for (auto &Listener : *Listeners) { @@ -935,7 +974,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste continue; \ } - switch (GetRandom(SpinQuicAPICallCount)) { + switch (GetRandom(SpinQuicAPICallCount, ThreadID)) { case SpinQuicAPICallConnectionOpen: if (!IsServer) { auto ctx = new SpinQuicConnection(ThreadID); @@ -954,14 +993,14 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste case SpinQuicAPICallConnectionStart: { auto Connection = Connections.TryGetRandom(); BAIL_ON_NULL_CONNECTION(Connection); - HQUIC Configuration = GetRandomFromVector(Gb.ClientConfigurations); - MsQuic.ConnectionStart(Connection, Configuration, QUIC_ADDRESS_FAMILY_INET, SpinSettings.ServerName, GetRandomFromVector(SpinSettings.Ports)); + HQUIC Configuration = GetRandomFromVector(Gb.ClientConfigurations, ThreadID); + MsQuic.ConnectionStart(Connection, Configuration, QUIC_ADDRESS_FAMILY_INET, SpinSettings.ServerName, GetRandomFromVector(SpinSettings.Ports, ThreadID)); break; } case SpinQuicAPICallConnectionShutdown: { auto Connection = Connections.TryGetRandom(); BAIL_ON_NULL_CONNECTION(Connection); - MsQuic.ConnectionShutdown(Connection, (QUIC_CONNECTION_SHUTDOWN_FLAGS)GetRandom(2), 0); + MsQuic.ConnectionShutdown(Connection, (QUIC_CONNECTION_SHUTDOWN_FLAGS)GetRandom(2, ThreadID), 0); break; } case SpinQuicAPICallConnectionClose: { @@ -975,9 +1014,19 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste BAIL_ON_NULL_CONNECTION(Connection); HQUIC Stream; auto ctx = new SpinQuicStream(*SpinQuicConnection::Get(Connection)); - QUIC_STATUS Status = MsQuic.StreamOpen(Connection, (QUIC_STREAM_OPEN_FLAGS)GetRandom(8), SpinQuicHandleStreamEvent, ctx, &Stream); + QUIC_STREAM_OPEN_FLAGS OpenFlags = (QUIC_STREAM_OPEN_FLAGS)GetRandom(16, ThreadID); + QUIC_STATUS Status = MsQuic.StreamOpen(Connection, OpenFlags, SpinQuicHandleStreamEvent, ctx, &Stream); if (QUIC_SUCCEEDED(Status)) { ctx->Handle = Stream; + if ((OpenFlags & QUIC_STREAM_OPEN_FLAG_APP_OWNED_BUFFERS) && GetRandom(2, ThreadID) == 0) { + uint32_t BufCount = GetRandom(5, ThreadID) + 1; // 1-5 buffers + std::vector Sizes(BufCount); + for (uint32_t i = 0; i < BufCount; i++) { + Sizes[i] = MaxBufferSizes[GetRandom(BufferCount, ThreadID)]; + } + auto Buffers = ctx->AllocateReceiveBuffers(Sizes); + MsQuic.StreamProvideReceiveBuffers(Stream, BufCount, Buffers.data()); + } SpinQuicGetRandomParam(Stream, ThreadID); SpinQuicSetRandomStreamParam(Stream, ThreadID); SpinQuicConnection::Get(Connection)->AddStream(Stream); @@ -994,7 +1043,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste std::lock_guard Lock(ctx->Lock); auto Stream = ctx->TryGetStream(); if (Stream == nullptr) continue; - MsQuic.StreamStart(Stream, (QUIC_STREAM_START_FLAGS)GetRandom(16)); + MsQuic.StreamStart(Stream, (QUIC_STREAM_START_FLAGS)GetRandom(16, ThreadID)); } break; } @@ -1009,11 +1058,11 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste auto StreamCtx = SpinQuicStream::Get(Stream); auto Buffer = new(std::nothrow) QUIC_BUFFER; if (Buffer) { - const uint32_t Length = MaxBufferSizes[GetRandom(BufferCount)]; - Buffer->Buffer = Gb.SendBuffer + StreamCtx->SendOffset; + const uint32_t Length = MaxBufferSizes[GetRandom(BufferCount, ThreadID)]; + Buffer->Buffer = Gb.SendBuffer.data() + StreamCtx->SendOffset; Buffer->Length = Length; if (QUIC_SUCCEEDED( - MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(16), Buffer))) { + MsQuic.StreamSend(Stream, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(16, ThreadID), Buffer))) { StreamCtx->SendOffset = (uint8_t)(StreamCtx->SendOffset + Length); } else { delete Buffer; @@ -1030,7 +1079,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste std::lock_guard Lock(ctx->Lock); auto Stream = ctx->TryGetStream(); if (Stream == nullptr) continue; - MsQuic.StreamReceiveSetEnabled(Stream, GetRandom(2) == 0); + MsQuic.StreamReceiveSetEnabled(Stream, GetRandom(2, ThreadID) == 0); } break; } @@ -1046,8 +1095,8 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste if (StreamCtx->PendingRecvLength == UINT64_MAX) continue; // Nothing to complete (yet auto BytesRemaining = StreamCtx->PendingRecvLength; StreamCtx->PendingRecvLength = UINT64_MAX; - if (BytesRemaining != 0 && GetRandom(10) == 0) { - auto BytesConsumed = GetRandom(BytesRemaining); + if (BytesRemaining != 0 && GetRandom(10, ThreadID) == 0) { + auto BytesConsumed = GetRandom(BytesRemaining, ThreadID); MsQuic.StreamReceiveComplete(Stream, BytesConsumed); } else { MsQuic.StreamReceiveComplete(Stream, BytesRemaining); @@ -1063,7 +1112,7 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste std::lock_guard Lock(ctx->Lock); auto Stream = ctx->TryGetStream(); if (Stream == nullptr) continue; - auto Flags = (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16); + auto Flags = (QUIC_STREAM_SHUTDOWN_FLAGS)GetRandom(16, ThreadID); if (Flags & QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE) { auto StreamCtx = SpinQuicStream::Get(Stream); StreamCtx->PendingRecvLength = UINT64_MAX; @@ -1146,9 +1195,9 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste BAIL_ON_NULL_CONNECTION(Connection); auto Buffer = new(std::nothrow) QUIC_BUFFER; if (Buffer) { - Buffer->Buffer = Gb.SendBuffer; - Buffer->Length = MaxBufferSizes[GetRandom(BufferCount)]; - if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(8), Buffer))) { + Buffer->Buffer = Gb.SendBuffer.data(); + Buffer->Length = MaxBufferSizes[GetRandom(BufferCount, ThreadID)]; + if (QUIC_FAILED(MsQuic.DatagramSend(Connection, Buffer, 1, (QUIC_SEND_FLAGS)GetRandom(8, ThreadID), Buffer))) { delete Buffer; } } @@ -1157,13 +1206,73 @@ void Spin(Gbs& Gb, LockableVector& Connections, std::vector* Liste case SpinQuicAPICallCompleteTicketValidation: { auto Connection = Connections.TryGetRandom(); BAIL_ON_NULL_CONNECTION(Connection); - MsQuic.ConnectionResumptionTicketValidationComplete(Connection, GetRandom(2) == 0); + MsQuic.ConnectionResumptionTicketValidationComplete(Connection, GetRandom(2, ThreadID) == 0); break; } case SpinQuicAPICallCompleteCertificateValidation: { auto Connection = Connections.TryGetRandom(); BAIL_ON_NULL_CONNECTION(Connection); - MsQuic.ConnectionCertificateValidationComplete(Connection, GetRandom(2) == 0, QUIC_TLS_ALERT_CODE_BAD_CERTIFICATE); + MsQuic.ConnectionCertificateValidationComplete(Connection, GetRandom(2, ThreadID) == 0, QUIC_TLS_ALERT_CODE_BAD_CERTIFICATE); + break; + } + case SpinQuicAPICallConnectionPoolCreate: { + if (!IsServer) { + uint16_t PoolSize = (uint16_t)(GetRandom(4, ThreadID) + 1); // 1-4 connections + std::vector PoolConnections(PoolSize, nullptr); + std::vector Contexts(PoolSize, &ThreadID); + QUIC_CONNECTION_POOL_CONFIG PoolConfig = {0}; + PoolConfig.Registration = Gb.Registration; + PoolConfig.Configuration = GetRandomFromVector(Gb.ClientConfigurations, ThreadID); + PoolConfig.Handler = SpinQuicHandleConnectionEvent; + PoolConfig.Context = Contexts.data(); + PoolConfig.ServerName = SpinSettings.ServerName; + PoolConfig.ServerAddress = nullptr; + PoolConfig.Family = QUIC_ADDRESS_FAMILY_INET; + PoolConfig.ServerPort = GetRandomFromVector(SpinSettings.Ports, ThreadID); + PoolConfig.NumberOfConnections = PoolSize; + PoolConfig.CibirIds = nullptr; + PoolConfig.CibirIdLength = 0; + PoolConfig.Flags = (QUIC_CONNECTION_POOL_FLAGS)GetRandom(2, ThreadID); + QUIC_STATUS Status = MsQuic.ConnectionPoolCreate(&PoolConfig, PoolConnections.data()); + if (QUIC_SUCCEEDED(Status)) { + for (uint16_t i = 0; i < PoolSize; i++) { + auto ctx = new(std::nothrow) SpinQuicConnection(PoolConnections[i], ThreadID); + if (ctx != nullptr) { + std::lock_guard Lock(Connections); + Connections.push_back(PoolConnections[i]); + } else { + MsQuic.ConnectionClose(PoolConnections[i]); + } + } + } else if (!(PoolConfig.Flags & QUIC_CONNECTION_POOL_FLAG_CLOSE_ON_FAILURE)) { + for (uint16_t i = 0; i < PoolSize; i++) { + if (PoolConnections[i] != nullptr) { + MsQuic.ConnectionClose(PoolConnections[i]); + } + } + } + } + break; + } + case SpinQuicAPICallStreamProvideReceiveBuffers: { + auto Connection = Connections.TryGetRandom(); + BAIL_ON_NULL_CONNECTION(Connection); + auto ctx = SpinQuicConnection::Get(Connection); + { + std::lock_guard Lock(ctx->Lock); + auto Stream = ctx->TryGetStream(); + if (Stream == nullptr) { + continue; + } + auto StreamCtx = SpinQuicStream::Get(Stream); + uint32_t BufCount = GetRandom(3, ThreadID) + 1; // 1-3 buffers + std::vector Sizes(BufCount); + for (uint32_t i = 0; i < BufCount; i++) { + Sizes[i] = MaxBufferSizes[GetRandom(BufferCount, ThreadID)]; + } + auto Buffers = StreamCtx->AllocateReceiveBuffers(Sizes); + MsQuic.StreamProvideReceiveBuffers(Stream, BufCount, Buffers.data()); + } break; } default: @@ -1191,9 +1300,9 @@ CXPLAT_THREAD_CALLBACK(ServerSpin, Context) // QUIC_SETTINGS QuicSettings{0}; - QuicSettings.PeerBidiStreamCount = GetRandom((uint16_t)10); + QuicSettings.PeerBidiStreamCount = GetRandom((uint16_t)10, ThreadID); QuicSettings.IsSet.PeerBidiStreamCount = TRUE; - QuicSettings.PeerUnidiStreamCount = GetRandom((uint16_t)10); + QuicSettings.PeerUnidiStreamCount = GetRandom((uint16_t)10, ThreadID); QuicSettings.IsSet.PeerUnidiStreamCount = TRUE; // TODO - Randomize more of the settings. @@ -1233,7 +1342,7 @@ CXPLAT_THREAD_CALLBACK(ServerSpin, Context) } QUIC_ADDR sockAddr = { 0 }; - QuicAddrSetFamily(&sockAddr, GetRandom(2) ? QUIC_ADDRESS_FAMILY_INET : QUIC_ADDRESS_FAMILY_UNSPEC); + QuicAddrSetFamily(&sockAddr, GetRandom(2, ThreadID) ? QUIC_ADDRESS_FAMILY_INET : QUIC_ADDRESS_FAMILY_UNSPEC); QuicAddrSetPort(&sockAddr, pt); if (!QUIC_SUCCEEDED(MsQuic.ListenerStart(Listener, &Gb.Alpns[i], 1, &sockAddr))) { @@ -1385,21 +1494,21 @@ CXPLAT_THREAD_CALLBACK(RunThread, Context) QUIC_SETTINGS QuicSettings{0}; CXPLAT_THREAD_CONFIG Config = { 0 }; - if (0 == GetRandom(4)) { + if (0 == GetRandom(4, ThreadID)) { uint16_t RetryMemoryPercent = 0; if (!QUIC_SUCCEEDED(MsQuic.SetParam(nullptr, QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT, sizeof(RetryMemoryPercent), &RetryMemoryPercent))) { break; } } - if (0 == GetRandom(4)) { + if (0 == GetRandom(4, ThreadID)) { uint16_t LoadBalancingMode = QUIC_LOAD_BALANCING_SERVER_ID_IP; if (!QUIC_SUCCEEDED(MsQuic.SetParam(nullptr, QUIC_PARAM_GLOBAL_LOAD_BALACING_MODE, sizeof(LoadBalancingMode), &LoadBalancingMode))) { break; } } - if (0 == GetRandom(4)) { + if (0 == GetRandom(4, ThreadID)) { uint8_t StatelessResetKey[QUIC_STATELESS_RESET_KEY_LENGTH]; CxPlatRandom(sizeof(StatelessResetKey), StatelessResetKey); if (!QUIC_SUCCEEDED(MsQuic.SetParam( @@ -1413,7 +1522,7 @@ CXPLAT_THREAD_CALLBACK(RunThread, Context) QUIC_REGISTRATION_CONFIG RegConfig; RegConfig.AppName = "spinquic"; - RegConfig.ExecutionProfile = FuzzData ? QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER : (QUIC_EXECUTION_PROFILE)GetRandom(4); + RegConfig.ExecutionProfile = FuzzData ? QUIC_EXECUTION_PROFILE_TYPE_SCAVENGER : (QUIC_EXECUTION_PROFILE)GetRandom(4, ThreadID); if (!QUIC_SUCCEEDED(MsQuic.RegistrationOpen(&RegConfig, &Gb.Registration))) { break; @@ -1436,9 +1545,9 @@ CXPLAT_THREAD_CALLBACK(RunThread, Context) } } - QuicSettings.PeerBidiStreamCount = GetRandom((uint16_t)10); + QuicSettings.PeerBidiStreamCount = GetRandom((uint16_t)10, ThreadID); QuicSettings.IsSet.PeerBidiStreamCount = TRUE; - QuicSettings.PeerUnidiStreamCount = GetRandom((uint16_t)10); + QuicSettings.PeerUnidiStreamCount = GetRandom((uint16_t)10, ThreadID); QuicSettings.IsSet.PeerUnidiStreamCount = TRUE; // TODO - Randomize more of the settings. @@ -1554,11 +1663,11 @@ void start() { ExecConfigSize = 0; } - if (GetRandom(2) == 0) { + if (GetRandom(2, ThreadID) == 0) { const uint32_t ProcCount = CxPlatProcCount() == 1 ? 1 : - 1 + GetRandom(CxPlatProcCount() - 1); + 1 + GetRandom(CxPlatProcCount() - 1, ThreadID); printf("Using %u partitions...\n", ProcCount); ExecConfigSize = QUIC_GLOBAL_EXECUTION_CONFIG_MIN_SIZE + sizeof(uint16_t)*ProcCount; ExecConfig = (QUIC_GLOBAL_EXECUTION_CONFIG*)malloc(ExecConfigSize);