Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions node/src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,67 @@ export class ConsumerImpl<ConsumerAppData extends AppData = AppData>
);
}

async degrade({
durationMs = 0,
maxDelayMs = 0,
delayPercent = 0,
lossPercent = 0,
}: {
durationMs?: number;
maxDelayMs?: number;
delayPercent?: number;
lossPercent?: number;
} = {}): Promise<void> {
if (durationMs > Math.pow(2, 32) - 1) {
durationMs = Math.pow(2, 32) - 1;
} else if (durationMs < 0) {
durationMs = 0;
}

if (maxDelayMs > Math.pow(2, 16) - 1) {
maxDelayMs = Math.pow(2, 16) - 1;
} else if (maxDelayMs < 0) {
maxDelayMs = 0;
}

if (delayPercent > 100) {
delayPercent = 100;
} else if (delayPercent < 0) {
delayPercent = 0;
}

if (lossPercent > 100) {
lossPercent = 100;
} else if (lossPercent < 0) {
lossPercent = 0;
}

if (durationMs === 0) {
maxDelayMs = 0;
lossPercent = 0;
lossPercent = 0;
}

logger.debug(
`degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]`
);

/* Build Request. */
const requestOffset = new FbsConsumer.DegradeRequestT(
durationMs,
maxDelayMs,
delayPercent,
lossPercent
).pack(this.#channel.bufferBuilder);

await this.#channel.request(
FbsRequest.Method.CONSUMER_DEGRADE,
FbsRequest.Body.Consumer_DegradeRequest,
requestOffset,
this.#internal.consumerId
);
}

private handleWorkerNotifications(): void {
this.#channel.on(
this.#internal.consumerId,
Expand Down
38 changes: 38 additions & 0 deletions node/src/ConsumerTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,4 +389,42 @@ export interface Consumer<ConsumerAppData extends AppData = AppData>
* Enable 'trace' event.
*/
enableTraceEvent(types?: ConsumerTraceEventType[]): Promise<void>;

/**
* Degrade RTP transmission.
* - durationMs: Duration that the degradation will take.
* - maxDelayMs: Max delay (in ms) to be applied to each packet.
* - delayPercent: Only apply delay to this percent of the packets.
* - lossPercent: Generate packet loss by given percent value.
*
* @remarks
* - Only implemented in `SimpleConsumer`.
* - After `durationMs`, or if `consumer.degrade()` is called again with
* `durationMs: 0`, then degradation is immediately stopped and all delayed
* buffered packets are immediately sent (all together).
*
* @throws
* - If called on a non `SimpleConsumer` (due to method not implemented).
*
* @example
* ```ts
* consumer.degrade({
* durationMs: 10000
* maxDelayMs: 3000,
* delayPercent: 20,
* lossPercent: 0,
* });
* ```
*/
degrade({
durationMs,
maxDelayMs,
delayPercent,
lossPercent,
}: {
durationMs?: number;
maxDelayMs?: number;
delayPercent?: number;
lossPercent?: number;
}): Promise<void>;
}
61 changes: 61 additions & 0 deletions node/src/Producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,67 @@ export class ProducerImpl<ProducerAppData extends AppData = AppData>
);
}

async degrade({
durationMs = 0,
maxDelayMs = 0,
delayPercent = 0,
lossPercent = 0,
}: {
durationMs?: number;
maxDelayMs?: number;
delayPercent?: number;
lossPercent?: number;
} = {}): Promise<void> {
if (durationMs > Math.pow(2, 32) - 1) {
durationMs = Math.pow(2, 32) - 1;
} else if (durationMs < 0) {
durationMs = 0;
}

if (maxDelayMs > Math.pow(2, 16) - 1) {
maxDelayMs = Math.pow(2, 16) - 1;
} else if (maxDelayMs < 0) {
maxDelayMs = 0;
}

if (delayPercent > 100) {
delayPercent = 100;
} else if (delayPercent < 0) {
delayPercent = 0;
}

if (lossPercent > 100) {
lossPercent = 100;
} else if (lossPercent < 0) {
lossPercent = 0;
}

if (durationMs === 0) {
maxDelayMs = 0;
lossPercent = 0;
lossPercent = 0;
}

logger.debug(
`degrade() [durationMs:${durationMs}, maxDelayMs:${maxDelayMs}, delayPercent:${delayPercent}, lossPercent:${lossPercent}]`
);

/* Build Request. */
const requestOffset = new FbsProducer.DegradeRequestT(
durationMs,
maxDelayMs,
delayPercent,
lossPercent
).pack(this.#channel.bufferBuilder);

await this.#channel.request(
FbsRequest.Method.CONSUMER_DEGRADE,
FbsRequest.Body.Consumer_DegradeRequest,
requestOffset,
this.#internal.producerId
);
}

