From 8461707cfb4321b99feaa5da5b2cd57815dd5d52 Mon Sep 17 00:00:00 2001 From: Ariel Molina Date: Tue, 16 Jun 2026 16:40:56 -0600 Subject: [PATCH] feat(moq-mux): cap pending AU growth and add discontinuity() Two additive, internal hardening changes to the codec importers (no wire or breaking API change). Bound the in-progress access unit: H.264/H.265/AV1 append every NAL/OBU to a pending `chunks` buffer and only flush it on a slice/frame, so a stream that never sends one (repeated AUD/SEI/SPS/PPS, or non-frame OBUs) grows it without limit. Every append now goes through a checked helper that bails past MAX_PENDING_FRAME_BYTES (16 MiB), including the cached SPS/PPS/VPS re-inserted before a keyframe. Add `discontinuity()` to signal a timeline boundary (e.g. a GStreamer FLUSH): it closes the open group via `Producer::finish_group()` so the next frame starts a fresh group, and for H.264/H.265/AV1 also drops the partial access unit. Exposed on `Framed` and on every framed codec importer; the moq-gst caller lands with the moqsink promotion (it lives on another branch). CONTEXT Discarded paths: - `seek(0)` as the discontinuity primitive: it rewrites the group sequence explicitly and fabricates false semantics. `finish_group()` closes the group without inventing a sequence. - No-op discontinuity for codecs without a partial AU (VP8/VP9/AAC/Opus): they still `finish_group()` so a FLUSH closes a clean group on every track, not only the video ones. - Toggling lenient_start or dropping deltas until a keyframe inside discontinuity(): that revives `with_lenient_start` under another name. discontinuity() defers to the Producer keyframe contract, forward compatible with the stricter contract in PR #1766. - Leaving the SPS/PPS/VPS re-insertion on a raw `extend_from_slice`: a large cached parameter set would allocate past the cap before the final append checked it. Routing it through the helper validates before mutating. - Including the moq-gst `Pad::flush()` caller here: moqsinkspike lives on the gstsink branch, not main, so the caller ships with the promotion PR. - A `Stream` dispatcher discontinuity(): moqsinkspike only uses `Framed`. Key decisions: - discontinuity() means "timeline boundary", not "clear partial AU if any": all codecs close the group, only H.264/H.265/AV1 also clear the partial AU. - The cap is hardening against malformed input (a normal stream flushes every frame). moq-gst's per-buffer 16 MiB limit does not bound accumulation across buffers. - A discontinuity test followed by a keyframe does not prove finish_group() (the keyframe closes the group anyway), so the Framed-level test feeds a delta after discontinuity() and asserts it does not extend the closed group. Verified faithful by deleting finish_group() and watching it fail. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-mux/src/codec/aac/import.rs | 7 + rs/moq-mux/src/codec/av1/import.rs | 119 +++++++++++++++- rs/moq-mux/src/codec/h264/import.rs | 205 +++++++++++++++++++++++++++- rs/moq-mux/src/codec/h265/import.rs | 114 ++++++++++++++-- rs/moq-mux/src/codec/opus/import.rs | 7 + rs/moq-mux/src/codec/vp8/import.rs | 10 ++ rs/moq-mux/src/codec/vp9/import.rs | 10 ++ rs/moq-mux/src/import.rs | 57 ++++++++ 8 files changed, 514 insertions(+), 15 deletions(-) diff --git a/rs/moq-mux/src/codec/aac/import.rs b/rs/moq-mux/src/codec/aac/import.rs index 366529a97..3ff849b68 100644 --- a/rs/moq-mux/src/codec/aac/import.rs +++ b/rs/moq-mux/src/codec/aac/import.rs @@ -79,6 +79,13 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: close the current group so the next + /// frame starts a fresh group. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + self.track.finish_group()?; + Ok(()) + } + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { let pts = self.pts(pts)?; diff --git a/rs/moq-mux/src/codec/av1/import.rs b/rs/moq-mux/src/codec/av1/import.rs index 0df8ef832..00ebfe64f 100644 --- a/rs/moq-mux/src/codec/av1/import.rs +++ b/rs/moq-mux/src/codec/av1/import.rs @@ -29,6 +29,11 @@ pub struct Import { jitter: MinFrameDuration, } +/// Cap on a single in-progress temporal unit. Without a frame OBU the importer +/// never flushes `chunks`, so a stream of non-frame OBUs (sequence headers, +/// metadata) would grow it unboundedly; bail instead. +const MAX_PENDING_FRAME_BYTES: usize = 16 * 1024 * 1024; + #[derive(Default)] struct Frame { chunks: BytesMut, @@ -36,6 +41,24 @@ struct Frame { contains_frame: bool, } +impl Frame { + fn append_obu(&mut self, obu: &[u8]) -> anyhow::Result<()> { + let next = self.chunks.len() + obu.len(); + anyhow::ensure!( + next <= MAX_PENDING_FRAME_BYTES, + "pending AV1 frame exceeds {MAX_PENDING_FRAME_BYTES} bytes without a frame OBU" + ); + self.chunks.extend_from_slice(obu); + Ok(()) + } + + fn clear_partial(&mut self) { + self.chunks.clear(); + self.contains_keyframe = false; + self.contains_frame = false; + } +} + impl Import { pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer) -> Self { Self { @@ -366,7 +389,7 @@ impl Import { tracing::trace!(?header.obu_type, "parsed OBU"); - self.current.chunks.extend_from_slice(&obu_data); + self.current.append_obu(&obu_data)?; Ok(()) } @@ -418,6 +441,17 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: drop the in-progress temporal unit and + /// close the current group so the next frame starts a fresh group with a + /// keyframe. Tolerant before the track exists. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + self.current.clear_partial(); + if let Some(track) = self.track.as_mut() { + track.finish_group()?; + } + Ok(()) + } + pub fn is_initialized(&self) -> bool { self.track.is_some() } @@ -530,3 +564,86 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for ObuIterator<'a, T> { Some(Ok(obu)) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn importer() -> Import { + let broadcast = moq_net::Broadcast::new(); + let mut producer = broadcast.produce(); + let catalog = crate::catalog::Producer::new(&mut producer).unwrap(); + Import::new(producer, catalog) + } + + fn leb128(mut value: usize) -> Vec { + let mut out = Vec::new(); + loop { + let mut byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + out.push(byte); + if value == 0 { + break; + } + } + out + } + + /// A Metadata OBU (not a frame) so the importer accumulates without flushing. + fn metadata_obu(payload: usize) -> bytes::BytesMut { + // header 0x2A: obu_type 5 (Metadata), obu_has_size_field = 1. + let mut buf = bytes::BytesMut::from(&[0x2Au8][..]); + buf.extend_from_slice(&leb128(payload)); + buf.extend(std::iter::repeat_n(0xFFu8, payload)); + buf + } + + /// Repeated non-frame OBUs (never a frame) must bail instead of growing forever. + #[tokio::test(start_paused = true)] + async fn caps_pending_non_frame_accumulation() { + let mut import = importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut metadata_obu(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first non-frame buffer fits"); + + let err = import + .decode_frame( + &mut metadata_obu(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect_err("second buffer must exceed the cap"); + assert!(err.to_string().contains("without a frame OBU"), "got: {err}"); + } + + /// discontinuity() drops the partial temporal unit so accumulation restarts + /// from zero, and is tolerant before any track exists. + #[tokio::test(start_paused = true)] + async fn discontinuity_resets_accumulation() { + let mut import = importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut metadata_obu(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first buffer fits"); + import + .discontinuity() + .expect("discontinuity clears the partial temporal unit"); + import + .decode_frame( + &mut metadata_obu(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect("post-discontinuity buffer fits because the cap reset"); + } +} diff --git a/rs/moq-mux/src/codec/h264/import.rs b/rs/moq-mux/src/codec/h264/import.rs index 806edb44a..57aaa60f9 100644 --- a/rs/moq-mux/src/codec/h264/import.rs +++ b/rs/moq-mux/src/codec/h264/import.rs @@ -53,6 +53,11 @@ enum State { }, } +/// Cap on a single in-progress access unit. Without a slice the importer never +/// flushes `chunks`, so a stream of non-VCL NALs (AUD/SEI/SPS/PPS) would grow it +/// unboundedly; bail instead. +const MAX_PENDING_FRAME_BYTES: usize = 16 * 1024 * 1024; + #[derive(Default)] struct Avc3Frame { chunks: BytesMut, @@ -62,6 +67,27 @@ struct Avc3Frame { contains_pps: bool, } +impl Avc3Frame { + fn append_nal(&mut self, nal: &[u8]) -> anyhow::Result<()> { + let next = self.chunks.len() + START_CODE.len() + nal.len(); + anyhow::ensure!( + next <= MAX_PENDING_FRAME_BYTES, + "pending H.264 frame exceeds {MAX_PENDING_FRAME_BYTES} bytes without a slice" + ); + self.chunks.extend_from_slice(&START_CODE); + self.chunks.extend_from_slice(nal); + Ok(()) + } + + fn clear_partial(&mut self) { + self.chunks.clear(); + self.contains_idr = false; + self.contains_slice = false; + self.contains_sps = false; + self.contains_pps = false; + } +} + impl Import { pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer) -> Self { Self { @@ -347,15 +373,13 @@ impl Import { if !current.contains_sps && let Some(sps) = sps.as_ref() { - current.chunks.extend_from_slice(&START_CODE); - current.chunks.extend_from_slice(sps); + current.append_nal(sps)?; current.contains_sps = true; } if !current.contains_pps && let Some(pps) = pps.as_ref() { - current.chunks.extend_from_slice(&START_CODE); - current.chunks.extend_from_slice(pps); + current.append_nal(pps)?; current.contains_pps = true; } current.contains_idr = true; @@ -381,8 +405,7 @@ impl Import { let State::Avc3 { current, .. } = &mut self.state else { unreachable!() }; - current.chunks.extend_from_slice(&START_CODE); - current.chunks.extend_from_slice(&nal); + current.append_nal(&nal)?; Ok(()) } @@ -484,6 +507,19 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: drop the in-progress access unit and + /// close the current group so the next frame starts a fresh group with a + /// keyframe. Tolerant before the track exists. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + if let State::Avc3 { current, .. } = &mut self.state { + current.clear_partial(); + } + if let Some(track) = self.track.as_mut() { + track.finish_group()?; + } + Ok(()) + } + fn pts(&mut self, hint: Option) -> anyhow::Result { if let Some(pts) = hint { return Ok(pts); @@ -632,6 +668,163 @@ mod tests { assert_eq!(h264.profile, sps[1]); assert_eq!(h264.level, sps[3]); } + + fn avc3_importer() -> (Import, crate::catalog::Producer) { + let broadcast = moq_net::Broadcast::new(); + let mut producer = broadcast.produce(); + let catalog = crate::catalog::Producer::new(&mut producer).unwrap(); + let import = Import::new(producer, catalog.clone()).with_mode(Mode::Avc3).unwrap(); + (import, catalog) + } + + /// A SEI NAL (no slice) so the importer accumulates without ever flushing. + fn sei_nal(payload: usize) -> bytes::BytesMut { + let mut buf = bytes::BytesMut::from(&[0x00, 0x00, 0x00, 0x01, 0x06][..]); + buf.extend(std::iter::repeat_n(0xFFu8, payload)); + buf + } + + /// Repeated non-VCL NALs (never a slice) must bail instead of growing forever. + #[tokio::test(start_paused = true)] + async fn caps_pending_non_vcl_accumulation() { + let (mut import, _catalog) = avc3_importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut sei_nal(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first non-VCL buffer fits"); + + let err = import + .decode_frame( + &mut sei_nal(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect_err("second buffer must exceed the cap"); + assert!(err.to_string().contains("without a slice"), "got: {err}"); + } + + /// discontinuity() drops the partial AU so accumulation restarts from zero. + #[tokio::test(start_paused = true)] + async fn discontinuity_resets_accumulation() { + let (mut import, _catalog) = avc3_importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut sei_nal(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first buffer fits"); + import.discontinuity().expect("discontinuity clears the partial AU"); + import + .decode_frame( + &mut sei_nal(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect("post-discontinuity buffer fits because the cap reset"); + } + + /// After a discontinuity the next access unit must not carry leftover bytes + /// from the dropped partial AU. + #[tokio::test(start_paused = true)] + async fn discontinuity_drops_partial_au_without_leaking() { + let sps: &[u8] = &[ + 0x67, 0x42, 0xc0, 0x1f, 0xda, 0x01, 0x40, 0x16, 0xe9, 0xb8, 0x08, 0x08, 0x0a, 0x00, 0x00, 0x07, 0xd0, 0x00, + 0x01, 0xd4, 0xc0, 0x80, + ]; + let pps: &[u8] = &[0x68, 0xce, 0x3c, 0x80]; + let idr: &[u8] = &[0x65, 0x88, 0x84, 0x00]; + + let access_unit = || { + let mut buf = bytes::BytesMut::new(); + for nal in [sps, pps, idr] { + buf.extend_from_slice(&[0, 0, 0, 1]); + buf.extend_from_slice(nal); + } + buf + }; + let partial = || { + let mut buf = bytes::BytesMut::new(); + for nal in [sps, pps] { + buf.extend_from_slice(&[0, 0, 0, 1]); + buf.extend_from_slice(nal); + } + buf + }; + + let (mut import, _catalog) = avc3_importer(); + let consumer = import.track().unwrap().consume(); + let mut media = crate::container::Consumer::new(consumer, crate::catalog::hang::Container::Legacy); + + import + .decode_frame( + &mut access_unit(), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .unwrap(); + import + .decode_frame( + &mut partial(), + Some(crate::container::Timestamp::from_micros(33_000).unwrap()), + ) + .unwrap(); + import.discontinuity().unwrap(); + import + .decode_frame( + &mut access_unit(), + Some(crate::container::Timestamp::from_micros(66_000).unwrap()), + ) + .unwrap(); + + let first = media.read().await.unwrap().unwrap(); + let second = media.read().await.unwrap().unwrap(); + assert_eq!( + first.payload, second.payload, + "the post-discontinuity AU must match a clean AU, not carry the dropped partial" + ); + } + + /// Cached SPS/PPS re-inserted before a keyframe also go through the capped + /// append, so a large parameter set can't grow the partial AU past the limit + /// before the final append checks it. + #[tokio::test(start_paused = true)] + async fn caps_reinserted_parameter_sets() { + let sps: &[u8] = &[ + 0x67, 0x42, 0xc0, 0x1f, 0xda, 0x01, 0x40, 0x16, 0xe9, 0xb8, 0x08, 0x08, 0x0a, 0x00, 0x00, 0x07, 0xd0, 0x00, + 0x01, 0xd4, 0xc0, 0x80, + ]; + let mut pps = vec![0x68u8]; // PPS NAL header; payload is opaque (not parsed, just cached). + pps.extend(std::iter::repeat_n(0xFFu8, 9 * 1024 * 1024)); + let idr: &[u8] = &[0x65, 0x88, 0x84, 0x00]; + + let (mut import, _catalog) = avc3_importer(); + + // AU1 fits on its own and caches the SPS + large PPS. + let mut au1 = bytes::BytesMut::new(); + for nal in [sps, pps.as_slice(), idr] { + au1.extend_from_slice(&[0, 0, 0, 1]); + au1.extend_from_slice(nal); + } + import + .decode_frame(&mut au1, Some(crate::container::Timestamp::from_micros(0).unwrap())) + .expect("AU1 fits under the cap"); + + // AU2: 8 MiB of SEI, then a keyframe with no inline PPS. Re-inserting the + // cached 9 MiB PPS must trip the cap instead of allocating past it. + let mut au2 = sei_nal(8 * 1024 * 1024); + au2.extend_from_slice(&[0, 0, 0, 1]); + au2.extend_from_slice(idr); + let err = import + .decode_frame( + &mut au2, + Some(crate::container::Timestamp::from_micros(33_000).unwrap()), + ) + .expect_err("re-inserted PPS must exceed the cap"); + assert!(err.to_string().contains("without a slice"), "got: {err}"); + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, num_enum::TryFromPrimitive)] diff --git a/rs/moq-mux/src/codec/h265/import.rs b/rs/moq-mux/src/codec/h265/import.rs index c329fdfdb..06121e504 100644 --- a/rs/moq-mux/src/codec/h265/import.rs +++ b/rs/moq-mux/src/codec/h265/import.rs @@ -254,22 +254,19 @@ impl Import { if !self.current.contains_vps && let Some(vps) = &self.vps { - self.current.chunks.extend_from_slice(&START_CODE); - self.current.chunks.extend_from_slice(vps); + self.current.append_nal(vps)?; self.current.contains_vps = true; } if !self.current.contains_sps && let Some(sps) = &self.sps { - self.current.chunks.extend_from_slice(&START_CODE); - self.current.chunks.extend_from_slice(sps); + self.current.append_nal(sps)?; self.current.contains_sps = true; } if !self.current.contains_pps && let Some(pps) = &self.pps { - self.current.chunks.extend_from_slice(&START_CODE); - self.current.chunks.extend_from_slice(pps); + self.current.append_nal(pps)?; self.current.contains_pps = true; } @@ -298,8 +295,7 @@ impl Import { // Replace the original start code with a canonical 4-byte start code (marginally easier // for downstream players, e.g. MSE). - self.current.chunks.extend_from_slice(&START_CODE); - self.current.chunks.extend_from_slice(&nal); + self.current.append_nal(&nal)?; Ok(()) } @@ -352,6 +348,17 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: drop the in-progress access unit and + /// close the current group so the next frame starts a fresh group with a + /// keyframe. Tolerant before the track exists. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + self.current.clear_partial(); + if let Some(track) = self.track.as_mut() { + track.finish_group()?; + } + Ok(()) + } + pub fn is_initialized(&self) -> bool { self.track.is_some() } @@ -377,6 +384,11 @@ impl Drop for Import { } } +/// Cap on a single in-progress access unit. Without a slice the importer never +/// flushes `chunks`, so a stream of non-VCL NALs (AUD/SEI/VPS/SPS/PPS) would grow +/// it unboundedly; bail instead. +const MAX_PENDING_FRAME_BYTES: usize = 16 * 1024 * 1024; + #[derive(Default)] struct Frame { chunks: BytesMut, @@ -387,6 +399,28 @@ struct Frame { contains_pps: bool, } +impl Frame { + fn append_nal(&mut self, nal: &[u8]) -> anyhow::Result<()> { + let next = self.chunks.len() + START_CODE.len() + nal.len(); + anyhow::ensure!( + next <= MAX_PENDING_FRAME_BYTES, + "pending H.265 frame exceeds {MAX_PENDING_FRAME_BYTES} bytes without a slice" + ); + self.chunks.extend_from_slice(&START_CODE); + self.chunks.extend_from_slice(nal); + Ok(()) + } + + fn clear_partial(&mut self) { + self.chunks.clear(); + self.contains_idr = false; + self.contains_slice = false; + self.contains_vps = false; + self.contains_sps = false; + self.contains_pps = false; + } +} + #[derive(Default)] struct VuiData { framerate: Option, @@ -444,3 +478,67 @@ fn aspect_ratio_from_idc(idc: scuffle_h265::AspectRatioIdc) -> Option<(u32, u32) _ => None, // Reserved } } + +#[cfg(test)] +mod tests { + use super::*; + + fn importer() -> Import { + let broadcast = moq_net::Broadcast::new(); + let mut producer = broadcast.produce(); + let catalog = crate::catalog::Producer::new(&mut producer).unwrap(); + Import::new(producer, catalog) + } + + /// An AUD NAL (no slice) so the importer accumulates without ever flushing. + fn aud_nal(payload: usize) -> bytes::BytesMut { + // 2-byte header: nal_unit_type 35 (AUD_NUT) => first byte (35 << 1) = 0x46. + let mut buf = bytes::BytesMut::from(&[0x00, 0x00, 0x00, 0x01, 0x46, 0x01][..]); + buf.extend(std::iter::repeat_n(0xFFu8, payload)); + buf + } + + /// Repeated non-VCL NALs (never a slice) must bail instead of growing forever. + #[tokio::test(start_paused = true)] + async fn caps_pending_non_vcl_accumulation() { + let mut import = importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut aud_nal(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first non-VCL buffer fits"); + + let err = import + .decode_frame( + &mut aud_nal(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect_err("second buffer must exceed the cap"); + assert!(err.to_string().contains("without a slice"), "got: {err}"); + } + + /// discontinuity() drops the partial AU so accumulation restarts from zero, + /// and is tolerant before any track exists. + #[tokio::test(start_paused = true)] + async fn discontinuity_resets_accumulation() { + let mut import = importer(); + let chunk = 9 * 1024 * 1024; + + import + .decode_frame( + &mut aud_nal(chunk), + Some(crate::container::Timestamp::from_micros(0).unwrap()), + ) + .expect("first buffer fits"); + import.discontinuity().expect("discontinuity clears the partial AU"); + import + .decode_frame( + &mut aud_nal(chunk), + Some(crate::container::Timestamp::from_micros(1).unwrap()), + ) + .expect("post-discontinuity buffer fits because the cap reset"); + } +} diff --git a/rs/moq-mux/src/codec/opus/import.rs b/rs/moq-mux/src/codec/opus/import.rs index a1357401d..fefde03d5 100644 --- a/rs/moq-mux/src/codec/opus/import.rs +++ b/rs/moq-mux/src/codec/opus/import.rs @@ -77,6 +77,13 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: close the current group so the next + /// frame starts a fresh group. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + self.track.finish_group()?; + Ok(()) + } + pub fn decode(&mut self, buf: &mut T, pts: Option) -> anyhow::Result<()> { let pts = self.pts(pts)?; diff --git a/rs/moq-mux/src/codec/vp8/import.rs b/rs/moq-mux/src/codec/vp8/import.rs index e1a1bd48f..49e9f0591 100644 --- a/rs/moq-mux/src/codec/vp8/import.rs +++ b/rs/moq-mux/src/codec/vp8/import.rs @@ -159,6 +159,16 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: close the current group so the next + /// frame starts a fresh group with a keyframe. Tolerant before the track + /// exists. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + if let Some(track) = self.track.as_mut() { + track.finish_group()?; + } + Ok(()) + } + pub fn is_initialized(&self) -> bool { self.track.is_some() } diff --git a/rs/moq-mux/src/codec/vp9/import.rs b/rs/moq-mux/src/codec/vp9/import.rs index 9ba89ca66..c30ef4db3 100644 --- a/rs/moq-mux/src/codec/vp9/import.rs +++ b/rs/moq-mux/src/codec/vp9/import.rs @@ -159,6 +159,16 @@ impl Import { Ok(()) } + /// Signal a timeline discontinuity: close the current group so the next + /// frame starts a fresh group with a keyframe. Tolerant before the track + /// exists. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + if let Some(track) = self.track.as_mut() { + track.finish_group()?; + } + Ok(()) + } + pub fn is_initialized(&self) -> bool { self.track.is_some() } diff --git a/rs/moq-mux/src/import.rs b/rs/moq-mux/src/import.rs index 58f42c777..7fccf5399 100644 --- a/rs/moq-mux/src/import.rs +++ b/rs/moq-mux/src/import.rs @@ -301,6 +301,23 @@ impl Framed { } } + /// Signal a timeline discontinuity (e.g. a GStreamer FLUSH): drop any + /// codec partial state and close the current group so the next frame starts + /// a fresh group with a keyframe. No-op for container formats that manage + /// their own framing. + pub fn discontinuity(&mut self) -> anyhow::Result<()> { + match self.decoder { + FramedKind::H264(ref mut decoder) => decoder.discontinuity(), + FramedKind::Hev1(ref mut decoder) => decoder.discontinuity(), + FramedKind::Av01(ref mut decoder) => decoder.discontinuity(), + FramedKind::Vp8(ref mut decoder) => decoder.discontinuity(), + FramedKind::Vp9(ref mut decoder) => decoder.discontinuity(), + FramedKind::Aac(ref mut decoder) => decoder.discontinuity(), + FramedKind::Opus(ref mut decoder) => decoder.discontinuity(), + FramedKind::Fmp4(_) | FramedKind::Mkv(_) | FramedKind::Ts(_) | FramedKind::Flv(_) => Ok(()), + } + } + /// Return the single track produced by this importer. pub fn track(&self) -> anyhow::Result<&moq_net::TrackProducer> { match self.decoder { @@ -494,6 +511,46 @@ mod tests { .unwrap_err(); assert!(err.to_string().contains("fixed track cannot be reconfigured")); } + + /// discontinuity() must close the open group, not just clear the partial AU: + /// a non-keyframe arriving after it cannot anchor the closed group and is + /// dropped by lenient start. Without `finish_group()` the delta would extend + /// the keyframe's group as a second frame, so this fails if it regresses. + #[tokio::test(start_paused = true)] + async fn discontinuity_closes_group_so_following_delta_is_dropped() { + let (broadcast, catalog) = new_broadcast(); + let init = h264_init(); + let mut init = init.as_slice(); + let mut framed = Framed::new(broadcast, catalog, FramedFormat::Avc3, &mut init).unwrap(); + + let consumer = framed.track().unwrap().consume(); + let mut media = crate::container::Consumer::new(consumer, crate::catalog::hang::Container::Legacy); + + // Keyframe AU: just an IDR slice; the importer re-inserts the cached SPS/PPS. + let mut keyframe = Bytes::from_static(&[0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x84, 0x00]); + framed + .decode_frame(&mut keyframe, Some(Timestamp::from_micros(0).unwrap())) + .unwrap(); + + framed.discontinuity().unwrap(); + + // Non-IDR slice (first_mb flag set) after the discontinuity. + let mut delta = Bytes::from_static(&[0x00, 0x00, 0x00, 0x01, 0x41, 0x80, 0x00, 0x00]); + framed + .decode_frame(&mut delta, Some(Timestamp::from_micros(33_000).unwrap())) + .unwrap(); + + framed.finish().unwrap(); + + let mut frames = 0; + while media.read().await.unwrap().is_some() { + frames += 1; + } + assert_eq!( + frames, 1, + "the delta after discontinuity() must not extend the closed group" + ); + } } // -- stream dispatcher --