Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions rs/moq-mux/src/codec/aac/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ impl<E: CatalogExt> Import<E> {
Ok(())
}

/// Signal a timeline discontinuity: close the current group so the next
/// frame starts a fresh group.
pub fn discontinuity(&mut self) -> anyhow::Result<()> {
self.track.finish_group()?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? each frame should be its own group

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, for AAC/Opus each frame already calls finish_group() after write() so discontinuity(..) here just refinishes a closed group already (no-op), I'll drop discontinuuty from the audio importers and keep it only on video

Ok(())
}

pub fn decode<T: Buf>(&mut self, buf: &mut T, pts: Option<crate::container::Timestamp>) -> anyhow::Result<()> {
let pts = self.pts(pts)?;

Expand Down
119 changes: 118 additions & 1 deletion rs/moq-mux/src/codec/av1/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,36 @@ pub struct Import {
jitter: MinFrameDuration,
}

/// Cap on a single in-progress temporal unit. Without a frame OBU the importer
/// never flushes `chunks`, so a stream of non-frame OBUs (sequence headers,
/// metadata) would grow it unboundedly; bail instead.
const MAX_PENDING_FRAME_BYTES: usize = 16 * 1024 * 1024;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should export moq_net::MAX_FRAME_SIZE and use that instead?

@arielmol arielmol Jun 17, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should export moq_net::MAX_FRAME_SIZE and use that instead?

Agreed


#[derive(Default)]
struct Frame {
chunks: BytesMut,
contains_keyframe: bool,
contains_frame: bool,
}

impl Frame {
fn append_obu(&mut self, obu: &[u8]) -> anyhow::Result<()> {
let next = self.chunks.len() + obu.len();
anyhow::ensure!(
next <= MAX_PENDING_FRAME_BYTES,
"pending AV1 frame exceeds {MAX_PENDING_FRAME_BYTES} bytes without a frame OBU"
);
self.chunks.extend_from_slice(obu);
Ok(())
}

fn clear_partial(&mut self) {
self.chunks.clear();
self.contains_keyframe = false;
self.contains_frame = false;
}
}

impl Import {
pub fn new(broadcast: moq_net::BroadcastProducer, catalog: crate::catalog::Producer) -> Self {
Self {
Expand Down Expand Up @@ -366,7 +389,7 @@ impl Import {

tracing::trace!(?header.obu_type, "parsed OBU");

self.current.chunks.extend_from_slice(&obu_data);
self.current.append_obu(&obu_data)?;

Ok(())
}
Expand Down Expand Up @@ -418,6 +441,17 @@ impl Import {
Ok(())
}

/// Signal a timeline discontinuity: drop the in-progress temporal unit and
/// close the current group so the next frame starts a fresh group with a
/// keyframe. Tolerant before the track exists.
pub fn discontinuity(&mut self) -> anyhow::Result<()> {

@kixelated kixelated Jun 17, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK I really don't like this TS stuff leaking everywhere. Partially writing a frame is just gross.

I'm architecting moq-mux on dev now. There's a separate av1::Split module that finds frame boundaries, while the dumb av1::Import takes entire frames (not a stream). TS and stdin can use av1::Split, while everybody else uses av1::Import directly instead.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your call, just two questions:

  1. is it ok if I land just the pending AU cap here for now?
  2. once Split exists, where should a FLUSH hook in? SPlit or still surfaced via Framed? (moqsink will need this)

self.current.clear_partial();
if let Some(track) = self.track.as_mut() {
track.finish_group()?;
}
Ok(())
}

pub fn is_initialized(&self) -> bool {
self.track.is_some()
}
Expand Down Expand Up @@ -530,3 +564,86 @@ impl<'a, T: Buf + AsRef<[u8]> + 'a> Iterator for ObuIterator<'a, T> {
Some(Ok(obu))
}
}

#[cfg(test)]
mod tests {
use super::*;

fn importer() -> Import {
let broadcast = moq_net::Broadcast::new();
let mut producer = broadcast.produce();
let catalog = crate::catalog::Producer::new(&mut producer).unwrap();
Import::new(producer, catalog)
}

fn leb128(mut value: usize) -> Vec<u8> {
let mut out = Vec::new();
loop {
let mut byte = (value & 0x7f) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
out.push(byte);
if value == 0 {
break;
}
}
out
}

/// A Metadata OBU (not a frame) so the importer accumulates without flushing.
fn metadata_obu(payload: usize) -> bytes::BytesMut {
// header 0x2A: obu_type 5 (Metadata), obu_has_size_field = 1.
let mut buf = bytes::BytesMut::from(&[0x2Au8][..]);
buf.extend_from_slice(&leb128(payload));
buf.extend(std::iter::repeat_n(0xFFu8, payload));
buf
}

/// Repeated non-frame OBUs (never a frame) must bail instead of growing forever.
#[tokio::test(start_paused = true)]
async fn caps_pending_non_frame_accumulation() {
let mut import = importer();
let chunk = 9 * 1024 * 1024;

import
.decode_frame(
&mut metadata_obu(chunk),
Some(crate::container::Timestamp::from_micros(0).unwrap()),
)
.expect("first non-frame buffer fits");

let err = import
.decode_frame(
&mut metadata_obu(chunk),
Some(crate::container::Timestamp::from_micros(1).unwrap()),
)
.expect_err("second buffer must exceed the cap");
assert!(err.to_string().contains("without a frame OBU"), "got: {err}");
}

/// discontinuity() drops the partial temporal unit so accumulation restarts
/// from zero, and is tolerant before any track exists.
#[tokio::test(start_paused = true)]
async fn discontinuity_resets_accumulation() {
let mut import = importer();
let chunk = 9 * 1024 * 1024;

import
.decode_frame(
&mut metadata_obu(chunk),
Some(crate::container::Timestamp::from_micros(0).unwrap()),
)
.expect("first buffer fits");
import
.discontinuity()
.expect("discontinuity clears the partial temporal unit");
import
.decode_frame(
&mut metadata_obu(chunk),
Some(crate::container::Timestamp::from_micros(1).unwrap()),
)
.expect("post-discontinuity buffer fits because the cap reset");
}
}
Loading
Loading