diff --git a/.changeset/av1-packet-trailers.md b/.changeset/av1-packet-trailers.md new file mode 100644 index 000000000..220c648f5 --- /dev/null +++ b/.changeset/av1-packet-trailers.md @@ -0,0 +1,8 @@ +--- +webrtc-sys: patch +libwebrtc: patch +livekit: patch +livekit-ffi: patch +--- + +Fix AV1 subscriber decode when packet trailers are enabled. diff --git a/.changeset/local-video-pipeline-timing.md b/.changeset/local-video-pipeline-timing.md new file mode 100644 index 000000000..3e02ceb76 --- /dev/null +++ b/.changeset/local-video-pipeline-timing.md @@ -0,0 +1,7 @@ +--- +webrtc-sys: patch +libwebrtc: patch +livekit: patch +--- + +Add native video pipeline timing instrumentation for local video publisher measurements. diff --git a/examples/local_video/README.md b/examples/local_video/README.md index 86feee7c7..dda041861 100644 --- a/examples/local_video/README.md +++ b/examples/local_video/README.md @@ -26,7 +26,7 @@ Publisher usage: --room-name demo \ --identity cam-1 \ --simulcast \ - --h265 \ + --codec h265 \ --max-bitrate 1500000 \ --url https://your.livekit.server \ --api-key YOUR_KEY \ @@ -89,7 +89,7 @@ Publisher flags (in addition to the common connection flags above): - `--width `: Desired capture width (default: `1280`). - `--height `: Desired capture height (default: `720`). - `--fps `: Desired capture framerate (default: `30`). -- `--h265`: Use H.265/HEVC encoding if supported (falls back to H.264 on failure). +- `--codec `: Video codec to use for publishing: `h264`, `h265`, `vp8`, `vp9`, or `av1` (default: `h264`). H.265 falls back to H.264 on failure. - `--simulcast`: Publish simulcast video (multiple layers when the resolution is large enough). - `--max-bitrate `: Max video bitrate for the main (highest) layer in bits per second (e.g. `1500000`). - `--attach-timestamp`: Attach the current wall-clock time (microseconds since UNIX epoch) as the user timestamp on each published frame. The subscriber can display this to measure end-to-end latency. diff --git a/examples/local_video/src/codec_display.rs b/examples/local_video/src/codec_display.rs new file mode 100644 index 000000000..fff05f1d4 --- /dev/null +++ b/examples/local_video/src/codec_display.rs @@ -0,0 +1,67 @@ +#[allow(dead_code)] +pub(crate) fn codec_from_mime(mime: &str) -> String { + let base = mime.split(';').next().unwrap_or(mime).trim(); + let last = base.rsplit('/').next().unwrap_or(base).trim(); + last.to_ascii_uppercase() +} + +pub(crate) fn codec_with_implementation(codec: &str, implementation: &str) -> String { + let codec = if codec.is_empty() { "Unknown" } else { codec }; + let Some(implementation) = implementation_label(implementation) else { + return codec.to_string(); + }; + + format!("{codec} {implementation}") +} + +fn implementation_label(implementation: &str) -> Option { + let implementation = implementation.trim(); + if implementation.is_empty() { + return None; + } + + let lower = implementation.to_ascii_lowercase(); + if lower.contains("nvidia") { + return Some(if lower.contains("decoder") { "NVDEC" } else { "NVENC" }.to_string()); + } + if lower.contains("vaapi") { + return Some("VAAPI".to_string()); + } + if lower.contains("videotoolbox") { + return Some("VideoToolbox".to_string()); + } + if lower.contains("openh264") { + return Some("OpenH264".to_string()); + } + if lower.contains("libvpx") { + return Some("libvpx".to_string()); + } + if lower.contains("libaom") { + return Some("libaom".to_string()); + } + + Some( + implementation + .strip_suffix(" Encoder") + .or_else(|| implementation.strip_suffix(" Decoder")) + .unwrap_or(implementation) + .to_string(), + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extracts_codec_from_mime_type() { + assert_eq!(codec_from_mime("video/H264;profile-level-id=42e01f"), "H264"); + } + + #[test] + fn shortens_common_hardware_implementations() { + assert_eq!(codec_with_implementation("H264", "NVIDIA H264 Encoder"), "H264 NVENC"); + assert_eq!(codec_with_implementation("H265", "VAAPI H264 Encoder"), "H265 VAAPI"); + assert_eq!(codec_with_implementation("H264", "NVIDIA H264 Decoder"), "H264 NVDEC"); + } +} diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index 62af1cd8b..4737ff7a9 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use clap::Parser; +use clap::{Parser, ValueEnum}; use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::options::{ self, video as video_presets, PacketTrailerFeatures, TrackPublishOptions, VideoCodec, @@ -20,6 +20,7 @@ use nokhwa::utils::{ }; use nokhwa::Camera; use parking_lot::Mutex; +use std::collections::{HashMap, VecDeque}; use std::env; use std::sync::{ atomic::{AtomicBool, Ordering}, @@ -28,15 +29,37 @@ use std::sync::{ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use yuv_sys; +mod codec_display; mod test_pattern; mod timestamp_burn; mod video_display; mod viewport_aspect; use test_pattern::TestPattern; -use timestamp_burn::{LatencyDisplay, TimestampOverlay}; +use timestamp_burn::TimestampOverlay; use video_display::{align_up, PublisherTimingSample, SharedYuv}; +#[derive(Copy, Clone, Debug, ValueEnum)] +enum PublisherCodec { + H264, + H265, + VP8, + VP9, + AV1, +} + +impl From for VideoCodec { + fn from(codec: PublisherCodec) -> Self { + match codec { + PublisherCodec::H264 => VideoCodec::H264, + PublisherCodec::H265 => VideoCodec::H265, + PublisherCodec::VP8 => VideoCodec::VP8, + PublisherCodec::VP9 => VideoCodec::VP9, + PublisherCodec::AV1 => VideoCodec::AV1, + } + } +} + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -100,9 +123,9 @@ struct Args { #[arg(long)] api_secret: Option, - /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) - #[arg(long, default_value_t = false)] - h265: bool, + /// Video codec to use for publishing + #[arg(long, value_enum, default_value_t = PublisherCodec::H264)] + codec: PublisherCodec, /// Attach the current system time (microseconds since UNIX epoch) as the user timestamp on each frame #[arg(long, default_value_t = false)] @@ -151,12 +174,6 @@ fn normalize_twirp_host(url: &str) -> String { url.trim_end_matches("/rtc").to_string() } -/// Format the us delta as a millisecond string like `"12.3ms"`. -fn format_us_delta_ms(later_us: u64, earlier_us: u64) -> String { - let delta_us = later_us.saturating_sub(earlier_us); - format!("{:.1}ms", delta_us as f64 / 1_000.0) -} - #[derive(Default)] struct RollingMs { total_ms: f64, @@ -189,6 +206,59 @@ struct PublisherTimingSummary { capture_to_webrtc_total_ms: RollingMs, } +fn find_video_outbound_encoder(stats: &[livekit::webrtc::stats::RtcStats]) -> Option<&str> { + let mut fallback = None; + for stat in stats { + let livekit::webrtc::stats::RtcStats::OutboundRtp(outbound) = stat else { + continue; + }; + if outbound.stream.kind != "video" || outbound.outbound.encoder_implementation.is_empty() { + continue; + } + + let implementation = outbound.outbound.encoder_implementation.as_str(); + if outbound.outbound.active { + return Some(implementation); + } + fallback.get_or_insert(implementation); + } + + fallback +} + +async fn update_publisher_encoder_overlay( + track: LocalVideoTrack, + shared: Arc>, + ctrl_c_received: Arc, +) { + let mut logged_initial = false; + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + if ctrl_c_received.load(Ordering::Acquire) { + break; + } + + match track.get_stats().await { + Ok(stats) => { + if let Some(implementation) = find_video_outbound_encoder(&stats) { + let mut shared = shared.lock(); + shared.codec_implementation = implementation.to_string(); + } + logged_initial = true; + } + Err(e) if !logged_initial => { + debug!("Failed to get publisher stats for video track: {:?}", e); + logged_initial = true; + } + Err(_) => {} + } + + interval.tick().await; + } +} + impl PublisherTimingSummary { fn reset(&mut self) { self.paced_wait_ms.reset(); @@ -234,6 +304,185 @@ fn format_timing_line(timings: &PublisherTimingSummary) -> String { format!("Timing ms: {}\nTiming ms: {}", line_one.join(" | "), line_two.join(" | ")) } +const MAX_PUBLISH_TIMING_SAMPLES: usize = 300; + +#[derive(Default)] +struct PublisherTimingState { + samples: HashMap, + order: VecDeque, + latest_complete_sample: Option, +} + +impl PublisherTimingState { + fn record_frame_buffer( + &mut self, + sensor_exposure_timestamp_us: u64, + got_frame_buffer_timestamp_us: u64, + frame_id: Option, + ) -> PublisherTimingSample { + let sample = self.get_or_insert_sample(sensor_exposure_timestamp_us, frame_id); + sample.got_frame_buffer_timestamp_us = Some(got_frame_buffer_timestamp_us); + *sample + } + + fn record_sdk_event(&mut self, event: PublishTimingEvent) -> Option { + if event.capture_timestamp_us == 0 { + return None; + } + + let updated_sample = { + let sample = self.get_or_insert_sample(event.capture_timestamp_us, event.frame_id); + match event.stage { + PublishTimingStage::EncoderUpload => { + sample.encoder_upload_timestamp_us = Some(event.timestamp_us); + } + PublishTimingStage::EncoderOutput => { + sample.encoder_output_timestamp_us = Some(event.timestamp_us); + } + PublishTimingStage::WebrtcPacketize => { + sample.webrtc_packetize_timestamp_us = Some(event.timestamp_us); + } + } + *sample + }; + + if updated_sample.is_complete() { + self.latest_complete_sample = Some(updated_sample); + Some(updated_sample) + } else { + None + } + } + + fn display_sample(&self) -> Option { + self.latest_complete_sample + } + + fn get_or_insert_sample( + &mut self, + sensor_exposure_timestamp_us: u64, + frame_id: Option, + ) -> &mut PublisherTimingSample { + if !self.samples.contains_key(&sensor_exposure_timestamp_us) { + self.samples.insert( + sensor_exposure_timestamp_us, + PublisherTimingSample::new(sensor_exposure_timestamp_us, frame_id), + ); + self.order.push_back(sensor_exposure_timestamp_us); + self.prune(); + } + + let sample = self + .samples + .get_mut(&sensor_exposure_timestamp_us) + .expect("timing sample should exist after insertion"); + if frame_id.is_some() { + sample.frame_id = frame_id; + } + sample + } + + fn prune(&mut self) { + while self.order.len() > MAX_PUBLISH_TIMING_SAMPLES { + if let Some(oldest) = self.order.pop_front() { + self.samples.remove(&oldest); + if self + .latest_complete_sample + .is_some_and(|sample| sample.sensor_exposure_timestamp_us == oldest) + { + self.latest_complete_sample = None; + } + } + } + } +} + +fn update_shared_timing_sample( + shared: Option<&Arc>>, + sample: PublisherTimingSample, +) { + if let Some(shared) = shared { + let mut shared = shared.lock(); + let should_update = shared.timing_sample.map_or(true, |current| { + sample.sensor_exposure_timestamp_us >= current.sensor_exposure_timestamp_us + }); + if should_update { + shared.timing_sample = Some(sample); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn timing_event( + stage: PublishTimingStage, + capture_timestamp_us: u64, + timestamp_us: u64, + ) -> PublishTimingEvent { + PublishTimingEvent { stage, timestamp_us, capture_timestamp_us, frame_id: Some(7) } + } + + #[test] + fn publisher_timing_state_waits_for_complete_sample() { + let mut state = PublisherTimingState::default(); + state.record_frame_buffer(1_000, 1_100, Some(7)); + + assert!(state.display_sample().is_none()); + assert!(state + .record_sdk_event(timing_event(PublishTimingStage::EncoderUpload, 1_000, 1_200)) + .is_none()); + assert!(state + .record_sdk_event(timing_event(PublishTimingStage::EncoderOutput, 1_000, 1_300)) + .is_none()); + assert!(state.display_sample().is_none()); + } + + #[test] + fn publisher_timing_state_displays_packetized_sample() { + let mut state = PublisherTimingState::default(); + state.record_frame_buffer(1_000, 1_100, Some(7)); + state.record_sdk_event(timing_event(PublishTimingStage::EncoderUpload, 1_000, 1_200)); + state.record_sdk_event(timing_event(PublishTimingStage::EncoderOutput, 1_000, 1_300)); + + let sample = state + .record_sdk_event(timing_event(PublishTimingStage::WebrtcPacketize, 1_000, 1_400)) + .expect("packetized sample should be displayable"); + + assert!(sample.is_complete()); + assert_eq!(state.display_sample().unwrap().webrtc_packetize_timestamp_us, Some(1_400)); + } + + #[test] + fn publisher_timing_shared_update_accepts_current_frame() { + let shared = Arc::new(Mutex::new(SharedYuv::default())); + let mut current = PublisherTimingSample::new(1_000, Some(1)); + shared.lock().timing_sample = Some(current); + + current.encoder_upload_timestamp_us = Some(1_500); + update_shared_timing_sample(Some(&shared), current); + + assert_eq!(shared.lock().timing_sample.unwrap().encoder_upload_timestamp_us, Some(1_500)); + } + + #[test] + fn publisher_timing_shared_update_ignores_other_frames() { + let shared = Arc::new(Mutex::new(SharedYuv::default())); + let current = PublisherTimingSample::new(2_000, Some(2)); + let mut stale = PublisherTimingSample::new(1_000, Some(1)); + stale.encoder_upload_timestamp_us = Some(1_500); + shared.lock().timing_sample = Some(current); + + update_shared_timing_sample(Some(&shared), stale); + + assert_eq!( + shared.lock().timing_sample.unwrap().sensor_exposure_timestamp_us, + current.sensor_exposure_timestamp_us + ); + } +} + fn list_cameras() -> Result<()> { let cams = nokhwa::query(ApiBackend::Auto)?; println!("Available cameras:"); @@ -432,16 +681,30 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let rtc_source = NativeVideoSource::new(VideoResolution { width, height }, false); let track = LocalVideoTrack::create_video_track("camera", RtcVideoSource::Native(rtc_source.clone())); + let display_shared = args.display_video.then(|| Arc::new(Mutex::new(SharedYuv::default()))); + let publish_timing_state = + args.display_timing.then(|| Arc::new(Mutex::new(PublisherTimingState::default()))); + + if let Some(timing_state) = publish_timing_state.as_ref() { + let timing_state = timing_state.clone(); + let display_shared_for_timing = display_shared.clone(); + track.set_publish_timing_observer(Some(Box::new(move |event| { + let sample = timing_state.lock().record_sdk_event(event); + if let Some(sample) = sample { + update_shared_timing_sample(display_shared_for_timing.as_ref(), sample); + } + }))); + } // Choose requested codec and attempt to publish; if H.265 fails, retry with H.264 - let requested_codec = if args.h265 { VideoCodec::H265 } else { VideoCodec::H264 }; + let requested_codec = VideoCodec::from(args.codec); info!("Attempting publish with codec: {}", requested_codec.as_str()); // Compute an explicit video encoding so all simulcast layers use 30 fps. // The SDK defaults reduce lower layers to 15/20 fps; we override that here. let target_fps = args.fps as f64; let main_encoding = { - let base = options::compute_appropriate_encoding(false, width, height, VideoCodec::H264); + let base = options::compute_appropriate_encoding(false, width, height, requested_codec); VideoEncoding { max_bitrate: args.max_bitrate.unwrap_or(base.max_bitrate), max_framerate: target_fps, @@ -508,10 +771,17 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { }; if args.display_video { - let shared = Arc::new(Mutex::new(SharedYuv { - codec: actual_codec.as_str().to_ascii_uppercase(), - ..Default::default() - })); + let shared = display_shared.expect("display video should create shared preview state"); + { + let mut shared = shared.lock(); + shared.codec = actual_codec.as_str().to_ascii_uppercase(); + shared.simulcast = args.simulcast; + } + let stats_task = tokio::spawn(update_publisher_encoder_overlay( + track.clone(), + shared.clone(), + ctrl_c_received.clone(), + )); let capture_task = tokio::spawn(run_capture_loop( capture_config, ctrl_c_received.clone(), @@ -520,6 +790,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { width, height, Some(shared.clone()), + publish_timing_state.clone(), )); let display_result = video_display::run_display( @@ -530,6 +801,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { ); let capture_result = capture_task.await?; + let _ = stats_task.await; display_result?; capture_result?; } else { @@ -541,6 +813,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { width, height, None, + publish_timing_state.clone(), ) .await?; } @@ -556,6 +829,7 @@ async fn run_capture_loop( width: u32, height: u32, display_shared: Option>>, + publish_timing_state: Option>>, ) -> Result<()> { // Pace publishing at the requested FPS (not the camera-reported FPS) to hit desired cadence let pace_fps = config.fps as f64; @@ -583,7 +857,6 @@ async fn run_capture_loop( let mut frame_counter: u32 = 1; let mut timestamp_overlay = (config.attach_timestamp && config.burn_timestamp) .then(|| TimestampOverlay::new(width, height)); - let mut latency_display = LatencyDisplay::default(); let align_buffers_for_display = display_shared.is_some(); // Reuse a single I420 buffer @@ -799,6 +1072,9 @@ async fn run_capture_loop( } else { None }; + if let Some(timing_state) = publish_timing_state.as_ref() { + timing_state.lock().record_frame_buffer(capture_wall_time_us, read_wall_time_us, fid); + } let mut buffer_ready_at = convert_finished_at; let mut frame_draw_ms = None; let mut burned_timestamp_us = None; @@ -811,10 +1087,14 @@ async fn run_capture_loop( buffer_ready_at = overlay_finished_at; } - // Build frame metadata from enabled packet trailer features - let user_ts = if config.attach_timestamp { Some(capture_wall_time_us) } else { None }; + // Build frame metadata from enabled packet trailer features and local timing correlation. + let user_ts = if config.attach_timestamp || config.display_timing { + Some(capture_wall_time_us) + } else { + None + }; if burned_timestamp_us.is_some() { - debug_assert_eq!(burned_timestamp_us, user_ts); + debug_assert_eq!(burned_timestamp_us, Some(capture_wall_time_us)); } frame.frame_metadata = if user_ts.is_some() || fid.is_some() { Some(FrameMetadata { user_timestamp: user_ts, frame_id: fid }) @@ -824,25 +1104,16 @@ async fn run_capture_loop( // Monotonic, microseconds since start. frame.timestamp_us = start_ts.elapsed().as_micros() as i64; rtc_source.capture_frame(&frame); - let sent_timestamp_us = unix_time_us_now(); let webrtc_capture_finished_at = Instant::now(); if let Some(shared) = display_shared.as_ref() { let (stride_y, stride_u, stride_v) = frame.buffer.strides(); let (data_y, data_u, data_v) = frame.buffer.data(); - let (timing_sample, publish_latency_display) = if config.display_timing { - let timing_sample = PublisherTimingSample { - frame_id: fid, - capture_timestamp_us: capture_wall_time_us, - read_timestamp_us: read_wall_time_us, - sent_timestamp_us, - }; - let publish_latency_display = latency_display.value( - Instant::now(), - Some(format_us_delta_ms(sent_timestamp_us, capture_wall_time_us)), - ); - (Some(timing_sample), Some(publish_latency_display)) + let timing_sample = if config.display_timing { + publish_timing_state + .as_ref() + .and_then(|timing_state| timing_state.lock().display_sample()) } else { - (None, None) + None }; video_display::pack_i420_into_shared( shared, @@ -855,7 +1126,6 @@ async fn run_capture_loop( data_v, stride_v as u32, timing_sample, - publish_latency_display, ); } diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index f7ebe6d99..05d8d70fc 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -7,13 +7,16 @@ use egui_wgpu_backend::CallbackTrait; use futures::StreamExt; use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::prelude::*; +use livekit::webrtc::native::packet_trailer::{ + SubscribeTimingEvent, SubscribeTimingObserver, SubscribeTimingStage, +}; use livekit::webrtc::video_frame::BoxVideoFrame; use livekit::webrtc::video_stream::native::NativeVideoStream; use livekit_api::access_token; use log::{debug, info}; use parking_lot::Mutex; use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, env, sync::{ atomic::{AtomicBool, Ordering}, @@ -22,10 +25,10 @@ use std::{ time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -mod timestamp_burn; +mod codec_display; mod viewport_aspect; -use timestamp_burn::{format_timestamp_us, LatencyDisplay}; +use codec_display::{codec_from_mime, codec_with_implementation}; use viewport_aspect::AspectConstrainedViewport; #[cfg(target_os = "macos")] @@ -417,30 +420,234 @@ struct SharedYuv { height: u32, frame: Option, codec: String, + codec_implementation: String, fps: f32, dirty: bool, /// Time when the latest frame became available to the subscriber code. received_at_us: Option, /// Packet-trailer metadata from the most recent frame, if any. frame_metadata: Option, - /// Latest frame whose GPU submit has completed; lags CPU receive by ~1 display frame. - gpu_done: Option, } #[derive(Clone, Copy, Debug)] -struct GpuDoneSample { +struct SubscriberTimingSample { frame_id: Option, - capture_timestamp_us: Option, - cpu_received_us: u64, - gpu_done_us: u64, + sensor_exposure_timestamp_us: u64, + webrtc_receive_timestamp_us: Option, + decoder_upload_timestamp_us: Option, + decoder_output_timestamp_us: Option, + frame_rendered_timestamp_us: Option, +} + +impl SubscriberTimingSample { + fn new(sensor_exposure_timestamp_us: u64, frame_id: Option) -> Self { + Self { + frame_id, + sensor_exposure_timestamp_us, + webrtc_receive_timestamp_us: None, + decoder_upload_timestamp_us: None, + decoder_output_timestamp_us: None, + frame_rendered_timestamp_us: None, + } + } } -/// Carried from upload into the wgpu submit callback to stamp `gpu_done_us`. +/// Carried from upload into the wgpu submit callback to stamp render completion. #[derive(Clone, Copy, Debug)] struct PendingGpuSample { frame_id: Option, capture_timestamp_us: Option, - cpu_received_us: u64, +} + +const MAX_SUBSCRIBER_TIMING_SAMPLES: usize = 300; +const SUBSCRIBER_TIMING_DISPLAY_UPDATE_INTERVAL: Duration = Duration::from_millis(100); + +#[derive(Default)] +struct SubscriberTimingState { + samples: HashMap, + order: VecDeque, + latest_rendered_sample: Option, + displayed_timing_deltas: Option, + displayed_exp2recv_latency: Option, + displayed_e2e_latency: Option, + last_latency_update: Option, +} + +#[derive(Clone, Debug)] +struct SubscriberTimingDeltaValues { + sensor_exposure: String, + webrtc_receive: String, + decoder_upload: String, + decoder_output: String, + frame_rendered: String, +} + +impl SubscriberTimingDeltaValues { + fn from_sample(sample: SubscriberTimingSample) -> Self { + let base = sample.sensor_exposure_timestamp_us; + Self { + sensor_exposure: format_timing_delta_ms(base, base), + webrtc_receive: format_optional_timing_delta_ms( + sample.webrtc_receive_timestamp_us, + Some(base), + ), + decoder_upload: format_optional_timing_delta_ms( + sample.decoder_upload_timestamp_us, + sample.webrtc_receive_timestamp_us, + ), + decoder_output: format_optional_timing_delta_ms( + sample.decoder_output_timestamp_us, + sample.decoder_upload_timestamp_us, + ), + frame_rendered: format_optional_timing_delta_ms( + sample.frame_rendered_timestamp_us, + sample.decoder_output_timestamp_us, + ), + } + } +} + +struct SubscriberTimingOverlayValues { + deltas: SubscriberTimingDeltaValues, + exp2recv_latency: String, + e2e_latency: String, +} + +impl SubscriberTimingState { + fn record_subscribe_event(&mut self, event: SubscribeTimingEvent) { + if event.capture_timestamp_us == 0 { + return; + } + + let sample = self.get_or_insert_sample(event.capture_timestamp_us, event.frame_id); + match event.stage { + SubscribeTimingStage::WebrtcReceive => { + sample.webrtc_receive_timestamp_us = Some(event.timestamp_us); + } + SubscribeTimingStage::DecoderUpload => { + sample.decoder_upload_timestamp_us = Some(event.timestamp_us); + } + SubscribeTimingStage::DecoderOutput => { + sample.decoder_output_timestamp_us = Some(event.timestamp_us); + } + } + } + + fn record_decoder_output_fallback( + &mut self, + sensor_exposure_timestamp_us: u64, + frame_id: Option, + decoder_output_timestamp_us: u64, + ) { + let sample = self.get_or_insert_sample(sensor_exposure_timestamp_us, frame_id); + sample.decoder_output_timestamp_us.get_or_insert(decoder_output_timestamp_us); + } + + fn record_frame_rendered( + &mut self, + sensor_exposure_timestamp_us: u64, + frame_id: Option, + frame_rendered_timestamp_us: u64, + ) -> SubscriberTimingSample { + let sample = self.get_or_insert_sample(sensor_exposure_timestamp_us, frame_id); + sample.frame_rendered_timestamp_us = Some(frame_rendered_timestamp_us); + let sample = *sample; + self.latest_rendered_sample = Some(sample); + sample + } + + fn display_sample(&self) -> Option { + self.latest_rendered_sample + } + + fn display_overlay_lines(&mut self, now: Instant) -> Option> { + let sample = self.display_sample()?; + let overlay_values = self.overlay_values(sample, now); + Some(build_timing_overlay_lines(sample, &overlay_values)) + } + + fn reset(&mut self) { + *self = Self::default(); + } + + fn overlay_values( + &mut self, + sample: SubscriberTimingSample, + now: Instant, + ) -> SubscriberTimingOverlayValues { + let should_update = self.last_latency_update.map_or(true, |last_update| { + now.duration_since(last_update) >= SUBSCRIBER_TIMING_DISPLAY_UPDATE_INTERVAL + }); + + if should_update { + self.displayed_timing_deltas = Some(SubscriberTimingDeltaValues::from_sample(sample)); + self.displayed_exp2recv_latency = + sample.webrtc_receive_timestamp_us.map(|webrtc_receive_timestamp_us| { + format_latency_ms( + webrtc_receive_timestamp_us, + sample.sensor_exposure_timestamp_us, + ) + }); + self.displayed_e2e_latency = + sample.frame_rendered_timestamp_us.map(|frame_rendered_timestamp_us| { + format_latency_ms( + frame_rendered_timestamp_us, + sample.sensor_exposure_timestamp_us, + ) + }); + self.last_latency_update = Some(now); + } + + SubscriberTimingOverlayValues { + deltas: self + .displayed_timing_deltas + .clone() + .unwrap_or_else(|| SubscriberTimingDeltaValues::from_sample(sample)), + exp2recv_latency: self + .displayed_exp2recv_latency + .clone() + .unwrap_or_else(|| "NA".to_string()), + e2e_latency: self.displayed_e2e_latency.clone().unwrap_or_else(|| "NA".to_string()), + } + } + + fn get_or_insert_sample( + &mut self, + sensor_exposure_timestamp_us: u64, + frame_id: Option, + ) -> &mut SubscriberTimingSample { + if !self.samples.contains_key(&sensor_exposure_timestamp_us) { + self.samples.insert( + sensor_exposure_timestamp_us, + SubscriberTimingSample::new(sensor_exposure_timestamp_us, frame_id), + ); + self.order.push_back(sensor_exposure_timestamp_us); + self.prune(); + } + + let sample = self + .samples + .get_mut(&sensor_exposure_timestamp_us) + .expect("timing sample should exist after insertion"); + if frame_id.is_some() { + sample.frame_id = frame_id; + } + sample + } + + fn prune(&mut self) { + while self.order.len() > MAX_SUBSCRIBER_TIMING_SAMPLES { + if let Some(oldest) = self.order.pop_front() { + self.samples.remove(&oldest); + if self + .latest_rendered_sample + .is_some_and(|sample| sample.sensor_exposure_timestamp_us == oldest) + { + self.latest_rendered_sample = None; + } + } + } + } } #[derive(Clone)] @@ -464,12 +671,6 @@ impl Default for SimulcastState { } } -fn codec_label(mime: &str) -> String { - let base = mime.split(';').next().unwrap_or(mime).trim(); - let last = base.rsplit('/').next().unwrap_or(base).trim(); - last.to_ascii_uppercase() -} - fn infer_quality_from_dims( full_w: u32, _full_h: u32, @@ -622,19 +823,59 @@ fn update_simulcast_quality_from_stats( sc.active_quality = Some(q); } +fn update_decoder_implementation_from_stats( + stats: &[livekit::webrtc::stats::RtcStats], + shared: &Arc>, +) { + let Some(inbound) = find_video_inbound_stats(stats) else { + return; + }; + if inbound.inbound.decoder_implementation.is_empty() { + return; + } + + let mut shared = shared.lock(); + shared.codec_implementation = inbound.inbound.decoder_implementation; +} + /// Returns the current wall-clock time as microseconds since Unix epoch. fn current_timestamp_us() -> u64 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_micros() as u64 } -fn format_optional_timestamp_us(ts_us: Option) -> String { - ts_us.map(format_timestamp_us).unwrap_or_else(|| "NA".to_string()) +fn format_time_of_day_us(timestamp_us: u64) -> String { + let total_millis = timestamp_us / 1_000; + let millis = total_millis % 1_000; + let total_seconds = total_millis / 1_000; + let seconds = total_seconds % 60; + let minutes = (total_seconds / 60) % 60; + let hours = (total_seconds / 3_600) % 24; + format!("{hours:02}:{minutes:02}:{seconds:02}:{millis:03}") +} + +fn format_timing_delta_ms(timestamp_us: u64, base_timestamp_us: u64) -> String { + let delta_us = i128::from(timestamp_us) - i128::from(base_timestamp_us); + if delta_us == 0 { + return "0.0ms".to_string(); + } + format!("{:+.1}ms", delta_us as f64 / 1_000.0) +} + +fn format_optional_timing_delta_ms( + timestamp_us: Option, + base_timestamp_us: Option, +) -> String { + match (timestamp_us, base_timestamp_us) { + (Some(timestamp_us), Some(base_timestamp_us)) => { + format_timing_delta_ms(timestamp_us, base_timestamp_us) + } + _ => "+--.-ms".to_string(), + } } -/// Format the us delta as a millisecond string like `"12.3ms"`. -fn format_us_delta_ms(later_us: u64, earlier_us: u64) -> String { - let delta_us = later_us.saturating_sub(earlier_us); - format!("{:.1}MS", delta_us as f64 / 1_000.0) +fn format_latency_ms(end_timestamp_us: u64, start_timestamp_us: u64) -> String { + let delta_us = i128::from(end_timestamp_us) - i128::from(start_timestamp_us); + format!("{:.1}ms", delta_us as f64 / 1_000.0) } fn simulcast_state_full_dims(state: &Arc>) -> Option<(u32, u32)> { @@ -642,53 +883,303 @@ fn simulcast_state_full_dims(state: &Arc>) -> Option<(u32, sc.full_dims } -fn video_quality_label(q: livekit::track::VideoQuality) -> &'static str { - match q { - livekit::track::VideoQuality::Low => "LOW", - livekit::track::VideoQuality::Medium => "MED", - livekit::track::VideoQuality::High => "HIGH", +fn video_status_line( + width: u32, + height: u32, + fps: f32, + codec: &str, + codec_implementation: &str, + simulcast: bool, +) -> String { + let codec = codec_with_implementation(codec, codec_implementation); + if simulcast { + format!("{}x{} {:.1}fps {codec} Simulcast", width, height, fps.max(0.0)) + } else { + format!("{}x{} {:.1}fps {codec}", width, height, fps.max(0.0)) } } -fn frame_id_label(frame_id: Option) -> String { - frame_id.map(|id| id.to_string()).unwrap_or_else(|| "NA".to_string()) +const SUBSCRIBER_TIMING_LABEL_WIDTH: usize = 20; +const SUBSCRIBER_TIMING_TIMESTAMP_WIDTH: usize = 12; +const SUBSCRIBER_TIMING_DELTA_WIDTH: usize = 10; +const SUBSCRIBER_TIMING_VALUE_WIDTH: usize = + SUBSCRIBER_TIMING_TIMESTAMP_WIDTH + 1 + SUBSCRIBER_TIMING_DELTA_WIDTH; +const SUBSCRIBER_TIMING_LINE_WIDTH: usize = + SUBSCRIBER_TIMING_LABEL_WIDTH + 1 + SUBSCRIBER_TIMING_VALUE_WIDTH; + +fn subscriber_timing_label(label: &str) -> String { + format!("{label}:") } -fn overlay_stats_line(label: &str, value: impl std::fmt::Display) -> String { - format!("{label:<17}{value}") +fn subscriber_timing_value_line(label: &str, value: &str) -> String { + let label = subscriber_timing_label(label); + format!( + "{label:value_width$}", + label_width = SUBSCRIBER_TIMING_LABEL_WIDTH, + value_width = SUBSCRIBER_TIMING_VALUE_WIDTH + ) +} + +fn subscriber_timing_line(label: &str, timestamp_us: Option, delta: &str) -> String { + let label = subscriber_timing_label(label); + match timestamp_us { + Some(timestamp_us) => format!( + "{label:timestamp_width$} {delta:>delta_width$}", + timestamp = format_time_of_day_us(timestamp_us), + delta = delta, + label_width = SUBSCRIBER_TIMING_LABEL_WIDTH, + timestamp_width = SUBSCRIBER_TIMING_TIMESTAMP_WIDTH, + delta_width = SUBSCRIBER_TIMING_DELTA_WIDTH + ), + None => format!( + "{label:timestamp_width$} {delta:>delta_width$}", + timestamp = "--:--:--:---", + delta = "+--.-ms", + label_width = SUBSCRIBER_TIMING_LABEL_WIDTH, + timestamp_width = SUBSCRIBER_TIMING_TIMESTAMP_WIDTH, + delta_width = SUBSCRIBER_TIMING_DELTA_WIDTH + ), + } } fn build_timing_overlay_lines( - frame_id: Option, - publish_us: Option, - receive_us: u64, - prev_render: Option, - prev_latency_display: &str, + sample: SubscriberTimingSample, + overlay_values: &SubscriberTimingOverlayValues, ) -> Vec { - let mut lines = vec![ - overlay_stats_line("FRAME ID:", frame_id_label(frame_id)), - overlay_stats_line("CAPT TIMESTAMP:", format_optional_timestamp_us(publish_us)), - overlay_stats_line("RECV TIMESTAMP:", format_timestamp_us(receive_us)), - ]; - - if let Some(sample) = prev_render { - lines.push(overlay_stats_line("PREV FRAME ID:", frame_id_label(sample.frame_id))); - lines.push(overlay_stats_line( - "PREV CAPT:", - format_optional_timestamp_us(sample.capture_timestamp_us), + let base = sample.sensor_exposure_timestamp_us; + let webrtc_receive = sample.webrtc_receive_timestamp_us; + let decoder_upload = sample.decoder_upload_timestamp_us; + let decoder_output = sample.decoder_output_timestamp_us; + let frame_rendered = sample.frame_rendered_timestamp_us; + let frame_id = sample.frame_id.map(|id| id.to_string()).unwrap_or_else(|| "NA".to_string()); + vec![ + subscriber_timing_value_line("Frame ID", &frame_id), + subscriber_timing_line( + "sensor exposure", + Some(base), + &overlay_values.deltas.sensor_exposure, + ), + subscriber_timing_line( + "webrtc receive", + webrtc_receive, + &overlay_values.deltas.webrtc_receive, + ), + subscriber_timing_line( + "decoder upload", + decoder_upload, + &overlay_values.deltas.decoder_upload, + ), + subscriber_timing_line( + "decoder output", + decoder_output, + &overlay_values.deltas.decoder_output, + ), + subscriber_timing_line( + "frame rendered", + frame_rendered, + &overlay_values.deltas.frame_rendered, + ), + subscriber_timing_value_line("Exposure to Receive", &overlay_values.exp2recv_latency), + subscriber_timing_value_line("e2e latency", &overlay_values.e2e_latency), + ] +} + +#[cfg(test)] +fn assert_subscriber_timing_lines_are_stable(lines: &[String]) { + assert!(lines.iter().all(|line| line.len() == SUBSCRIBER_TIMING_LINE_WIDTH)); +} + +#[cfg(test)] +mod tests { + use super::*; + + fn timestamp_us(hour: u64, minute: u64, second: u64, millisecond: u64) -> u64 { + (((hour * 3_600 + minute * 60 + second) * 1_000) + millisecond) * 1_000 + } + + fn subscribe_event( + stage: SubscribeTimingStage, + capture_timestamp_us: u64, + timestamp_us: u64, + ) -> SubscribeTimingEvent { + SubscribeTimingEvent { stage, timestamp_us, capture_timestamp_us, frame_id: Some(123) } + } + + fn overlay_values( + sample: SubscriberTimingSample, + exp2recv_latency: &str, + e2e_latency: &str, + ) -> SubscriberTimingOverlayValues { + SubscriberTimingOverlayValues { + deltas: SubscriberTimingDeltaValues::from_sample(sample), + exp2recv_latency: exp2recv_latency.to_string(), + e2e_latency: e2e_latency.to_string(), + } + } + + #[test] + fn subscriber_overlay_shows_status_without_timing() { + let shared = Arc::new(Mutex::new(SharedYuv { + width: 1280, + height: 720, + frame: None, + codec: "H264".to_string(), + codec_implementation: "NVIDIA H264 Decoder".to_string(), + fps: 29.6, + dirty: false, + received_at_us: None, + frame_metadata: None, + })); + let simulcast = + Arc::new(Mutex::new(SimulcastState { available: true, ..Default::default() })); + + let lines = subscriber_overlay_lines(&shared, &simulcast, false, None) + .expect("overlay should render"); + + assert_eq!(lines, vec!["1280x720 29.6fps H264 NVDEC Simulcast"]); + } + + #[test] + fn subscriber_timing_lines_match_requested_format() { + let base = timestamp_us(1, 2, 3, 456); + let sample = SubscriberTimingSample { + frame_id: Some(123), + sensor_exposure_timestamp_us: base, + webrtc_receive_timestamp_us: Some(base + 32_400), + decoder_upload_timestamp_us: Some(base + 35_500), + decoder_output_timestamp_us: Some(base + 55_300), + frame_rendered_timestamp_us: Some(base + 56_900), + }; + + let overlay_values = overlay_values(sample, "32.4ms", "56.9ms"); + let lines = build_timing_overlay_lines(sample, &overlay_values); + assert_subscriber_timing_lines_are_stable(&lines); + assert_eq!( + lines, + vec![ + "Frame ID: 123", + "sensor exposure: 01:02:03:456 0.0ms", + "webrtc receive: 01:02:03:488 +32.4ms", + "decoder upload: 01:02:03:491 +3.1ms", + "decoder output: 01:02:03:511 +19.8ms", + "frame rendered: 01:02:03:512 +1.6ms", + "Exposure to Receive: 32.4ms", + "e2e latency: 56.9ms", + ] + ); + } + + #[test] + fn subscriber_timing_lines_use_placeholders_for_missing_stages() { + let base = timestamp_us(1, 2, 3, 456); + let sample = SubscriberTimingSample::new(base, None); + + let overlay_values = overlay_values(sample, "NA", "NA"); + let lines = build_timing_overlay_lines(sample, &overlay_values); + assert_subscriber_timing_lines_are_stable(&lines); + assert_eq!( + lines, + vec![ + "Frame ID: NA", + "sensor exposure: 01:02:03:456 0.0ms", + "webrtc receive: --:--:--:--- +--.-ms", + "decoder upload: --:--:--:--- +--.-ms", + "decoder output: --:--:--:--- +--.-ms", + "frame rendered: --:--:--:--- +--.-ms", + "Exposure to Receive: NA", + "e2e latency: NA", + ] + ); + } + + #[test] + fn subscriber_timing_state_displays_rendered_sample() { + let mut state = SubscriberTimingState::default(); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::WebrtcReceive, + 1_000, + 1_200, )); - lines.push(overlay_stats_line("PREV RECV:", format_timestamp_us(sample.cpu_received_us))); - lines.push(overlay_stats_line("PREV RENDER:", format_timestamp_us(sample.gpu_done_us))); - lines.push(overlay_stats_line("PREV LATENCY:", prev_latency_display)); - } else { - lines.push(overlay_stats_line("PREV FRAME ID:", "NA")); - lines.push(overlay_stats_line("PREV CAPT:", "NA")); - lines.push(overlay_stats_line("PREV RECV:", "NA")); - lines.push(overlay_stats_line("PREV RENDER:", "NA")); - lines.push(overlay_stats_line("PREV LATENCY:", prev_latency_display)); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderUpload, + 1_000, + 1_300, + )); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderOutput, + 1_000, + 1_400, + )); + assert!(state.display_sample().is_none()); + + let sample = state.record_frame_rendered(1_000, Some(123), 1_500); + + assert_eq!(sample.frame_rendered_timestamp_us, Some(1_500)); + assert_eq!(state.display_sample().unwrap().decoder_output_timestamp_us, Some(1_400)); } - lines + #[test] + fn subscriber_timing_summary_latencies_refresh_at_ten_hz() { + let mut state = SubscriberTimingState::default(); + let now = Instant::now(); + + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::WebrtcReceive, + 1_000, + 33_400, + )); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderUpload, + 1_000, + 36_500, + )); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderOutput, + 1_000, + 56_300, + )); + state.record_frame_rendered(1_000, Some(1), 57_900); + let lines = state.display_overlay_lines(now).expect("overlay should render"); + assert_eq!(lines[3], "decoder upload: 00:00:00:036 +3.1ms"); + assert_eq!(lines[4], "decoder output: 00:00:00:056 +19.8ms"); + assert_eq!(lines[5], "frame rendered: 00:00:00:057 +1.6ms"); + assert_eq!(lines[6], "Exposure to Receive: 32.4ms"); + assert_eq!(lines[7], "e2e latency: 56.9ms"); + + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::WebrtcReceive, + 1_000_000, + 1_050_000, + )); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderUpload, + 1_000_000, + 1_060_000, + )); + state.record_subscribe_event(subscribe_event( + SubscribeTimingStage::DecoderOutput, + 1_000_000, + 1_080_000, + )); + state.record_frame_rendered(1_000_000, Some(2), 1_100_000); + let lines = state + .display_overlay_lines(now + Duration::from_millis(99)) + .expect("overlay should render"); + assert_eq!(lines[3], "decoder upload: 00:00:01:060 +3.1ms"); + assert_eq!(lines[4], "decoder output: 00:00:01:080 +19.8ms"); + assert_eq!(lines[5], "frame rendered: 00:00:01:100 +1.6ms"); + assert_eq!(lines[6], "Exposure to Receive: 32.4ms"); + assert_eq!(lines[7], "e2e latency: 56.9ms"); + + let lines = state + .display_overlay_lines(now + Duration::from_millis(100)) + .expect("overlay should render"); + assert_eq!(lines[3], "decoder upload: 00:00:01:060 +10.0ms"); + assert_eq!(lines[4], "decoder output: 00:00:01:080 +20.0ms"); + assert_eq!(lines[5], "frame rendered: 00:00:01:100 +20.0ms"); + assert_eq!(lines[6], "Exposure to Receive: 50.0ms"); + assert_eq!(lines[7], "e2e latency: 100.0ms"); + } } fn video_size(shared: &Arc>) -> Option<(u32, u32)> { @@ -710,6 +1201,7 @@ async fn handle_track_subscribed( ctrl_c_received: &Arc, simulcast: &Arc>, repaint_ctx: &Arc>>, + subscriber_timing_state: Option>>, ) { // If a participant filter is set, skip others if let Some(ref allow) = allowed_identity { @@ -724,7 +1216,7 @@ async fn handle_track_subscribed( }; let sid = publication.sid().clone(); - let codec = codec_label(&publication.mime_type()); + let codec = codec_from_mime(&publication.mime_type()); // Only handle if we don't already have an active video track { let mut active = active_sid.lock(); @@ -743,25 +1235,35 @@ async fn handle_track_subscribed( *active = Some(sid.clone()); } - // Update HUD codec label and feature flags early (before first frame arrives) - { - let mut s = shared.lock(); - s.codec = codec; - } - info!( "Subscribed to video track: {} (sid {}) from {} - codec: {}, simulcast: {}, dimension: {}x{}, packet_trailer_features: {:?}", publication.name(), publication.sid(), participant.identity(), - publication.mime_type(), + codec, publication.simulcasted(), publication.dimension().0, publication.dimension().1, publication.packet_trailer_features(), ); + { + let mut s = shared.lock(); + s.codec = codec; + } + let rtc_track = video_track.rtc_track(); + if let Some(timing_state) = subscriber_timing_state.as_ref() { + if let Some(handler) = video_track.packet_trailer_handler() { + let timing_state = timing_state.clone(); + let observer: SubscribeTimingObserver = Arc::new(move |event| { + timing_state.lock().record_subscribe_event(event); + }); + handler.set_subscribe_timing_observer(Some(observer)); + } else { + debug!("No packet trailer handler available for subscriber timing overlay"); + } + } // Start background sink task immediately so stats lookup cannot delay first-frame handling. let shared2 = shared.clone(); @@ -769,6 +1271,7 @@ async fn handle_track_subscribed( let my_sid = sid.clone(); let ctrl_c_sink = ctrl_c_received.clone(); let repaint_ctx_sink = repaint_ctx.clone(); + let subscriber_timing_state_sink = subscriber_timing_state.clone(); // Initialize simulcast state for this publication { let mut sc = simulcast.lock(); @@ -828,6 +1331,17 @@ async fn handle_track_subscribed( s.dirty = true; s.received_at_us = Some(received_at_us); s.frame_metadata = frame.frame_metadata; + if let (Some(timing_state), Some(metadata)) = + (subscriber_timing_state_sink.as_ref(), frame.frame_metadata) + { + if let Some(sensor_exposure_timestamp_us) = metadata.user_timestamp { + timing_state.lock().record_decoder_output_fallback( + sensor_exposure_timestamp_us, + metadata.frame_id, + received_at_us, + ); + } + } s.frame = Some(frame); drop(s); @@ -856,6 +1370,7 @@ async fn handle_track_subscribed( let active_sid_stats = active_sid.clone(); let my_sid_stats = sid.clone(); let simulcast_stats = simulcast.clone(); + let shared_stats = shared.clone(); tokio::spawn(async move { let mut logged_initial = false; let mut jitter_buffer_snapshot = None; @@ -882,6 +1397,7 @@ async fn handle_track_subscribed( log_video_jitter_buffer_stats(&stats, &mut jitter_buffer_snapshot); last_jitter_buffer_log = Instant::now(); } + update_decoder_implementation_from_stats(&stats, &shared_stats); update_simulcast_quality_from_stats(&stats, &simulcast_stats); } Err(e) if !logged_initial => { @@ -896,54 +1412,69 @@ async fn handle_track_subscribed( }); } -fn clear_hud_and_simulcast(shared: &Arc>, simulcast: &Arc>) { +fn clear_hud_and_simulcast( + shared: &Arc>, + simulcast: &Arc>, + subscriber_timing_state: Option<&Arc>>, +) { { let mut s = shared.lock(); + s.width = 0; + s.height = 0; s.codec.clear(); + s.codec_implementation.clear(); s.fps = 0.0; s.frame = None; s.dirty = false; s.received_at_us = None; s.frame_metadata = None; - s.gpu_done = None; + } + if let Some(timing_state) = subscriber_timing_state { + timing_state.lock().reset(); } let mut sc = simulcast.lock(); *sc = SimulcastState::default(); } -fn timing_overlay_lines( +fn subscriber_overlay_lines( shared: &Arc>, - latency_display: &mut LatencyDisplay, + simulcast: &Arc>, + include_timing: bool, + subscriber_timing_state: Option<&Arc>>, ) -> Option> { - let s = shared.lock(); - let receive_us = s.received_at_us?; - let frame_id = s.frame_metadata.and_then(|m| m.frame_id); - let publish_us = s.frame_metadata.and_then(|m| m.user_timestamp); - let previous_gpu_done = s.gpu_done; - drop(s); - - let prev_latency_display = latency_display - .value( - Instant::now(), - previous_gpu_done.and_then(|sample| { - sample - .capture_timestamp_us - .map(|capture_us| format_us_delta_ms(sample.gpu_done_us, capture_us)) - }), + let status_line = { + let s = shared.lock(); + if s.width == 0 || s.height == 0 { + return None; + } + + let simulcast_enabled = simulcast.lock().available; + video_status_line( + s.width, + s.height, + s.fps, + &s.codec, + &s.codec_implementation, + simulcast_enabled, ) - .to_string(); - - Some(build_timing_overlay_lines( - frame_id, - publish_us, - receive_us, - previous_gpu_done, - &prev_latency_display, - )) + }; + + let mut lines = vec![status_line]; + if include_timing { + if let Some(timing_state) = subscriber_timing_state { + if let Some(mut timing_lines) = + timing_state.lock().display_overlay_lines(Instant::now()) + { + lines.append(&mut timing_lines); + } + } + } + + Some(lines) } -fn paint_timing_overlay(ctx: &egui::Context, video_rect: egui::Rect, lines: &[String]) { - egui::Area::new("timing_overlay".into()) +fn paint_subscriber_overlay(ctx: &egui::Context, video_rect: egui::Rect, lines: &[String]) { + egui::Area::new("subscriber_overlay".into()) .fixed_pos(video_rect.left_top() + egui::vec2(10.0, 10.0)) .interactable(false) .show(ctx, |ui| { @@ -952,6 +1483,9 @@ fn paint_timing_overlay(ctx: &egui::Context, video_rect: egui::Rect, lines: &[St .corner_radius(egui::CornerRadius::same(4)) .inner_margin(egui::Margin::same(6)) .show(ui, |ui| { + if lines.len() > 1 { + ui.set_min_width(SUBSCRIBER_TIMING_LINE_WIDTH as f32 * 8.0); + } ui.add( egui::Label::new( egui::RichText::new(lines.join("\n")) @@ -970,6 +1504,7 @@ fn handle_track_unsubscribed( shared: &Arc>, active_sid: &Arc>>, simulcast: &Arc>, + subscriber_timing_state: Option<&Arc>>, ) { let sid = publication.sid().clone(); let mut active = active_sid.lock(); @@ -977,7 +1512,7 @@ fn handle_track_unsubscribed( info!("Video track unsubscribed ({}), clearing active sink", sid); *active = None; } - clear_hud_and_simulcast(shared, simulcast); + clear_hud_and_simulcast(shared, simulcast, subscriber_timing_state); } fn handle_track_unpublished( @@ -985,6 +1520,7 @@ fn handle_track_unpublished( shared: &Arc>, active_sid: &Arc>>, simulcast: &Arc>, + subscriber_timing_state: Option<&Arc>>, ) { let sid = publication.sid().clone(); let mut active = active_sid.lock(); @@ -992,17 +1528,17 @@ fn handle_track_unpublished( info!("Video track unpublished ({}), clearing active sink", sid); *active = None; } - clear_hud_and_simulcast(shared, simulcast); + clear_hud_and_simulcast(shared, simulcast, subscriber_timing_state); } struct VideoApp { shared: Arc>, simulcast: Arc>, + subscriber_timing_state: Option>>, repaint_ctx: Arc>>, ctrl_c_received: Arc, viewport: AspectConstrainedViewport, display_timestamp: bool, - latency_display: LatencyDisplay, } impl eframe::App for VideoApp { @@ -1019,10 +1555,12 @@ impl eframe::App for VideoApp { } self.viewport.constrain(ctx, aspect_just_changed); - let timing_lines = self - .display_timestamp - .then(|| timing_overlay_lines(&self.shared, &mut self.latency_display)) - .flatten(); + let overlay_lines = subscriber_overlay_lines( + &self.shared, + &self.simulcast, + self.display_timestamp, + self.subscriber_timing_state.as_ref(), + ); egui::CentralPanel::default().frame(egui::Frame::NONE).show(ctx, |ui| { // Ensure we keep repainting for smooth playback. @@ -1049,49 +1587,19 @@ impl eframe::App for VideoApp { let (rect, _) = ui.allocate_exact_size(size, egui::Sense::hover()); let cb = egui_wgpu_backend::Callback::new_paint_callback( rect, - YuvPaintCallback { shared: self.shared.clone() }, + YuvPaintCallback { + shared: self.shared.clone(), + subscriber_timing_state: self.subscriber_timing_state.clone(), + }, ); ui.painter().add(cb); - if let Some(lines) = timing_lines.as_ref() { - paint_timing_overlay(ui.ctx(), rect, lines); + if let Some(lines) = overlay_lines.as_ref() { + paint_subscriber_overlay(ui.ctx(), rect, lines); } }, ); }); - // Non-timing video stats stay in egui so they don't become part of the frame timing record. - egui::Area::new("video_hud".into()) - .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) - .interactable(false) - .show(ctx, |ui| { - let s = self.shared.lock(); - if s.width == 0 || s.height == 0 || s.fps <= 0.0 || s.codec.is_empty() { - return; - } - let mut text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); - drop(s); - - let sc = self.simulcast.lock(); - if sc.available { - let layer = sc.active_quality.map(video_quality_label).unwrap_or("NA"); - text.push_str(&format!("\nSimulcast: {}", layer)); - } else { - text.push_str("\nSimulcast: off"); - } - drop(sc); - - egui::Frame::NONE - .fill(egui::Color32::from_black_alpha(140)) - .corner_radius(egui::CornerRadius::same(4)) - .inner_margin(egui::Margin::same(6)) - .show(ui, |ui| { - ui.add( - egui::Label::new(egui::RichText::new(text).color(egui::Color32::WHITE)) - .extend(), - ); - }); - }); - // Simulcast layer controls: bottom-left overlay egui::Area::new("simulcast_controls".into()) .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) @@ -1201,12 +1709,14 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { height: 0, frame: None, codec: String::new(), + codec_implementation: String::new(), fps: 0.0, dirty: false, received_at_us: None, frame_metadata: None, - gpu_done: None, })); + let subscriber_timing_state = + args.display_timestamp.then(|| Arc::new(Mutex::new(SubscriberTimingState::default()))); // Subscribe to room events: on first video track, start sink task let allowed_identity = args.participant.clone(); @@ -1219,9 +1729,11 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let simulcast_events = simulcast.clone(); let repaint_ctx_events = repaint_ctx.clone(); let ctrl_c_events = ctrl_c_received.clone(); + let subscriber_timing_state_events = subscriber_timing_state.clone(); tokio::spawn(async move { let active_sid = active_sid.clone(); let simulcast = simulcast_events; + let subscriber_timing_state = subscriber_timing_state_events; let mut events = room.subscribe(); info!("Subscribed to room events"); while let Some(evt) = events.recv().await { @@ -1238,14 +1750,27 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { &ctrl_c_events, &simulcast, &repaint_ctx_events, + subscriber_timing_state.clone(), ) .await; } RoomEvent::TrackUnsubscribed { publication, .. } => { - handle_track_unsubscribed(publication, &shared_clone, &active_sid, &simulcast); + handle_track_unsubscribed( + publication, + &shared_clone, + &active_sid, + &simulcast, + subscriber_timing_state.as_ref(), + ); } RoomEvent::TrackUnpublished { publication, .. } => { - handle_track_unpublished(publication, &shared_clone, &active_sid, &simulcast); + handle_track_unpublished( + publication, + &shared_clone, + &active_sid, + &simulcast, + subscriber_timing_state.as_ref(), + ); } _ => {} } @@ -1256,11 +1781,11 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let app = VideoApp { shared, simulcast, + subscriber_timing_state, repaint_ctx, ctrl_c_received: ctrl_c_received.clone(), viewport: AspectConstrainedViewport::new(None), display_timestamp: args.display_timestamp, - latency_display: LatencyDisplay::default(), }; let native_options = viewport_aspect::native_options(None); eframe::run_native( @@ -1279,6 +1804,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { struct YuvPaintCallback { shared: Arc>, + subscriber_timing_state: Option>>, } struct YuvGpuState { @@ -1601,8 +2127,7 @@ impl CallbackTrait for YuvPaintCallback { shared.frame.take().map(|frame| { let frame_id = shared.frame_metadata.and_then(|m| m.frame_id); let capture_timestamp_us = shared.frame_metadata.and_then(|m| m.user_timestamp); - let cpu_received_us = shared.received_at_us.unwrap_or_default(); - let sample = PendingGpuSample { frame_id, capture_timestamp_us, cpu_received_us }; + let sample = PendingGpuSample { frame_id, capture_timestamp_us }; (frame, sample) }) } else { @@ -1812,20 +2337,21 @@ impl CallbackTrait for YuvPaintCallback { // Ride an empty command buffer with egui's submit so we can stamp GPU-done. if let Some(sample) = gpu_sample_in_flight { - let shared_for_cb = self.shared.clone(); + let subscriber_timing_state = self.subscriber_timing_state.clone(); let encoder = device.create_command_encoder(&wgpu::CommandEncoderDescriptor { label: Some("yuv_gpu_done_probe"), }); let cb = encoder.finish(); cb.on_submitted_work_done(move || { - let gpu_done_us = current_timestamp_us(); - let mut s = shared_for_cb.lock(); - s.gpu_done = Some(GpuDoneSample { - frame_id: sample.frame_id, - capture_timestamp_us: sample.capture_timestamp_us, - cpu_received_us: sample.cpu_received_us, - gpu_done_us, - }); + if let (Some(timing_state), Some(capture_timestamp_us)) = + (subscriber_timing_state.as_ref(), sample.capture_timestamp_us) + { + timing_state.lock().record_frame_rendered( + capture_timestamp_us, + sample.frame_id, + current_timestamp_us(), + ); + } }); vec![cb] } else { diff --git a/examples/local_video/src/timestamp_burn.rs b/examples/local_video/src/timestamp_burn.rs index b2dd2da56..1fcb92680 100644 --- a/examples/local_video/src/timestamp_burn.rs +++ b/examples/local_video/src/timestamp_burn.rs @@ -10,22 +10,26 @@ const PADDING_Y: usize = 4; const MARGIN: usize = 8; const BG_LUMA: u8 = 16; const FG_LUMA: u8 = 235; -const LATENCY_DISPLAY_UPDATE_INTERVAL: Duration = Duration::from_millis(500); +#[allow(dead_code)] +const LATENCY_DISPLAY_UPDATE_INTERVAL: Duration = Duration::from_millis(100); +#[allow(dead_code)] const LATENCY_DISPLAY_STALE_AFTER: Duration = Duration::from_secs(2); /// Text scale used for burned-in timing metrics overlays. #[allow(dead_code)] pub(crate) const METRICS_OVERLAY_SCALE: usize = 3; -/// Holds a latency string that refreshes at a readable 2 Hz cadence. +/// Holds a latency string that refreshes at a readable 10 Hz cadence. +#[allow(dead_code)] #[derive(Default)] pub(crate) struct LatencyDisplay { value: String, last_update: Option, } +#[allow(dead_code)] impl LatencyDisplay { - /// Return the latency string to display, refreshing it when the 2 Hz interval has elapsed. + /// Return the latency string to display, refreshing it when the 10 Hz interval has elapsed. pub(crate) fn value(&mut self, now: Instant, latest_value: Option) -> &str { let should_update = match self.last_update { Some(last_update) => now.duration_since(last_update) >= LATENCY_DISPLAY_UPDATE_INTERVAL, diff --git a/examples/local_video/src/video_display.rs b/examples/local_video/src/video_display.rs index fe1b8acbd..ed85a02d4 100644 --- a/examples/local_video/src/video_display.rs +++ b/examples/local_video/src/video_display.rs @@ -8,9 +8,9 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use std::time::Duration; +use std::time::{Duration, Instant}; -use crate::timestamp_burn::format_timestamp_us; +use crate::codec_display::codec_with_implementation; use crate::viewport_aspect::{self, AspectConstrainedViewport}; #[derive(Default)] @@ -23,18 +23,41 @@ pub(crate) struct SharedYuv { pub(crate) u: Vec, pub(crate) v: Vec, pub(crate) codec: String, + pub(crate) codec_implementation: String, pub(crate) fps: f32, + pub(crate) simulcast: bool, pub(crate) dirty: bool, pub(crate) timing_sample: Option, - pub(crate) publish_latency_display: String, } #[derive(Clone, Copy, Debug)] pub(crate) struct PublisherTimingSample { pub(crate) frame_id: Option, - pub(crate) capture_timestamp_us: u64, - pub(crate) read_timestamp_us: u64, - pub(crate) sent_timestamp_us: u64, + pub(crate) sensor_exposure_timestamp_us: u64, + pub(crate) got_frame_buffer_timestamp_us: Option, + pub(crate) encoder_upload_timestamp_us: Option, + pub(crate) encoder_output_timestamp_us: Option, + pub(crate) webrtc_packetize_timestamp_us: Option, +} + +impl PublisherTimingSample { + pub(crate) fn new(sensor_exposure_timestamp_us: u64, frame_id: Option) -> Self { + Self { + frame_id, + sensor_exposure_timestamp_us, + got_frame_buffer_timestamp_us: None, + encoder_upload_timestamp_us: None, + encoder_output_timestamp_us: None, + webrtc_packetize_timestamp_us: None, + } + } + + pub(crate) fn is_complete(&self) -> bool { + self.got_frame_buffer_timestamp_us.is_some() + && self.encoder_upload_timestamp_us.is_some() + && self.encoder_output_timestamp_us.is_some() + && self.webrtc_packetize_timestamp_us.is_some() + } } pub(crate) fn align_up(value: u32, alignment: u32) -> u32 { @@ -88,7 +111,6 @@ pub(crate) fn pack_i420_into_shared( v: &[u8], v_stride: u32, timing_sample: Option, - publish_latency_display: Option<&str>, ) -> bool { let uv_w = (width + 1) / 2; let uv_h = (height + 1) / 2; @@ -104,50 +126,142 @@ pub(crate) fn pack_i420_into_shared( pack_plane(u, u_stride, uv_w, uv_h, uv_bytes_per_row, &mut s.u); pack_plane(v, v_stride, uv_w, uv_h, uv_bytes_per_row, &mut s.v); - let publish_latency_display = timing_sample - .map(|sample| { - publish_latency_display.map(str::to_string).unwrap_or_else(|| { - format_us_delta_ms(sample.sent_timestamp_us, sample.capture_timestamp_us) - }) - }) - .unwrap_or_default(); - s.width = width; s.height = height; s.y_bytes_per_row = y_bytes_per_row; s.uv_bytes_per_row = uv_bytes_per_row; - s.timing_sample = timing_sample; - s.publish_latency_display = publish_latency_display; + if let Some(timing_sample) = timing_sample { + s.timing_sample = Some(timing_sample); + } s.dirty = true; true } -fn format_us_delta_ms(later_us: u64, earlier_us: u64) -> String { - let delta_us = later_us.saturating_sub(earlier_us); +fn format_time_of_day_us(timestamp_us: u64) -> String { + let total_millis = timestamp_us / 1_000; + let millis = total_millis % 1_000; + let total_seconds = total_millis / 1_000; + let seconds = total_seconds % 60; + let minutes = (total_seconds / 60) % 60; + let hours = (total_seconds / 3_600) % 24; + format!("{hours:02}:{minutes:02}:{seconds:02}:{millis:03}") +} + +fn format_timing_delta_ms(timestamp_us: u64, base_timestamp_us: u64) -> String { + let delta_us = i128::from(timestamp_us) - i128::from(base_timestamp_us); + if delta_us == 0 { + return "0.0ms".to_string(); + } + format!("{:+.1}ms", delta_us as f64 / 1_000.0) +} + +fn format_optional_timing_delta_ms( + timestamp_us: Option, + base_timestamp_us: Option, +) -> String { + match (timestamp_us, base_timestamp_us) { + (Some(timestamp_us), Some(base_timestamp_us)) => { + format_timing_delta_ms(timestamp_us, base_timestamp_us) + } + _ => "+--.-ms".to_string(), + } +} + +fn format_latency_ms(end_timestamp_us: u64, start_timestamp_us: u64) -> String { + let delta_us = i128::from(end_timestamp_us) - i128::from(start_timestamp_us); format!("{:.1}ms", delta_us as f64 / 1_000.0) } -fn frame_id_label(frame_id: Option) -> String { - frame_id.map(|id| id.to_string()).unwrap_or_else(|| "NA".to_string()) +const PUBLISHER_TIMING_LABEL_WIDTH: usize = 17; +const PUBLISHER_TIMING_TIMESTAMP_WIDTH: usize = 12; +const PUBLISHER_TIMING_DELTA_WIDTH: usize = 10; +const PUBLISHER_TIMING_VALUE_WIDTH: usize = + PUBLISHER_TIMING_TIMESTAMP_WIDTH + 1 + PUBLISHER_TIMING_DELTA_WIDTH; +const PUBLISHER_TIMING_LINE_WIDTH: usize = + PUBLISHER_TIMING_LABEL_WIDTH + 1 + PUBLISHER_TIMING_VALUE_WIDTH; +const PUBLISHER_TIMING_DISPLAY_UPDATE_INTERVAL: Duration = Duration::from_millis(100); + +fn publisher_timing_label(label: &str) -> String { + format!("{label}:") +} + +fn publisher_timing_value_line(label: &str, value: &str) -> String { + let label = publisher_timing_label(label); + format!( + "{label:value_width$}", + label_width = PUBLISHER_TIMING_LABEL_WIDTH, + value_width = PUBLISHER_TIMING_VALUE_WIDTH + ) +} + +fn publisher_timing_line(label: &str, timestamp_us: Option, delta: &str) -> String { + let label = publisher_timing_label(label); + match timestamp_us { + Some(timestamp_us) => format!( + "{label:timestamp_width$} {delta:>delta_width$}", + timestamp = format_time_of_day_us(timestamp_us), + delta = delta, + label_width = PUBLISHER_TIMING_LABEL_WIDTH, + timestamp_width = PUBLISHER_TIMING_TIMESTAMP_WIDTH, + delta_width = PUBLISHER_TIMING_DELTA_WIDTH + ), + None => format!( + "{label:timestamp_width$} {delta:>delta_width$}", + timestamp = "--:--:--:---", + delta = "+--.-ms", + label_width = PUBLISHER_TIMING_LABEL_WIDTH, + timestamp_width = PUBLISHER_TIMING_TIMESTAMP_WIDTH, + delta_width = PUBLISHER_TIMING_DELTA_WIDTH + ), + } } -fn burned_stats_line(label: &str, value: impl std::fmt::Display) -> String { - format!("{label:<17}{value}") +fn publisher_timing_frame_id_line(frame_id: Option) -> String { + let frame_id = frame_id.map(|id| id.to_string()).unwrap_or_else(|| "NA".to_string()); + publisher_timing_value_line("frame ID", &frame_id) } fn build_publisher_timing_lines( sample: PublisherTimingSample, - publish_latency_display: &str, + overlay_values: &PublisherTimingOverlayValues, ) -> Vec { + let base = sample.sensor_exposure_timestamp_us; vec![ - burned_stats_line("FRAME ID:", frame_id_label(sample.frame_id)), - burned_stats_line("CAPT TIMESTAMP:", format_timestamp_us(sample.capture_timestamp_us)), - burned_stats_line("READ TIMESTAMP:", format_timestamp_us(sample.read_timestamp_us)), - burned_stats_line("SENT TIMESTAMP:", format_timestamp_us(sample.sent_timestamp_us)), - burned_stats_line("PUBLISH LATENCY:", publish_latency_display), + publisher_timing_frame_id_line(sample.frame_id), + publisher_timing_line( + "sensor exposure", + Some(base), + &overlay_values.deltas.sensor_exposure, + ), + publisher_timing_line( + "got frame buffer", + sample.got_frame_buffer_timestamp_us, + &overlay_values.deltas.got_frame_buffer, + ), + publisher_timing_line( + "encoder upload", + sample.encoder_upload_timestamp_us, + &overlay_values.deltas.encoder_upload, + ), + publisher_timing_line( + "encoder output", + sample.encoder_output_timestamp_us, + &overlay_values.deltas.encoder_output, + ), + publisher_timing_line( + "webrtc packetize", + sample.webrtc_packetize_timestamp_us, + &overlay_values.deltas.webrtc_packetize, + ), + publisher_timing_value_line("Exposure to Send", &overlay_values.exp2send_latency), ] } +#[cfg(test)] +fn assert_publisher_timing_lines_are_stable(lines: &[String]) { + assert!(lines.iter().all(|line| line.len() == PUBLISHER_TIMING_LINE_WIDTH)); +} + fn video_size(shared: &Arc>) -> Option<(u32, u32)> { let s = shared.lock(); if s.width > 0 && s.height > 0 { @@ -157,21 +271,290 @@ fn video_size(shared: &Arc>) -> Option<(u32, u32)> { } } -fn publisher_timing_text(shared: &Arc>) -> Option { - let s = shared.lock(); - let sample = s.timing_sample?; - let latency_display = if s.publish_latency_display.is_empty() { - format_us_delta_ms(sample.sent_timestamp_us, sample.capture_timestamp_us) +#[derive(Default)] +struct PublisherTimingOverlayState { + displayed_timing_deltas: Option, + displayed_exp2send_latency: Option, + last_latency_update: Option, +} + +#[derive(Clone, Debug)] +struct PublisherTimingDeltaValues { + sensor_exposure: String, + got_frame_buffer: String, + encoder_upload: String, + encoder_output: String, + webrtc_packetize: String, +} + +impl PublisherTimingDeltaValues { + fn from_sample(sample: PublisherTimingSample) -> Self { + let base = sample.sensor_exposure_timestamp_us; + Self { + sensor_exposure: format_timing_delta_ms(base, base), + got_frame_buffer: format_optional_timing_delta_ms( + sample.got_frame_buffer_timestamp_us, + Some(base), + ), + encoder_upload: format_optional_timing_delta_ms( + sample.encoder_upload_timestamp_us, + sample.got_frame_buffer_timestamp_us, + ), + encoder_output: format_optional_timing_delta_ms( + sample.encoder_output_timestamp_us, + sample.encoder_upload_timestamp_us, + ), + webrtc_packetize: format_optional_timing_delta_ms( + sample.webrtc_packetize_timestamp_us, + sample.encoder_output_timestamp_us, + ), + } + } +} + +struct PublisherTimingOverlayValues { + deltas: PublisherTimingDeltaValues, + exp2send_latency: String, +} + +impl PublisherTimingOverlayState { + fn overlay_values( + &mut self, + sample: PublisherTimingSample, + now: Instant, + ) -> PublisherTimingOverlayValues { + let should_update = self.last_latency_update.map_or(true, |last_update| { + now.duration_since(last_update) >= PUBLISHER_TIMING_DISPLAY_UPDATE_INTERVAL + }); + + if should_update { + self.displayed_timing_deltas = Some(PublisherTimingDeltaValues::from_sample(sample)); + self.displayed_exp2send_latency = + sample.webrtc_packetize_timestamp_us.map(|webrtc_packetize_timestamp_us| { + format_latency_ms( + webrtc_packetize_timestamp_us, + sample.sensor_exposure_timestamp_us, + ) + }); + self.last_latency_update = Some(now); + } + + PublisherTimingOverlayValues { + deltas: self + .displayed_timing_deltas + .clone() + .unwrap_or_else(|| PublisherTimingDeltaValues::from_sample(sample)), + exp2send_latency: self + .displayed_exp2send_latency + .clone() + .unwrap_or_else(|| "NA".to_string()), + } + } +} + +fn video_status_line( + width: u32, + height: u32, + fps: f32, + codec: &str, + codec_implementation: &str, + simulcast: bool, +) -> String { + let codec = codec_with_implementation(codec, codec_implementation); + if simulcast { + format!("{}x{} {:.1}fps {codec} Simulcast", width, height, fps.max(0.0)) } else { - s.publish_latency_display.clone() + format!("{}x{} {:.1}fps {codec}", width, height, fps.max(0.0)) + } +} + +fn publisher_overlay_lines( + shared: &Arc>, + overlay_state: &mut PublisherTimingOverlayState, + now: Instant, +) -> Option> { + let (status_line, sample) = { + let s = shared.lock(); + if s.width == 0 || s.height == 0 { + return None; + } + + ( + video_status_line( + s.width, + s.height, + s.fps, + &s.codec, + &s.codec_implementation, + s.simulcast, + ), + s.timing_sample, + ) }; - Some(build_publisher_timing_lines(sample, &latency_display).join("\n")) + + let mut lines = vec![status_line]; + if let Some(sample) = sample { + let overlay_values = overlay_state.overlay_values(sample, now); + lines.extend(build_publisher_timing_lines(sample, &overlay_values)); + } + Some(lines) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn timestamp_us(hour: u64, minute: u64, second: u64, millisecond: u64) -> u64 { + (((hour * 3_600 + minute * 60 + second) * 1_000) + millisecond) * 1_000 + } + + fn overlay_values( + sample: PublisherTimingSample, + exp2send_latency: &str, + ) -> PublisherTimingOverlayValues { + PublisherTimingOverlayValues { + deltas: PublisherTimingDeltaValues::from_sample(sample), + exp2send_latency: exp2send_latency.to_string(), + } + } + + #[test] + fn publisher_overlay_shows_status_without_timing() { + let shared = Arc::new(Mutex::new(SharedYuv::default())); + { + let mut s = shared.lock(); + s.width = 1280; + s.height = 720; + s.codec = "H264".to_string(); + s.codec_implementation = "NVIDIA H264 Encoder".to_string(); + s.fps = 29.6; + s.simulcast = true; + } + + let mut overlay_state = PublisherTimingOverlayState::default(); + let lines = publisher_overlay_lines(&shared, &mut overlay_state, Instant::now()) + .expect("status overlay should render"); + + assert_eq!(lines, vec!["1280x720 29.6fps H264 NVENC Simulcast"]); + } + + #[test] + fn publisher_timing_lines_match_requested_format() { + let base = timestamp_us(1, 2, 3, 456); + let sample = PublisherTimingSample { + frame_id: Some(7), + sensor_exposure_timestamp_us: base, + got_frame_buffer_timestamp_us: Some(base + 32_400), + encoder_upload_timestamp_us: Some(base + 35_500), + encoder_output_timestamp_us: Some(base + 55_300), + webrtc_packetize_timestamp_us: Some(base + 56_900), + }; + + let overlay_values = overlay_values(sample, "56.9ms"); + let lines = build_publisher_timing_lines(sample, &overlay_values); + assert_publisher_timing_lines_are_stable(&lines); + assert_eq!( + lines, + vec![ + "frame ID: 7", + "sensor exposure: 01:02:03:456 0.0ms", + "got frame buffer: 01:02:03:488 +32.4ms", + "encoder upload: 01:02:03:491 +3.1ms", + "encoder output: 01:02:03:511 +19.8ms", + "webrtc packetize: 01:02:03:512 +1.6ms", + "Exposure to Send: 56.9ms", + ] + ); + } + + #[test] + fn publisher_timing_lines_use_placeholder_for_missing_async_stages() { + let base = timestamp_us(1, 2, 3, 456); + let mut sample = PublisherTimingSample::new(base, None); + sample.got_frame_buffer_timestamp_us = Some(base + 32_400); + + let overlay_values = overlay_values(sample, "NA"); + let lines = build_publisher_timing_lines(sample, &overlay_values); + assert_publisher_timing_lines_are_stable(&lines); + assert_eq!( + lines, + vec![ + "frame ID: NA", + "sensor exposure: 01:02:03:456 0.0ms", + "got frame buffer: 01:02:03:488 +32.4ms", + "encoder upload: --:--:--:--- +--.-ms", + "encoder output: --:--:--:--- +--.-ms", + "webrtc packetize: --:--:--:--- +--.-ms", + "Exposure to Send: NA", + ] + ); + } + + #[test] + fn publisher_timing_deltas_are_relative_to_previous_stage() { + let base = timestamp_us(0, 0, 1, 0); + let sample = PublisherTimingSample { + frame_id: None, + sensor_exposure_timestamp_us: base, + got_frame_buffer_timestamp_us: Some(base + 1_500_000), + encoder_upload_timestamp_us: Some(base + 1_600_000), + encoder_output_timestamp_us: None, + webrtc_packetize_timestamp_us: None, + }; + + let overlay_values = overlay_values(sample, "NA"); + let lines = build_publisher_timing_lines(sample, &overlay_values); + assert_publisher_timing_lines_are_stable(&lines); + assert_eq!(lines[2], "got frame buffer: 00:00:02:500 +1500.0ms"); + assert_eq!(lines[3], "encoder upload: 00:00:02:600 +100.0ms"); + } + + #[test] + fn publisher_timing_exp2send_latency_refreshes_at_ten_hz() { + let mut overlay_state = PublisherTimingOverlayState::default(); + let now = Instant::now(); + + let sample = PublisherTimingSample { + frame_id: Some(1), + sensor_exposure_timestamp_us: 1_000, + got_frame_buffer_timestamp_us: Some(2_000), + encoder_upload_timestamp_us: Some(5_100), + encoder_output_timestamp_us: Some(24_900), + webrtc_packetize_timestamp_us: Some(57_900), + }; + let overlay_values = overlay_state.overlay_values(sample, now); + assert_eq!(overlay_values.deltas.encoder_upload, "+3.1ms"); + assert_eq!(overlay_values.deltas.encoder_output, "+19.8ms"); + assert_eq!(overlay_values.deltas.webrtc_packetize, "+33.0ms"); + assert_eq!(overlay_values.exp2send_latency, "56.9ms"); + + let sample = PublisherTimingSample { + frame_id: Some(2), + sensor_exposure_timestamp_us: 1_000_000, + got_frame_buffer_timestamp_us: Some(1_001_000), + encoder_upload_timestamp_us: Some(1_011_000), + encoder_output_timestamp_us: Some(1_031_000), + webrtc_packetize_timestamp_us: Some(1_100_000), + }; + let overlay_values = overlay_state.overlay_values(sample, now + Duration::from_millis(99)); + assert_eq!(overlay_values.deltas.encoder_upload, "+3.1ms"); + assert_eq!(overlay_values.deltas.encoder_output, "+19.8ms"); + assert_eq!(overlay_values.deltas.webrtc_packetize, "+33.0ms"); + assert_eq!(overlay_values.exp2send_latency, "56.9ms"); + + let overlay_values = overlay_state.overlay_values(sample, now + Duration::from_millis(100)); + assert_eq!(overlay_values.deltas.encoder_upload, "+10.0ms"); + assert_eq!(overlay_values.deltas.encoder_output, "+20.0ms"); + assert_eq!(overlay_values.deltas.webrtc_packetize, "+69.0ms"); + assert_eq!(overlay_values.exp2send_latency, "100.0ms"); + } } struct VideoApp { shared: Arc>, ctrl_c_received: Arc, viewport: AspectConstrainedViewport, + timing_overlay_state: PublisherTimingOverlayState, } impl eframe::App for VideoApp { @@ -216,39 +599,27 @@ impl eframe::App for VideoApp { ); }); - egui::Area::new("video_hud".into()) - .anchor(egui::Align2::RIGHT_TOP, egui::vec2(-10.0, 10.0)) - .interactable(false) - .show(ctx, |ui| { - let s = self.shared.lock(); - if s.width == 0 || s.height == 0 || s.fps <= 0.0 || s.codec.is_empty() { - return; - } - let text = format!("{} {}x{} {:.1}fps", s.codec, s.width, s.height, s.fps); - egui::Frame::NONE - .fill(egui::Color32::from_black_alpha(140)) - .corner_radius(egui::CornerRadius::same(4)) - .inner_margin(egui::Margin::same(6)) - .show(ui, |ui| { - ui.add( - egui::Label::new(egui::RichText::new(text).color(egui::Color32::WHITE)) - .extend(), - ); - }); - }); - - egui::Area::new("publisher_timing".into()) + egui::Area::new("publisher_overlay".into()) .anchor(egui::Align2::LEFT_TOP, egui::vec2(10.0, 10.0)) .interactable(false) .show(ctx, |ui| { - let Some(text) = publisher_timing_text(&self.shared) else { + let Some(lines) = publisher_overlay_lines( + &self.shared, + &mut self.timing_overlay_state, + Instant::now(), + ) else { return; }; + let has_timing = lines.len() > 1; + let text = lines.join("\n"); egui::Frame::NONE .fill(egui::Color32::from_black_alpha(160)) .corner_radius(egui::CornerRadius::same(4)) .inner_margin(egui::Margin::same(6)) .show(ui, |ui| { + if has_timing { + ui.set_min_width(PUBLISHER_TIMING_LINE_WIDTH as f32 * 8.0); + } ui.add( egui::Label::new( egui::RichText::new(text).monospace().color(egui::Color32::WHITE), @@ -272,6 +643,7 @@ pub(crate) fn run_display( shared, ctrl_c_received: ctrl_c_received.clone(), viewport: AspectConstrainedViewport::new(initial_aspect), + timing_overlay_state: PublisherTimingOverlayState::default(), }; let native_options = viewport_aspect::native_options(initial_aspect); let result = eframe::run_native(title, native_options, Box::new(|_| Ok(Box::new(app)))); diff --git a/libwebrtc/src/native/packet_trailer.rs b/libwebrtc/src/native/packet_trailer.rs index 4fd2f4c26..6ba68a909 100644 --- a/libwebrtc/src/native/packet_trailer.rs +++ b/libwebrtc/src/native/packet_trailer.rs @@ -26,6 +26,8 @@ //! internal map keyed by RTP timestamp. Decoded frames look up their //! metadata via lookup_frame_metadata(rtp_timestamp). +use std::sync::Arc; + use cxx::SharedPtr; use webrtc_sys::packet_trailer::ffi as sys_pt; @@ -34,6 +36,103 @@ use crate::{ rtp_sender::RtpSender, }; +/// Stage reached by a native local video frame in the publish pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PublishTimingStage { + /// The adapted raw frame was handed to WebRTC's encoder path. + EncoderUpload, + /// WebRTC produced an encoded frame for packetization. + EncoderOutput, + /// The encoded frame was handed back to WebRTC's packetizer. + WebrtcPacketize, +} + +/// Stage reached by a native remote video frame in the subscribe pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SubscribeTimingStage { + /// WebRTC produced an encoded frame after RTP depacketization. + WebrtcReceive, + /// The encoded frame was handed to WebRTC's decoder. + DecoderUpload, + /// WebRTC produced a decoded frame for the native video sink. + DecoderOutput, +} + +/// Timestamped native local video publish pipeline event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PublishTimingEvent { + /// Publish pipeline stage reached by the frame. + pub stage: PublishTimingStage, + /// Wall-clock time when this stage was observed, in microseconds since the Unix epoch. + pub timestamp_us: u64, + /// User capture timestamp associated with this frame, in microseconds since the Unix epoch. + pub capture_timestamp_us: u64, + /// Optional application frame ID associated with this frame. + pub frame_id: Option, +} + +/// Timestamped native remote video subscribe pipeline event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SubscribeTimingEvent { + /// Subscribe pipeline stage reached by the frame. + pub stage: SubscribeTimingStage, + /// Wall-clock time when this stage was observed, in microseconds since the Unix epoch. + pub timestamp_us: u64, + /// User capture timestamp associated with this frame, in microseconds since the Unix epoch. + pub capture_timestamp_us: u64, + /// Optional application frame ID associated with this frame. + pub frame_id: Option, +} + +/// Callback invoked for native local video publish timing events. +pub type PublishTimingObserver = Arc; +/// Callback invoked for native remote video subscribe timing events. +pub type SubscribeTimingObserver = Arc; + +impl From for PublishTimingStage { + fn from(stage: sys_pt::VideoPublishTimingStage) -> Self { + match stage { + sys_pt::VideoPublishTimingStage::EncoderUpload => Self::EncoderUpload, + sys_pt::VideoPublishTimingStage::EncoderOutput => Self::EncoderOutput, + sys_pt::VideoPublishTimingStage::WebrtcPacketize => Self::WebrtcPacketize, + _ => Self::WebrtcPacketize, + } + } +} + +impl From for PublishTimingEvent { + fn from(event: sys_pt::VideoPublishTimingEvent) -> Self { + Self { + stage: event.stage.into(), + timestamp_us: event.timestamp_us, + capture_timestamp_us: event.capture_timestamp_us, + frame_id: (event.frame_id != 0).then_some(event.frame_id), + } + } +} + +impl From for SubscribeTimingStage { + fn from(stage: sys_pt::VideoSubscribeTimingStage) -> Self { + match stage { + sys_pt::VideoSubscribeTimingStage::WebrtcReceive => Self::WebrtcReceive, + sys_pt::VideoSubscribeTimingStage::DecoderUpload => Self::DecoderUpload, + sys_pt::VideoSubscribeTimingStage::DecoderOutput => Self::DecoderOutput, + _ => Self::DecoderOutput, + } + } +} + +impl From for SubscribeTimingEvent { + fn from(event: sys_pt::VideoSubscribeTimingEvent) -> Self { + Self { + stage: event.stage.into(), + timestamp_us: event.timestamp_us, + capture_timestamp_us: event.capture_timestamp_us, + frame_id: (event.frame_id != 0).then_some(event.frame_id), + } + } +} + /// Handler for packet trailer embedding/extraction on RTP streams. /// /// For sender side: Stores frame metadata keyed by capture timestamp @@ -96,6 +195,46 @@ impl PacketTrailerHandler { pub(crate) fn sys_handle(&self) -> SharedPtr { self.sys_handle.clone() } + + /// Set the callback receiving sender-side publish timing events. + pub fn set_publish_timing_observer(&self, observer: Option) { + if let Some(observer) = observer { + self.sys_handle.set_publish_timing_observer(Box::new( + webrtc_sys::packet_trailer::VideoPublishTimingObserverWrapper::new(Box::new( + move |event| observer(event.into()), + )), + )); + } else { + self.sys_handle.clear_publish_timing_observer(); + } + } + + /// Set the callback receiving receiver-side subscribe timing events. + pub fn set_subscribe_timing_observer(&self, observer: Option) { + if let Some(observer) = observer { + self.sys_handle.set_subscribe_timing_observer(Box::new( + webrtc_sys::packet_trailer::VideoSubscribeTimingObserverWrapper::new(Box::new( + move |event| observer(event.into()), + )), + )); + } else { + self.sys_handle.clear_subscribe_timing_observer(); + } + } + + pub(crate) fn emit_subscribe_timing( + &self, + stage: SubscribeTimingStage, + capture_timestamp_us: u64, + frame_id: u32, + ) { + let stage = match stage { + SubscribeTimingStage::WebrtcReceive => sys_pt::VideoSubscribeTimingStage::WebrtcReceive, + SubscribeTimingStage::DecoderUpload => sys_pt::VideoSubscribeTimingStage::DecoderUpload, + SubscribeTimingStage::DecoderOutput => sys_pt::VideoSubscribeTimingStage::DecoderOutput, + }; + self.sys_handle.emit_subscribe_timing(stage, capture_timestamp_us, frame_id); + } } /// Create a sender-side packet trailer handler. diff --git a/libwebrtc/src/native/video_stream.rs b/libwebrtc/src/native/video_stream.rs index 3dcab976e..cebc11b3a 100644 --- a/libwebrtc/src/native/video_stream.rs +++ b/libwebrtc/src/native/video_stream.rs @@ -28,7 +28,7 @@ use parking_lot::Mutex; use rtrb::{Consumer, Producer, PushError, RingBuffer}; use webrtc_sys::video_track as sys_vt; -use super::video_frame::new_video_frame_buffer; +use super::{packet_trailer::SubscribeTimingStage, video_frame::new_video_frame_buffer}; use crate::{ native::packet_trailer::PacketTrailerHandler, video_frame::{BoxVideoFrame, FrameMetadata, VideoFrame}, @@ -108,11 +108,15 @@ struct VideoTrackObserver { impl sys_vt::VideoSink for VideoTrackObserver { fn on_frame(&self, frame: UniquePtr) { let rtp_timestamp = frame.timestamp(); - let frame_metadata = self - .packet_trailer_handler - .lock() + let packet_trailer_handler = self.packet_trailer_handler.lock().clone(); + let frame_metadata = packet_trailer_handler .as_ref() - .and_then(|h| h.lookup_frame_metadata(rtp_timestamp)) + .and_then(|handler| { + handler.lookup_frame_metadata(rtp_timestamp).map(|(ts, fid)| { + handler.emit_subscribe_timing(SubscribeTimingStage::DecoderOutput, ts, fid); + (ts, fid) + }) + }) .map(|(ts, fid)| FrameMetadata { user_timestamp: Some(ts), frame_id: if fid != 0 { Some(fid) } else { None }, diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 909e1c7f9..ff88d6995 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -27,8 +27,9 @@ pub use crate::{ }, publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication}, track::{ - AudioTrack, LocalAudioTrack, LocalTrack, LocalVideoTrack, RemoteAudioTrack, RemoteTrack, - RemoteVideoTrack, StreamState, Track, TrackDimension, TrackKind, TrackSource, VideoTrack, + AudioTrack, LocalAudioTrack, LocalTrack, LocalVideoTrack, PublishTimingEvent, + PublishTimingObserver, PublishTimingStage, RemoteAudioTrack, RemoteTrack, RemoteVideoTrack, + StreamState, Track, TrackDimension, TrackKind, TrackSource, VideoTrack, }, ConnectionState, DataPacket, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions, RoomResult, RoomSdkOptions, SipDTMF, Transcription, TranscriptionSegment, diff --git a/livekit/src/room/participant/local_participant.rs b/livekit/src/room/participant/local_participant.rs index c5c26cb50..bf4353f11 100644 --- a/livekit/src/room/participant/local_participant.rs +++ b/livekit/src/room/participant/local_participant.rs @@ -57,6 +57,52 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); type LocalTrackPublishedHandler = Box; type LocalTrackUnpublishedHandler = Box; +fn needs_video_sender_transformer( + options: &TrackPublishOptions, + has_publish_timing_observer: bool, +) -> bool { + !options.packet_trailer_features.is_empty() || has_publish_timing_observer +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::options::PacketTrailerFeatures; + + #[test] + fn timing_observer_requests_video_sender_transformer_without_packet_trailers() { + let options = TrackPublishOptions { + packet_trailer_features: PacketTrailerFeatures::default(), + ..Default::default() + }; + + assert!(needs_video_sender_transformer(&options, true)); + } + + #[test] + fn packet_trailer_features_request_video_sender_transformer_without_timing_observer() { + let options = TrackPublishOptions { + packet_trailer_features: PacketTrailerFeatures { + user_timestamp: true, + frame_id: false, + }, + ..Default::default() + }; + + assert!(needs_video_sender_transformer(&options, false)); + } + + #[test] + fn video_sender_transformer_is_skipped_without_timing_or_packet_trailers() { + let options = TrackPublishOptions { + packet_trailer_features: PacketTrailerFeatures::default(), + ..Default::default() + }; + + assert!(!needs_video_sender_transformer(&options, false)); + } +} + #[derive(Default)] struct LocalEvents { local_track_published: Mutex>, @@ -355,14 +401,22 @@ impl LocalParticipant { track.set_transceiver(Some(transceiver)); - if !options.packet_trailer_features.is_empty() { - if let LocalTrack::Video(video_track) = &track { - log::info!("packet_trailer enabled for local video track {}", publication.sid(),); + if let LocalTrack::Video(video_track) = &track { + let has_timing_observer = video_track.has_publish_timing_observer(); + if needs_video_sender_transformer(&options, has_timing_observer) { + let trailers_enabled = !options.packet_trailer_features.is_empty(); + log::info!( + "sender frame transformer enabled for local video track {} (packet_trailer={}, publish_timing={})", + publication.sid(), + trailers_enabled, + has_timing_observer, + ); let sender = track.transceiver().unwrap().sender(); let handler = packet_trailer::create_sender_handler( LkRuntime::instance().pc_factory(), &sender, ); + handler.set_enabled(trailers_enabled); video_track.set_packet_trailer_handler(handler.clone()); #[cfg(not(target_arch = "wasm32"))] diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index 0da8f683c..e7212dc22 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -14,7 +14,15 @@ use std::{fmt::Debug, sync::Arc}; -use libwebrtc::{native::packet_trailer::PacketTrailerHandler, prelude::*, stats::RtcStats}; +use libwebrtc::{ + native::packet_trailer::{ + PacketTrailerHandler, PublishTimingEvent as RtcPublishTimingEvent, + PublishTimingObserver as RtcPublishTimingObserver, + PublishTimingStage as RtcPublishTimingStage, + }, + prelude::*, + stats::RtcStats, +}; use livekit_protocol as proto; use parking_lot::Mutex; @@ -26,6 +34,57 @@ pub struct LocalVideoTrack { inner: Arc, source: RtcVideoSource, packet_trailer_handler: Arc>>, + publish_timing_observer: Arc>>>, +} + +type PublishTimingObserverFn = dyn Fn(PublishTimingEvent) + Send + Sync + 'static; + +/// Callback invoked for native local video publish timing events. +pub type PublishTimingObserver = Box; + +/// Stage reached by a native local video frame in the publish pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PublishTimingStage { + /// The adapted raw frame was handed to WebRTC's encoder path. + EncoderUpload, + /// WebRTC produced an encoded frame for packetization. + EncoderOutput, + /// The encoded frame was handed back to WebRTC's packetizer. + WebrtcPacketize, +} + +/// Timestamped native local video publish pipeline event. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PublishTimingEvent { + /// Publish pipeline stage reached by the frame. + pub stage: PublishTimingStage, + /// Wall-clock time when this stage was observed, in microseconds since the Unix epoch. + pub timestamp_us: u64, + /// User capture timestamp associated with this frame, in microseconds since the Unix epoch. + pub capture_timestamp_us: u64, + /// Optional application frame ID associated with this frame. + pub frame_id: Option, +} + +impl From for PublishTimingStage { + fn from(stage: RtcPublishTimingStage) -> Self { + match stage { + RtcPublishTimingStage::EncoderUpload => Self::EncoderUpload, + RtcPublishTimingStage::EncoderOutput => Self::EncoderOutput, + RtcPublishTimingStage::WebrtcPacketize => Self::WebrtcPacketize, + } + } +} + +impl From for PublishTimingEvent { + fn from(event: RtcPublishTimingEvent) -> Self { + Self { + stage: event.stage.into(), + timestamp_us: event.timestamp_us, + capture_timestamp_us: event.capture_timestamp_us, + frame_id: event.frame_id, + } + } } impl Debug for LocalVideoTrack { @@ -49,6 +108,7 @@ impl LocalVideoTrack { )), source, packet_trailer_handler: Arc::new(Mutex::new(None)), + publish_timing_observer: Arc::new(Mutex::new(None)), } } @@ -126,6 +186,19 @@ impl LocalVideoTrack { self.source.clone() } + /// Set a callback for native local video publish pipeline timing events. + /// + /// The observer is invoked from WebRTC worker threads and should avoid + /// blocking. Pass `None` to clear a previously registered observer. + pub fn set_publish_timing_observer(&self, observer: Option) { + *self.publish_timing_observer.lock() = observer.map(Arc::from); + + let handler = self.packet_trailer_handler.lock().clone(); + if let Some(handler) = handler { + self.apply_publish_timing_observer(&handler); + } + } + /// Returns the packet trailer handler associated with this track, if any. /// When present on the sender side, callers can store per-frame user /// timestamps which will be embedded into encoded frames. @@ -133,11 +206,26 @@ impl LocalVideoTrack { self.packet_trailer_handler.lock().clone() } + pub(crate) fn has_publish_timing_observer(&self) -> bool { + self.publish_timing_observer.lock().is_some() + } + /// Internal: set the packet trailer handler used for this track. pub(crate) fn set_packet_trailer_handler(&self, handler: PacketTrailerHandler) { + self.apply_publish_timing_observer(&handler); *self.packet_trailer_handler.lock() = Some(handler); } + fn apply_publish_timing_observer(&self, handler: &PacketTrailerHandler) { + let observer = self.publish_timing_observer.lock().clone(); + let observer = observer.map(|observer| { + Arc::new(move |event: RtcPublishTimingEvent| { + observer(event.into()); + }) as RtcPublishTimingObserver + }); + handler.set_publish_timing_observer(observer); + } + pub async fn get_stats(&self) -> RoomResult> { super::local_track::get_stats(&self.inner).await } diff --git a/livekit/tests/packet_trailer_test.rs b/livekit/tests/packet_trailer_test.rs index b89108d73..c0aae32a1 100644 --- a/livekit/tests/packet_trailer_test.rs +++ b/livekit/tests/packet_trailer_test.rs @@ -104,6 +104,17 @@ async fn test_timestamp_and_frame_id_vp8_e2ee() -> Result<()> { .await } +#[test_log::test(tokio::test)] +async fn test_timestamp_and_frame_id_av1() -> Result<()> { + run_packet_trailer_test(PacketTrailerTestParams { + attach_timestamp: true, + attach_frame_id: true, + e2ee: false, + codec: VideoCodec::AV1, + }) + .await +} + // ==================== Implementation ==================== /// Publishes solid-color video frames with packet trailer metadata (user_timestamp diff --git a/webrtc-sys/include/livekit/packet_trailer.h b/webrtc-sys/include/livekit/packet_trailer.h index 200840385..f7f38c53f 100644 --- a/webrtc-sys/include/livekit/packet_trailer.h +++ b/webrtc-sys/include/livekit/packet_trailer.h @@ -41,6 +41,10 @@ namespace livekit_ffi { class PeerConnectionFactory; class RtpSender; class RtpReceiver; +enum class VideoPublishTimingStage : int32_t; +enum class VideoSubscribeTimingStage : int32_t; +struct VideoPublishTimingObserverWrapper; +struct VideoSubscribeTimingObserverWrapper; } // namespace livekit_ffi namespace livekit_ffi { @@ -119,22 +123,57 @@ class PacketTrailerTransformer : public webrtc::FrameTransformerInterface { uint64_t user_timestamp, uint32_t frame_id); + /// Set the observer receiving sender-side publish timing events. + void set_publish_timing_observer( + rust::Box observer); + + /// Clear the observer receiving sender-side publish timing events. + void clear_publish_timing_observer(); + + /// Emit a sender-side publish timing event. + void emit_publish_timing(VideoPublishTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const; + + /// Set the observer receiving receiver-side subscribe timing events. + void set_subscribe_timing_observer( + rust::Box observer); + + /// Clear the observer receiving receiver-side subscribe timing events. + void clear_subscribe_timing_observer(); + + /// Emit a receiver-side subscribe timing event. + void emit_subscribe_timing(VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const; + private: void TransformSend( std::unique_ptr frame); void TransformReceive( std::unique_ptr frame); + void emit_subscribe_timing(VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id, + uint64_t timestamp_us) const; + + PacketTrailerMetadata LookupSendMetadata( + const webrtc::TransformableFrameInterface& frame, + uint32_t ssrc, + uint32_t rtp_timestamp) const; /// Append frame metadata trailer to frame data std::vector AppendTrailer( webrtc::ArrayView data, uint64_t user_timestamp, - uint32_t frame_id); + uint32_t frame_id, + bool is_av1); /// Extract and remove frame metadata trailer from frame data std::optional ExtractTrailer( webrtc::ArrayView data, - std::vector& out_data); + std::vector& out_data, + bool is_av1); const Direction direction_; std::atomic enabled_{true}; @@ -161,6 +200,13 @@ class PacketTrailerTransformer : public webrtc::FrameTransformerInterface { // Simulcast tracking: detect layer switches and flush stale entries. mutable uint32_t recv_active_ssrc_{0}; + + mutable webrtc::Mutex publish_timing_observer_mutex_; + mutable std::shared_ptr> + publish_timing_observer_; + mutable webrtc::Mutex subscribe_timing_observer_mutex_; + mutable std::shared_ptr> + subscribe_timing_observer_; }; /// Wrapper class for Rust FFI that manages packet trailer transformers. @@ -194,6 +240,30 @@ class PacketTrailerHandler { uint64_t user_timestamp, uint32_t frame_id) const; + /// Set the observer receiving sender-side publish timing events. + void set_publish_timing_observer( + rust::Box observer) const; + + /// Clear the observer receiving sender-side publish timing events. + void clear_publish_timing_observer() const; + + /// Emit a sender-side publish timing event. + void emit_publish_timing(VideoPublishTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const; + + /// Set the observer receiving receiver-side subscribe timing events. + void set_subscribe_timing_observer( + rust::Box observer) const; + + /// Clear the observer receiving receiver-side subscribe timing events. + void clear_subscribe_timing_observer() const; + + /// Emit a receiver-side subscribe timing event. + void emit_subscribe_timing(VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const; + /// Access the underlying transformer for chaining. webrtc::scoped_refptr transformer() const; diff --git a/webrtc-sys/src/packet_trailer.cpp b/webrtc-sys/src/packet_trailer.cpp index 4aa1cc1d2..78e27d05b 100644 --- a/webrtc-sys/src/packet_trailer.cpp +++ b/webrtc-sys/src/packet_trailer.cpp @@ -16,8 +16,13 @@ #include "livekit/packet_trailer.h" +#include +#include +#include #include #include +#include +#include #include "api/make_ref_counted.h" #include "livekit/peer_connection_factory.h" @@ -28,6 +33,280 @@ namespace livekit_ffi { +namespace { + +uint64_t CurrentUnixTimeMicros() { + auto now = std::chrono::system_clock::now().time_since_epoch(); + return static_cast( + std::chrono::duration_cast(now).count()); +} + +constexpr uint8_t kAv1ObuSizePresentBit = 0b0000'0010; +constexpr uint8_t kAv1ObuExtensionFlag = 0b0000'0100; +constexpr uint8_t kAv1ObuTypeMask = 0b0111'1000; +constexpr uint8_t kAv1ObuTypeSequenceHeader = 1; +constexpr uint8_t kAv1ObuTypeTemporalDelimiter = 2; +constexpr uint8_t kAv1ObuTypeMetadata = 5; +constexpr uint64_t kAv1MetadataTypeLiveKitPacketTrailer = 31; + +bool IsAv1Frame(const webrtc::TransformableFrameInterface& frame) { + std::string mime_type = frame.GetMimeType(); + std::transform(mime_type.begin(), mime_type.end(), mime_type.begin(), + [](unsigned char c) { + return static_cast(std::tolower(c)); + }); + return mime_type.find("av1") != std::string::npos; +} + +void WriteLeb128(uint64_t value, std::vector& out) { + while (value >= 0x80) { + out.push_back(static_cast((value & 0x7F) | 0x80)); + value >>= 7; + } + out.push_back(static_cast(value)); +} + +bool ReadLeb128(webrtc::ArrayView data, + size_t& pos, + uint64_t& value) { + value = 0; + int shift = 0; + for (int bytes = 0; bytes < 8; ++bytes) { + if (pos >= data.size()) { + return false; + } + uint8_t byte = data[pos++]; + value |= static_cast(byte & 0x7F) << shift; + if ((byte & 0x80) == 0) { + return true; + } + shift += 7; + } + return false; +} + +std::vector BuildTrailerPayload(uint64_t user_timestamp, + uint32_t frame_id) { + const bool has_frame_id = frame_id != 0; + const size_t trailer_len = kTimestampTlvSize + + (has_frame_id ? kFrameIdTlvSize : 0) + + kTrailerEnvelopeSize; + std::vector trailer; + trailer.reserve(trailer_len); + + // All TLV bytes are XORed with 0xFF to prevent H.264 NAL start code + // sequences (0x000001 / 0x00000001) from appearing inside the trailer. + trailer.push_back(kTagTimestampUs ^ 0xFF); + trailer.push_back(8 ^ 0xFF); + for (int i = 7; i >= 0; --i) { + trailer.push_back( + static_cast(((user_timestamp >> (i * 8)) & 0xFF) ^ 0xFF)); + } + + if (has_frame_id) { + trailer.push_back(kTagFrameId ^ 0xFF); + trailer.push_back(4 ^ 0xFF); + for (int i = 3; i >= 0; --i) { + trailer.push_back( + static_cast(((frame_id >> (i * 8)) & 0xFF) ^ 0xFF)); + } + } + + trailer.push_back(static_cast(trailer_len ^ 0xFF)); + trailer.insert(trailer.end(), std::begin(kPacketTrailerMagic), + std::end(kPacketTrailerMagic)); + return trailer; +} + +std::optional ParseTrailerPayload( + webrtc::ArrayView trailer) { + if (trailer.size() < kTrailerEnvelopeSize) { + return std::nullopt; + } + + const uint8_t* magic_start = trailer.data() + trailer.size() - 4; + if (std::memcmp(magic_start, kPacketTrailerMagic, 4) != 0) { + return std::nullopt; + } + + uint8_t trailer_len = trailer[trailer.size() - 5] ^ 0xFF; + if (trailer_len != trailer.size() || trailer_len < kTrailerEnvelopeSize) { + return std::nullopt; + } + + size_t tlv_region_len = trailer_len - kTrailerEnvelopeSize; + PacketTrailerMetadata meta{0, 0, 0}; + bool found_any = false; + size_t pos = 0; + + while (pos + 2 <= tlv_region_len) { + uint8_t tag = trailer[pos] ^ 0xFF; + uint8_t len = trailer[pos + 1] ^ 0xFF; + pos += 2; + + if (pos + len > tlv_region_len) { + break; + } + + const uint8_t* val = trailer.data() + pos; + if (tag == kTagTimestampUs && len == 8) { + uint64_t ts = 0; + for (int i = 0; i < 8; ++i) { + ts = (ts << 8) | (val[i] ^ 0xFF); + } + meta.user_timestamp = ts; + found_any = true; + } else if (tag == kTagFrameId && len == 4) { + uint32_t fid = 0; + for (int i = 0; i < 4; ++i) { + fid = (fid << 8) | (val[i] ^ 0xFF); + } + meta.frame_id = fid; + found_any = true; + } + + pos += len; + } + + if (!found_any) { + return std::nullopt; + } + return meta; +} + +std::vector BuildAv1MetadataObu( + webrtc::ArrayView trailer) { + std::vector metadata_payload; + WriteLeb128(kAv1MetadataTypeLiveKitPacketTrailer, metadata_payload); + metadata_payload.insert(metadata_payload.end(), trailer.begin(), trailer.end()); + + std::vector obu; + obu.reserve(1 + 8 + metadata_payload.size()); + obu.push_back(static_cast((kAv1ObuTypeMetadata << 3) | + kAv1ObuSizePresentBit)); + WriteLeb128(metadata_payload.size(), obu); + obu.insert(obu.end(), metadata_payload.begin(), metadata_payload.end()); + return obu; +} + +size_t FindAv1MetadataInsertOffset(webrtc::ArrayView data) { + size_t pos = 0; + size_t insert_offset = 0; + + while (pos < data.size()) { + const size_t obu_start = pos; + uint8_t obu_header = data[pos++]; + if ((obu_header & 0x80) != 0) { + return 0; + } + + const uint8_t obu_type = (obu_header & kAv1ObuTypeMask) >> 3; + if ((obu_header & kAv1ObuExtensionFlag) != 0) { + if (pos >= data.size()) { + return 0; + } + ++pos; + } + + size_t payload_size = data.size() - pos; + if ((obu_header & kAv1ObuSizePresentBit) != 0) { + uint64_t leb_payload_size = 0; + if (!ReadLeb128(data, pos, leb_payload_size) || + leb_payload_size > data.size() - pos) { + return 0; + } + payload_size = static_cast(leb_payload_size); + } + + const size_t obu_end = pos + payload_size; + if (obu_type == kAv1ObuTypeTemporalDelimiter) { + pos = obu_end; + continue; + } + + if (obu_type != kAv1ObuTypeSequenceHeader) { + break; + } + + insert_offset = obu_end; + pos = obu_end; + + if ((data[obu_start] & kAv1ObuSizePresentBit) == 0) { + break; + } + } + + return insert_offset; +} + +std::optional ExtractAv1Trailer( + webrtc::ArrayView data, + std::vector& out_data) { + std::vector stripped_data; + stripped_data.reserve(data.size()); + size_t pos = 0; + + while (pos < data.size()) { + const size_t obu_start = pos; + uint8_t obu_header = data[pos++]; + if ((obu_header & 0x80) != 0) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + const uint8_t obu_type = (obu_header & kAv1ObuTypeMask) >> 3; + if ((obu_header & kAv1ObuExtensionFlag) != 0) { + if (pos >= data.size()) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + ++pos; + } + + size_t payload_size = data.size() - pos; + if ((obu_header & kAv1ObuSizePresentBit) != 0) { + uint64_t leb_payload_size = 0; + if (!ReadLeb128(data, pos, leb_payload_size) || + leb_payload_size > data.size() - pos) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + payload_size = static_cast(leb_payload_size); + } + + const size_t payload_start = pos; + const size_t obu_end = payload_start + payload_size; + + if (obu_type == kAv1ObuTypeMetadata) { + auto metadata_payload = data.subview(payload_start, obu_end - payload_start); + size_t metadata_pos = 0; + uint64_t metadata_type = 0; + if (ReadLeb128(metadata_payload, metadata_pos, metadata_type) && + metadata_type == kAv1MetadataTypeLiveKitPacketTrailer && + metadata_pos <= metadata_payload.size()) { + auto trailer_payload = metadata_payload.subview( + metadata_pos, metadata_payload.size() - metadata_pos); + if (auto meta = ParseTrailerPayload(trailer_payload)) { + stripped_data.insert(stripped_data.end(), data.begin() + obu_end, + data.end()); + out_data = std::move(stripped_data); + return meta; + } + } + } + + stripped_data.insert(stripped_data.end(), data.begin() + obu_start, + data.begin() + obu_end); + + pos = obu_end; + } + + out_data.assign(data.begin(), data.end()); + return std::nullopt; +} + +} // namespace + // PacketTrailerTransformer implementation PacketTrailerTransformer::PacketTrailerTransformer(Direction direction) @@ -38,6 +317,11 @@ void PacketTrailerTransformer::Transform( uint32_t ssrc = frame->GetSsrc(); uint32_t rtp_timestamp = frame->GetTimestamp(); + if (direction_ == Direction::kSend) { + TransformSend(std::move(frame)); + return; + } + if (!enabled_.load()) { webrtc::scoped_refptr cb; { @@ -62,11 +346,7 @@ void PacketTrailerTransformer::Transform( return; } - if (direction_ == Direction::kSend) { - TransformSend(std::move(frame)); - } else { - TransformReceive(std::move(frame)); - } + TransformReceive(std::move(frame)); } void PacketTrailerTransformer::TransformSend( @@ -75,36 +355,18 @@ void PacketTrailerTransformer::TransformSend( uint32_t ssrc = frame->GetSsrc(); auto data = frame->GetData(); - - // Look up the frame metadata by the frame's capture time. - // CaptureTime() returns Timestamp::Millis(capture_time_ms_) where - // capture_time_ms_ = timestamp_us / 1000. So capture_time->us() - // has millisecond precision (bottom 3 digits always zero). - // store_frame_metadata() truncates its key the same way. - PacketTrailerMetadata meta_to_embed{0, 0, 0}; - auto capture_time = frame->CaptureTime(); - if (capture_time.has_value()) { - int64_t capture_us = capture_time->us(); - - webrtc::MutexLock lock(&send_map_mutex_); - auto it = send_map_.find(capture_us); - if (it != send_map_.end()) { - meta_to_embed = it->second; - // Don't erase — simulcast layers share the same capture time. - // Entries are pruned by capacity in store_frame_metadata(). - } - } else { - RTC_LOG(LS_WARNING) - << "PacketTrailerTransformer::TransformSend CaptureTime() not available" - << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; - } + const bool is_av1 = IsAv1Frame(*frame); + PacketTrailerMetadata meta_to_embed = + LookupSendMetadata(*frame, ssrc, rtp_timestamp); + emit_publish_timing(VideoPublishTimingStage::EncoderOutput, + meta_to_embed.user_timestamp, meta_to_embed.frame_id); // Always append trailer when enabled (even if timestamp is 0, // which indicates no metadata was set for this frame) std::vector new_data; if (enabled_.load()) { new_data = AppendTrailer(data, meta_to_embed.user_timestamp, - meta_to_embed.frame_id); + meta_to_embed.frame_id, is_av1); frame->SetData(webrtc::ArrayView(new_data)); } @@ -121,6 +383,8 @@ void PacketTrailerTransformer::TransformSend( } if (cb) { + emit_publish_timing(VideoPublishTimingStage::WebrtcPacketize, + meta_to_embed.user_timestamp, meta_to_embed.frame_id); cb->OnTransformedFrame(std::move(frame)); } else { RTC_LOG(LS_WARNING) @@ -129,17 +393,50 @@ void PacketTrailerTransformer::TransformSend( } } +PacketTrailerMetadata PacketTrailerTransformer::LookupSendMetadata( + const webrtc::TransformableFrameInterface& frame, + uint32_t ssrc, + uint32_t rtp_timestamp) const { + // Look up the frame metadata by the frame's capture time. + // CaptureTime() returns Timestamp::Millis(capture_time_ms_) where + // capture_time_ms_ = timestamp_us / 1000. So capture_time->us() + // has millisecond precision (bottom 3 digits always zero). + // store_frame_metadata() truncates its key the same way. + PacketTrailerMetadata meta_to_embed{0, 0, 0}; + auto capture_time = frame.CaptureTime(); + if (capture_time.has_value()) { + int64_t capture_us = capture_time->us(); + + webrtc::MutexLock lock(&send_map_mutex_); + auto it = send_map_.find(capture_us); + if (it != send_map_.end()) { + meta_to_embed = it->second; + // Don't erase — simulcast layers share the same capture time. + // Entries are pruned by capacity in store_frame_metadata(). + } + } else { + RTC_LOG(LS_WARNING) + << "PacketTrailerTransformer::TransformSend CaptureTime() not available" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + } + return meta_to_embed; +} + void PacketTrailerTransformer::TransformReceive( std::unique_ptr frame) { + uint64_t receive_timestamp_us = CurrentUnixTimeMicros(); uint32_t ssrc = frame->GetSsrc(); uint32_t rtp_timestamp = frame->GetTimestamp(); auto data = frame->GetData(); + const bool is_av1 = IsAv1Frame(*frame); std::vector stripped_data; - auto meta = ExtractTrailer(data, stripped_data); + PacketTrailerMetadata timing_meta{0, 0, ssrc}; + auto meta = ExtractTrailer(data, stripped_data, is_av1); if (meta.has_value()) { meta->ssrc = ssrc; + timing_meta = meta.value(); { webrtc::MutexLock lock(&recv_map_mutex_); @@ -180,6 +477,9 @@ void PacketTrailerTransformer::TransformReceive( // Update frame with stripped data frame->SetData(webrtc::ArrayView(stripped_data)); } + emit_subscribe_timing(VideoSubscribeTimingStage::WebrtcReceive, + timing_meta.user_timestamp, timing_meta.frame_id, + receive_timestamp_us); // Forward to the appropriate callback (either global or per-SSRC sink). webrtc::scoped_refptr cb; @@ -194,6 +494,8 @@ void PacketTrailerTransformer::TransformReceive( } if (cb) { + emit_subscribe_timing(VideoSubscribeTimingStage::DecoderUpload, + timing_meta.user_timestamp, timing_meta.frame_id); cb->OnTransformedFrame(std::move(frame)); } else { RTC_LOG(LS_WARNING) @@ -205,49 +507,39 @@ void PacketTrailerTransformer::TransformReceive( std::vector PacketTrailerTransformer::AppendTrailer( webrtc::ArrayView data, uint64_t user_timestamp, - uint32_t frame_id) { - const bool has_frame_id = frame_id != 0; - const size_t trailer_len = kTimestampTlvSize + - (has_frame_id ? kFrameIdTlvSize : 0) + - kTrailerEnvelopeSize; - std::vector result; - result.reserve(data.size() + trailer_len); - - // Copy original data - result.insert(result.end(), data.begin(), data.end()); - - // All TLV bytes are XORed with 0xFF to prevent H.264 NAL start code - // sequences (0x000001 / 0x00000001) from appearing inside the trailer. - - // TLV: timestamp_us (tag=0x01, len=8, 8 bytes big-endian) - result.push_back(kTagTimestampUs ^ 0xFF); - result.push_back(8 ^ 0xFF); - for (int i = 7; i >= 0; --i) { - result.push_back( - static_cast(((user_timestamp >> (i * 8)) & 0xFF) ^ 0xFF)); - } - - if (has_frame_id) { - // TLV: frame_id (tag=0x02, len=4, 4 bytes big-endian) - result.push_back(kTagFrameId ^ 0xFF); - result.push_back(4 ^ 0xFF); - for (int i = 3; i >= 0; --i) { - result.push_back( - static_cast(((frame_id >> (i * 8)) & 0xFF) ^ 0xFF)); + uint32_t frame_id, + bool is_av1) { + std::vector trailer = BuildTrailerPayload(user_timestamp, frame_id); + + if (is_av1) { + std::vector obu = BuildAv1MetadataObu(trailer); + if (data.empty()) { + return obu; } + const size_t insert_offset = FindAv1MetadataInsertOffset(data); + std::vector result; + result.reserve(data.size() + obu.size()); + result.insert(result.end(), data.begin(), data.begin() + insert_offset); + result.insert(result.end(), obu.begin(), obu.end()); + result.insert(result.end(), data.begin() + insert_offset, data.end()); + return result; } - // Envelope: trailer_len (1B, XORed) + magic (4B, NOT XORed) - result.push_back(static_cast(trailer_len ^ 0xFF)); - result.insert(result.end(), std::begin(kPacketTrailerMagic), - std::end(kPacketTrailerMagic)); - + std::vector result; + result.reserve(data.size() + trailer.size()); + result.insert(result.end(), data.begin(), data.end()); + result.insert(result.end(), trailer.begin(), trailer.end()); return result; } std::optional PacketTrailerTransformer::ExtractTrailer( webrtc::ArrayView data, - std::vector& out_data) { + std::vector& out_data, + bool is_av1) { + if (is_av1) { + return ExtractAv1Trailer(data, out_data); + } + if (data.size() < kTrailerEnvelopeSize) { out_data.assign(data.begin(), data.end()); return std::nullopt; @@ -269,48 +561,14 @@ std::optional PacketTrailerTransformer::ExtractTrailer( // Walk the TLV region: everything from trailer_start up to the envelope. const uint8_t* trailer_start = data.data() + data.size() - trailer_len; - size_t tlv_region_len = trailer_len - kTrailerEnvelopeSize; - - PacketTrailerMetadata meta{0, 0, 0}; - bool found_any = false; - size_t pos = 0; - - while (pos + 2 <= tlv_region_len) { - uint8_t tag = trailer_start[pos] ^ 0xFF; - uint8_t len = trailer_start[pos + 1] ^ 0xFF; - pos += 2; - - if (pos + len > tlv_region_len) { - break; - } - - const uint8_t* val = trailer_start + pos; - - if (tag == kTagTimestampUs && len == 8) { - uint64_t ts = 0; - for (int i = 0; i < 8; ++i) { - ts = (ts << 8) | (val[i] ^ 0xFF); - } - meta.user_timestamp = ts; - found_any = true; - } else if (tag == kTagFrameId && len == 4) { - uint32_t fid = 0; - for (int i = 0; i < 4; ++i) { - fid = (fid << 8) | (val[i] ^ 0xFF); - } - meta.frame_id = fid; - found_any = true; - } - // Unknown tags are silently skipped. - - pos += len; + auto meta = ParseTrailerPayload( + webrtc::ArrayView(trailer_start, trailer_len)); + if (!meta.has_value()) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; } out_data.assign(data.begin(), data.end() - trailer_len); - - if (!found_any) { - return std::nullopt; - } return meta; } @@ -396,6 +654,75 @@ void PacketTrailerTransformer::store_frame_metadata( send_map_[key] = PacketTrailerMetadata{user_timestamp, frame_id, 0}; } +void PacketTrailerTransformer::set_publish_timing_observer( + rust::Box observer) { + webrtc::MutexLock lock(&publish_timing_observer_mutex_); + publish_timing_observer_ = + std::make_shared>( + std::move(observer)); +} + +void PacketTrailerTransformer::clear_publish_timing_observer() { + webrtc::MutexLock lock(&publish_timing_observer_mutex_); + publish_timing_observer_.reset(); +} + +void PacketTrailerTransformer::emit_publish_timing( + VideoPublishTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const { + std::shared_ptr> observer; + { + webrtc::MutexLock lock(&publish_timing_observer_mutex_); + observer = publish_timing_observer_; + } + if (!observer) { + return; + } + + (*observer)->on_publish_timing(VideoPublishTimingEvent{ + stage, CurrentUnixTimeMicros(), user_timestamp, frame_id}); +} + +void PacketTrailerTransformer::set_subscribe_timing_observer( + rust::Box observer) { + webrtc::MutexLock lock(&subscribe_timing_observer_mutex_); + subscribe_timing_observer_ = + std::make_shared>( + std::move(observer)); +} + +void PacketTrailerTransformer::clear_subscribe_timing_observer() { + webrtc::MutexLock lock(&subscribe_timing_observer_mutex_); + subscribe_timing_observer_.reset(); +} + +void PacketTrailerTransformer::emit_subscribe_timing( + VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const { + emit_subscribe_timing(stage, user_timestamp, frame_id, + CurrentUnixTimeMicros()); +} + +void PacketTrailerTransformer::emit_subscribe_timing( + VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id, + uint64_t timestamp_us) const { + std::shared_ptr> observer; + { + webrtc::MutexLock lock(&subscribe_timing_observer_mutex_); + observer = subscribe_timing_observer_; + } + if (!observer) { + return; + } + + (*observer)->on_subscribe_timing(VideoSubscribeTimingEvent{ + stage, timestamp_us, user_timestamp, frame_id}); +} + // PacketTrailerHandler implementation PacketTrailerHandler::PacketTrailerHandler( @@ -444,6 +771,38 @@ void PacketTrailerHandler::store_frame_metadata( transformer_->store_frame_metadata(capture_timestamp_us, user_timestamp, frame_id); } +void PacketTrailerHandler::set_publish_timing_observer( + rust::Box observer) const { + transformer_->set_publish_timing_observer(std::move(observer)); +} + +void PacketTrailerHandler::clear_publish_timing_observer() const { + transformer_->clear_publish_timing_observer(); +} + +void PacketTrailerHandler::emit_publish_timing( + VideoPublishTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const { + transformer_->emit_publish_timing(stage, user_timestamp, frame_id); +} + +void PacketTrailerHandler::set_subscribe_timing_observer( + rust::Box observer) const { + transformer_->set_subscribe_timing_observer(std::move(observer)); +} + +void PacketTrailerHandler::clear_subscribe_timing_observer() const { + transformer_->clear_subscribe_timing_observer(); +} + +void PacketTrailerHandler::emit_subscribe_timing( + VideoSubscribeTimingStage stage, + uint64_t user_timestamp, + uint32_t frame_id) const { + transformer_->emit_subscribe_timing(stage, user_timestamp, frame_id); +} + webrtc::scoped_refptr PacketTrailerHandler::transformer() const { return transformer_; } diff --git a/webrtc-sys/src/packet_trailer.rs b/webrtc-sys/src/packet_trailer.rs index 3c928b564..3a3394415 100644 --- a/webrtc-sys/src/packet_trailer.rs +++ b/webrtc-sys/src/packet_trailer.rs @@ -14,8 +14,46 @@ use crate::impl_thread_safety; +/// Callback invoked for native video publish pipeline timing events. +pub type OnVideoPublishTiming = Box; +/// Callback invoked for native video subscribe pipeline timing events. +pub type OnVideoSubscribeTiming = + Box; + #[cxx::bridge(namespace = "livekit_ffi")] pub mod ffi { + #[repr(i32)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum VideoPublishTimingStage { + EncoderUpload, + EncoderOutput, + WebrtcPacketize, + } + + #[repr(i32)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum VideoSubscribeTimingStage { + WebrtcReceive, + DecoderUpload, + DecoderOutput, + } + + #[derive(Debug, Clone, Copy)] + pub struct VideoPublishTimingEvent { + pub stage: VideoPublishTimingStage, + pub timestamp_us: u64, + pub capture_timestamp_us: u64, + pub frame_id: u32, + } + + #[derive(Debug, Clone, Copy)] + pub struct VideoSubscribeTimingEvent { + pub stage: VideoSubscribeTimingStage, + pub timestamp_us: u64, + pub capture_timestamp_us: u64, + pub frame_id: u32, + } + unsafe extern "C++" { include!("livekit/packet_trailer.h"); include!("livekit/rtp_sender.h"); @@ -52,6 +90,32 @@ pub mod ffi { frame_id: u32, ); + /// Set a callback for sender-side publish timing events. + fn set_publish_timing_observer( + self: &PacketTrailerHandler, + observer: Box, + ); + + /// Clear the sender-side publish timing callback. + fn clear_publish_timing_observer(self: &PacketTrailerHandler); + + /// Set a callback for receiver-side subscribe timing events. + fn set_subscribe_timing_observer( + self: &PacketTrailerHandler, + observer: Box, + ); + + /// Clear the receiver-side subscribe timing callback. + fn clear_subscribe_timing_observer(self: &PacketTrailerHandler); + + /// Emit a receiver-side subscribe timing event. + fn emit_subscribe_timing( + self: &PacketTrailerHandler, + stage: VideoSubscribeTimingStage, + user_timestamp: u64, + frame_id: u32, + ); + /// Create a new packet trailer handler for a sender. fn new_packet_trailer_sender( peer_factory: SharedPtr, @@ -64,6 +128,49 @@ pub mod ffi { receiver: SharedPtr, ) -> SharedPtr; } + + extern "Rust" { + type VideoPublishTimingObserverWrapper; + type VideoSubscribeTimingObserverWrapper; + + fn on_publish_timing( + self: &VideoPublishTimingObserverWrapper, + event: VideoPublishTimingEvent, + ); + + fn on_subscribe_timing( + self: &VideoSubscribeTimingObserverWrapper, + event: VideoSubscribeTimingEvent, + ); + } } impl_thread_safety!(ffi::PacketTrailerHandler, Send + Sync); + +pub struct VideoPublishTimingObserverWrapper { + observer: OnVideoPublishTiming, +} + +impl VideoPublishTimingObserverWrapper { + pub fn new(observer: OnVideoPublishTiming) -> Self { + Self { observer } + } + + fn on_publish_timing(&self, event: ffi::VideoPublishTimingEvent) { + (self.observer)(event); + } +} + +pub struct VideoSubscribeTimingObserverWrapper { + observer: OnVideoSubscribeTiming, +} + +impl VideoSubscribeTimingObserverWrapper { + pub fn new(observer: OnVideoSubscribeTiming) -> Self { + Self { observer } + } + + fn on_subscribe_timing(&self, event: ffi::VideoSubscribeTimingEvent) { + (self.observer)(event); + } +} diff --git a/webrtc-sys/src/video_track.cpp b/webrtc-sys/src/video_track.cpp index 8dc35e167..3ae36f2db 100644 --- a/webrtc-sys/src/video_track.cpp +++ b/webrtc-sys/src/video_track.cpp @@ -32,6 +32,7 @@ #include "rtc_base/ref_counted_object.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/time_utils.h" +#include "webrtc-sys/src/packet_trailer.rs.h" #include "webrtc-sys/src/video_track.rs.h" namespace livekit_ffi { @@ -178,6 +179,13 @@ bool VideoTrackSource::InternalSource::on_captured_frame( buffer = buffer->ToI420(); } + if (packet_trailer_handler_) { + packet_trailer_handler_->emit_publish_timing( + VideoPublishTimingStage::EncoderUpload, + frame_metadata.has_packet_trailer ? frame_metadata.user_timestamp : 0, + frame_metadata.has_packet_trailer ? frame_metadata.frame_id : 0); + } + OnFrame(webrtc::VideoFrame::Builder() .set_video_frame_buffer(buffer) .set_rotation(rotation)