From 45b06b3d5b106fcb20bee5c79746c8dcb78c1bb7 Mon Sep 17 00:00:00 2001 From: wuxinfei Date: Thu, 23 Apr 2026 16:49:10 +0800 Subject: [PATCH 1/4] feat: Support different rtp header extentions and payload for each consumer This commit introduces the ability to specify custom RTP parameters for consumers, allowing for advanced scenarios where the wire-level payload types and header-extension IDs differ from the router's canonical values. The implementation includes validation against the producer's consumable RTP parameters and ensures compatibility. Additionally, tests have been added to verify the correct behavior of this feature, including rejection of overrides in pipe transports and validation of codec mappings. --- node/src/ConsumerTypes.ts | 19 ++ node/src/Transport.ts | 36 ++- node/src/ortc.ts | 374 +++++++++++++++++++----- node/src/test/test-Consumer.ts | 128 +++++++++ node/src/test/test-ortc.ts | 375 +++++++++++++++++++++++++ rust/src/messages.rs | 3 +- rust/src/ortc.rs | 294 ++++++++++++++++--- rust/src/ortc/tests.rs | 2 +- rust/src/router/consumer.rs | 19 ++ rust/src/router/transport.rs | 31 +- worker/fbs/rtpParameters.fbs | 15 + worker/fbs/transport.fbs | 1 + worker/include/RTC/Consumer.hpp | 19 ++ worker/include/RTC/RTP/Packet.hpp | 51 +++- worker/src/RTC/Consumer.cpp | 111 +++++++- worker/src/RTC/RTP/Packet.cpp | 107 +++++++ worker/src/RTC/SimpleConsumer.cpp | 52 +++- worker/src/RTC/SimulcastConsumer.cpp | 54 +++- worker/src/RTC/SvcConsumer.cpp | 52 +++- worker/test/src/RTC/RTP/TestPacket.cpp | 175 ++++++++++++ 20 files changed, 1786 insertions(+), 132 deletions(-) diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 0a7bac65b5..0bd4f5dc6e 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -70,6 +70,25 @@ export type ConsumerOptions = { */ pipe?: boolean; + /** + * When set, bypasses ortc.getConsumerRtpParameters and is used verbatim + * as the Consumer's rtpParameters. The payload types and header-extension + * ids declared here become the on-wire values for this Consumer. + * + * mediasoup validates compatibility against the Producer's + * consumableRtpParameters (1:1 codec mapping, compatible fmtp, matching + * layer/simulcast structure, header-extension URIs being a subset of the + * consumable ones) and asks the worker to rewrite outgoing RTP headers + * accordingly. + * + * Intended for advanced scenarios (e.g. WHEP) where the answer SDP + * dictates wire-level PT / ext-id values that differ from the Router's + * canonical ones. + * + * Not supported together with pipe=true. + */ + rtpParameters?: RtpParameters; + /** * Custom application data. */ diff --git a/node/src/Transport.ts b/node/src/Transport.ts index ae6580c4cb..d83a585d11 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -585,6 +585,7 @@ export abstract class TransportImpl< ignoreDtx = false, enableRtx, pipe = false, + rtpParameters: overrideRtpParameters, appData, }: ConsumerOptions): Promise> { logger.debug('consume()'); @@ -595,6 +596,10 @@ export abstract class TransportImpl< throw new TypeError('if given, appData must be an object'); } else if (mid && (typeof mid !== 'string' || mid.length === 0)) { throw new TypeError('if given, mid must be non empty string'); + } else if (overrideRtpParameters && pipe) { + throw new TypeError( + 'rtpParameters override is not supported together with pipe=true' + ); } // Clone given RTP capabilities to not modify input data. @@ -614,19 +619,28 @@ export abstract class TransportImpl< enableRtx = producer.kind === 'video'; } + let consumerRtpMapping: ortc.ConsumerRtpMapping | undefined; + // This may throw. - const rtpParameters = ortc.getConsumerRtpParameters({ + const rtpParameters: RtpParameters = ortc.getConsumerRtpParameters({ consumableRtpParameters: producer.consumableRtpParameters, - remoteRtpCapabilities: clonedRtpCapabilities, + remoteRtpCapabilities: overrideRtpParameters ?? clonedRtpCapabilities, pipe, enableRtx, }); + if (overrideRtpParameters) { + consumerRtpMapping = ortc.getConsumerRtpMapping( + producer.consumableRtpParameters, + rtpParameters + ); + } + // Set MID. if (!pipe) { if (mid) { rtpParameters.mid = mid; - } else { + } else if (!rtpParameters.mid) { rtpParameters.mid = `${this.#nextMidForConsumers++}`; // We use up to 8 bytes for MID (string). @@ -650,6 +664,7 @@ export abstract class TransportImpl< preferredLayers, ignoreDtx, pipe, + consumerRtpMapping, }); const response = await this.channel.request( @@ -1348,6 +1363,7 @@ function createConsumeRequest({ preferredLayers, ignoreDtx, pipe, + consumerRtpMapping, }: { builder: flatbuffers.Builder; producer: Producer; @@ -1357,6 +1373,7 @@ function createConsumeRequest({ preferredLayers?: ConsumerLayers; ignoreDtx?: boolean; pipe: boolean; + consumerRtpMapping?: ortc.ConsumerRtpMapping; }): number { const rtpParametersOffset = serializeRtpParameters(builder, rtpParameters); const consumerIdOffset = builder.createString(consumerId); @@ -1389,6 +1406,15 @@ function createConsumeRequest({ FbsConsumer.ConsumerLayers.endConsumerLayers(builder); } + let consumerRtpMappingOffset: number | undefined; + + if (consumerRtpMapping) { + consumerRtpMappingOffset = ortc.serializeConsumerRtpMapping( + builder, + consumerRtpMapping + ); + } + const ConsumeRequest = FbsTransport.ConsumeRequest; // Create Consume Request. @@ -1420,6 +1446,10 @@ function createConsumeRequest({ ConsumeRequest.addIgnoreDtx(builder, Boolean(ignoreDtx)); + if (consumerRtpMappingOffset !== undefined) { + ConsumeRequest.addConsumerRtpMapping(builder, consumerRtpMappingOffset); + } + return ConsumeRequest.endConsumeRequest(builder); } diff --git a/node/src/ortc.ts b/node/src/ortc.ts index a6db4675c6..f8a5fb296a 100644 --- a/node/src/ortc.ts +++ b/node/src/ortc.ts @@ -34,6 +34,25 @@ export type RtpCodecsEncodingsMapping = { }[]; }; +/** + * Per-Consumer egress mapping between the Router's canonical (consumable) RTP + * space and the wire-level RTP space declared by the caller via + * `ConsumerOptions.rtpParameters`. + * + * The worker uses this mapping to rewrite outgoing RTP packet payload types + * and header-extension ids in place. + */ +export type ConsumerRtpMapping = { + codecs: { + producerPayloadType: number; + consumerPayloadType: number; + }[]; + headerExtensions: { + producerExtId: number; + consumerExtId: number; + }[]; +}; + const DynamicPayloadTypes = [ 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 96, 97, 98, @@ -599,55 +618,135 @@ export function getConsumerRtpParameters({ enableRtx, }: { consumableRtpParameters: RtpParameters; - remoteRtpCapabilities: RtpCapabilities; + // Can be either RtpCapabilities (default path, derived from the remote + // endpoint's capabilities) or RtpParameters (override path, where the + // caller explicitly dictates the on-wire PT / header-extension ids). + remoteRtpCapabilities: RtpCapabilities | RtpParameters; pipe: boolean; enableRtx: boolean; }): RtpParameters { - const consumerParams: RtpParameters = { - codecs: [], - headerExtensions: [], - encodings: [], - rtcp: consumableRtpParameters.rtcp, - msid: consumableRtpParameters.msid, - }; - - for (const capCodec of remoteRtpCapabilities.codecs!) { - validateAndNormalizeRtpCodecCapability(capCodec); - } + const isOverride = isRtpParameters(remoteRtpCapabilities); + + let consumerParams: RtpParameters; + let consumableCodecs: RtpCodecParameters[]; + let remoteCodecs: RtpCodecParameters[]; + let remoteHeaderExtensions: RtpHeaderExtensionParameters[]; + // Decides whether a given header extension from `remoteHeaderExtensions` + // should make it into the final consumerParams.headerExtensions. Differs + // between the RtpCapabilities and RtpParameters branches. + let matchConsumerExt: (ext: RtpHeaderExtensionParameters) => boolean; + + if (!isOverride) { + const caps = remoteRtpCapabilities as RtpCapabilities; + + for (const capCodec of caps.codecs ?? []) { + validateAndNormalizeRtpCodecCapability(capCodec); + } - const consumableCodecs = - utils.clone( - consumableRtpParameters.codecs - ) ?? []; + consumerParams = { + codecs: [], + headerExtensions: [], + encodings: [], + rtcp: consumableRtpParameters.rtcp, + msid: consumableRtpParameters.msid, + }; + // Iterate producer-side consumable codecs; the caller's capabilities + // are used as the codec-match table so that its rtcpFeedback makes it + // into the final consumerParams. + remoteCodecs = + utils.clone( + consumableRtpParameters.codecs + ) ?? []; + consumableCodecs = (caps.codecs ?? []).map(c => ({ + mimeType: c.mimeType, + payloadType: c.preferredPayloadType!, + clockRate: c.clockRate, + channels: c.channels, + parameters: c.parameters ?? {}, + rtcpFeedback: c.rtcpFeedback ?? [], + })); + remoteHeaderExtensions = consumableRtpParameters.headerExtensions ?? []; + // Keep the producer-side extension only when the remote capability + // advertises the same URI AND `preferredId`. + matchConsumerExt = ext => + (caps.headerExtensions ?? []).some( + capExt => capExt.preferredId === ext.id && capExt.uri === ext.uri + ); + } else { + const override = remoteRtpCapabilities as RtpParameters; + + // validateAndNormalizeRtpParameters dereferences params.rtcp. Fall back + // to the producer-side consumable values for rtcp / msid the caller + // did not provide so that validation has a valid view. + const rtcp = override.rtcp ?? consumableRtpParameters.rtcp; + const msid = override.msid ?? consumableRtpParameters.msid; + const toValidate: RtpParameters = { ...override, rtcp }; + + try { + validateAndNormalizeRtpParameters(toValidate); + } catch (err) { + throw new TypeError( + `invalid consumer.rtpParameters: ${ + err instanceof Error ? err.message : String(err) + }` + ); + } - let rtxSupported = false; + consumerParams = { + codecs: [], + headerExtensions: [], + encodings: [], + rtcp, + msid, + }; + // Consumable is still the Router-canonical view used for structural + // matching; we iterate the caller's override (which dictates wire PTs + // and extension ids). + consumableCodecs = + utils.clone( + consumableRtpParameters.codecs + ) ?? []; + remoteCodecs = + utils.clone(override.codecs) ?? []; + remoteHeaderExtensions = override.headerExtensions ?? []; + // The caller explicitly declares wire-level ext ids that may differ + // from the Router's canonical ones, so we only check URI presence in + // the producer-side consumable set. The worker rewrites the producer + // ext-id to the caller-declared one using ConsumerRtpMapping. + matchConsumerExt = ext => + (consumableRtpParameters.headerExtensions ?? []).some( + cExt => cExt.uri === ext.uri + ); + } - for (const codec of consumableCodecs) { + for (const codec of remoteCodecs) { if (!enableRtx && isRtxCodec(codec)) { continue; } - const matchedCapCodec = remoteRtpCapabilities.codecs!.find(capCodec => - matchCodecs(capCodec, codec, { strict: true }) + const matchedCodec = consumableCodecs.find(cc => + matchCodecs(cc, codec, { strict: true }) ); - if (!matchedCapCodec) { + if (!matchedCodec) { continue; } - codec.rtcpFeedback = matchedCapCodec.rtcpFeedback!.filter( + codec.rtcpFeedback = (matchedCodec.rtcpFeedback ?? []).filter( fb => enableRtx || fb.type !== 'nack' || fb.parameter ); consumerParams.codecs.push(codec); } - // Must sanitize the list of matched codecs by removing useless RTX codecs. + let rtxSupported = false; + + // Sanitize the matched codec list by removing useless RTX codecs (whose + // `apt` does not point to any remaining media codec). for (let idx = consumerParams.codecs.length - 1; idx >= 0; --idx) { const codec = consumerParams.codecs[idx]!; if (isRtxCodec(codec)) { - // Search for the associated media codec. const associatedMediaCodec = consumerParams.codecs.find( mediaCodec => mediaCodec.payloadType === codec.parameters!['apt'] ); @@ -660,7 +759,6 @@ export function getConsumerRtpParameters({ } } - // Ensure there is at least one media codec. if ( consumerParams.codecs.length === 0 || isRtxCodec(consumerParams.codecs[0]!) @@ -668,16 +766,15 @@ export function getConsumerRtpParameters({ throw new UnsupportedError('no compatible media codecs'); } - consumerParams.headerExtensions = - consumableRtpParameters.headerExtensions!.filter(ext => - remoteRtpCapabilities.headerExtensions!.some( - capExt => capExt.preferredId === ext.id && capExt.uri === ext.uri - ) - ); + for (const ext of remoteHeaderExtensions) { + if (matchConsumerExt(ext)) { + consumerParams.headerExtensions!.push(ext); + } + } // Reduce codecs' RTCP feedback. Use Transport-CC if available, REMB otherwise. if ( - consumerParams.headerExtensions.some( + consumerParams.headerExtensions!.some( ext => ext.uri === 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01' @@ -689,7 +786,7 @@ export function getConsumerRtpParameters({ ); } } else if ( - consumerParams.headerExtensions.some( + consumerParams.headerExtensions!.some( ext => ext.uri === 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time' ) @@ -707,7 +804,28 @@ export function getConsumerRtpParameters({ } } - if (!pipe) { + if (pipe) { + const consumableEncodings = + utils.clone( + consumableRtpParameters.encodings + ) ?? []; + const baseSsrc = utils.generateRandomNumber(); + const baseRtxSsrc = utils.generateRandomNumber(); + + for (let i = 0; i < consumableEncodings.length; ++i) { + const encoding = consumableEncodings[i]!; + + encoding.ssrc = baseSsrc + i; + + if (rtxSupported) { + encoding.rtx = { ssrc: baseRtxSsrc + i }; + } else { + delete encoding.rtx; + } + + consumerParams.encodings!.push(encoding); + } + } else { const consumerEncoding: RtpEncodingParameters = { ssrc: utils.generateRandomNumber(), }; @@ -716,18 +834,15 @@ export function getConsumerRtpParameters({ consumerEncoding.rtx = { ssrc: consumerEncoding.ssrc! + 1 }; } - // If any of the consumableRtpParameters.encodings has scalabilityMode, - // process it (assume all encodings have the same value). - const encodingWithScalabilityMode = consumableRtpParameters.encodings!.find( - encoding => encoding.scalabilityMode - ); + const encodingWithScalabilityMode = ( + consumableRtpParameters.encodings ?? [] + ).find(encoding => encoding.scalabilityMode); let scalabilityMode = encodingWithScalabilityMode ? encodingWithScalabilityMode.scalabilityMode : undefined; - // If there is simulast, mangle spatial layers in scalabilityMode. - if (consumableRtpParameters.encodings!.length > 1) { + if ((consumableRtpParameters.encodings ?? []).length > 1) { const { temporalLayers } = parseScalabilityMode(scalabilityMode); scalabilityMode = `L${ @@ -739,9 +854,9 @@ export function getConsumerRtpParameters({ consumerEncoding.scalabilityMode = scalabilityMode; } - // Use the maximum maxBitrate in any encoding and honor it in the Consumer's - // encoding. - const maxEncodingMaxBitrate = consumableRtpParameters.encodings!.reduce( + const maxEncodingMaxBitrate = ( + consumableRtpParameters.encodings ?? [] + ).reduce( (maxBitrate, encoding) => encoding.maxBitrate && encoding.maxBitrate > maxBitrate ? encoding.maxBitrate @@ -753,34 +868,40 @@ export function getConsumerRtpParameters({ consumerEncoding.maxBitrate = maxEncodingMaxBitrate; } - // Set a single encoding for the Consumer. consumerParams.encodings!.push(consumerEncoding); - } else { - const consumableEncodings = - utils.clone( - consumableRtpParameters.encodings - ) ?? []; - const baseSsrc = utils.generateRandomNumber(); - const baseRtxSsrc = utils.generateRandomNumber(); - - for (let i = 0; i < consumableEncodings.length; ++i) { - const encoding = consumableEncodings[i]!; - - encoding.ssrc = baseSsrc + i; - - if (rtxSupported) { - encoding.rtx = { ssrc: baseRtxSsrc + i }; - } else { - delete encoding.rtx; - } - - consumerParams.encodings!.push(encoding); - } } return consumerParams; } +// Heuristic to discriminate the two input shapes of +// `getConsumerRtpParameters.remoteRtpCapabilities`: RtpCodecParameters has a +// required `payloadType`, RtpCodecCapability has `preferredPayloadType` +// instead. We fall back to checking for `encodings` / `rtcp` / `msid` / `mid` +// if the codec list is empty so the empty-override edge case still lands on +// the `RtpParameters` branch. +function isRtpParameters( + value: RtpCapabilities | RtpParameters +): value is RtpParameters { + const firstCodec = value.codecs?.[0] as + | RtpCodecCapability + | RtpCodecParameters + | undefined; + + if (firstCodec) { + return (firstCodec as RtpCodecParameters).payloadType !== undefined; + } + + const asParams = value as RtpParameters; + + return ( + Array.isArray(asParams.encodings) || + asParams.rtcp !== undefined || + asParams.msid !== undefined || + asParams.mid !== undefined + ); +} + /** * Generate RTP parameters for a pipe Consumer. * @@ -871,6 +992,127 @@ export function getPipeConsumerRtpParameters({ return consumerParams; } +/** + * Build a per-Consumer egress mapping between the Router's canonical + * (consumable) RTP space and the wire-level RTP space dictated by the final + * Consumer rtpParameters. The worker uses this mapping to rewrite outgoing + * RTP packet payload types and header-extension ids in place. + */ +export function getConsumerRtpMapping( + producerRtpParameters: RtpParameters, + consumerRtpParameters: RtpParameters +): ConsumerRtpMapping { + const mapping: ConsumerRtpMapping = { + codecs: [], + headerExtensions: [], + }; + + const consumerCodecPts = new Set(); + + for (const codec of consumerRtpParameters.codecs) { + consumerCodecPts.add(codec.payloadType); + } + + const usedProducerCodecPts = new Set(); + + for (const consumerCodec of consumerRtpParameters.codecs) { + for (const producerCodec of producerRtpParameters.codecs) { + if (usedProducerCodecPts.has(producerCodec.payloadType)) { + continue; + } + + if (isRtxCodec(producerCodec) !== isRtxCodec(consumerCodec)) { + continue; + } + + if (!matchCodecs(producerCodec, consumerCodec, { strict: true })) { + continue; + } + + if (isRtxCodec(producerCodec)) { + const apt = consumerCodec.parameters?.['apt']; + + if ( + typeof apt !== 'number' || + !consumerCodecPts.has(apt as number) + ) { + continue; + } + } + + usedProducerCodecPts.add(producerCodec.payloadType); + mapping.codecs.push({ + producerPayloadType: producerCodec.payloadType, + consumerPayloadType: consumerCodec.payloadType, + }); + break; + } + } + + const producerExtByUri = new Map(); + + for (const ext of producerRtpParameters.headerExtensions ?? []) { + producerExtByUri.set(ext.uri, ext.id); + } + + for (const consumerExt of consumerRtpParameters.headerExtensions ?? []) { + const producerExtId = producerExtByUri.get(consumerExt.uri); + + if (producerExtId !== undefined) { + mapping.headerExtensions.push({ + producerExtId, + consumerExtId: consumerExt.id, + }); + } + } + + return mapping; +} + +export function serializeConsumerRtpMapping( + builder: flatbuffers.Builder, + consumerRtpMapping: ConsumerRtpMapping +): number { + const codecs: number[] = []; + + for (const m of consumerRtpMapping.codecs) { + codecs.push( + FbsRtpParameters.ConsumerCodecMapping.createConsumerCodecMapping( + builder, + m.producerPayloadType, + m.consumerPayloadType + ) + ); + } + + const codecsOffset = + FbsRtpParameters.ConsumerRtpMapping.createCodecsVector(builder, codecs); + + const headerExtensions: number[] = []; + + for (const m of consumerRtpMapping.headerExtensions) { + headerExtensions.push( + FbsRtpParameters.ConsumerHeaderExtensionMapping.createConsumerHeaderExtensionMapping( + builder, + m.producerExtId, + m.consumerExtId + ) + ); + } + + const headerExtensionsOffset = + FbsRtpParameters.ConsumerRtpMapping.createHeaderExtensionsVector( + builder, + headerExtensions + ); + + return FbsRtpParameters.ConsumerRtpMapping.createConsumerRtpMapping( + builder, + codecsOffset, + headerExtensionsOffset + ); +} + export function serializeRtpMapping( builder: flatbuffers.Builder, rtpMapping: RtpCodecsEncodingsMapping diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index b486dc6309..c6407c9026 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -555,6 +555,134 @@ test('transport.consume() can be created with user provided mid', async () => { ); }, 2000); +test('transport.consume() with rtpParameters override rewrites egress PT / ext-id', async () => { + // Learn the producer's consumable PT / ext-id mappings. + const consumable = ctx.videoProducer!.consumableRtpParameters; + const consumableH264 = consumable.codecs.find(c => c.mimeType === 'video/H264')!; + const consumableRtx = consumable.codecs.find(c => c.mimeType === 'video/rtx')!; + const consumableMid = consumable.headerExtensions!.find( + e => e.uri === 'urn:ietf:params:rtp-hdrext:sdes:mid' + )!; + + // Pick non-clashing wire PTs / ids different from the consumable ones. + const wireH264Pt = 96; + const wireRtxPt = 97; + const wireMidId = (consumableMid.id % 14) + 1; + + const override: mediasoup.types.RtpParameters = { + mid: 'VIDEO-CONSUMER', + codecs: [ + { + mimeType: 'video/H264', + payloadType: wireH264Pt, + clockRate: 90000, + parameters: consumableH264.parameters, + rtcpFeedback: [{ type: 'nack', parameter: 'pli' }], + }, + { + mimeType: 'video/rtx', + payloadType: wireRtxPt, + clockRate: 90000, + parameters: { apt: wireH264Pt }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: wireMidId, + encrypt: false, + parameters: {}, + }, + ], + }; + + const videoConsumer = await ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + rtpParameters: override, + }); + + // The caller-visible rtpParameters must match the override. + expect(videoConsumer.rtpParameters.mid).toBe('VIDEO-CONSUMER'); + expect(videoConsumer.rtpParameters.codecs.length).toBe(2); + expect(videoConsumer.rtpParameters.codecs[0]!.mimeType).toBe('video/H264'); + expect(videoConsumer.rtpParameters.codecs[0]!.payloadType).toBe(wireH264Pt); + expect(videoConsumer.rtpParameters.codecs[1]!.mimeType).toBe('video/rtx'); + expect(videoConsumer.rtpParameters.codecs[1]!.payloadType).toBe(wireRtxPt); + expect(videoConsumer.rtpParameters.headerExtensions).toEqual([ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: wireMidId, + encrypt: false, + parameters: {}, + }, + ]); + + // Dump round-trips the wire PTs. + const dump = await videoConsumer.dump(); + + expect(dump.rtpParameters.codecs[0]!.payloadType).toBe(wireH264Pt); + expect(dump.rtpParameters.codecs[1]!.payloadType).toBe(wireRtxPt); + expect(dump.rtpParameters.headerExtensions?.[0]?.id).toBe(wireMidId); + + // supportedCodecPayloadTypes must be keyed by the producer-consumable PT + // (not the wire PT), otherwise incoming packets would all be dropped. + expect(dump.supportedCodecPayloadTypes).toEqual( + expect.arrayContaining([ + consumableH264.payloadType, + consumableRtx.payloadType, + ]) + ); + expect(dump.supportedCodecPayloadTypes).not.toContain(wireH264Pt); +}, 2000); + +test('transport.consume() with rtpParameters override rejects on pipe transport', async () => { + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 96, + clockRate: 90000, + parameters: { 'packetization-mode': 1, 'profile-level-id': '4d0032' }, + rtcpFeedback: [], + }, + ], + }; + + await expect( + ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + pipe: true, + rtpParameters: override, + }) + ).rejects.toThrow(TypeError); +}, 2000); + +test('transport.consume() with invalid rtpParameters override rejects', async () => { + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + // No H264 match (wrong profile-level-id). + mimeType: 'video/H264', + payloadType: 96, + clockRate: 90000, + parameters: { 'packetization-mode': 1, 'profile-level-id': '640032' }, + rtcpFeedback: [], + }, + ], + }; + + await expect( + ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + rtpParameters: override, + }) + ).rejects.toThrow(TypeError); +}, 2000); + test('transport.consume() with incompatible rtpCapabilities rejects with UnsupportedError', async () => { let invalidDeviceCapabilities: mediasoup.types.RtpCapabilities; diff --git a/node/src/test/test-ortc.ts b/node/src/test/test-ortc.ts index 9a7db532e8..59ddb873d4 100644 --- a/node/src/test/test-ortc.ts +++ b/node/src/test/test-ortc.ts @@ -552,3 +552,378 @@ test('getProducerRtpParametersMapping() with incompatible params throws Unsuppor ortc.getProducerRtpParametersMapping(rtpParameters, routerRtpCapabilities) ).toThrow(UnsupportedError); }); + +describe('getConsumerRtpParameters() with RtpParameters override', () => { + const makeConsumable = (): mediasoup.types.RtpParameters => ({ + codecs: [ + { + mimeType: 'video/H264', + payloadType: 101, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [ + { type: 'nack', parameter: '' }, + { type: 'nack', parameter: 'pli' }, + ], + }, + { + mimeType: 'video/rtx', + payloadType: 102, + clockRate: 90000, + parameters: { apt: 101 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 1, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 5, + encrypt: false, + parameters: {}, + }, + ], + encodings: [ + { + ssrc: 10000001, + maxBitrate: 500000, + scalabilityMode: 'L1T3', + }, + ], + rtcp: { cname: 'cname1234', reducedSize: true }, + }); + + test('succeeds with happy path and produces a mapping', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 7, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.codecs.length).toBe(2); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + expect(rtpParameters.codecs[1]!.payloadType).toBe(98); + + expect(rtpParameters.rtcp).toEqual({ + cname: 'cname1234', + reducedSize: true, + }); + + const mapping = ortc.getConsumerRtpMapping(consumable, rtpParameters); + + expect(mapping.codecs).toEqual( + expect.arrayContaining([ + { producerPayloadType: 101, consumerPayloadType: 97 }, + { producerPayloadType: 102, consumerPayloadType: 98 }, + ]) + ); + + expect(mapping.headerExtensions).toEqual([ + { producerExtId: 1, consumerExtId: 3 }, + { producerExtId: 5, consumerExtId: 7 }, + ]); + }); + + test('auto-generates SSRCs regardless of caller-provided encodings', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.encodings).toBeDefined(); + expect(rtpParameters.encodings!.length).toBe(1); + expect(typeof rtpParameters.encodings![0]!.ssrc).toBe('number'); + expect(typeof rtpParameters.encodings![0]!.rtx?.ssrc).toBe('number'); + }); + + test('rtcp.cname from caller is preserved when provided', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + rtcp: { cname: 'custom-cname' }, + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.rtcp?.cname).toBe('custom-cname'); + }); + + test('throws when no codec has a consumable counterpart', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/VP8', + payloadType: 97, + clockRate: 90000, + parameters: {}, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + expect(() => + ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }) + ).toThrow(UnsupportedError); + }); + + test('drops RTX codec when its apt points to no consumer-side codec', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 123 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.codecs.length).toBe(1); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + }); + + test('drops unknown header extension URIs from the final rtpParameters', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'urn:3gpp:video-orientation', + id: 2, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.headerExtensions!.length).toBe(1); + expect(rtpParameters.headerExtensions![0]!.uri).toBe( + 'urn:ietf:params:rtp-hdrext:sdes:mid' + ); + }); + + test('keeps all matching header extensions (no early break)', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 7, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.headerExtensions!.length).toBe(2); + }); + + test('enableRtx=false strips RTX from the caller-provided codec list', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.codecs.length).toBe(1); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + expect(rtpParameters.encodings?.[0]?.rtx).toBeUndefined(); + }); +}); diff --git a/rust/src/messages.rs b/rust/src/messages.rs index dbbf67131f..73f1e89b15 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -1775,6 +1775,7 @@ pub(crate) struct TransportConsumeRequest { pub(crate) paused: bool, pub(crate) preferred_layers: Option, pub(crate) ignore_dtx: bool, + pub(crate) consumer_rtp_mapping: Option, } #[derive(Debug)] @@ -1802,8 +1803,8 @@ impl Request for TransportConsumeRequest { ToFbs::to_fbs(&self.consumable_rtp_encodings), self.paused, ToFbs::to_fbs(&self.preferred_layers), - // self.preferred_layers.map(ConsumerLayers::to_fbs), self.ignore_dtx, + self.consumer_rtp_mapping.as_ref().map(ToFbs::to_fbs), ); let request_body = request::Body::create_transport_consume_request(&mut builder, data); let request = request::Request::create( diff --git a/rust/src/ortc.rs b/rust/src/ortc.rs index 397380718d..d31c4c83ed 100644 --- a/rust/src/ortc.rs +++ b/rust/src/ortc.rs @@ -135,6 +135,66 @@ impl ToFbs for RtpMapping { } } +/// Single producer-side -> consumer-side (wire-level) payload-type pair used +/// by the worker to rewrite outgoing RTP packet headers for a per-Consumer +/// egress remap. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerCodecMapping { + pub producer_payload_type: u8, + pub consumer_payload_type: u8, +} + +/// Single producer-side -> consumer-side (wire-level) RTP header-extension id +/// pair. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerHeaderExtensionMapping { + pub producer_ext_id: u8, + pub consumer_ext_id: u8, +} + +/// Per-Consumer egress mapping between the Router's canonical (consumable) +/// RTP space and the wire-level RTP space declared by the caller via +/// [`ConsumerOptions::rtp_parameters`](crate::consumer::ConsumerOptions::rtp_parameters). +/// +/// The worker uses this mapping to rewrite outgoing RTP packet payload types +/// and header-extension ids in place for this Consumer only. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerRtpMapping { + pub codecs: Vec, + pub header_extensions: Vec, +} + +impl ToFbs for ConsumerRtpMapping { + type FbsType = rtp_parameters::ConsumerRtpMapping; + + fn to_fbs(&self) -> Self::FbsType { + rtp_parameters::ConsumerRtpMapping { + codecs: self + .codecs + .iter() + .map(|m| rtp_parameters::ConsumerCodecMapping { + producer_payload_type: m.producer_payload_type, + consumer_payload_type: m.consumer_payload_type, + }) + .collect(), + header_extensions: self + .header_extensions + .iter() + .map(|m| rtp_parameters::ConsumerHeaderExtensionMapping { + producer_ext_id: m.producer_ext_id, + consumer_ext_id: m.consumer_ext_id, + }) + .collect(), + } + } +} + /// Error caused by invalid RTP parameters. #[derive(Debug, Error, Eq, PartialEq)] pub enum RtpParametersError { @@ -742,6 +802,20 @@ pub(crate) fn can_consume( .unwrap_or_default()) } +/// Input source for [`get_consumer_rtp_parameters`]. +/// +/// - [`RemoteRtpSource::Capabilities`] runs the default ORTC path: iterate the +/// producer-side consumable codecs and keep the ones matching any of the +/// caller's capabilities, inheriting rtcpFeedback from the matched +/// capability. +/// - [`RemoteRtpSource::Parameters`] runs the override path: iterate the +/// caller-provided rtpParameters codecs (which dictate wire PTs / ext ids) +/// and keep the ones that have a compatible producer-side counterpart. +pub(crate) enum RemoteRtpSource<'a> { + Capabilities(&'a RtpCapabilities), + Parameters(&'a RtpParameters), +} + /// Generate RTP parameters for a specific Consumer. /// /// It reduces encodings to just one and takes into account given RTP capabilities to reduce codecs, @@ -749,43 +823,113 @@ pub(crate) fn can_consume( #[allow(clippy::suspicious_operation_groupings)] pub(crate) fn get_consumer_rtp_parameters( consumable_rtp_parameters: &RtpParameters, - remote_rtp_capabilities: &RtpCapabilities, + remote: RemoteRtpSource<'_>, pipe: bool, enable_rtx: bool, ) -> Result { - let mut consumer_params = RtpParameters { - rtcp: consumable_rtp_parameters.rtcp.clone(), - msid: consumable_rtp_parameters.msid.clone(), - ..RtpParameters::default() - }; + let mut consumer_params: RtpParameters; - for cap_codec in &remote_rtp_capabilities.codecs { - validate_rtp_codec_capability(cap_codec) - .map_err(ConsumerRtpParametersError::InvalidCapabilities)?; - } + match remote { + RemoteRtpSource::Capabilities(caps) => { + for cap_codec in &caps.codecs { + validate_rtp_codec_capability(cap_codec) + .map_err(ConsumerRtpParametersError::InvalidCapabilities)?; + } - let mut rtx_supported = false; + consumer_params = RtpParameters { + rtcp: consumable_rtp_parameters.rtcp.clone(), + msid: consumable_rtp_parameters.msid.clone(), + ..RtpParameters::default() + }; + // Iterate producer-side consumable codecs; the caller's + // capabilities are used as the codec-match table so that its + // rtcpFeedback makes it into the final consumer_params. + for mut codec in consumable_rtp_parameters.codecs.clone() { + if !enable_rtx && codec.is_rtx() { + continue; + } - for mut codec in consumable_rtp_parameters.codecs.clone() { - if !enable_rtx && codec.is_rtx() { - continue; - } + if let Some(matched_cap_codec) = caps + .codecs + .iter() + .find(|c| match_codecs((*c).into(), (&codec).into(), true).is_ok()) + { + *codec.rtcp_feedback_mut() = matched_cap_codec + .rtcp_feedback() + .iter() + .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) + .copied() + .collect(); + consumer_params.codecs.push(codec); + } + } - if let Some(matched_cap_codec) = remote_rtp_capabilities - .codecs - .iter() - .find(|cap_codec| match_codecs((*cap_codec).into(), (&codec).into(), true).is_ok()) - { - *codec.rtcp_feedback_mut() = matched_cap_codec - .rtcp_feedback() + // Keep the producer-side extension only when the remote capability + // advertises the same URI AND `preferred_id`. + consumer_params.header_extensions = consumable_rtp_parameters + .header_extensions .iter() - .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) - .copied() + .filter(|ext| { + caps.header_extensions + .iter() + .any(|cap_ext| cap_ext.preferred_id == ext.id && cap_ext.uri == ext.uri) + }) + .cloned() .collect(); + } + RemoteRtpSource::Parameters(override_params) => { + consumer_params = RtpParameters { + rtcp: override_params.rtcp.clone(), + msid: override_params + .msid + .clone() + .or_else(|| consumable_rtp_parameters.msid.clone()), + ..RtpParameters::default() + }; + // Consumable is still the Router-canonical view used for + // structural matching; we iterate the caller's override (which + // dictates wire PTs and extension ids). + for mut codec in override_params.codecs.clone() { + if !enable_rtx && codec.is_rtx() { + continue; + } - consumer_params.codecs.push(codec); + if let Some(matched_codec) = consumable_rtp_parameters + .codecs + .iter() + .find(|cc| match_codecs((&codec).into(), (*cc).into(), true).is_ok()) + { + *codec.rtcp_feedback_mut() = matched_codec + .rtcp_feedback() + .iter() + .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) + .copied() + .collect(); + consumer_params.codecs.push(codec); + } + } + + // The caller explicitly declares wire-level ext ids that may + // differ from the Router's canonical ones, so we only check URI + // presence in the producer-side consumable set. The worker rewrites + // the producer ext-id to the caller-declared one using + // `ConsumerRtpMapping`. + consumer_params.header_extensions = override_params + .header_extensions + .iter() + .filter(|ext| { + consumable_rtp_parameters + .header_extensions + .iter() + .any(|c_ext| c_ext.uri == ext.uri) + }) + .cloned() + .collect(); } } + + let mut rtx_supported = false; + // Must sanitize the list of matched codecs by removing useless RTX codecs. let mut remove_codecs = Vec::new(); for (idx, codec) in consumer_params.codecs.iter().enumerate() { @@ -816,18 +960,6 @@ pub(crate) fn get_consumer_rtp_parameters( return Err(ConsumerRtpParametersError::NoCompatibleMediaCodecs); } - consumer_params.header_extensions = consumable_rtp_parameters - .header_extensions - .iter() - .filter(|ext| { - remote_rtp_capabilities - .header_extensions - .iter() - .any(|cap_ext| cap_ext.preferred_id == ext.id && cap_ext.uri == ext.uri) - }) - .cloned() - .collect(); - // Reduce codecs' RTCP feedback. Use Transport-CC if available, REMB otherwise. if consumer_params .header_extensions @@ -1006,6 +1138,94 @@ pub(crate) fn get_pipe_consumer_rtp_parameters( consumer_params } +/// Build a per-Consumer egress mapping between the Router's canonical +/// (consumable) RTP space and the wire-level RTP space dictated by the final +/// Consumer rtpParameters. The worker uses this mapping to rewrite outgoing +/// RTP packet payload types and header-extension ids in place for this +/// Consumer only. +pub(crate) fn get_consumer_rtp_mapping( + producer_rtp_parameters: &RtpParameters, + consumer_rtp_parameters: &RtpParameters, +) -> ConsumerRtpMapping { + let mut mapping = ConsumerRtpMapping::default(); + + let consumer_codec_pts: std::collections::HashSet = consumer_rtp_parameters + .codecs + .iter() + .map(|c| c.payload_type()) + .collect(); + + let mut used_producer_codec_pts: std::collections::HashSet = + std::collections::HashSet::new(); + + for consumer_codec in &consumer_rtp_parameters.codecs { + for producer_codec in &producer_rtp_parameters.codecs { + if used_producer_codec_pts.contains(&producer_codec.payload_type()) { + continue; + } + + if producer_codec.is_rtx() != consumer_codec.is_rtx() { + continue; + } + + if match_codecs(producer_codec.into(), consumer_codec.into(), true).is_err() { + continue; + } + + if producer_codec.is_rtx() { + let apt = match consumer_codec.parameters().get("apt") { + Some(RtpCodecParametersParametersValue::Number(apt)) => { + match u8::try_from(*apt) { + Ok(v) => v, + Err(_) => continue, + } + } + _ => continue, + }; + + if !consumer_codec_pts.contains(&apt) { + continue; + } + } + + used_producer_codec_pts.insert(producer_codec.payload_type()); + mapping.codecs.push(ConsumerCodecMapping { + producer_payload_type: producer_codec.payload_type(), + consumer_payload_type: consumer_codec.payload_type(), + }); + break; + } + } + + let mut producer_ext_by_uri = + std::collections::HashMap::::with_capacity( + producer_rtp_parameters.header_extensions.len(), + ); + + for ext in &producer_rtp_parameters.header_extensions { + // NOTE: Header-extension ids are validated upstream to be in 1..=14, + // so narrowing u16 -> u8 is safe here. + if let Ok(id) = u8::try_from(ext.id) { + producer_ext_by_uri.insert(ext.uri, id); + } + } + + for consumer_ext in &consumer_rtp_parameters.header_extensions { + if let Some(&producer_ext_id) = producer_ext_by_uri.get(&consumer_ext.uri) { + if let Ok(consumer_ext_id) = u8::try_from(consumer_ext.id) { + mapping + .header_extensions + .push(ConsumerHeaderExtensionMapping { + producer_ext_id, + consumer_ext_id, + }); + } + } + } + + mapping +} + struct CodecToMatch<'a> { channels: Option, clock_rate: NonZeroU32, diff --git a/rust/src/ortc/tests.rs b/rust/src/ortc/tests.rs index 589e8ddbd4..20a6428db6 100644 --- a/rust/src/ortc/tests.rs +++ b/rust/src/ortc/tests.rs @@ -455,7 +455,7 @@ fn get_producer_rtp_parameters_mapping_get_consumable_rtp_parameters_get_consume let consumer_rtp_parameters = get_consumer_rtp_parameters( &consumable_rtp_parameters, - &remote_rtp_capabilities, + RemoteRtpSource::Capabilities(&remote_rtp_capabilities), false, true, ) diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index 014ba50d18..4108f77c19 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -130,6 +130,24 @@ pub struct ConsumerOptions { pub ignore_dtx: bool, /// Whether this Consumer should consume all RTP streams generated by the Producer. pub pipe: bool, + /// When set, bypasses the default ORTC-driven `rtpParameters` derivation for + /// this Consumer and is used verbatim as the wire-level `rtpParameters`. + /// The payload types and header-extension ids declared here become the + /// on-wire values for this Consumer. + /// + /// mediasoup validates compatibility against the Producer's + /// `consumableRtpParameters` (1:1 codec mapping, compatible fmtp, matching + /// layer/simulcast structure, header-extension URIs being a subset of the + /// consumable ones) and asks the worker to rewrite outgoing RTP headers + /// accordingly. + /// + /// Intended for advanced scenarios (e.g. WHEP) where the answer SDP + /// dictates wire-level PT / ext-id values that differ from the Router's + /// canonical ones. + /// + /// Not supported together with [`ConsumerOptions::pipe`]=`true` or on a + /// [`PipeTransport`](crate::pipe_transport::PipeTransport). + pub rtp_parameters: Option, /// Custom application data. pub app_data: AppData, } @@ -147,6 +165,7 @@ impl ConsumerOptions { enable_rtx: None, pipe: false, mid: None, + rtp_parameters: None, app_data: AppData::default(), } } diff --git a/rust/src/router/transport.rs b/rust/src/router/transport.rs index 27898e9f19..fe1d4dbc7e 100644 --- a/rust/src/router/transport.rs +++ b/rust/src/router/transport.rs @@ -394,6 +394,10 @@ pub enum ConsumeError { /// Bad consumer RTP parameters. #[error("Bad consumer RTP parameters: {0}")] BadConsumerRtpParameters(ConsumerRtpParametersError), + /// [`ConsumerOptions::rtp_parameters`] override is not supported together with `pipe=true` + /// or on pipe transports. + #[error("ConsumerOptions.rtp_parameters override is not supported on pipe transports")] + OverrideNotSupportedOnPipe, /// Request to worker failed. #[error("Request to worker failed: {0}")] Request(RequestError), @@ -614,8 +618,18 @@ pub(super) trait TransportImpl: TransportGeneric { enable_rtx, ignore_dtx, pipe, + rtp_parameters: override_rtp_parameters, app_data, } = consumer_options; + + // Caller-provided wire-level rtpParameters are incompatible with pipe + // semantics (which already passes through all encodings verbatim). + if override_rtp_parameters.is_some() + && (pipe || transport_type == TransportType::Pipe) + { + return Err(ConsumeError::OverrideNotSupportedOnPipe); + } + ortc::validate_rtp_capabilities(&rtp_capabilities) .map_err(ConsumeError::FailedRtpCapabilitiesValidation)?; @@ -628,17 +642,31 @@ pub(super) trait TransportImpl: TransportGeneric { let enable_rtx = enable_rtx.unwrap_or(producer.kind() == MediaKind::Video); + let mut consumer_rtp_mapping: Option = None; + let rtp_parameters = if transport_type == TransportType::Pipe { ortc::get_pipe_consumer_rtp_parameters(producer.consumable_rtp_parameters(), rtx) } else { + let source = match &override_rtp_parameters { + Some(o) => ortc::RemoteRtpSource::Parameters(o), + None => ortc::RemoteRtpSource::Capabilities(&rtp_capabilities), + }; + let mut rtp_parameters = ortc::get_consumer_rtp_parameters( producer.consumable_rtp_parameters(), - &rtp_capabilities, + source, pipe, enable_rtx, ) .map_err(ConsumeError::BadConsumerRtpParameters)?; + if override_rtp_parameters.is_some() { + consumer_rtp_mapping = Some(ortc::get_consumer_rtp_mapping( + producer.consumable_rtp_parameters(), + &rtp_parameters, + )); + } + if !pipe { // Set MID. rtp_parameters.mid = mid.or_else(|| { @@ -681,6 +709,7 @@ pub(super) trait TransportImpl: TransportGeneric { paused, preferred_layers, ignore_dtx, + consumer_rtp_mapping, }, ) .await diff --git a/worker/fbs/rtpParameters.fbs b/worker/fbs/rtpParameters.fbs index 115f901c4b..c93771e9a7 100644 --- a/worker/fbs/rtpParameters.fbs +++ b/worker/fbs/rtpParameters.fbs @@ -127,3 +127,18 @@ table RtpMapping { codecs: [CodecMapping] (required); encodings: [EncodingMapping] (required); } + +table ConsumerCodecMapping { + producer_payload_type: uint8; + consumer_payload_type: uint8; +} + +table ConsumerHeaderExtensionMapping { + producer_ext_id: uint8; + consumer_ext_id: uint8; +} + +table ConsumerRtpMapping { + codecs: [ConsumerCodecMapping] (required); + header_extensions: [ConsumerHeaderExtensionMapping] (required); +} diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index 225cdd1171..cd974d95ac 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -65,6 +65,7 @@ table ConsumeRequest { paused: bool = false; preferred_layers: FBS.Consumer.ConsumerLayers; ignore_dtx: bool = false; + consumer_rtp_mapping: FBS.RtpParameters.ConsumerRtpMapping; } table ConsumeResponse { diff --git a/worker/include/RTC/Consumer.hpp b/worker/include/RTC/Consumer.hpp index 86f6333342..1c8e8d7174 100644 --- a/worker/include/RTC/Consumer.hpp +++ b/worker/include/RTC/Consumer.hpp @@ -18,6 +18,7 @@ #include "RTC/RtpDictionaries.hpp" #include "RTC/Shared.hpp" #include +#include #include #include @@ -195,6 +196,24 @@ namespace RTC uint8_t priority{ 1u }; struct TraceEventTypes traceEventTypes; + // Per-Consumer egress rewrite state. Populated when the ConsumeRequest + // carries a ConsumerRtpMapping; otherwise remains zeroed and the send + // path behaves exactly as before (no rewrite, no overhead beyond a + // single boolean check). + bool hasEgressRemap{ false }; + // egressPayloadTypeMap[producerPt] = wirePt; 0 means no mapping. + std::array egressPayloadTypeMap{}; + // egressHeaderExtensionIdMap[producerId] = wireId; 0 means no mapping. + // Index 0 is unused; indices 1..14 are meaningful for RFC 8285 + // one-byte extension ids. + std::array egressHeaderExtensionIdMap{}; + // Mirror of `rtpHeaderExtensionIds` with every non-zero id routed + // through `egressHeaderExtensionIdMap`. Used as the `newExtIds` + // argument to `Packet::ApplyEgressRewrite` so that the Transport's + // UpdateAbsSendTime / UpdateTransportWideCc01 (which consult the + // packet's own headerExtensionIds) observe the wire ids. + struct RTC::RTP::HeaderExtensionIds egressHeaderExtensionIds; + private: // Others. std::vector mediaSsrcs; diff --git a/worker/include/RTC/RTP/Packet.hpp b/worker/include/RTC/RTP/Packet.hpp index 195024b181..3d753587b2 100644 --- a/worker/include/RTC/RTP/Packet.hpp +++ b/worker/include/RTC/RTP/Packet.hpp @@ -512,6 +512,54 @@ namespace RTC */ void AssignExtensionIds(RTP::HeaderExtensionIds& headerExtensionIds); + /** + * Undo token for `ApplyEgressRewrite`. Keeps the minimum state needed + * to revert the in-place Packet mutations done by the egress rewrite + * so that the same underlying packet buffer can be served to the next + * Consumer in the Router fan-out. + */ + struct EgressRewriteUndo + { + uint8_t origPayloadType{ 0u }; + // Snapshotted producer-side extension layouts. `RevertEgressRewrite` + // walks these to rewrite id bytes in-place (length nibbles in the + // one-byte scheme are preserved across the rewrite) and then + // swaps them back into the live maps. + std::array origOneByteExtensions{ + { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 } + }; + std::map origTwoBytesExtensions; + RTP::HeaderExtensionIds origHeaderExtensionIds{}; + }; + + /** + * Per-Consumer egress rewrite. Applied in-place on the live packet + * buffer before `RtpStreamSend::ReceivePacket` so that retransmission + * stores observe the wire PT, then reverted after + * `OnConsumerSendRtpPacket` so the next Consumer in the Router + * fan-out sees the pristine packet. + * + * @param newPayloadType The on-wire payload type for this Consumer. + * @param extIdRemap `extIdRemap[oldId] = newId`; 0 means keep as is. + * @param newExtIds Pre-computed `HeaderExtensionIds` that mirrors the + * producer-side ids mapped through `extIdRemap`. Used so that + * `UpdateAbsSendTime` / `UpdateTransportWideCc01` called by the + * Transport observe the wire ids. + * @param undo Out: token for a subsequent `RevertEgressRewrite`. + */ + void ApplyEgressRewrite( + uint8_t newPayloadType, + const std::array& extIdRemap, + const RTP::HeaderExtensionIds& newExtIds, + EgressRewriteUndo& undo); + + /** + * Revert the in-place mutations from a previous + * `ApplyEgressRewrite` call. `undo` must have been initialised by + * a matching `ApplyEgressRewrite`. + */ + void RevertEgressRewrite(const EgressRewriteUndo& undo); + bool ReadMid(std::string& mid) const; bool UpdateMid(const std::string& mid); @@ -937,7 +985,8 @@ namespace RTC */ void SetExtensionLength(uint8_t id, uint8_t len); - /* Pure virtual methods inherited from Codecs::DependencyDescriptor::Listener. */ + /* Pure virtual methods inherited from Codecs::DependencyDescriptor::Listener. + */ public: void OnDependencyDescriptorUpdated(const uint8_t* data, size_t len) override; diff --git a/worker/src/RTC/Consumer.cpp b/worker/src/RTC/Consumer.cpp index 04a8fab12c..77474678de 100644 --- a/worker/src/RTC/Consumer.cpp +++ b/worker/src/RTC/Consumer.cpp @@ -145,12 +145,117 @@ namespace RTC // paused is set to false by default. this->paused = data->paused(); + // Parse the optional per-Consumer egress rewrite mapping. + // + // When this mapping is present the packet on the wire will use the + // "consumer" PTs and extension ids declared by the caller, while the + // internal worker path continues to operate on the Producer-side + // consumable PTs / canonical extension ids. SimpleConsumer / + // SimulcastConsumer / SvcConsumer / PipeConsumer apply + // `Packet::ApplyEgressRewrite` in their SendRtpPacket implementations. + if (data->consumerRtpMapping() != nullptr) + { + const auto* mapping = data->consumerRtpMapping(); + + this->hasEgressRemap = true; + + for (const auto* entry : *mapping->codecs()) + { + const uint8_t producerPt = entry->producerPayloadType(); + const uint8_t consumerPt = entry->consumerPayloadType(); + + if (producerPt == 0u || producerPt >= 128u) + { + MS_THROW_TYPE_ERROR("invalid consumerRtpMapping codec (producer payload type out of range)"); + } + + if (consumerPt == 0u || consumerPt >= 128u) + { + MS_THROW_TYPE_ERROR("invalid consumerRtpMapping codec (consumer payload type out of range)"); + } + + this->egressPayloadTypeMap[producerPt] = consumerPt; + } + + for (const auto* entry : *mapping->headerExtensions()) + { + const uint8_t producerExtId = entry->producerExtId(); + const uint8_t consumerExtId = entry->consumerExtId(); + + if (producerExtId == 0u || producerExtId >= 15u) + { + MS_THROW_TYPE_ERROR( + "invalid consumerRtpMapping header extension (producer ext id out of range)"); + } + + if (consumerExtId == 0u || consumerExtId >= 15u) + { + MS_THROW_TYPE_ERROR( + "invalid consumerRtpMapping header extension (consumer ext id out of range)"); + } + + this->egressHeaderExtensionIdMap[producerExtId] = consumerExtId; + } + + // Build egressHeaderExtensionIds by routing each non-zero id in + // rtpHeaderExtensionIds through the remap table. Fields without a + // remap entry keep their original id (so the view stays consistent + // for extensions the caller did not need to re-label). + this->egressHeaderExtensionIds = this->rtpHeaderExtensionIds; + + auto remap = [this](uint8_t& id) + { + if (id != 0u && this->egressHeaderExtensionIdMap[id] != 0u) + { + id = this->egressHeaderExtensionIdMap[id]; + } + }; + + remap(this->egressHeaderExtensionIds.mid); + remap(this->egressHeaderExtensionIds.rid); + remap(this->egressHeaderExtensionIds.rrid); + remap(this->egressHeaderExtensionIds.absSendTime); + remap(this->egressHeaderExtensionIds.transportWideCc01); + remap(this->egressHeaderExtensionIds.ssrcAudioLevel); + remap(this->egressHeaderExtensionIds.dependencyDescriptor); + remap(this->egressHeaderExtensionIds.videoOrientation); + remap(this->egressHeaderExtensionIds.timeOffset); + remap(this->egressHeaderExtensionIds.absCaptureTime); + remap(this->egressHeaderExtensionIds.playoutDelay); + remap(this->egressHeaderExtensionIds.mediasoupPacketId); + } + // Fill supported codec payload types. - for (auto& codec : this->rtpParameters.codecs) + // + // In the legacy (no-remap) path, `rtpParameters.codecs[i].payloadType` + // is the consumable PT (same space as the packet seen in SendRtpPacket + // after Producer::MangleRtpPacket), so indexing the bitset by it is + // correct. + // + // In the remap path, `rtpParameters.codecs[i].payloadType` holds the + // *wire* PT, and the packet in SendRtpPacket still carries the + // producer-side consumable PT. Iterate `egressPayloadTypeMap` + // directly: a non-zero entry at index `producerPt` means the caller + // declared an egress mapping for that consumable PT (media or RTX), + // so that is the PT we must accept. + if (this->hasEgressRemap) { - if (codec.mimeType.IsMediaCodec()) + for (size_t producerPt = 0u; producerPt < this->egressPayloadTypeMap.size(); ++producerPt) { - this->supportedCodecPayloadTypes[codec.payloadType] = true; + if (this->egressPayloadTypeMap[producerPt] != 0u) + { + this->supportedCodecPayloadTypes[producerPt] = true; + } + } + } + else + { + for (auto& codec : this->rtpParameters.codecs) + { + if (codec.mimeType.IsMediaCodec()) + { + this->supportedCodecPayloadTypes[codec.payloadType] = true; + } } } diff --git a/worker/src/RTC/RTP/Packet.cpp b/worker/src/RTC/RTP/Packet.cpp index 09dc3444ab..afa7ea9449 100644 --- a/worker/src/RTC/RTP/Packet.cpp +++ b/worker/src/RTC/RTP/Packet.cpp @@ -846,6 +846,113 @@ namespace RTC this->headerExtensionIds = headerExtensionIds; } + void Packet::ApplyEgressRewrite( + uint8_t newPayloadType, + const std::array& extIdRemap, + const RTP::HeaderExtensionIds& newExtIds, + EgressRewriteUndo& undo) + { + MS_TRACE(); + + const auto rewriteId = [&extIdRemap](uint8_t oldId) + { + const auto newId = extIdRemap[oldId]; + + return newId == 0u ? oldId : newId; + }; + + undo.origPayloadType = GetFixedHeaderPointer()->payloadType; + undo.origOneByteExtensions = this->oneByteExtensions; + undo.origTwoBytesExtensions = this->twoBytesExtensions; + undo.origHeaderExtensionIds = this->headerExtensionIds; + + GetFixedHeaderPointer()->payloadType = newPayloadType; + + auto* extensionsStart = GetHeaderExtensionValue(); + + if (HasOneByteExtensions()) + { + for (uint8_t oldId = 1u; oldId <= 14u; ++oldId) + { + const auto off = undo.origOneByteExtensions[oldId - 1]; + + if (off == -1) + { + continue; + } + + const auto newId = rewriteId(oldId); + + if (newId == oldId) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = static_cast((newId << 4) | (*idByte & 0x0F)); + + this->oneByteExtensions[newId - 1] = off; + this->oneByteExtensions[oldId - 1] = -1; + } + } + else + { + for (const auto& [oldId, off] : undo.origTwoBytesExtensions) + { + const auto newId = rewriteId(oldId); + + if (newId == oldId) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = newId; + + this->twoBytesExtensions.erase(oldId); + this->twoBytesExtensions[newId] = off; + } + } + + this->headerExtensionIds = newExtIds; + } + + void Packet::RevertEgressRewrite(const EgressRewriteUndo& undo) + { + MS_TRACE(); + + auto* extensionsStart = GetHeaderExtensionValue(); + + if (HasOneByteExtensions()) + { + for (uint8_t oldId = 1u; oldId <= 14u; ++oldId) + { + const auto off = undo.origOneByteExtensions[oldId - 1]; + + if (off == -1) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = static_cast((oldId << 4) | (*idByte & 0x0F)); + } + } + else + { + for (const auto& [oldId, off] : undo.origTwoBytesExtensions) + { + auto* idByte = extensionsStart + off; + *idByte = oldId; + } + } + + this->oneByteExtensions = undo.origOneByteExtensions; + this->twoBytesExtensions = undo.origTwoBytesExtensions; + this->headerExtensionIds = undo.origHeaderExtensionIds; + GetFixedHeaderPointer()->payloadType = undo.origPayloadType; + } + bool Packet::ReadMid(std::string& mid) const { MS_TRACE(); diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 3b3962e6ff..66065adf27 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -321,6 +321,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -346,7 +361,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -440,6 +455,21 @@ namespace RTC packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); packet->SetSequenceNumber(seq); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = packet->GetTimestamp(); packet->logger.sendSeqNumber = seq; @@ -458,7 +488,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -488,11 +518,21 @@ namespace RTC packet->SetSsrc(origSsrc); packet->SetSequenceNumber(origSeq); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/src/RTC/SimulcastConsumer.cpp b/worker/src/RTC/SimulcastConsumer.cpp index 88801252af..e3390d9e69 100644 --- a/worker/src/RTC/SimulcastConsumer.cpp +++ b/worker/src/RTC/SimulcastConsumer.cpp @@ -710,6 +710,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + auto spatialLayer = this->mapMappedSsrcSpatialLayer.at(packet->GetSsrc()); if (!IsActive()) @@ -786,7 +801,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -822,7 +837,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -1094,6 +1109,21 @@ namespace RTC packet->SetSequenceNumber(seq); packet->SetTimestamp(timestamp); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = timestamp; packet->logger.sendSeqNumber = seq; @@ -1114,7 +1144,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -1155,11 +1185,21 @@ namespace RTC // Restore the original payload if needed. packet->RestorePayload(); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/src/RTC/SvcConsumer.cpp b/worker/src/RTC/SvcConsumer.cpp index fbe433fc86..169cfd4e12 100644 --- a/worker/src/RTC/SvcConsumer.cpp +++ b/worker/src/RTC/SvcConsumer.cpp @@ -603,6 +603,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -638,7 +653,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -735,6 +750,21 @@ namespace RTC packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); packet->SetSequenceNumber(seq); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = packet->GetTimestamp(); packet->logger.sendSeqNumber = seq; @@ -755,7 +785,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -790,11 +820,21 @@ namespace RTC // Restore the original payload if needed. packet->RestorePayload(); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/test/src/RTC/RTP/TestPacket.cpp b/worker/test/src/RTC/RTP/TestPacket.cpp index f34377fa14..094a42d021 100644 --- a/worker/test/src/RTC/RTP/TestPacket.cpp +++ b/worker/test/src/RTC/RTP/TestPacket.cpp @@ -2219,4 +2219,179 @@ SCENARIO("RTP Packet", "[serializable][rtp][packet]") // been destroyed. packet->SetBufferReleasedListener(nullptr); } + + SECTION("Packet::ApplyEgressRewrite() and RevertEgressRewrite() on one-byte extensions") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0xbe, 0xde, 0x00, 0x03, // Header Extension + 0x10, 0xaa, 0x21, 0xbb, // - id: 1, len: 1 + 0xff, 0x00, 0x00, 0x33, // - id: 2, len: 2 + 0xff, 0xff, 0xff, 0xff, // - id: 3, len: 4 + 0x12, 0x23 + }; + // clang-format on + + alignas(4) uint8_t snapshotBefore[sizeof(buffer)]; + std::memcpy(snapshotBefore, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + REQUIRE(packet->HasOneByteExtensions()); + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(2)); + REQUIRE(packet->HasExtension(3)); + + // Remap producer ids (1,2,3) -> wire ids (5,7,9), and PT 1 -> 97. + std::array extIdRemap{}; + extIdRemap[1] = 5; + extIdRemap[2] = 7; + extIdRemap[3] = 9; + + RTC::RTP::HeaderExtensionIds newExtIds{}; + // For this unit test we do not care about which URI maps to which id; + // ApplyEgressRewrite copies the struct wholesale into packet-> + // headerExtensionIds. + newExtIds.mid = 5; + newExtIds.rid = 7; + newExtIds.transportWideCc01 = 9; + + RTC::RTP::Packet::EgressRewriteUndo undo; + + packet->ApplyEgressRewrite(97, extIdRemap, newExtIds, undo); + + // After remap the packet is in wire state. + REQUIRE(packet->GetPayloadType() == 97); + REQUIRE(packet->HasExtension(5)); + REQUIRE(packet->HasExtension(7)); + REQUIRE(packet->HasExtension(9)); + REQUIRE_FALSE(packet->HasExtension(1)); + REQUIRE_FALSE(packet->HasExtension(2)); + REQUIRE_FALSE(packet->HasExtension(3)); + + uint8_t len; + auto* val1 = packet->GetExtensionValue(5, len); + REQUIRE(val1 != nullptr); + REQUIRE(len == 1); + REQUIRE(val1[0] == 0xaa); + + auto* val2 = packet->GetExtensionValue(7, len); + REQUIRE(val2 != nullptr); + REQUIRE(len == 2); + + auto* val3 = packet->GetExtensionValue(9, len); + REQUIRE(val3 != nullptr); + REQUIRE(len == 4); + + // Revert and check bit-for-bit equality. + packet->RevertEgressRewrite(undo); + + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(2)); + REQUIRE(packet->HasExtension(3)); + REQUIRE_FALSE(packet->HasExtension(5)); + REQUIRE_FALSE(packet->HasExtension(7)); + REQUIRE_FALSE(packet->HasExtension(9)); + + REQUIRE(std::memcmp(buffer, snapshotBefore, sizeof(buffer)) == 0); + } + + SECTION("Packet::ApplyEgressRewrite() identity remap is a no-op") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0xbe, 0xde, 0x00, 0x03, + 0x10, 0xaa, 0x21, 0xbb, + 0xff, 0x00, 0x00, 0x33, + 0xff, 0xff, 0xff, 0xff, + 0x12, 0x23 + }; + // clang-format on + + alignas(4) uint8_t snapshot[sizeof(buffer)]; + std::memcpy(snapshot, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + + std::array extIdRemap{}; + // All zeros: identity remap (no-op). + + RTC::RTP::HeaderExtensionIds newExtIds{}; + + RTC::RTP::Packet::EgressRewriteUndo undo; + + // Remap to same PT value with identity ext id remap. + packet->ApplyEgressRewrite(1, extIdRemap, newExtIds, undo); + + // Extension bytes must not have been touched since all remap entries + // are 0. + REQUIRE(std::memcmp(buffer + 16, snapshot + 16, sizeof(buffer) - 16) == 0); + + packet->RevertEgressRewrite(undo); + REQUIRE(std::memcmp(buffer, snapshot, sizeof(buffer)) == 0); + } + + SECTION("Packet::ApplyEgressRewrite() and RevertEgressRewrite() on two-byte extensions") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0x10, 0x00, 0x00, 0x04, // Header Extension (Two-Bytes) + 0x00, 0x00, 0x01, 0x00, // - id: 1, len: 0 + 0x02, 0x01, 0x42, 0x00, // - id: 2, len: 1 + 0x03, 0x02, 0x11, 0x22, // - id: 3, len: 2 + 0x00, 0x00, 0x04, 0x00 // - id: 4, len: 0 + }; + // clang-format on + + alignas(4) uint8_t snapshot[sizeof(buffer)]; + std::memcpy(snapshot, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + REQUIRE(packet->HasTwoBytesExtensions()); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(3)); + + std::array extIdRemap{}; + extIdRemap[1] = 11; + extIdRemap[3] = 13; + + RTC::RTP::HeaderExtensionIds newExtIds{}; + RTC::RTP::Packet::EgressRewriteUndo undo; + + packet->ApplyEgressRewrite(42, extIdRemap, newExtIds, undo); + + REQUIRE(packet->GetPayloadType() == 42); + REQUIRE(packet->HasExtension(11)); + REQUIRE(packet->HasExtension(13)); + REQUIRE_FALSE(packet->HasExtension(1)); + REQUIRE_FALSE(packet->HasExtension(3)); + + // Revert and assert bit-for-bit equality. + packet->RevertEgressRewrite(undo); + + REQUIRE(std::memcmp(buffer, snapshot, sizeof(buffer)) == 0); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(3)); + REQUIRE_FALSE(packet->HasExtension(11)); + REQUIRE_FALSE(packet->HasExtension(13)); + } } From 3180671ba2f50170b5d3583e63fe8b1b55111a95 Mon Sep 17 00:00:00 2001 From: wuxinfei Date: Thu, 23 Apr 2026 19:57:09 +0800 Subject: [PATCH 2/4] feat: Enhance MID handling in RTP parameters --- node/src/Transport.ts | 10 +++++++--- node/src/ortc.ts | 12 ++++++------ rust/src/ortc.rs | 1 + rust/src/router/transport.rs | 10 +++++++--- worker/.vscode/settings.json | 36 ++++++++++++++++++++++++++++++++++++ 5 files changed, 57 insertions(+), 12 deletions(-) create mode 100644 worker/.vscode/settings.json diff --git a/node/src/Transport.ts b/node/src/Transport.ts index d83a585d11..079112a6c0 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -636,11 +636,15 @@ export abstract class TransportImpl< ); } - // Set MID. - if (!pipe) { + // Set MID. Priority: + // 1. mid already set on rtpParameters (override path, taken from + // the caller-provided rtpParameters.mid). + // 2. ConsumerOptions.mid. + // 3. Auto-generated monotonically increasing integer. + if (!pipe && !rtpParameters.mid) { if (mid) { rtpParameters.mid = mid; - } else if (!rtpParameters.mid) { + } else { rtpParameters.mid = `${this.#nextMidForConsumers++}`; // We use up to 8 bytes for MID (string). diff --git a/node/src/ortc.ts b/node/src/ortc.ts index f8a5fb296a..c452c65f20 100644 --- a/node/src/ortc.ts +++ b/node/src/ortc.ts @@ -693,6 +693,7 @@ export function getConsumerRtpParameters({ } consumerParams = { + mid: override.mid, codecs: [], headerExtensions: [], encodings: [], @@ -1032,10 +1033,7 @@ export function getConsumerRtpMapping( if (isRtxCodec(producerCodec)) { const apt = consumerCodec.parameters?.['apt']; - if ( - typeof apt !== 'number' || - !consumerCodecPts.has(apt as number) - ) { + if (typeof apt !== 'number' || !consumerCodecPts.has(apt as number)) { continue; } } @@ -1085,8 +1083,10 @@ export function serializeConsumerRtpMapping( ); } - const codecsOffset = - FbsRtpParameters.ConsumerRtpMapping.createCodecsVector(builder, codecs); + const codecsOffset = FbsRtpParameters.ConsumerRtpMapping.createCodecsVector( + builder, + codecs + ); const headerExtensions: number[] = []; diff --git a/rust/src/ortc.rs b/rust/src/ortc.rs index d31c4c83ed..4b29eb41da 100644 --- a/rust/src/ortc.rs +++ b/rust/src/ortc.rs @@ -879,6 +879,7 @@ pub(crate) fn get_consumer_rtp_parameters( } RemoteRtpSource::Parameters(override_params) => { consumer_params = RtpParameters { + mid: override_params.mid.clone(), rtcp: override_params.rtcp.clone(), msid: override_params .msid diff --git a/rust/src/router/transport.rs b/rust/src/router/transport.rs index fe1d4dbc7e..9a31ff6ada 100644 --- a/rust/src/router/transport.rs +++ b/rust/src/router/transport.rs @@ -667,10 +667,14 @@ pub(super) trait TransportImpl: TransportGeneric { )); } - if !pipe { - // Set MID. + if !pipe && rtp_parameters.mid.is_none() { + // Set MID. Priority: + // 1. mid already set on rtp_parameters (override path, + // taken from the caller-provided rtpParameters.mid). + // 2. ConsumerOptions.mid. + // 3. Auto-generated monotonically increasing integer + // (up to 8 bytes). rtp_parameters.mid = mid.or_else(|| { - // We use up to 8 bytes for MID (string). let next_mid_for_consumers = self .next_mid_for_consumers() .fetch_add(1, Ordering::Relaxed); diff --git a/worker/.vscode/settings.json b/worker/.vscode/settings.json new file mode 100644 index 0000000000..10ed08d728 --- /dev/null +++ b/worker/.vscode/settings.json @@ -0,0 +1,36 @@ +{ + "editor.formatOnSave": true, + "C_Cpp.default.configurationProvider": "mesonbuild.mesonbuild", + "mesonbuild.configureOnOpen": false, + + "clangd.path": "/opt/homebrew/opt/llvm/bin/clangd", + "clangd.arguments": [ + "-log=verbose", + "-pretty", + "--background-index", + "--compile-commands-dir=${workspaceFolder}/worker/out/Release/build", + "--clang-tidy", + // 启用全项目索引(跨文件引用查找更准) + "--all-scopes-completion", + // 补充头文件(当某些头文件找不到时自动插入) + "--header-insertion=iwyu", + // 开启代码补全时显示参数提示 + "--completion-style=detailed", + // 利用多核优化 + "-j=4", + // 兜底风格,虽然有 .clang-format 时会被忽略,但写上是个好习惯 + "--fallback-style=LLVM" + ], + + "[cpp]": { + "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd", + "editor.formatOnSave": true + }, + "[c]": { + "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd", + "editor.formatOnSave": true + }, + "C_Cpp.formatting": "disabled", + "C_Cpp.intelliSenseEngine": "disabled", + "C_Cpp.errorSquiggles": "disabled" +} From 9c2702b2195ecbcaa69c50d58b901bfac5158dc1 Mon Sep 17 00:00:00 2001 From: wuxinfei Date: Thu, 23 Apr 2026 20:50:05 +0800 Subject: [PATCH 3/4] feat: Ensure compliance with RFC 4585 for rtcpFeedback in RTP parameters MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This update modifies the handling of rtcpFeedback in both TypeScript and Rust implementations to ensure that the final Consumer RTP parameters adhere to RFC 4585 §4.2.2. The changes preserve the caller-advertised rtcpFeedback list, preventing the Router's consumable feedback from leaking into WHEP answers. The implementation includes filtering logic to maintain compatibility with the enableRtx setting. --- node/src/ortc.ts | 8 +++++++- rust/src/ortc.rs | 12 +++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/node/src/ortc.ts b/node/src/ortc.ts index c452c65f20..907c45bb41 100644 --- a/node/src/ortc.ts +++ b/node/src/ortc.ts @@ -733,7 +733,13 @@ export function getConsumerRtpParameters({ continue; } - codec.rtcpFeedback = (matchedCodec.rtcpFeedback ?? []).filter( + // Pick the caller-advertised rtcpFeedback list so that the final + // Consumer rtpParameters stay compliant with RFC 4585 §4.2.2 + // (answer rtcpFeedback must be a subset of the offer): + // - Capabilities path: matchedCodec came from caps (caller side). + // - Parameters path: codec came from override (caller side). + const feedbackSource = isOverride ? codec : matchedCodec; + codec.rtcpFeedback = (feedbackSource.rtcpFeedback ?? []).filter( fb => enableRtx || fb.type !== 'nack' || fb.parameter ); diff --git a/rust/src/ortc.rs b/rust/src/ortc.rs index 4b29eb41da..8ce08bc3de 100644 --- a/rust/src/ortc.rs +++ b/rust/src/ortc.rs @@ -895,17 +895,23 @@ pub(crate) fn get_consumer_rtp_parameters( continue; } - if let Some(matched_codec) = consumable_rtp_parameters + if consumable_rtp_parameters .codecs .iter() - .find(|cc| match_codecs((&codec).into(), (*cc).into(), true).is_ok()) + .any(|cc| match_codecs((&codec).into(), cc.into(), true).is_ok()) { - *codec.rtcp_feedback_mut() = matched_codec + // Preserve caller-declared rtcp_feedback (typically + // taken verbatim from the remote SDP answer) so that + // the final Consumer rtp_parameters stay compliant + // with RFC 4585 §4.2.2 (answer feedback must be a + // subset of the offer). + let feedback: Vec<_> = codec .rtcp_feedback() .iter() .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) .copied() .collect(); + *codec.rtcp_feedback_mut() = feedback; consumer_params.codecs.push(codec); } } From 4b3b7107525cdfcd440d631b269ec6f581c2f4d7 Mon Sep 17 00:00:00 2001 From: wuxinfei Date: Fri, 24 Apr 2026 10:58:19 +0800 Subject: [PATCH 4/4] refactor: Improve readability and error handling in RTP parameter tests This commit enhances the readability of the test code by formatting the codec lookups for H264 and RTX into multi-line statements. Additionally, it updates the error type thrown in the test for invalid RTP parameters from TypeError to UnsupportedError, ensuring more accurate error handling in the test cases. --- node/src/test/test-Consumer.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index c6407c9026..4a4b81437d 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -558,8 +558,12 @@ test('transport.consume() can be created with user provided mid', async () => { test('transport.consume() with rtpParameters override rewrites egress PT / ext-id', async () => { // Learn the producer's consumable PT / ext-id mappings. const consumable = ctx.videoProducer!.consumableRtpParameters; - const consumableH264 = consumable.codecs.find(c => c.mimeType === 'video/H264')!; - const consumableRtx = consumable.codecs.find(c => c.mimeType === 'video/rtx')!; + const consumableH264 = consumable.codecs.find( + c => c.mimeType === 'video/H264' + )!; + const consumableRtx = consumable.codecs.find( + c => c.mimeType === 'video/rtx' + )!; const consumableMid = consumable.headerExtensions!.find( e => e.uri === 'urn:ietf:params:rtp-hdrext:sdes:mid' )!; @@ -680,7 +684,7 @@ test('transport.consume() with invalid rtpParameters override rejects', async () rtpCapabilities: ctx.consumerDeviceCapabilities, rtpParameters: override, }) - ).rejects.toThrow(TypeError); + ).rejects.toThrow(UnsupportedError); }, 2000); test('transport.consume() with incompatible rtpCapabilities rejects with UnsupportedError', async () => {