send(rtpPacket: Buffer): void {
if (!Buffer.isBuffer(rtpPacket)) {
throw new TypeError('rtpPacket must be a Buffer');
Expand Down
34 changes: 34 additions & 0 deletions node/src/ProducerTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,4 +249,38 @@ export interface Producer<ProducerAppData extends AppData = AppData>
* Send RTP packet (just valid for Producers created on a DirectTransport).
*/
send(rtpPacket: Buffer): void;

/**
* Degrade RTP transmission.
* - durationMs: Duration that the degradation will take.
* - maxDelayMs: Max delay (in ms) to be applied to each packet.
* - delayPercent: Only apply delay to this percent of the packets.
* - lossPercent: Generate packet loss by given percent value.
*
* @remarks
* - After `durationMs`, or if `producer.degrade()` is called again with
* `durationMs: 0`, then degradation is immediately stopped and all delayed
* buffered packets are immediately sent (all together).
*
* @example
* ```ts
* producer.degrade({
* durationMs: 10000
* maxDelayMs: 3000,
* delayPercent: 20,
* lossPercent: 0,
* });
* ```
*/
degrade({
durationMs,
maxDelayMs,
delayPercent,
lossPercent,
}: {
durationMs?: number;
maxDelayMs?: number;
delayPercent?: number;
lossPercent?: number;
}): Promise<void>;
}
7 changes: 7 additions & 0 deletions worker/fbs/consumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ table EnableTraceEventRequest {
events: [TraceEventType] (required);
}

table DegradeRequest {
duration_ms: uint32;
max_delay_ms: uint16;
delay_percent: uint8;
loss_percent: uint8;
}

table DumpResponse {
data: ConsumerDump (required);
}
Expand Down
7 changes: 7 additions & 0 deletions worker/fbs/producer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ table EnableTraceEventRequest {
events: [TraceEventType] (required);
}

table DegradeRequest {
duration_ms: uint32;
max_delay_ms: uint16;
delay_percent: uint8;
loss_percent: uint8;
}

table DumpResponse {
id: string (required);
kind: FBS.RtpParameters.MediaKind;
Expand Down
4 changes: 4 additions & 0 deletions worker/fbs/request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ enum Method: uint8 {
PRODUCER_PAUSE,
PRODUCER_RESUME,
PRODUCER_ENABLE_TRACE_EVENT,
PRODUCER_DEGRADE,
CONSUMER_DUMP,
CONSUMER_GET_STATS,
CONSUMER_PAUSE,
Expand All @@ -60,6 +61,7 @@ enum Method: uint8 {
CONSUMER_SET_PRIORITY,
CONSUMER_REQUEST_KEY_FRAME,
CONSUMER_ENABLE_TRACE_EVENT,
CONSUMER_DEGRADE,
DATAPRODUCER_DUMP,
DATAPRODUCER_GET_STATS,
DATAPRODUCER_PAUSE,
Expand Down Expand Up @@ -110,9 +112,11 @@ union Body {
PipeTransport_ConnectRequest: FBS.PipeTransport.ConnectRequest,
WebRtcTransport_ConnectRequest: FBS.WebRtcTransport.ConnectRequest,
Producer_EnableTraceEventRequest: FBS.Producer.EnableTraceEventRequest,
Producer_DegradeRequest: FBS.Producer.DegradeRequest,
Consumer_SetPreferredLayersRequest: FBS.Consumer.SetPreferredLayersRequest,
Consumer_SetPriorityRequest: FBS.Consumer.SetPriorityRequest,
Consumer_EnableTraceEventRequest: FBS.Consumer.EnableTraceEventRequest,
Consumer_DegradeRequest: FBS.Consumer.DegradeRequest,
DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest,
DataConsumer_SendRequest: FBS.DataConsumer.SendRequest,
DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest,
Expand Down
30 changes: 29 additions & 1 deletion worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@
#include "RTC/Consumer.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include "handles/TimerHandle.hpp"
#include <list>
#include <map>

namespace RTC
{
class SimpleConsumer : public RTC::Consumer, public RTC::RtpStreamSend::Listener
class SimpleConsumer : public RTC::Consumer,
public RTC::RtpStreamSend::Listener,
public TimerHandle::Listener
{
private:
struct DelayedPacketItem
{
// Original packet.
RTC::SharedRtpPacket sharedPacket{ nullptr };
// Arrival time of the original packet.
uint64_t arrivalTimeMs{ 0 };
// Delay applied to the packet.
uint16_t delayMs{ 0 };
};

public:
SimpleConsumer(
RTC::Shared* shared,
Expand Down Expand Up @@ -76,12 +91,19 @@ namespace RTC
void StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket);
void EmitScore() const;
void ClearDegradation(bool sendDelayedPackets);
bool ShouldDelayPacket(const RTC::RtpPacket* packet) const;
bool ShouldDropPacket(const RTC::RtpPacket* packet) const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
public:
void OnRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void OnRtpStreamRetransmitRtpPacket(RTC::RtpStreamSend* rtpStream, RTC::RtpPacket* packet) override;

/* Pure virtual methods inherited from TimerHandle::Listener. */
public:
void OnTimer(TimerHandle* timer) override;

private:
// Allocated by this.
RTC::RtpStreamSend* rtpStream{ nullptr };
Expand All @@ -97,6 +119,12 @@ namespace RTC
// video key frame.
std::map<uint16_t, RTC::SharedRtpPacket, RTC::SeqManager<uint16_t>::SeqLowerThan>
targetLayerRetransmissionBuffer;
uint32_t maxDelayMs{ 0 };
uint8_t delayPercent{ 0 };
uint8_t lossPercent{ 0 };
TimerHandle* degradationTimer{ nullptr };
TimerHandle* delayTimer{ nullptr };
std::list<DelayedPacketItem> delayedPacketItems;
};
} // namespace RTC

Expand Down
1 change: 1 addition & 0 deletions worker/src/Channel/ChannelRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace Channel
{ FBS::Request::Method::CONSUMER_SET_PRIORITY, "consumer.setPriority" },
{ FBS::Request::Method::CONSUMER_REQUEST_KEY_FRAME, "consumer.requestKeyFrame" },
{ FBS::Request::Method::CONSUMER_ENABLE_TRACE_EVENT, "consumer.enableTraceEvent" },
{ FBS::Request::Method::CONSUMER_DEGRADE, "consumer.degrade" },
{ FBS::Request::Method::DATAPRODUCER_DUMP, "dataProducer.dump" },
{ FBS::Request::Method::DATAPRODUCER_GET_STATS, "dataProducer.getStats" },
{ FBS::Request::Method::DATAPRODUCER_PAUSE, "dataProducer.pause" },
Expand Down
Loading