diff --git a/node/src/Consumer.ts b/node/src/Consumer.ts index ac092a5d2f..5462ded006 100644 --- a/node/src/Consumer.ts +++ b/node/src/Consumer.ts @@ -463,6 +463,67 @@ export class ConsumerImpl ); } + async degrade({ + durationMs = 0, + maxDelayMs = 0, + delayPercent = 0, + lossPercent = 0, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + } = {}): Promise { + if (durationMs > Math.pow(2, 32) - 1) { + durationMs = Math.pow(2, 32) - 1; + } else if (durationMs < 0) { + durationMs = 0; + } + + if (maxDelayMs > Math.pow(2, 16) - 1) { + maxDelayMs = Math.pow(2, 16) - 1; + } else if (maxDelayMs < 0) { + maxDelayMs = 0; + } + + if (delayPercent > 100) { + delayPercent = 100; + } else if (delayPercent < 0) { + delayPercent = 0; + } + + if (lossPercent > 100) { + lossPercent = 100; + } else if (lossPercent < 0) { + lossPercent = 0; + } + + if (durationMs === 0) { + maxDelayMs = 0; + lossPercent = 0; + lossPercent = 0; + } + + logger.debug( + `degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]` + ); + + /* Build Request. */ + const requestOffset = new FbsConsumer.DegradeRequestT( + durationMs, + maxDelayMs, + delayPercent, + lossPercent + ).pack(this.#channel.bufferBuilder); + + await this.#channel.request( + FbsRequest.Method.CONSUMER_DEGRADE, + FbsRequest.Body.Consumer_DegradeRequest, + requestOffset, + this.#internal.consumerId + ); + } + private handleWorkerNotifications(): void { this.#channel.on( this.#internal.consumerId, diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 74ca27615d..2105bd62c3 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -389,4 +389,42 @@ export interface Consumer * Enable 'trace' event. */ enableTraceEvent(types?: ConsumerTraceEventType[]): Promise; + + /** + * Degrade RTP transmission. + * - durationMs: Duration that the degradation will take. + * - maxDelayMs: Max delay (in ms) to be applied to each packet. + * - delayPercent: Only apply delay to this percent of the packets. + * - lossPercent: Generate packet loss by given percent value. + * + * @remarks + * - Only implemented in `SimpleConsumer`. + * - After `durationMs`, or if `consumer.degrade()` is called again with + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). + * + * @throws + * - If called on a non `SimpleConsumer` (due to method not implemented). + * + * @example + * ```ts + * consumer.degrade({ + * durationMs: 10000 + * maxDelayMs: 3000, + * delayPercent: 20, + * lossPercent: 0, + * }); + * ``` + */ + degrade({ + durationMs, + maxDelayMs, + delayPercent, + lossPercent, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + }): Promise; } diff --git a/node/src/Producer.ts b/node/src/Producer.ts index 9420b8f8eb..0e8062503f 100644 --- a/node/src/Producer.ts +++ b/node/src/Producer.ts @@ -311,6 +311,67 @@ export class ProducerImpl ); } + async degrade({ + durationMs = 0, + maxDelayMs = 0, + delayPercent = 0, + lossPercent = 0, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + } = {}): Promise { + if (durationMs > Math.pow(2, 32) - 1) { + durationMs = Math.pow(2, 32) - 1; + } else if (durationMs < 0) { + durationMs = 0; + } + + if (maxDelayMs > Math.pow(2, 16) - 1) { + maxDelayMs = Math.pow(2, 16) - 1; + } else if (maxDelayMs < 0) { + maxDelayMs = 0; + } + + if (delayPercent > 100) { + delayPercent = 100; + } else if (delayPercent < 0) { + delayPercent = 0; + } + + if (lossPercent > 100) { + lossPercent = 100; + } else if (lossPercent < 0) { + lossPercent = 0; + } + + if (durationMs === 0) { + maxDelayMs = 0; + lossPercent = 0; + lossPercent = 0; + } + + logger.debug( + `degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]` + ); + + /* Build Request. */ + const requestOffset = new FbsProducer.DegradeRequestT( + durationMs, + maxDelayMs, + delayPercent, + lossPercent + ).pack(this.#channel.bufferBuilder); + + await this.#channel.request( + FbsRequest.Method.CONSUMER_DEGRADE, + FbsRequest.Body.Consumer_DegradeRequest, + requestOffset, + this.#internal.producerId + ); + } + send(rtpPacket: Buffer): void { if (!Buffer.isBuffer(rtpPacket)) { throw new TypeError('rtpPacket must be a Buffer'); diff --git a/node/src/ProducerTypes.ts b/node/src/ProducerTypes.ts index 79c9df3bf6..98e420fe57 100644 --- a/node/src/ProducerTypes.ts +++ b/node/src/ProducerTypes.ts @@ -249,4 +249,38 @@ export interface Producer * Send RTP packet (just valid for Producers created on a DirectTransport). */ send(rtpPacket: Buffer): void; + + /** + * Degrade RTP transmission. + * - durationMs: Duration that the degradation will take. + * - maxDelayMs: Max delay (in ms) to be applied to each packet. + * - delayPercent: Only apply delay to this percent of the packets. + * - lossPercent: Generate packet loss by given percent value. + * + * @remarks + * - After `durationMs`, or if `producer.degrade()` is called again with + * `durationMs: 0`, then degradation is immediately stopped and all delayed + * buffered packets are immediately sent (all together). + * + * @example + * ```ts + * producer.degrade({ + * durationMs: 10000 + * maxDelayMs: 3000, + * delayPercent: 20, + * lossPercent: 0, + * }); + * ``` + */ + degrade({ + durationMs, + maxDelayMs, + delayPercent, + lossPercent, + }: { + durationMs?: number; + maxDelayMs?: number; + delayPercent?: number; + lossPercent?: number; + }): Promise; } diff --git a/worker/fbs/consumer.fbs b/worker/fbs/consumer.fbs index 715ce2bd53..e145511ff2 100644 --- a/worker/fbs/consumer.fbs +++ b/worker/fbs/consumer.fbs @@ -44,6 +44,13 @@ table EnableTraceEventRequest { events: [TraceEventType] (required); } +table DegradeRequest { + duration_ms: uint32; + max_delay_ms: uint16; + delay_percent: uint8; + loss_percent: uint8; +} + table DumpResponse { data: ConsumerDump (required); } diff --git a/worker/fbs/producer.fbs b/worker/fbs/producer.fbs index 8913cb9222..970d6e5d8b 100644 --- a/worker/fbs/producer.fbs +++ b/worker/fbs/producer.fbs @@ -18,6 +18,13 @@ table EnableTraceEventRequest { events: [TraceEventType] (required); } +table DegradeRequest { + duration_ms: uint32; + max_delay_ms: uint16; + delay_percent: uint8; + loss_percent: uint8; +} + table DumpResponse { id: string (required); kind: FBS.RtpParameters.MediaKind; diff --git a/worker/fbs/request.fbs b/worker/fbs/request.fbs index d9f869e878..939693caeb 100644 --- a/worker/fbs/request.fbs +++ b/worker/fbs/request.fbs @@ -52,6 +52,7 @@ enum Method: uint8 { PRODUCER_PAUSE, PRODUCER_RESUME, PRODUCER_ENABLE_TRACE_EVENT, + PRODUCER_DEGRADE, CONSUMER_DUMP, CONSUMER_GET_STATS, CONSUMER_PAUSE, @@ -60,6 +61,7 @@ enum Method: uint8 { CONSUMER_SET_PRIORITY, CONSUMER_REQUEST_KEY_FRAME, CONSUMER_ENABLE_TRACE_EVENT, + CONSUMER_DEGRADE, DATAPRODUCER_DUMP, DATAPRODUCER_GET_STATS, DATAPRODUCER_PAUSE, @@ -110,9 +112,11 @@ union Body { PipeTransport_ConnectRequest: FBS.PipeTransport.ConnectRequest, WebRtcTransport_ConnectRequest: FBS.WebRtcTransport.ConnectRequest, Producer_EnableTraceEventRequest: FBS.Producer.EnableTraceEventRequest, + Producer_DegradeRequest: FBS.Producer.DegradeRequest, Consumer_SetPreferredLayersRequest: FBS.Consumer.SetPreferredLayersRequest, Consumer_SetPriorityRequest: FBS.Consumer.SetPriorityRequest, Consumer_EnableTraceEventRequest: FBS.Consumer.EnableTraceEventRequest, + Consumer_DegradeRequest: FBS.Consumer.DegradeRequest, DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest, DataConsumer_SendRequest: FBS.DataConsumer.SendRequest, DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest, diff --git a/worker/include/RTC/SimpleConsumer.hpp b/worker/include/RTC/SimpleConsumer.hpp index 61345f8bb0..6627002c6e 100644 --- a/worker/include/RTC/SimpleConsumer.hpp +++ b/worker/include/RTC/SimpleConsumer.hpp @@ -5,12 +5,27 @@ #include "RTC/Consumer.hpp" #include "RTC/SeqManager.hpp" #include "RTC/Shared.hpp" +#include "handles/TimerHandle.hpp" +#include #include namespace RTC { - class SimpleConsumer : public RTC::Consumer, public RTC::RtpStreamSend::Listener + class SimpleConsumer : public RTC::Consumer, + public RTC::RtpStreamSend::Listener, + public TimerHandle::Listener { + private: + struct DelayedPacketItem + { + // Original packet. + RTC::SharedRtpPacket sharedPacket{ nullptr }; + // Arrival time of the original packet. + uint64_t arrivalTimeMs{ 0 }; + // Delay applied to the packet. + uint16_t delayMs{ 0 }; + }; + public: SimpleConsumer( RTC::Shared* shared, @@ -76,12 +91,19 @@ namespace RTC void StorePacketInTargetLayerRetransmissionBuffer( RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket); void EmitScore() const; + void ClearDegradation(bool sendDelayedPackets); + bool ShouldDelayPacket(const RTC::RtpPacket* packet) const; + bool ShouldDropPacket(const RTC::RtpPacket* packet) const; /* Pure virtual methods inherited from RtpStreamSend::Listener. */ public: void OnRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override; void OnRtpStreamRetransmitRtpPacket(RTC::RtpStreamSend* rtpStream, RTC::RtpPacket* packet) override; + /* Pure virtual methods inherited from TimerHandle::Listener. */ + public: + void OnTimer(TimerHandle* timer) override; + private: // Allocated by this. RTC::RtpStreamSend* rtpStream{ nullptr }; @@ -97,6 +119,12 @@ namespace RTC // video key frame. std::map::SeqLowerThan> targetLayerRetransmissionBuffer; + uint32_t maxDelayMs{ 0 }; + uint8_t delayPercent{ 0 }; + uint8_t lossPercent{ 0 }; + TimerHandle* degradationTimer{ nullptr }; + TimerHandle* delayTimer{ nullptr }; + std::list delayedPacketItems; }; } // namespace RTC diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index 82830bef7c..b449a68fe8 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -67,6 +67,7 @@ namespace Channel { FBS::Request::Method::CONSUMER_SET_PRIORITY, "consumer.setPriority" }, { FBS::Request::Method::CONSUMER_REQUEST_KEY_FRAME, "consumer.requestKeyFrame" }, { FBS::Request::Method::CONSUMER_ENABLE_TRACE_EVENT, "consumer.enableTraceEvent" }, + { FBS::Request::Method::CONSUMER_DEGRADE, "consumer.degrade" }, { FBS::Request::Method::DATAPRODUCER_DUMP, "dataProducer.dump" }, { FBS::Request::Method::DATAPRODUCER_GET_STATS, "dataProducer.getStats" }, { FBS::Request::Method::DATAPRODUCER_PAUSE, "dataProducer.pause" }, diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 60da29fa14..ad6abcd792 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -11,6 +11,9 @@ #ifdef MS_RTC_LOGGER_RTP #include "RTC/RtcLogger.hpp" #endif +#include + +const uint64_t DEGRADATION_DELAY_CHECK_INTERVAL_MS = 15; namespace RTC { @@ -82,6 +85,8 @@ namespace RTC delete this->rtpStream; this->targetLayerRetransmissionBuffer.clear(); + + ClearDegradation(/*sendDelayedPackets*/ false); } flatbuffers::Offset SimpleConsumer::FillBuffer( @@ -176,6 +181,55 @@ namespace RTC break; } + case Channel::ChannelRequest::Method::CONSUMER_DEGRADE: + { + const auto* body = request->data->body_as(); + uint32_t durationMs = body->durationMs(); + uint16_t maxDelayMs = body->maxDelayMs(); + uint8_t delayPercent = body->delayPercent(); + uint8_t lossPercent = body->lossPercent(); + + MS_DUMP( + "[DEGRADATION] applying consumer degradation [durationMs:%" PRIu32 ", maxDelayMs:%" PRIu16 + ", delayPercent:%" PRIu8 ", lossPercent:%" PRIu8 "]", + durationMs, + maxDelayMs, + delayPercent, + lossPercent); + + // First clear everything and send already delayed packets. + ClearDegradation(/*sendDelayedPackets*/ true); + + if (durationMs == 0) + { + MS_DUMP("[DEGRADATION] consumer degradation disabled"); + + request->Accept(); + + break; + } + + this->maxDelayMs = maxDelayMs; + this->delayPercent = delayPercent; + this->lossPercent = lossPercent; + this->degradationTimer = new TimerHandle(this); + + this->degradationTimer->Start(durationMs); + + if (this->maxDelayMs > 0 && this->delayPercent > 0) + { + this->delayTimer = new TimerHandle(this); + + // Check delayed packets every N ms. + this->delayTimer->Start( + DEGRADATION_DELAY_CHECK_INTERVAL_MS, DEGRADATION_DELAY_CHECK_INTERVAL_MS); + } + + request->Accept(); + + break; + } + default: { // Pass it to the parent class. @@ -322,6 +376,41 @@ namespace RTC packet->logger.consumerId = this->id; #endif + if (ShouldDropPacket(packet)) + { + MS_DUMP("[DEGRADATION] dropping packet [seq:%" PRIu16 "]", packet->GetSequenceNumber()); + + return; + } + + if (ShouldDelayPacket(packet)) + { + auto nowMs = DepLibUV::GetTimeMs(); + auto delayMs = static_cast(Utils::Crypto::GetRandomUInt(0, this->maxDelayMs)); + + MS_DUMP( + "[DEGRADATION] delaying packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + delayMs); + + // Only clone once and only if necessary. + if (!sharedPacket.HasPacket()) + { + sharedPacket.Assign(packet); + } + // Assert that, if sharedPacket was already filled, both packet and + // sharedPacket are the very same RTP packet. + else + { + sharedPacket.AssertSamePacket(packet); + } + + // Store original packet into the delay buffer. + this->delayedPacketItems.push_back({ sharedPacket, nowMs, delayMs }); + + return; + } + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -864,6 +953,91 @@ namespace RTC notificationOffset); } + void SimpleConsumer::ClearDegradation(bool sendDelayedPackets) + { + MS_TRACE(); + + this->maxDelayMs = 0; + this->delayPercent = 0; + this->lossPercent = 0; + + delete this->degradationTimer; + this->degradationTimer = nullptr; + + delete this->delayTimer; + this->delayTimer = nullptr; + + for (auto& item : this->delayedPacketItems) + { + if (sendDelayedPackets) + { + auto& sharedPacket = item.sharedPacket; + auto* packet = sharedPacket.GetPacket(); + + MS_DUMP( + "[DEGRADATION] terminated, sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + item.delayMs); + + SendRtpPacket(packet, sharedPacket); + } + } + + this->delayedPacketItems.clear(); + } + + bool SimpleConsumer::ShouldDelayPacket(const RTC::RtpPacket* packet) const + { + MS_TRACE(); + + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution dist(0, 99); + + if (!this->degradationTimer) + { + return false; + } + + if (!this->delayTimer) + { + return false; + } + + // Check if the packet is in the list of delayed packets, meaning that it's + // been sent from the onTimer() callback and hence must not be delayed + // again. + auto it = std::find_if( + this->delayedPacketItems.begin(), + this->delayedPacketItems.end(), + [&](const auto& item) { return item.sharedPacket.GetPacket() == packet; }); + + if (it != this->delayedPacketItems.end()) + { + return false; + } + + // Take into account use given delay percent. + return dist(gen) < this->delayPercent; + } + + bool SimpleConsumer::ShouldDropPacket(const RTC::RtpPacket* packet) const + { + MS_TRACE(); + + static std::random_device rd; + static std::mt19937 gen(rd()); + static std::uniform_int_distribution dist(0, 99); + + if (!this->degradationTimer) + { + return false; + } + + // Take into account user given loss percent. + return dist(gen) < this->lossPercent; + } + void SimpleConsumer::OnRtpStreamScore( RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/) { @@ -878,9 +1052,60 @@ namespace RTC { MS_TRACE(); + // Ignore the transmitted packet if it was delayed on purpose. + for (const auto& item : this->delayedPacketItems) + { + if (item.sharedPacket.GetPacket() == packet) + { + return; + } + } + this->listener->OnConsumerRetransmitRtpPacket(this, packet); // May emit 'trace' event. EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx()); } + + void SimpleConsumer::OnTimer(TimerHandle* timer) + { + MS_TRACE(); + + if (timer == this->degradationTimer) + { + // Clear everything and send already delayed packets. + ClearDegradation(/*sendDelayedPackets*/ true); + } + else if (timer == this->delayTimer) + { + auto nowMs = DepLibUV::GetTimeMs(); + + for (auto it = this->delayedPacketItems.begin(); it != this->delayedPacketItems.end();) + { + auto& item = *it; + + // Only send delayed packets whose arrival time + applied delay is less + // or equal than current time. Deleted the stored packet and remove the + // item from the list once the packet is sent. + if (item.arrivalTimeMs + item.delayMs <= nowMs) + { + auto& sharedPacket = item.sharedPacket; + auto* packet = sharedPacket.GetPacket(); + + MS_DUMP( + "[DEGRADATION] sending delayed packet [seq:%" PRIu16 ", delayMs:%" PRIu16 "]", + packet->GetSequenceNumber(), + item.delayMs); + + SendRtpPacket(packet, sharedPacket); + + it = this->delayedPacketItems.erase(it); + } + else + { + ++it; + } + } + } + } } // namespace RTC