diff --git a/node/src/ConsumerTypes.ts b/node/src/ConsumerTypes.ts index 0a7bac65b5..0bd4f5dc6e 100644 --- a/node/src/ConsumerTypes.ts +++ b/node/src/ConsumerTypes.ts @@ -70,6 +70,25 @@ export type ConsumerOptions = { */ pipe?: boolean; + /** + * When set, bypasses ortc.getConsumerRtpParameters and is used verbatim + * as the Consumer's rtpParameters. The payload types and header-extension + * ids declared here become the on-wire values for this Consumer. + * + * mediasoup validates compatibility against the Producer's + * consumableRtpParameters (1:1 codec mapping, compatible fmtp, matching + * layer/simulcast structure, header-extension URIs being a subset of the + * consumable ones) and asks the worker to rewrite outgoing RTP headers + * accordingly. + * + * Intended for advanced scenarios (e.g. WHEP) where the answer SDP + * dictates wire-level PT / ext-id values that differ from the Router's + * canonical ones. + * + * Not supported together with pipe=true. + */ + rtpParameters?: RtpParameters; + /** * Custom application data. */ diff --git a/node/src/Transport.ts b/node/src/Transport.ts index ae6580c4cb..079112a6c0 100644 --- a/node/src/Transport.ts +++ b/node/src/Transport.ts @@ -585,6 +585,7 @@ export abstract class TransportImpl< ignoreDtx = false, enableRtx, pipe = false, + rtpParameters: overrideRtpParameters, appData, }: ConsumerOptions): Promise> { logger.debug('consume()'); @@ -595,6 +596,10 @@ export abstract class TransportImpl< throw new TypeError('if given, appData must be an object'); } else if (mid && (typeof mid !== 'string' || mid.length === 0)) { throw new TypeError('if given, mid must be non empty string'); + } else if (overrideRtpParameters && pipe) { + throw new TypeError( + 'rtpParameters override is not supported together with pipe=true' + ); } // Clone given RTP capabilities to not modify input data. @@ -614,16 +619,29 @@ export abstract class TransportImpl< enableRtx = producer.kind === 'video'; } + let consumerRtpMapping: ortc.ConsumerRtpMapping | undefined; + // This may throw. - const rtpParameters = ortc.getConsumerRtpParameters({ + const rtpParameters: RtpParameters = ortc.getConsumerRtpParameters({ consumableRtpParameters: producer.consumableRtpParameters, - remoteRtpCapabilities: clonedRtpCapabilities, + remoteRtpCapabilities: overrideRtpParameters ?? clonedRtpCapabilities, pipe, enableRtx, }); - // Set MID. - if (!pipe) { + if (overrideRtpParameters) { + consumerRtpMapping = ortc.getConsumerRtpMapping( + producer.consumableRtpParameters, + rtpParameters + ); + } + + // Set MID. Priority: + // 1. mid already set on rtpParameters (override path, taken from + // the caller-provided rtpParameters.mid). + // 2. ConsumerOptions.mid. + // 3. Auto-generated monotonically increasing integer. + if (!pipe && !rtpParameters.mid) { if (mid) { rtpParameters.mid = mid; } else { @@ -650,6 +668,7 @@ export abstract class TransportImpl< preferredLayers, ignoreDtx, pipe, + consumerRtpMapping, }); const response = await this.channel.request( @@ -1348,6 +1367,7 @@ function createConsumeRequest({ preferredLayers, ignoreDtx, pipe, + consumerRtpMapping, }: { builder: flatbuffers.Builder; producer: Producer; @@ -1357,6 +1377,7 @@ function createConsumeRequest({ preferredLayers?: ConsumerLayers; ignoreDtx?: boolean; pipe: boolean; + consumerRtpMapping?: ortc.ConsumerRtpMapping; }): number { const rtpParametersOffset = serializeRtpParameters(builder, rtpParameters); const consumerIdOffset = builder.createString(consumerId); @@ -1389,6 +1410,15 @@ function createConsumeRequest({ FbsConsumer.ConsumerLayers.endConsumerLayers(builder); } + let consumerRtpMappingOffset: number | undefined; + + if (consumerRtpMapping) { + consumerRtpMappingOffset = ortc.serializeConsumerRtpMapping( + builder, + consumerRtpMapping + ); + } + const ConsumeRequest = FbsTransport.ConsumeRequest; // Create Consume Request. @@ -1420,6 +1450,10 @@ function createConsumeRequest({ ConsumeRequest.addIgnoreDtx(builder, Boolean(ignoreDtx)); + if (consumerRtpMappingOffset !== undefined) { + ConsumeRequest.addConsumerRtpMapping(builder, consumerRtpMappingOffset); + } + return ConsumeRequest.endConsumeRequest(builder); } diff --git a/node/src/ortc.ts b/node/src/ortc.ts index a6db4675c6..907c45bb41 100644 --- a/node/src/ortc.ts +++ b/node/src/ortc.ts @@ -34,6 +34,25 @@ export type RtpCodecsEncodingsMapping = { }[]; }; +/** + * Per-Consumer egress mapping between the Router's canonical (consumable) RTP + * space and the wire-level RTP space declared by the caller via + * `ConsumerOptions.rtpParameters`. + * + * The worker uses this mapping to rewrite outgoing RTP packet payload types + * and header-extension ids in place. + */ +export type ConsumerRtpMapping = { + codecs: { + producerPayloadType: number; + consumerPayloadType: number; + }[]; + headerExtensions: { + producerExtId: number; + consumerExtId: number; + }[]; +}; + const DynamicPayloadTypes = [ 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 96, 97, 98, @@ -599,55 +618,142 @@ export function getConsumerRtpParameters({ enableRtx, }: { consumableRtpParameters: RtpParameters; - remoteRtpCapabilities: RtpCapabilities; + // Can be either RtpCapabilities (default path, derived from the remote + // endpoint's capabilities) or RtpParameters (override path, where the + // caller explicitly dictates the on-wire PT / header-extension ids). + remoteRtpCapabilities: RtpCapabilities | RtpParameters; pipe: boolean; enableRtx: boolean; }): RtpParameters { - const consumerParams: RtpParameters = { - codecs: [], - headerExtensions: [], - encodings: [], - rtcp: consumableRtpParameters.rtcp, - msid: consumableRtpParameters.msid, - }; - - for (const capCodec of remoteRtpCapabilities.codecs!) { - validateAndNormalizeRtpCodecCapability(capCodec); - } + const isOverride = isRtpParameters(remoteRtpCapabilities); + + let consumerParams: RtpParameters; + let consumableCodecs: RtpCodecParameters[]; + let remoteCodecs: RtpCodecParameters[]; + let remoteHeaderExtensions: RtpHeaderExtensionParameters[]; + // Decides whether a given header extension from `remoteHeaderExtensions` + // should make it into the final consumerParams.headerExtensions. Differs + // between the RtpCapabilities and RtpParameters branches. + let matchConsumerExt: (ext: RtpHeaderExtensionParameters) => boolean; + + if (!isOverride) { + const caps = remoteRtpCapabilities as RtpCapabilities; + + for (const capCodec of caps.codecs ?? []) { + validateAndNormalizeRtpCodecCapability(capCodec); + } - const consumableCodecs = - utils.clone( - consumableRtpParameters.codecs - ) ?? []; + consumerParams = { + codecs: [], + headerExtensions: [], + encodings: [], + rtcp: consumableRtpParameters.rtcp, + msid: consumableRtpParameters.msid, + }; + // Iterate producer-side consumable codecs; the caller's capabilities + // are used as the codec-match table so that its rtcpFeedback makes it + // into the final consumerParams. + remoteCodecs = + utils.clone( + consumableRtpParameters.codecs + ) ?? []; + consumableCodecs = (caps.codecs ?? []).map(c => ({ + mimeType: c.mimeType, + payloadType: c.preferredPayloadType!, + clockRate: c.clockRate, + channels: c.channels, + parameters: c.parameters ?? {}, + rtcpFeedback: c.rtcpFeedback ?? [], + })); + remoteHeaderExtensions = consumableRtpParameters.headerExtensions ?? []; + // Keep the producer-side extension only when the remote capability + // advertises the same URI AND `preferredId`. + matchConsumerExt = ext => + (caps.headerExtensions ?? []).some( + capExt => capExt.preferredId === ext.id && capExt.uri === ext.uri + ); + } else { + const override = remoteRtpCapabilities as RtpParameters; + + // validateAndNormalizeRtpParameters dereferences params.rtcp. Fall back + // to the producer-side consumable values for rtcp / msid the caller + // did not provide so that validation has a valid view. + const rtcp = override.rtcp ?? consumableRtpParameters.rtcp; + const msid = override.msid ?? consumableRtpParameters.msid; + const toValidate: RtpParameters = { ...override, rtcp }; + + try { + validateAndNormalizeRtpParameters(toValidate); + } catch (err) { + throw new TypeError( + `invalid consumer.rtpParameters: ${ + err instanceof Error ? err.message : String(err) + }` + ); + } - let rtxSupported = false; + consumerParams = { + mid: override.mid, + codecs: [], + headerExtensions: [], + encodings: [], + rtcp, + msid, + }; + // Consumable is still the Router-canonical view used for structural + // matching; we iterate the caller's override (which dictates wire PTs + // and extension ids). + consumableCodecs = + utils.clone( + consumableRtpParameters.codecs + ) ?? []; + remoteCodecs = + utils.clone(override.codecs) ?? []; + remoteHeaderExtensions = override.headerExtensions ?? []; + // The caller explicitly declares wire-level ext ids that may differ + // from the Router's canonical ones, so we only check URI presence in + // the producer-side consumable set. The worker rewrites the producer + // ext-id to the caller-declared one using ConsumerRtpMapping. + matchConsumerExt = ext => + (consumableRtpParameters.headerExtensions ?? []).some( + cExt => cExt.uri === ext.uri + ); + } - for (const codec of consumableCodecs) { + for (const codec of remoteCodecs) { if (!enableRtx && isRtxCodec(codec)) { continue; } - const matchedCapCodec = remoteRtpCapabilities.codecs!.find(capCodec => - matchCodecs(capCodec, codec, { strict: true }) + const matchedCodec = consumableCodecs.find(cc => + matchCodecs(cc, codec, { strict: true }) ); - if (!matchedCapCodec) { + if (!matchedCodec) { continue; } - codec.rtcpFeedback = matchedCapCodec.rtcpFeedback!.filter( + // Pick the caller-advertised rtcpFeedback list so that the final + // Consumer rtpParameters stay compliant with RFC 4585 §4.2.2 + // (answer rtcpFeedback must be a subset of the offer): + // - Capabilities path: matchedCodec came from caps (caller side). + // - Parameters path: codec came from override (caller side). + const feedbackSource = isOverride ? codec : matchedCodec; + codec.rtcpFeedback = (feedbackSource.rtcpFeedback ?? []).filter( fb => enableRtx || fb.type !== 'nack' || fb.parameter ); consumerParams.codecs.push(codec); } - // Must sanitize the list of matched codecs by removing useless RTX codecs. + let rtxSupported = false; + + // Sanitize the matched codec list by removing useless RTX codecs (whose + // `apt` does not point to any remaining media codec). for (let idx = consumerParams.codecs.length - 1; idx >= 0; --idx) { const codec = consumerParams.codecs[idx]!; if (isRtxCodec(codec)) { - // Search for the associated media codec. const associatedMediaCodec = consumerParams.codecs.find( mediaCodec => mediaCodec.payloadType === codec.parameters!['apt'] ); @@ -660,7 +766,6 @@ export function getConsumerRtpParameters({ } } - // Ensure there is at least one media codec. if ( consumerParams.codecs.length === 0 || isRtxCodec(consumerParams.codecs[0]!) @@ -668,16 +773,15 @@ export function getConsumerRtpParameters({ throw new UnsupportedError('no compatible media codecs'); } - consumerParams.headerExtensions = - consumableRtpParameters.headerExtensions!.filter(ext => - remoteRtpCapabilities.headerExtensions!.some( - capExt => capExt.preferredId === ext.id && capExt.uri === ext.uri - ) - ); + for (const ext of remoteHeaderExtensions) { + if (matchConsumerExt(ext)) { + consumerParams.headerExtensions!.push(ext); + } + } // Reduce codecs' RTCP feedback. Use Transport-CC if available, REMB otherwise. if ( - consumerParams.headerExtensions.some( + consumerParams.headerExtensions!.some( ext => ext.uri === 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01' @@ -689,7 +793,7 @@ export function getConsumerRtpParameters({ ); } } else if ( - consumerParams.headerExtensions.some( + consumerParams.headerExtensions!.some( ext => ext.uri === 'http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time' ) @@ -707,7 +811,28 @@ export function getConsumerRtpParameters({ } } - if (!pipe) { + if (pipe) { + const consumableEncodings = + utils.clone( + consumableRtpParameters.encodings + ) ?? []; + const baseSsrc = utils.generateRandomNumber(); + const baseRtxSsrc = utils.generateRandomNumber(); + + for (let i = 0; i < consumableEncodings.length; ++i) { + const encoding = consumableEncodings[i]!; + + encoding.ssrc = baseSsrc + i; + + if (rtxSupported) { + encoding.rtx = { ssrc: baseRtxSsrc + i }; + } else { + delete encoding.rtx; + } + + consumerParams.encodings!.push(encoding); + } + } else { const consumerEncoding: RtpEncodingParameters = { ssrc: utils.generateRandomNumber(), }; @@ -716,18 +841,15 @@ export function getConsumerRtpParameters({ consumerEncoding.rtx = { ssrc: consumerEncoding.ssrc! + 1 }; } - // If any of the consumableRtpParameters.encodings has scalabilityMode, - // process it (assume all encodings have the same value). - const encodingWithScalabilityMode = consumableRtpParameters.encodings!.find( - encoding => encoding.scalabilityMode - ); + const encodingWithScalabilityMode = ( + consumableRtpParameters.encodings ?? [] + ).find(encoding => encoding.scalabilityMode); let scalabilityMode = encodingWithScalabilityMode ? encodingWithScalabilityMode.scalabilityMode : undefined; - // If there is simulast, mangle spatial layers in scalabilityMode. - if (consumableRtpParameters.encodings!.length > 1) { + if ((consumableRtpParameters.encodings ?? []).length > 1) { const { temporalLayers } = parseScalabilityMode(scalabilityMode); scalabilityMode = `L${ @@ -739,9 +861,9 @@ export function getConsumerRtpParameters({ consumerEncoding.scalabilityMode = scalabilityMode; } - // Use the maximum maxBitrate in any encoding and honor it in the Consumer's - // encoding. - const maxEncodingMaxBitrate = consumableRtpParameters.encodings!.reduce( + const maxEncodingMaxBitrate = ( + consumableRtpParameters.encodings ?? [] + ).reduce( (maxBitrate, encoding) => encoding.maxBitrate && encoding.maxBitrate > maxBitrate ? encoding.maxBitrate @@ -753,34 +875,40 @@ export function getConsumerRtpParameters({ consumerEncoding.maxBitrate = maxEncodingMaxBitrate; } - // Set a single encoding for the Consumer. consumerParams.encodings!.push(consumerEncoding); - } else { - const consumableEncodings = - utils.clone( - consumableRtpParameters.encodings - ) ?? []; - const baseSsrc = utils.generateRandomNumber(); - const baseRtxSsrc = utils.generateRandomNumber(); - - for (let i = 0; i < consumableEncodings.length; ++i) { - const encoding = consumableEncodings[i]!; - - encoding.ssrc = baseSsrc + i; - - if (rtxSupported) { - encoding.rtx = { ssrc: baseRtxSsrc + i }; - } else { - delete encoding.rtx; - } - - consumerParams.encodings!.push(encoding); - } } return consumerParams; } +// Heuristic to discriminate the two input shapes of +// `getConsumerRtpParameters.remoteRtpCapabilities`: RtpCodecParameters has a +// required `payloadType`, RtpCodecCapability has `preferredPayloadType` +// instead. We fall back to checking for `encodings` / `rtcp` / `msid` / `mid` +// if the codec list is empty so the empty-override edge case still lands on +// the `RtpParameters` branch. +function isRtpParameters( + value: RtpCapabilities | RtpParameters +): value is RtpParameters { + const firstCodec = value.codecs?.[0] as + | RtpCodecCapability + | RtpCodecParameters + | undefined; + + if (firstCodec) { + return (firstCodec as RtpCodecParameters).payloadType !== undefined; + } + + const asParams = value as RtpParameters; + + return ( + Array.isArray(asParams.encodings) || + asParams.rtcp !== undefined || + asParams.msid !== undefined || + asParams.mid !== undefined + ); +} + /** * Generate RTP parameters for a pipe Consumer. * @@ -871,6 +999,126 @@ export function getPipeConsumerRtpParameters({ return consumerParams; } +/** + * Build a per-Consumer egress mapping between the Router's canonical + * (consumable) RTP space and the wire-level RTP space dictated by the final + * Consumer rtpParameters. The worker uses this mapping to rewrite outgoing + * RTP packet payload types and header-extension ids in place. + */ +export function getConsumerRtpMapping( + producerRtpParameters: RtpParameters, + consumerRtpParameters: RtpParameters +): ConsumerRtpMapping { + const mapping: ConsumerRtpMapping = { + codecs: [], + headerExtensions: [], + }; + + const consumerCodecPts = new Set(); + + for (const codec of consumerRtpParameters.codecs) { + consumerCodecPts.add(codec.payloadType); + } + + const usedProducerCodecPts = new Set(); + + for (const consumerCodec of consumerRtpParameters.codecs) { + for (const producerCodec of producerRtpParameters.codecs) { + if (usedProducerCodecPts.has(producerCodec.payloadType)) { + continue; + } + + if (isRtxCodec(producerCodec) !== isRtxCodec(consumerCodec)) { + continue; + } + + if (!matchCodecs(producerCodec, consumerCodec, { strict: true })) { + continue; + } + + if (isRtxCodec(producerCodec)) { + const apt = consumerCodec.parameters?.['apt']; + + if (typeof apt !== 'number' || !consumerCodecPts.has(apt as number)) { + continue; + } + } + + usedProducerCodecPts.add(producerCodec.payloadType); + mapping.codecs.push({ + producerPayloadType: producerCodec.payloadType, + consumerPayloadType: consumerCodec.payloadType, + }); + break; + } + } + + const producerExtByUri = new Map(); + + for (const ext of producerRtpParameters.headerExtensions ?? []) { + producerExtByUri.set(ext.uri, ext.id); + } + + for (const consumerExt of consumerRtpParameters.headerExtensions ?? []) { + const producerExtId = producerExtByUri.get(consumerExt.uri); + + if (producerExtId !== undefined) { + mapping.headerExtensions.push({ + producerExtId, + consumerExtId: consumerExt.id, + }); + } + } + + return mapping; +} + +export function serializeConsumerRtpMapping( + builder: flatbuffers.Builder, + consumerRtpMapping: ConsumerRtpMapping +): number { + const codecs: number[] = []; + + for (const m of consumerRtpMapping.codecs) { + codecs.push( + FbsRtpParameters.ConsumerCodecMapping.createConsumerCodecMapping( + builder, + m.producerPayloadType, + m.consumerPayloadType + ) + ); + } + + const codecsOffset = FbsRtpParameters.ConsumerRtpMapping.createCodecsVector( + builder, + codecs + ); + + const headerExtensions: number[] = []; + + for (const m of consumerRtpMapping.headerExtensions) { + headerExtensions.push( + FbsRtpParameters.ConsumerHeaderExtensionMapping.createConsumerHeaderExtensionMapping( + builder, + m.producerExtId, + m.consumerExtId + ) + ); + } + + const headerExtensionsOffset = + FbsRtpParameters.ConsumerRtpMapping.createHeaderExtensionsVector( + builder, + headerExtensions + ); + + return FbsRtpParameters.ConsumerRtpMapping.createConsumerRtpMapping( + builder, + codecsOffset, + headerExtensionsOffset + ); +} + export function serializeRtpMapping( builder: flatbuffers.Builder, rtpMapping: RtpCodecsEncodingsMapping diff --git a/node/src/test/test-Consumer.ts b/node/src/test/test-Consumer.ts index b486dc6309..4a4b81437d 100644 --- a/node/src/test/test-Consumer.ts +++ b/node/src/test/test-Consumer.ts @@ -555,6 +555,138 @@ test('transport.consume() can be created with user provided mid', async () => { ); }, 2000); +test('transport.consume() with rtpParameters override rewrites egress PT / ext-id', async () => { + // Learn the producer's consumable PT / ext-id mappings. + const consumable = ctx.videoProducer!.consumableRtpParameters; + const consumableH264 = consumable.codecs.find( + c => c.mimeType === 'video/H264' + )!; + const consumableRtx = consumable.codecs.find( + c => c.mimeType === 'video/rtx' + )!; + const consumableMid = consumable.headerExtensions!.find( + e => e.uri === 'urn:ietf:params:rtp-hdrext:sdes:mid' + )!; + + // Pick non-clashing wire PTs / ids different from the consumable ones. + const wireH264Pt = 96; + const wireRtxPt = 97; + const wireMidId = (consumableMid.id % 14) + 1; + + const override: mediasoup.types.RtpParameters = { + mid: 'VIDEO-CONSUMER', + codecs: [ + { + mimeType: 'video/H264', + payloadType: wireH264Pt, + clockRate: 90000, + parameters: consumableH264.parameters, + rtcpFeedback: [{ type: 'nack', parameter: 'pli' }], + }, + { + mimeType: 'video/rtx', + payloadType: wireRtxPt, + clockRate: 90000, + parameters: { apt: wireH264Pt }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: wireMidId, + encrypt: false, + parameters: {}, + }, + ], + }; + + const videoConsumer = await ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + rtpParameters: override, + }); + + // The caller-visible rtpParameters must match the override. + expect(videoConsumer.rtpParameters.mid).toBe('VIDEO-CONSUMER'); + expect(videoConsumer.rtpParameters.codecs.length).toBe(2); + expect(videoConsumer.rtpParameters.codecs[0]!.mimeType).toBe('video/H264'); + expect(videoConsumer.rtpParameters.codecs[0]!.payloadType).toBe(wireH264Pt); + expect(videoConsumer.rtpParameters.codecs[1]!.mimeType).toBe('video/rtx'); + expect(videoConsumer.rtpParameters.codecs[1]!.payloadType).toBe(wireRtxPt); + expect(videoConsumer.rtpParameters.headerExtensions).toEqual([ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: wireMidId, + encrypt: false, + parameters: {}, + }, + ]); + + // Dump round-trips the wire PTs. + const dump = await videoConsumer.dump(); + + expect(dump.rtpParameters.codecs[0]!.payloadType).toBe(wireH264Pt); + expect(dump.rtpParameters.codecs[1]!.payloadType).toBe(wireRtxPt); + expect(dump.rtpParameters.headerExtensions?.[0]?.id).toBe(wireMidId); + + // supportedCodecPayloadTypes must be keyed by the producer-consumable PT + // (not the wire PT), otherwise incoming packets would all be dropped. + expect(dump.supportedCodecPayloadTypes).toEqual( + expect.arrayContaining([ + consumableH264.payloadType, + consumableRtx.payloadType, + ]) + ); + expect(dump.supportedCodecPayloadTypes).not.toContain(wireH264Pt); +}, 2000); + +test('transport.consume() with rtpParameters override rejects on pipe transport', async () => { + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 96, + clockRate: 90000, + parameters: { 'packetization-mode': 1, 'profile-level-id': '4d0032' }, + rtcpFeedback: [], + }, + ], + }; + + await expect( + ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + pipe: true, + rtpParameters: override, + }) + ).rejects.toThrow(TypeError); +}, 2000); + +test('transport.consume() with invalid rtpParameters override rejects', async () => { + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + // No H264 match (wrong profile-level-id). + mimeType: 'video/H264', + payloadType: 96, + clockRate: 90000, + parameters: { 'packetization-mode': 1, 'profile-level-id': '640032' }, + rtcpFeedback: [], + }, + ], + }; + + await expect( + ctx.webRtcTransport2!.consume({ + producerId: ctx.videoProducer!.id, + rtpCapabilities: ctx.consumerDeviceCapabilities, + rtpParameters: override, + }) + ).rejects.toThrow(UnsupportedError); +}, 2000); + test('transport.consume() with incompatible rtpCapabilities rejects with UnsupportedError', async () => { let invalidDeviceCapabilities: mediasoup.types.RtpCapabilities; diff --git a/node/src/test/test-ortc.ts b/node/src/test/test-ortc.ts index 9a7db532e8..59ddb873d4 100644 --- a/node/src/test/test-ortc.ts +++ b/node/src/test/test-ortc.ts @@ -552,3 +552,378 @@ test('getProducerRtpParametersMapping() with incompatible params throws Unsuppor ortc.getProducerRtpParametersMapping(rtpParameters, routerRtpCapabilities) ).toThrow(UnsupportedError); }); + +describe('getConsumerRtpParameters() with RtpParameters override', () => { + const makeConsumable = (): mediasoup.types.RtpParameters => ({ + codecs: [ + { + mimeType: 'video/H264', + payloadType: 101, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [ + { type: 'nack', parameter: '' }, + { type: 'nack', parameter: 'pli' }, + ], + }, + { + mimeType: 'video/rtx', + payloadType: 102, + clockRate: 90000, + parameters: { apt: 101 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 1, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 5, + encrypt: false, + parameters: {}, + }, + ], + encodings: [ + { + ssrc: 10000001, + maxBitrate: 500000, + scalabilityMode: 'L1T3', + }, + ], + rtcp: { cname: 'cname1234', reducedSize: true }, + }); + + test('succeeds with happy path and produces a mapping', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 7, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.codecs.length).toBe(2); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + expect(rtpParameters.codecs[1]!.payloadType).toBe(98); + + expect(rtpParameters.rtcp).toEqual({ + cname: 'cname1234', + reducedSize: true, + }); + + const mapping = ortc.getConsumerRtpMapping(consumable, rtpParameters); + + expect(mapping.codecs).toEqual( + expect.arrayContaining([ + { producerPayloadType: 101, consumerPayloadType: 97 }, + { producerPayloadType: 102, consumerPayloadType: 98 }, + ]) + ); + + expect(mapping.headerExtensions).toEqual([ + { producerExtId: 1, consumerExtId: 3 }, + { producerExtId: 5, consumerExtId: 7 }, + ]); + }); + + test('auto-generates SSRCs regardless of caller-provided encodings', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.encodings).toBeDefined(); + expect(rtpParameters.encodings!.length).toBe(1); + expect(typeof rtpParameters.encodings![0]!.ssrc).toBe('number'); + expect(typeof rtpParameters.encodings![0]!.rtx?.ssrc).toBe('number'); + }); + + test('rtcp.cname from caller is preserved when provided', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + rtcp: { cname: 'custom-cname' }, + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.rtcp?.cname).toBe('custom-cname'); + }); + + test('throws when no codec has a consumable counterpart', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/VP8', + payloadType: 97, + clockRate: 90000, + parameters: {}, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + expect(() => + ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }) + ).toThrow(UnsupportedError); + }); + + test('drops RTX codec when its apt points to no consumer-side codec', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 123 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: true, + }); + + expect(rtpParameters.codecs.length).toBe(1); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + }); + + test('drops unknown header extension URIs from the final rtpParameters', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'urn:3gpp:video-orientation', + id: 2, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.headerExtensions!.length).toBe(1); + expect(rtpParameters.headerExtensions![0]!.uri).toBe( + 'urn:ietf:params:rtp-hdrext:sdes:mid' + ); + }); + + test('keeps all matching header extensions (no early break)', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + ], + headerExtensions: [ + { + uri: 'urn:ietf:params:rtp-hdrext:sdes:mid', + id: 3, + encrypt: false, + parameters: {}, + }, + { + uri: 'http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01', + id: 7, + encrypt: false, + parameters: {}, + }, + ], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.headerExtensions!.length).toBe(2); + }); + + test('enableRtx=false strips RTX from the caller-provided codec list', () => { + const consumable = makeConsumable(); + const override: mediasoup.types.RtpParameters = { + codecs: [ + { + mimeType: 'video/H264', + payloadType: 97, + clockRate: 90000, + parameters: { + 'packetization-mode': 1, + 'profile-level-id': '4d0032', + }, + rtcpFeedback: [], + }, + { + mimeType: 'video/rtx', + payloadType: 98, + clockRate: 90000, + parameters: { apt: 97 }, + rtcpFeedback: [], + }, + ], + headerExtensions: [], + }; + + const rtpParameters = ortc.getConsumerRtpParameters({ + consumableRtpParameters: consumable, + remoteRtpCapabilities: override, + pipe: false, + enableRtx: false, + }); + + expect(rtpParameters.codecs.length).toBe(1); + expect(rtpParameters.codecs[0]!.payloadType).toBe(97); + expect(rtpParameters.encodings?.[0]?.rtx).toBeUndefined(); + }); +}); diff --git a/rust/src/messages.rs b/rust/src/messages.rs index dbbf67131f..73f1e89b15 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -1775,6 +1775,7 @@ pub(crate) struct TransportConsumeRequest { pub(crate) paused: bool, pub(crate) preferred_layers: Option, pub(crate) ignore_dtx: bool, + pub(crate) consumer_rtp_mapping: Option, } #[derive(Debug)] @@ -1802,8 +1803,8 @@ impl Request for TransportConsumeRequest { ToFbs::to_fbs(&self.consumable_rtp_encodings), self.paused, ToFbs::to_fbs(&self.preferred_layers), - // self.preferred_layers.map(ConsumerLayers::to_fbs), self.ignore_dtx, + self.consumer_rtp_mapping.as_ref().map(ToFbs::to_fbs), ); let request_body = request::Body::create_transport_consume_request(&mut builder, data); let request = request::Request::create( diff --git a/rust/src/ortc.rs b/rust/src/ortc.rs index 397380718d..8ce08bc3de 100644 --- a/rust/src/ortc.rs +++ b/rust/src/ortc.rs @@ -135,6 +135,66 @@ impl ToFbs for RtpMapping { } } +/// Single producer-side -> consumer-side (wire-level) payload-type pair used +/// by the worker to rewrite outgoing RTP packet headers for a per-Consumer +/// egress remap. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerCodecMapping { + pub producer_payload_type: u8, + pub consumer_payload_type: u8, +} + +/// Single producer-side -> consumer-side (wire-level) RTP header-extension id +/// pair. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerHeaderExtensionMapping { + pub producer_ext_id: u8, + pub consumer_ext_id: u8, +} + +/// Per-Consumer egress mapping between the Router's canonical (consumable) +/// RTP space and the wire-level RTP space declared by the caller via +/// [`ConsumerOptions::rtp_parameters`](crate::consumer::ConsumerOptions::rtp_parameters). +/// +/// The worker uses this mapping to rewrite outgoing RTP packet payload types +/// and header-extension ids in place for this Consumer only. +#[doc(hidden)] +#[derive(Debug, Default, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConsumerRtpMapping { + pub codecs: Vec, + pub header_extensions: Vec, +} + +impl ToFbs for ConsumerRtpMapping { + type FbsType = rtp_parameters::ConsumerRtpMapping; + + fn to_fbs(&self) -> Self::FbsType { + rtp_parameters::ConsumerRtpMapping { + codecs: self + .codecs + .iter() + .map(|m| rtp_parameters::ConsumerCodecMapping { + producer_payload_type: m.producer_payload_type, + consumer_payload_type: m.consumer_payload_type, + }) + .collect(), + header_extensions: self + .header_extensions + .iter() + .map(|m| rtp_parameters::ConsumerHeaderExtensionMapping { + producer_ext_id: m.producer_ext_id, + consumer_ext_id: m.consumer_ext_id, + }) + .collect(), + } + } +} + /// Error caused by invalid RTP parameters. #[derive(Debug, Error, Eq, PartialEq)] pub enum RtpParametersError { @@ -742,6 +802,20 @@ pub(crate) fn can_consume( .unwrap_or_default()) } +/// Input source for [`get_consumer_rtp_parameters`]. +/// +/// - [`RemoteRtpSource::Capabilities`] runs the default ORTC path: iterate the +/// producer-side consumable codecs and keep the ones matching any of the +/// caller's capabilities, inheriting rtcpFeedback from the matched +/// capability. +/// - [`RemoteRtpSource::Parameters`] runs the override path: iterate the +/// caller-provided rtpParameters codecs (which dictate wire PTs / ext ids) +/// and keep the ones that have a compatible producer-side counterpart. +pub(crate) enum RemoteRtpSource<'a> { + Capabilities(&'a RtpCapabilities), + Parameters(&'a RtpParameters), +} + /// Generate RTP parameters for a specific Consumer. /// /// It reduces encodings to just one and takes into account given RTP capabilities to reduce codecs, @@ -749,43 +823,120 @@ pub(crate) fn can_consume( #[allow(clippy::suspicious_operation_groupings)] pub(crate) fn get_consumer_rtp_parameters( consumable_rtp_parameters: &RtpParameters, - remote_rtp_capabilities: &RtpCapabilities, + remote: RemoteRtpSource<'_>, pipe: bool, enable_rtx: bool, ) -> Result { - let mut consumer_params = RtpParameters { - rtcp: consumable_rtp_parameters.rtcp.clone(), - msid: consumable_rtp_parameters.msid.clone(), - ..RtpParameters::default() - }; + let mut consumer_params: RtpParameters; - for cap_codec in &remote_rtp_capabilities.codecs { - validate_rtp_codec_capability(cap_codec) - .map_err(ConsumerRtpParametersError::InvalidCapabilities)?; - } + match remote { + RemoteRtpSource::Capabilities(caps) => { + for cap_codec in &caps.codecs { + validate_rtp_codec_capability(cap_codec) + .map_err(ConsumerRtpParametersError::InvalidCapabilities)?; + } - let mut rtx_supported = false; + consumer_params = RtpParameters { + rtcp: consumable_rtp_parameters.rtcp.clone(), + msid: consumable_rtp_parameters.msid.clone(), + ..RtpParameters::default() + }; + // Iterate producer-side consumable codecs; the caller's + // capabilities are used as the codec-match table so that its + // rtcpFeedback makes it into the final consumer_params. + for mut codec in consumable_rtp_parameters.codecs.clone() { + if !enable_rtx && codec.is_rtx() { + continue; + } - for mut codec in consumable_rtp_parameters.codecs.clone() { - if !enable_rtx && codec.is_rtx() { - continue; - } + if let Some(matched_cap_codec) = caps + .codecs + .iter() + .find(|c| match_codecs((*c).into(), (&codec).into(), true).is_ok()) + { + *codec.rtcp_feedback_mut() = matched_cap_codec + .rtcp_feedback() + .iter() + .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) + .copied() + .collect(); + consumer_params.codecs.push(codec); + } + } - if let Some(matched_cap_codec) = remote_rtp_capabilities - .codecs - .iter() - .find(|cap_codec| match_codecs((*cap_codec).into(), (&codec).into(), true).is_ok()) - { - *codec.rtcp_feedback_mut() = matched_cap_codec - .rtcp_feedback() + // Keep the producer-side extension only when the remote capability + // advertises the same URI AND `preferred_id`. + consumer_params.header_extensions = consumable_rtp_parameters + .header_extensions .iter() - .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) - .copied() + .filter(|ext| { + caps.header_extensions + .iter() + .any(|cap_ext| cap_ext.preferred_id == ext.id && cap_ext.uri == ext.uri) + }) + .cloned() .collect(); + } + RemoteRtpSource::Parameters(override_params) => { + consumer_params = RtpParameters { + mid: override_params.mid.clone(), + rtcp: override_params.rtcp.clone(), + msid: override_params + .msid + .clone() + .or_else(|| consumable_rtp_parameters.msid.clone()), + ..RtpParameters::default() + }; + // Consumable is still the Router-canonical view used for + // structural matching; we iterate the caller's override (which + // dictates wire PTs and extension ids). + for mut codec in override_params.codecs.clone() { + if !enable_rtx && codec.is_rtx() { + continue; + } - consumer_params.codecs.push(codec); + if consumable_rtp_parameters + .codecs + .iter() + .any(|cc| match_codecs((&codec).into(), cc.into(), true).is_ok()) + { + // Preserve caller-declared rtcp_feedback (typically + // taken verbatim from the remote SDP answer) so that + // the final Consumer rtp_parameters stay compliant + // with RFC 4585 §4.2.2 (answer feedback must be a + // subset of the offer). + let feedback: Vec<_> = codec + .rtcp_feedback() + .iter() + .filter(|&&fb| enable_rtx || fb != RtcpFeedback::Nack) + .copied() + .collect(); + *codec.rtcp_feedback_mut() = feedback; + consumer_params.codecs.push(codec); + } + } + + // The caller explicitly declares wire-level ext ids that may + // differ from the Router's canonical ones, so we only check URI + // presence in the producer-side consumable set. The worker rewrites + // the producer ext-id to the caller-declared one using + // `ConsumerRtpMapping`. + consumer_params.header_extensions = override_params + .header_extensions + .iter() + .filter(|ext| { + consumable_rtp_parameters + .header_extensions + .iter() + .any(|c_ext| c_ext.uri == ext.uri) + }) + .cloned() + .collect(); } } + + let mut rtx_supported = false; + // Must sanitize the list of matched codecs by removing useless RTX codecs. let mut remove_codecs = Vec::new(); for (idx, codec) in consumer_params.codecs.iter().enumerate() { @@ -816,18 +967,6 @@ pub(crate) fn get_consumer_rtp_parameters( return Err(ConsumerRtpParametersError::NoCompatibleMediaCodecs); } - consumer_params.header_extensions = consumable_rtp_parameters - .header_extensions - .iter() - .filter(|ext| { - remote_rtp_capabilities - .header_extensions - .iter() - .any(|cap_ext| cap_ext.preferred_id == ext.id && cap_ext.uri == ext.uri) - }) - .cloned() - .collect(); - // Reduce codecs' RTCP feedback. Use Transport-CC if available, REMB otherwise. if consumer_params .header_extensions @@ -1006,6 +1145,94 @@ pub(crate) fn get_pipe_consumer_rtp_parameters( consumer_params } +/// Build a per-Consumer egress mapping between the Router's canonical +/// (consumable) RTP space and the wire-level RTP space dictated by the final +/// Consumer rtpParameters. The worker uses this mapping to rewrite outgoing +/// RTP packet payload types and header-extension ids in place for this +/// Consumer only. +pub(crate) fn get_consumer_rtp_mapping( + producer_rtp_parameters: &RtpParameters, + consumer_rtp_parameters: &RtpParameters, +) -> ConsumerRtpMapping { + let mut mapping = ConsumerRtpMapping::default(); + + let consumer_codec_pts: std::collections::HashSet = consumer_rtp_parameters + .codecs + .iter() + .map(|c| c.payload_type()) + .collect(); + + let mut used_producer_codec_pts: std::collections::HashSet = + std::collections::HashSet::new(); + + for consumer_codec in &consumer_rtp_parameters.codecs { + for producer_codec in &producer_rtp_parameters.codecs { + if used_producer_codec_pts.contains(&producer_codec.payload_type()) { + continue; + } + + if producer_codec.is_rtx() != consumer_codec.is_rtx() { + continue; + } + + if match_codecs(producer_codec.into(), consumer_codec.into(), true).is_err() { + continue; + } + + if producer_codec.is_rtx() { + let apt = match consumer_codec.parameters().get("apt") { + Some(RtpCodecParametersParametersValue::Number(apt)) => { + match u8::try_from(*apt) { + Ok(v) => v, + Err(_) => continue, + } + } + _ => continue, + }; + + if !consumer_codec_pts.contains(&apt) { + continue; + } + } + + used_producer_codec_pts.insert(producer_codec.payload_type()); + mapping.codecs.push(ConsumerCodecMapping { + producer_payload_type: producer_codec.payload_type(), + consumer_payload_type: consumer_codec.payload_type(), + }); + break; + } + } + + let mut producer_ext_by_uri = + std::collections::HashMap::::with_capacity( + producer_rtp_parameters.header_extensions.len(), + ); + + for ext in &producer_rtp_parameters.header_extensions { + // NOTE: Header-extension ids are validated upstream to be in 1..=14, + // so narrowing u16 -> u8 is safe here. + if let Ok(id) = u8::try_from(ext.id) { + producer_ext_by_uri.insert(ext.uri, id); + } + } + + for consumer_ext in &consumer_rtp_parameters.header_extensions { + if let Some(&producer_ext_id) = producer_ext_by_uri.get(&consumer_ext.uri) { + if let Ok(consumer_ext_id) = u8::try_from(consumer_ext.id) { + mapping + .header_extensions + .push(ConsumerHeaderExtensionMapping { + producer_ext_id, + consumer_ext_id, + }); + } + } + } + + mapping +} + struct CodecToMatch<'a> { channels: Option, clock_rate: NonZeroU32, diff --git a/rust/src/ortc/tests.rs b/rust/src/ortc/tests.rs index 589e8ddbd4..20a6428db6 100644 --- a/rust/src/ortc/tests.rs +++ b/rust/src/ortc/tests.rs @@ -455,7 +455,7 @@ fn get_producer_rtp_parameters_mapping_get_consumable_rtp_parameters_get_consume let consumer_rtp_parameters = get_consumer_rtp_parameters( &consumable_rtp_parameters, - &remote_rtp_capabilities, + RemoteRtpSource::Capabilities(&remote_rtp_capabilities), false, true, ) diff --git a/rust/src/router/consumer.rs b/rust/src/router/consumer.rs index 014ba50d18..4108f77c19 100644 --- a/rust/src/router/consumer.rs +++ b/rust/src/router/consumer.rs @@ -130,6 +130,24 @@ pub struct ConsumerOptions { pub ignore_dtx: bool, /// Whether this Consumer should consume all RTP streams generated by the Producer. pub pipe: bool, + /// When set, bypasses the default ORTC-driven `rtpParameters` derivation for + /// this Consumer and is used verbatim as the wire-level `rtpParameters`. + /// The payload types and header-extension ids declared here become the + /// on-wire values for this Consumer. + /// + /// mediasoup validates compatibility against the Producer's + /// `consumableRtpParameters` (1:1 codec mapping, compatible fmtp, matching + /// layer/simulcast structure, header-extension URIs being a subset of the + /// consumable ones) and asks the worker to rewrite outgoing RTP headers + /// accordingly. + /// + /// Intended for advanced scenarios (e.g. WHEP) where the answer SDP + /// dictates wire-level PT / ext-id values that differ from the Router's + /// canonical ones. + /// + /// Not supported together with [`ConsumerOptions::pipe`]=`true` or on a + /// [`PipeTransport`](crate::pipe_transport::PipeTransport). + pub rtp_parameters: Option, /// Custom application data. pub app_data: AppData, } @@ -147,6 +165,7 @@ impl ConsumerOptions { enable_rtx: None, pipe: false, mid: None, + rtp_parameters: None, app_data: AppData::default(), } } diff --git a/rust/src/router/transport.rs b/rust/src/router/transport.rs index 27898e9f19..9a31ff6ada 100644 --- a/rust/src/router/transport.rs +++ b/rust/src/router/transport.rs @@ -394,6 +394,10 @@ pub enum ConsumeError { /// Bad consumer RTP parameters. #[error("Bad consumer RTP parameters: {0}")] BadConsumerRtpParameters(ConsumerRtpParametersError), + /// [`ConsumerOptions::rtp_parameters`] override is not supported together with `pipe=true` + /// or on pipe transports. + #[error("ConsumerOptions.rtp_parameters override is not supported on pipe transports")] + OverrideNotSupportedOnPipe, /// Request to worker failed. #[error("Request to worker failed: {0}")] Request(RequestError), @@ -614,8 +618,18 @@ pub(super) trait TransportImpl: TransportGeneric { enable_rtx, ignore_dtx, pipe, + rtp_parameters: override_rtp_parameters, app_data, } = consumer_options; + + // Caller-provided wire-level rtpParameters are incompatible with pipe + // semantics (which already passes through all encodings verbatim). + if override_rtp_parameters.is_some() + && (pipe || transport_type == TransportType::Pipe) + { + return Err(ConsumeError::OverrideNotSupportedOnPipe); + } + ortc::validate_rtp_capabilities(&rtp_capabilities) .map_err(ConsumeError::FailedRtpCapabilitiesValidation)?; @@ -628,21 +642,39 @@ pub(super) trait TransportImpl: TransportGeneric { let enable_rtx = enable_rtx.unwrap_or(producer.kind() == MediaKind::Video); + let mut consumer_rtp_mapping: Option = None; + let rtp_parameters = if transport_type == TransportType::Pipe { ortc::get_pipe_consumer_rtp_parameters(producer.consumable_rtp_parameters(), rtx) } else { + let source = match &override_rtp_parameters { + Some(o) => ortc::RemoteRtpSource::Parameters(o), + None => ortc::RemoteRtpSource::Capabilities(&rtp_capabilities), + }; + let mut rtp_parameters = ortc::get_consumer_rtp_parameters( producer.consumable_rtp_parameters(), - &rtp_capabilities, + source, pipe, enable_rtx, ) .map_err(ConsumeError::BadConsumerRtpParameters)?; - if !pipe { - // Set MID. + if override_rtp_parameters.is_some() { + consumer_rtp_mapping = Some(ortc::get_consumer_rtp_mapping( + producer.consumable_rtp_parameters(), + &rtp_parameters, + )); + } + + if !pipe && rtp_parameters.mid.is_none() { + // Set MID. Priority: + // 1. mid already set on rtp_parameters (override path, + // taken from the caller-provided rtpParameters.mid). + // 2. ConsumerOptions.mid. + // 3. Auto-generated monotonically increasing integer + // (up to 8 bytes). rtp_parameters.mid = mid.or_else(|| { - // We use up to 8 bytes for MID (string). let next_mid_for_consumers = self .next_mid_for_consumers() .fetch_add(1, Ordering::Relaxed); @@ -681,6 +713,7 @@ pub(super) trait TransportImpl: TransportGeneric { paused, preferred_layers, ignore_dtx, + consumer_rtp_mapping, }, ) .await diff --git a/worker/.vscode/settings.json b/worker/.vscode/settings.json new file mode 100644 index 0000000000..10ed08d728 --- /dev/null +++ b/worker/.vscode/settings.json @@ -0,0 +1,36 @@ +{ + "editor.formatOnSave": true, + "C_Cpp.default.configurationProvider": "mesonbuild.mesonbuild", + "mesonbuild.configureOnOpen": false, + + "clangd.path": "/opt/homebrew/opt/llvm/bin/clangd", + "clangd.arguments": [ + "-log=verbose", + "-pretty", + "--background-index", + "--compile-commands-dir=${workspaceFolder}/worker/out/Release/build", + "--clang-tidy", + // 启用全项目索引(跨文件引用查找更准) + "--all-scopes-completion", + // 补充头文件(当某些头文件找不到时自动插入) + "--header-insertion=iwyu", + // 开启代码补全时显示参数提示 + "--completion-style=detailed", + // 利用多核优化 + "-j=4", + // 兜底风格,虽然有 .clang-format 时会被忽略,但写上是个好习惯 + "--fallback-style=LLVM" + ], + + "[cpp]": { + "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd", + "editor.formatOnSave": true + }, + "[c]": { + "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd", + "editor.formatOnSave": true + }, + "C_Cpp.formatting": "disabled", + "C_Cpp.intelliSenseEngine": "disabled", + "C_Cpp.errorSquiggles": "disabled" +} diff --git a/worker/fbs/rtpParameters.fbs b/worker/fbs/rtpParameters.fbs index 115f901c4b..c93771e9a7 100644 --- a/worker/fbs/rtpParameters.fbs +++ b/worker/fbs/rtpParameters.fbs @@ -127,3 +127,18 @@ table RtpMapping { codecs: [CodecMapping] (required); encodings: [EncodingMapping] (required); } + +table ConsumerCodecMapping { + producer_payload_type: uint8; + consumer_payload_type: uint8; +} + +table ConsumerHeaderExtensionMapping { + producer_ext_id: uint8; + consumer_ext_id: uint8; +} + +table ConsumerRtpMapping { + codecs: [ConsumerCodecMapping] (required); + header_extensions: [ConsumerHeaderExtensionMapping] (required); +} diff --git a/worker/fbs/transport.fbs b/worker/fbs/transport.fbs index 225cdd1171..cd974d95ac 100644 --- a/worker/fbs/transport.fbs +++ b/worker/fbs/transport.fbs @@ -65,6 +65,7 @@ table ConsumeRequest { paused: bool = false; preferred_layers: FBS.Consumer.ConsumerLayers; ignore_dtx: bool = false; + consumer_rtp_mapping: FBS.RtpParameters.ConsumerRtpMapping; } table ConsumeResponse { diff --git a/worker/include/RTC/Consumer.hpp b/worker/include/RTC/Consumer.hpp index 86f6333342..1c8e8d7174 100644 --- a/worker/include/RTC/Consumer.hpp +++ b/worker/include/RTC/Consumer.hpp @@ -18,6 +18,7 @@ #include "RTC/RtpDictionaries.hpp" #include "RTC/Shared.hpp" #include +#include #include #include @@ -195,6 +196,24 @@ namespace RTC uint8_t priority{ 1u }; struct TraceEventTypes traceEventTypes; + // Per-Consumer egress rewrite state. Populated when the ConsumeRequest + // carries a ConsumerRtpMapping; otherwise remains zeroed and the send + // path behaves exactly as before (no rewrite, no overhead beyond a + // single boolean check). + bool hasEgressRemap{ false }; + // egressPayloadTypeMap[producerPt] = wirePt; 0 means no mapping. + std::array egressPayloadTypeMap{}; + // egressHeaderExtensionIdMap[producerId] = wireId; 0 means no mapping. + // Index 0 is unused; indices 1..14 are meaningful for RFC 8285 + // one-byte extension ids. + std::array egressHeaderExtensionIdMap{}; + // Mirror of `rtpHeaderExtensionIds` with every non-zero id routed + // through `egressHeaderExtensionIdMap`. Used as the `newExtIds` + // argument to `Packet::ApplyEgressRewrite` so that the Transport's + // UpdateAbsSendTime / UpdateTransportWideCc01 (which consult the + // packet's own headerExtensionIds) observe the wire ids. + struct RTC::RTP::HeaderExtensionIds egressHeaderExtensionIds; + private: // Others. std::vector mediaSsrcs; diff --git a/worker/include/RTC/RTP/Packet.hpp b/worker/include/RTC/RTP/Packet.hpp index 195024b181..3d753587b2 100644 --- a/worker/include/RTC/RTP/Packet.hpp +++ b/worker/include/RTC/RTP/Packet.hpp @@ -512,6 +512,54 @@ namespace RTC */ void AssignExtensionIds(RTP::HeaderExtensionIds& headerExtensionIds); + /** + * Undo token for `ApplyEgressRewrite`. Keeps the minimum state needed + * to revert the in-place Packet mutations done by the egress rewrite + * so that the same underlying packet buffer can be served to the next + * Consumer in the Router fan-out. + */ + struct EgressRewriteUndo + { + uint8_t origPayloadType{ 0u }; + // Snapshotted producer-side extension layouts. `RevertEgressRewrite` + // walks these to rewrite id bytes in-place (length nibbles in the + // one-byte scheme are preserved across the rewrite) and then + // swaps them back into the live maps. + std::array origOneByteExtensions{ + { -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 } + }; + std::map origTwoBytesExtensions; + RTP::HeaderExtensionIds origHeaderExtensionIds{}; + }; + + /** + * Per-Consumer egress rewrite. Applied in-place on the live packet + * buffer before `RtpStreamSend::ReceivePacket` so that retransmission + * stores observe the wire PT, then reverted after + * `OnConsumerSendRtpPacket` so the next Consumer in the Router + * fan-out sees the pristine packet. + * + * @param newPayloadType The on-wire payload type for this Consumer. + * @param extIdRemap `extIdRemap[oldId] = newId`; 0 means keep as is. + * @param newExtIds Pre-computed `HeaderExtensionIds` that mirrors the + * producer-side ids mapped through `extIdRemap`. Used so that + * `UpdateAbsSendTime` / `UpdateTransportWideCc01` called by the + * Transport observe the wire ids. + * @param undo Out: token for a subsequent `RevertEgressRewrite`. + */ + void ApplyEgressRewrite( + uint8_t newPayloadType, + const std::array& extIdRemap, + const RTP::HeaderExtensionIds& newExtIds, + EgressRewriteUndo& undo); + + /** + * Revert the in-place mutations from a previous + * `ApplyEgressRewrite` call. `undo` must have been initialised by + * a matching `ApplyEgressRewrite`. + */ + void RevertEgressRewrite(const EgressRewriteUndo& undo); + bool ReadMid(std::string& mid) const; bool UpdateMid(const std::string& mid); @@ -937,7 +985,8 @@ namespace RTC */ void SetExtensionLength(uint8_t id, uint8_t len); - /* Pure virtual methods inherited from Codecs::DependencyDescriptor::Listener. */ + /* Pure virtual methods inherited from Codecs::DependencyDescriptor::Listener. + */ public: void OnDependencyDescriptorUpdated(const uint8_t* data, size_t len) override; diff --git a/worker/src/RTC/Consumer.cpp b/worker/src/RTC/Consumer.cpp index 04a8fab12c..77474678de 100644 --- a/worker/src/RTC/Consumer.cpp +++ b/worker/src/RTC/Consumer.cpp @@ -145,12 +145,117 @@ namespace RTC // paused is set to false by default. this->paused = data->paused(); + // Parse the optional per-Consumer egress rewrite mapping. + // + // When this mapping is present the packet on the wire will use the + // "consumer" PTs and extension ids declared by the caller, while the + // internal worker path continues to operate on the Producer-side + // consumable PTs / canonical extension ids. SimpleConsumer / + // SimulcastConsumer / SvcConsumer / PipeConsumer apply + // `Packet::ApplyEgressRewrite` in their SendRtpPacket implementations. + if (data->consumerRtpMapping() != nullptr) + { + const auto* mapping = data->consumerRtpMapping(); + + this->hasEgressRemap = true; + + for (const auto* entry : *mapping->codecs()) + { + const uint8_t producerPt = entry->producerPayloadType(); + const uint8_t consumerPt = entry->consumerPayloadType(); + + if (producerPt == 0u || producerPt >= 128u) + { + MS_THROW_TYPE_ERROR("invalid consumerRtpMapping codec (producer payload type out of range)"); + } + + if (consumerPt == 0u || consumerPt >= 128u) + { + MS_THROW_TYPE_ERROR("invalid consumerRtpMapping codec (consumer payload type out of range)"); + } + + this->egressPayloadTypeMap[producerPt] = consumerPt; + } + + for (const auto* entry : *mapping->headerExtensions()) + { + const uint8_t producerExtId = entry->producerExtId(); + const uint8_t consumerExtId = entry->consumerExtId(); + + if (producerExtId == 0u || producerExtId >= 15u) + { + MS_THROW_TYPE_ERROR( + "invalid consumerRtpMapping header extension (producer ext id out of range)"); + } + + if (consumerExtId == 0u || consumerExtId >= 15u) + { + MS_THROW_TYPE_ERROR( + "invalid consumerRtpMapping header extension (consumer ext id out of range)"); + } + + this->egressHeaderExtensionIdMap[producerExtId] = consumerExtId; + } + + // Build egressHeaderExtensionIds by routing each non-zero id in + // rtpHeaderExtensionIds through the remap table. Fields without a + // remap entry keep their original id (so the view stays consistent + // for extensions the caller did not need to re-label). + this->egressHeaderExtensionIds = this->rtpHeaderExtensionIds; + + auto remap = [this](uint8_t& id) + { + if (id != 0u && this->egressHeaderExtensionIdMap[id] != 0u) + { + id = this->egressHeaderExtensionIdMap[id]; + } + }; + + remap(this->egressHeaderExtensionIds.mid); + remap(this->egressHeaderExtensionIds.rid); + remap(this->egressHeaderExtensionIds.rrid); + remap(this->egressHeaderExtensionIds.absSendTime); + remap(this->egressHeaderExtensionIds.transportWideCc01); + remap(this->egressHeaderExtensionIds.ssrcAudioLevel); + remap(this->egressHeaderExtensionIds.dependencyDescriptor); + remap(this->egressHeaderExtensionIds.videoOrientation); + remap(this->egressHeaderExtensionIds.timeOffset); + remap(this->egressHeaderExtensionIds.absCaptureTime); + remap(this->egressHeaderExtensionIds.playoutDelay); + remap(this->egressHeaderExtensionIds.mediasoupPacketId); + } + // Fill supported codec payload types. - for (auto& codec : this->rtpParameters.codecs) + // + // In the legacy (no-remap) path, `rtpParameters.codecs[i].payloadType` + // is the consumable PT (same space as the packet seen in SendRtpPacket + // after Producer::MangleRtpPacket), so indexing the bitset by it is + // correct. + // + // In the remap path, `rtpParameters.codecs[i].payloadType` holds the + // *wire* PT, and the packet in SendRtpPacket still carries the + // producer-side consumable PT. Iterate `egressPayloadTypeMap` + // directly: a non-zero entry at index `producerPt` means the caller + // declared an egress mapping for that consumable PT (media or RTX), + // so that is the PT we must accept. + if (this->hasEgressRemap) { - if (codec.mimeType.IsMediaCodec()) + for (size_t producerPt = 0u; producerPt < this->egressPayloadTypeMap.size(); ++producerPt) { - this->supportedCodecPayloadTypes[codec.payloadType] = true; + if (this->egressPayloadTypeMap[producerPt] != 0u) + { + this->supportedCodecPayloadTypes[producerPt] = true; + } + } + } + else + { + for (auto& codec : this->rtpParameters.codecs) + { + if (codec.mimeType.IsMediaCodec()) + { + this->supportedCodecPayloadTypes[codec.payloadType] = true; + } } } diff --git a/worker/src/RTC/RTP/Packet.cpp b/worker/src/RTC/RTP/Packet.cpp index 09dc3444ab..afa7ea9449 100644 --- a/worker/src/RTC/RTP/Packet.cpp +++ b/worker/src/RTC/RTP/Packet.cpp @@ -846,6 +846,113 @@ namespace RTC this->headerExtensionIds = headerExtensionIds; } + void Packet::ApplyEgressRewrite( + uint8_t newPayloadType, + const std::array& extIdRemap, + const RTP::HeaderExtensionIds& newExtIds, + EgressRewriteUndo& undo) + { + MS_TRACE(); + + const auto rewriteId = [&extIdRemap](uint8_t oldId) + { + const auto newId = extIdRemap[oldId]; + + return newId == 0u ? oldId : newId; + }; + + undo.origPayloadType = GetFixedHeaderPointer()->payloadType; + undo.origOneByteExtensions = this->oneByteExtensions; + undo.origTwoBytesExtensions = this->twoBytesExtensions; + undo.origHeaderExtensionIds = this->headerExtensionIds; + + GetFixedHeaderPointer()->payloadType = newPayloadType; + + auto* extensionsStart = GetHeaderExtensionValue(); + + if (HasOneByteExtensions()) + { + for (uint8_t oldId = 1u; oldId <= 14u; ++oldId) + { + const auto off = undo.origOneByteExtensions[oldId - 1]; + + if (off == -1) + { + continue; + } + + const auto newId = rewriteId(oldId); + + if (newId == oldId) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = static_cast((newId << 4) | (*idByte & 0x0F)); + + this->oneByteExtensions[newId - 1] = off; + this->oneByteExtensions[oldId - 1] = -1; + } + } + else + { + for (const auto& [oldId, off] : undo.origTwoBytesExtensions) + { + const auto newId = rewriteId(oldId); + + if (newId == oldId) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = newId; + + this->twoBytesExtensions.erase(oldId); + this->twoBytesExtensions[newId] = off; + } + } + + this->headerExtensionIds = newExtIds; + } + + void Packet::RevertEgressRewrite(const EgressRewriteUndo& undo) + { + MS_TRACE(); + + auto* extensionsStart = GetHeaderExtensionValue(); + + if (HasOneByteExtensions()) + { + for (uint8_t oldId = 1u; oldId <= 14u; ++oldId) + { + const auto off = undo.origOneByteExtensions[oldId - 1]; + + if (off == -1) + { + continue; + } + + auto* idByte = extensionsStart + off; + *idByte = static_cast((oldId << 4) | (*idByte & 0x0F)); + } + } + else + { + for (const auto& [oldId, off] : undo.origTwoBytesExtensions) + { + auto* idByte = extensionsStart + off; + *idByte = oldId; + } + } + + this->oneByteExtensions = undo.origOneByteExtensions; + this->twoBytesExtensions = undo.origTwoBytesExtensions; + this->headerExtensionIds = undo.origHeaderExtensionIds; + GetFixedHeaderPointer()->payloadType = undo.origPayloadType; + } + bool Packet::ReadMid(std::string& mid) const { MS_TRACE(); diff --git a/worker/src/RTC/SimpleConsumer.cpp b/worker/src/RTC/SimpleConsumer.cpp index 3b3962e6ff..66065adf27 100644 --- a/worker/src/RTC/SimpleConsumer.cpp +++ b/worker/src/RTC/SimpleConsumer.cpp @@ -321,6 +321,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -346,7 +361,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -440,6 +455,21 @@ namespace RTC packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); packet->SetSequenceNumber(seq); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = packet->GetTimestamp(); packet->logger.sendSeqNumber = seq; @@ -458,7 +488,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -488,11 +518,21 @@ namespace RTC packet->SetSsrc(origSsrc); packet->SetSequenceNumber(origSeq); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/src/RTC/SimulcastConsumer.cpp b/worker/src/RTC/SimulcastConsumer.cpp index 88801252af..e3390d9e69 100644 --- a/worker/src/RTC/SimulcastConsumer.cpp +++ b/worker/src/RTC/SimulcastConsumer.cpp @@ -710,6 +710,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + auto spatialLayer = this->mapMappedSsrcSpatialLayer.at(packet->GetSsrc()); if (!IsActive()) @@ -786,7 +801,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -822,7 +837,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -1094,6 +1109,21 @@ namespace RTC packet->SetSequenceNumber(seq); packet->SetTimestamp(timestamp); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = timestamp; packet->logger.sendSeqNumber = seq; @@ -1114,7 +1144,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -1155,11 +1185,21 @@ namespace RTC // Restore the original payload if needed. packet->RestorePayload(); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/src/RTC/SvcConsumer.cpp b/worker/src/RTC/SvcConsumer.cpp index fbe433fc86..169cfd4e12 100644 --- a/worker/src/RTC/SvcConsumer.cpp +++ b/worker/src/RTC/SvcConsumer.cpp @@ -603,6 +603,21 @@ namespace RTC packet->logger.consumerId = this->id; #endif + // When this Consumer carries a per-Consumer egress PT / ext-id remap, + // do NOT write to the Router's fan-out sharedPacket (which is shared + // across all Consumers of the same Producer). A remap would otherwise + // leak our wire PT / ext-id into every other Consumer's retransmission + // buffer via the same underlying shared slot. + // + // Instead, use a per-call local SharedPacket: our RtpStreamSend + // retransmission buffer will hold the wire-state clone exclusively, + // while the Router's sharedPacket stays untouched (and the next + // non-remap Consumer will keep benefiting from fan-out sharing). + // Cost: one extra clone per packet per remap Consumer. + RTC::RTP::SharedPacket localSharedPacket; + RTC::RTP::SharedPacket& activeSharedPacket = + this->hasEgressRemap ? localSharedPacket : sharedPacket; + if (!IsActive()) { #ifdef MS_RTC_LOGGER_RTP @@ -638,7 +653,7 @@ namespace RTC // Store the packet for the scenario in which this packet is part of the // key frame and it arrived before the first packet of the key frame. - StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket); + StorePacketInTargetLayerRetransmissionBuffer(packet, activeSharedPacket); return; } @@ -735,6 +750,21 @@ namespace RTC packet->SetSsrc(this->rtpParameters.encodings[0].ssrc); packet->SetSequenceNumber(seq); + // Per-Consumer egress PT / ext-id rewrite (no-op unless + // ConsumeRequest carried a ConsumerRtpMapping). Must happen before + // rtpStream->ReceivePacket so the RTX store observes the wire PT. + RTC::RTP::Packet::EgressRewriteUndo egressUndo; + const bool doEgressRemap = this->hasEgressRemap && this->egressPayloadTypeMap[payloadType] != 0u; + + if (doEgressRemap) + { + packet->ApplyEgressRewrite( + this->egressPayloadTypeMap[payloadType], + this->egressHeaderExtensionIdMap, + this->egressHeaderExtensionIds, + egressUndo); + } + #ifdef MS_RTC_LOGGER_RTP packet->logger.sendRtpTimestamp = packet->GetTimestamp(); packet->logger.sendSeqNumber = seq; @@ -755,7 +785,7 @@ namespace RTC } const RTC::RTP::RtpStreamSend::ReceivePacketResult result = - this->rtpStream->ReceivePacket(packet, sharedPacket); + this->rtpStream->ReceivePacket(packet, activeSharedPacket); if (result != RTC::RTP::RtpStreamSend::ReceivePacketResult::DISCARDED) { @@ -790,11 +820,21 @@ namespace RTC // Restore the original payload if needed. packet->RestorePayload(); - // If sharedPacket doesn't have a packet inside and it has been stored we - // need to clone the packet into it. - if (!sharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) + // If the active sharedPacket doesn't have a packet inside and it has + // been stored we need to clone the packet into it. For remap + // Consumers this writes into the per-call localSharedPacket, so the + // clone captures wire PT / ext-ids (RevertEgressRewrite runs below). + if (!activeSharedPacket.HasPacket() && result == RTC::RTP::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED) { - sharedPacket.Assign(packet); + activeSharedPacket.Assign(packet); + } + + // Revert the egress remap on the packet so the next Consumer in the + // Router fan-out (and any further logic in this Consumer that reads + // the packet) sees the pristine Router-canonical PT / ext-id view. + if (doEgressRemap) + { + packet->RevertEgressRewrite(egressUndo); } // If sent packet was the first packet of a key frame, let's send buffered diff --git a/worker/test/src/RTC/RTP/TestPacket.cpp b/worker/test/src/RTC/RTP/TestPacket.cpp index f34377fa14..094a42d021 100644 --- a/worker/test/src/RTC/RTP/TestPacket.cpp +++ b/worker/test/src/RTC/RTP/TestPacket.cpp @@ -2219,4 +2219,179 @@ SCENARIO("RTP Packet", "[serializable][rtp][packet]") // been destroyed. packet->SetBufferReleasedListener(nullptr); } + + SECTION("Packet::ApplyEgressRewrite() and RevertEgressRewrite() on one-byte extensions") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0xbe, 0xde, 0x00, 0x03, // Header Extension + 0x10, 0xaa, 0x21, 0xbb, // - id: 1, len: 1 + 0xff, 0x00, 0x00, 0x33, // - id: 2, len: 2 + 0xff, 0xff, 0xff, 0xff, // - id: 3, len: 4 + 0x12, 0x23 + }; + // clang-format on + + alignas(4) uint8_t snapshotBefore[sizeof(buffer)]; + std::memcpy(snapshotBefore, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + REQUIRE(packet->HasOneByteExtensions()); + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(2)); + REQUIRE(packet->HasExtension(3)); + + // Remap producer ids (1,2,3) -> wire ids (5,7,9), and PT 1 -> 97. + std::array extIdRemap{}; + extIdRemap[1] = 5; + extIdRemap[2] = 7; + extIdRemap[3] = 9; + + RTC::RTP::HeaderExtensionIds newExtIds{}; + // For this unit test we do not care about which URI maps to which id; + // ApplyEgressRewrite copies the struct wholesale into packet-> + // headerExtensionIds. + newExtIds.mid = 5; + newExtIds.rid = 7; + newExtIds.transportWideCc01 = 9; + + RTC::RTP::Packet::EgressRewriteUndo undo; + + packet->ApplyEgressRewrite(97, extIdRemap, newExtIds, undo); + + // After remap the packet is in wire state. + REQUIRE(packet->GetPayloadType() == 97); + REQUIRE(packet->HasExtension(5)); + REQUIRE(packet->HasExtension(7)); + REQUIRE(packet->HasExtension(9)); + REQUIRE_FALSE(packet->HasExtension(1)); + REQUIRE_FALSE(packet->HasExtension(2)); + REQUIRE_FALSE(packet->HasExtension(3)); + + uint8_t len; + auto* val1 = packet->GetExtensionValue(5, len); + REQUIRE(val1 != nullptr); + REQUIRE(len == 1); + REQUIRE(val1[0] == 0xaa); + + auto* val2 = packet->GetExtensionValue(7, len); + REQUIRE(val2 != nullptr); + REQUIRE(len == 2); + + auto* val3 = packet->GetExtensionValue(9, len); + REQUIRE(val3 != nullptr); + REQUIRE(len == 4); + + // Revert and check bit-for-bit equality. + packet->RevertEgressRewrite(undo); + + REQUIRE(packet->GetPayloadType() == 1); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(2)); + REQUIRE(packet->HasExtension(3)); + REQUIRE_FALSE(packet->HasExtension(5)); + REQUIRE_FALSE(packet->HasExtension(7)); + REQUIRE_FALSE(packet->HasExtension(9)); + + REQUIRE(std::memcmp(buffer, snapshotBefore, sizeof(buffer)) == 0); + } + + SECTION("Packet::ApplyEgressRewrite() identity remap is a no-op") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0xbe, 0xde, 0x00, 0x03, + 0x10, 0xaa, 0x21, 0xbb, + 0xff, 0x00, 0x00, 0x33, + 0xff, 0xff, 0xff, 0xff, + 0x12, 0x23 + }; + // clang-format on + + alignas(4) uint8_t snapshot[sizeof(buffer)]; + std::memcpy(snapshot, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + + std::array extIdRemap{}; + // All zeros: identity remap (no-op). + + RTC::RTP::HeaderExtensionIds newExtIds{}; + + RTC::RTP::Packet::EgressRewriteUndo undo; + + // Remap to same PT value with identity ext id remap. + packet->ApplyEgressRewrite(1, extIdRemap, newExtIds, undo); + + // Extension bytes must not have been touched since all remap entries + // are 0. + REQUIRE(std::memcmp(buffer + 16, snapshot + 16, sizeof(buffer) - 16) == 0); + + packet->RevertEgressRewrite(undo); + REQUIRE(std::memcmp(buffer, snapshot, sizeof(buffer)) == 0); + } + + SECTION("Packet::ApplyEgressRewrite() and RevertEgressRewrite() on two-byte extensions") + { + // clang-format off + alignas(4) uint8_t buffer[] = + { + 0x90, 0x01, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x04, + 0x00, 0x00, 0x00, 0x05, + 0x10, 0x00, 0x00, 0x04, // Header Extension (Two-Bytes) + 0x00, 0x00, 0x01, 0x00, // - id: 1, len: 0 + 0x02, 0x01, 0x42, 0x00, // - id: 2, len: 1 + 0x03, 0x02, 0x11, 0x22, // - id: 3, len: 2 + 0x00, 0x00, 0x04, 0x00 // - id: 4, len: 0 + }; + // clang-format on + + alignas(4) uint8_t snapshot[sizeof(buffer)]; + std::memcpy(snapshot, buffer, sizeof(buffer)); + + std::unique_ptr packet{ RTC::RTP::Packet::Parse(buffer, sizeof(buffer)) }; + + REQUIRE(packet); + REQUIRE(packet->HasTwoBytesExtensions()); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(3)); + + std::array extIdRemap{}; + extIdRemap[1] = 11; + extIdRemap[3] = 13; + + RTC::RTP::HeaderExtensionIds newExtIds{}; + RTC::RTP::Packet::EgressRewriteUndo undo; + + packet->ApplyEgressRewrite(42, extIdRemap, newExtIds, undo); + + REQUIRE(packet->GetPayloadType() == 42); + REQUIRE(packet->HasExtension(11)); + REQUIRE(packet->HasExtension(13)); + REQUIRE_FALSE(packet->HasExtension(1)); + REQUIRE_FALSE(packet->HasExtension(3)); + + // Revert and assert bit-for-bit equality. + packet->RevertEgressRewrite(undo); + + REQUIRE(std::memcmp(buffer, snapshot, sizeof(buffer)) == 0); + REQUIRE(packet->HasExtension(1)); + REQUIRE(packet->HasExtension(3)); + REQUIRE_FALSE(packet->HasExtension(11)); + REQUIRE_FALSE(packet->HasExtension(13)); + } }