diff --git a/.changeset/local-video-encoding-controls.md b/.changeset/local-video-encoding-controls.md new file mode 100644 index 000000000..9a957d8d6 --- /dev/null +++ b/.changeset/local-video-encoding-controls.md @@ -0,0 +1,6 @@ +--- +libwebrtc: patch +livekit: patch +--- + +Add runtime video encoding limit controls for local video tracks and wire them into the `local_video` publisher/subscriber example via RPC. diff --git a/Cargo.lock b/Cargo.lock index b85400203..173fcdf0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4189,6 +4189,8 @@ dependencies = [ "nokhwa", "objc2 0.6.4", "parking_lot", + "serde", + "serde_json", "tokio", "tokio-stream", "webrtc-sys", diff --git a/examples/local_video/Cargo.toml b/examples/local_video/Cargo.toml index dedbc76d6..0c12294b4 100644 --- a/examples/local_video/Cargo.toml +++ b/examples/local_video/Cargo.toml @@ -49,6 +49,8 @@ parking_lot = { workspace = true, features = ["deadlock_detection"] } anyhow = { workspace = true } chrono = "0.4" bytemuck = { version = "1.16", features = ["derive"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } nokhwa = { git = "https://github.com/l1npengtul/nokhwa", rev = "4923ecab7cf26f9dba83867a15a9d8662d021296", default-features = false, features = ["output-threaded"] } diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index 62b8030b0..52e1be38b 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -20,6 +20,7 @@ use nokhwa::utils::{ }; use nokhwa::Camera; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::env; use std::sync::{ @@ -214,6 +215,24 @@ fn unix_time_us_now() -> u64 { } const MAX_BACKEND_CAPTURE_TIMESTAMP_AGE_US: u64 = 5_000_000; +const SET_VIDEO_ENCODING_LIMITS_METHOD: &str = "set-video-encoding-limits"; + +#[derive(Debug, Deserialize, Serialize)] +struct SetEncodingLimitsRequest { + track_sid: String, + bitrate_bps: Option, + max_framerate: Option, + scale_resolution_down_by: Option, + reason: String, +} + +#[derive(Debug, Deserialize, Serialize)] +struct SetEncodingLimitsResponse { + applied_bitrate_bps: Option, + applied_max_framerate: Option, + applied_scale_resolution_down_by: Option, + track_sid: String, +} #[derive(Default)] struct CaptureTimestampLogState { @@ -329,21 +348,39 @@ struct PublisherTimingSummary { capture_to_webrtc_total_ms: RollingMs, } -fn find_video_outbound_encoder(stats: &[livekit::webrtc::stats::RtcStats]) -> Option<&str> { - let mut fallback = None; +#[derive(Default)] +struct PublisherVideoOutboundStats { + encoder_implementation: Option, + target_bitrate_bps: Option, +} + +fn find_video_outbound_stats( + stats: &[livekit::webrtc::stats::RtcStats], +) -> PublisherVideoOutboundStats { + let mut fallback = PublisherVideoOutboundStats::default(); for stat in stats { let livekit::webrtc::stats::RtcStats::OutboundRtp(outbound) = stat else { continue; }; - if outbound.stream.kind != "video" || outbound.outbound.encoder_implementation.is_empty() { + if outbound.stream.kind != "video" { continue; } - let implementation = outbound.outbound.encoder_implementation.as_str(); + let current = PublisherVideoOutboundStats { + encoder_implementation: (!outbound.outbound.encoder_implementation.is_empty()) + .then(|| outbound.outbound.encoder_implementation.clone()), + target_bitrate_bps: (outbound.outbound.target_bitrate > 0.0) + .then_some(outbound.outbound.target_bitrate), + }; if outbound.outbound.active { - return Some(implementation); + return current; + } + if fallback.encoder_implementation.is_none() { + fallback.encoder_implementation = current.encoder_implementation; + } + if fallback.target_bitrate_bps.is_none() { + fallback.target_bitrate_bps = current.target_bitrate_bps; } - fallback.get_or_insert(implementation); } fallback @@ -366,14 +403,20 @@ async fn update_publisher_encoder_overlay( match track.get_stats().await { Ok(stats) => { - if let Some(implementation) = find_video_outbound_encoder(&stats) { + let outbound = find_video_outbound_stats(&stats); + if let Some(implementation) = outbound.encoder_implementation { if implementation != last_implementation { info!("Publisher video encoder implementation: {implementation}"); - last_implementation = implementation.to_string(); + last_implementation = implementation.clone(); } let mut shared = shared.lock(); - shared.codec_implementation = implementation.to_string(); + shared.codec_implementation = implementation; + if let Some(target_bitrate_bps) = outbound.target_bitrate_bps { + shared.encode_bitrate_mbps = Some(target_bitrate_bps / 1_000_000.0); + } + } else if let Some(target_bitrate_bps) = outbound.target_bitrate_bps { + shared.lock().encode_bitrate_mbps = Some(target_bitrate_bps / 1_000_000.0); } logged_initial = true; } @@ -541,6 +584,66 @@ fn update_shared_timing_sample( } } +fn register_encoding_limits_rpc(room: &Arc, publication: LocalTrackPublication) { + room.local_participant().register_rpc_method( + SET_VIDEO_ENCODING_LIMITS_METHOD.to_string(), + move |data| { + let publication = publication.clone(); + Box::pin(async move { + let request: SetEncodingLimitsRequest = serde_json::from_str(&data.payload) + .map_err(|err| { + RpcError::new( + 400, + "invalid encoding limits request".to_string(), + Some(err.to_string()), + ) + })?; + + let publication_sid = publication.sid().to_string(); + if request.track_sid != publication_sid { + return Err(RpcError::new( + 404, + "track not found".to_string(), + Some(request.track_sid), + )); + } + + let limits = VideoEncodingLimits { + max_bitrate: request.bitrate_bps, + max_framerate: request.max_framerate, + scale_resolution_down_by: request.scale_resolution_down_by, + }; + publication.set_video_encoding_limits(limits).map_err(|err| { + RpcError::new(500, format!("set encoding limits failed: {err}"), None) + })?; + + info!( + "{} requested video encoding limits: {:?} bps, {:?} fps, {:?}x scale ({})", + data.caller_identity, + request.bitrate_bps, + request.max_framerate, + request.scale_resolution_down_by, + request.reason + ); + + serde_json::to_string(&SetEncodingLimitsResponse { + applied_bitrate_bps: request.bitrate_bps, + applied_max_framerate: request.max_framerate, + applied_scale_resolution_down_by: request.scale_resolution_down_by, + track_sid: publication_sid, + }) + .map_err(|err| { + RpcError::new( + 500, + "failed to serialize encoding limits response".to_string(), + Some(err.to_string()), + ) + }) + }) + }, + ); +} + #[cfg(test)] mod tests { use super::*; @@ -955,21 +1058,23 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { .publish_track(LocalTrack::Video(track.clone()), publish_opts(requested_codec)) .await; - let actual_codec = if let Err(e) = publish_result { - if matches!(requested_codec, VideoCodec::H265) { + let (publication, actual_codec) = match publish_result { + Ok(publication) => { + info!("Published camera track"); + (publication, requested_codec) + } + Err(e) if matches!(requested_codec, VideoCodec::H265) => { log::warn!("H.265 publish failed ({}). Falling back to H.264...", e); - room.local_participant() + let publication = room + .local_participant() .publish_track(LocalTrack::Video(track.clone()), publish_opts(VideoCodec::H264)) .await?; info!("Published camera track with H.264 fallback"); - VideoCodec::H264 - } else { - return Err(e.into()); + (publication, VideoCodec::H264) } - } else { - info!("Published camera track"); - requested_codec + Err(e) => return Err(e.into()), }; + register_encoding_limits_rpc(&room, publication); let capture_config = CaptureConfig { fps: args.fps, diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 1f3a4df47..7b072ab26 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -12,6 +12,7 @@ use livekit::webrtc::video_stream::native::NativeVideoStream; use livekit_api::access_token; use log::{debug, info}; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, env, @@ -31,6 +32,15 @@ use codec_display::{codec_from_mime, codec_with_implementation}; use subscriber_timing::SubscriberTimingHandle; use viewport_aspect::AspectConstrainedViewport; +const SET_VIDEO_ENCODING_LIMITS_METHOD: &str = "set-video-encoding-limits"; +const DEFAULT_CONTROL_BITRATE_BPS: u64 = 1_500_000; +const DEFAULT_CONTROL_FRAMERATE: f64 = 30.0; +const DEFAULT_CONTROL_RESOLUTION_SCALE: f64 = 1.0; +const BITRATE_KEY_STEP_BPS: u64 = 100_000; +const MIN_CONTROL_BITRATE_BPS: u64 = BITRATE_KEY_STEP_BPS; +const MIN_FRAMERATE: f64 = 0.1; +const MAX_FRAMERATE: f64 = 60.0; + #[cfg(target_os = "macos")] mod macos_native_video { use std::ffi::c_void; @@ -409,6 +419,181 @@ struct Args { e2ee_key: Option, } +#[derive(Debug, Deserialize, Serialize)] +struct SetEncodingLimitsRequest { + track_sid: String, + bitrate_bps: Option, + max_framerate: Option, + scale_resolution_down_by: Option, + reason: String, +} + +#[derive(Debug, Deserialize, Serialize)] +struct SetEncodingLimitsResponse { + applied_bitrate_bps: Option, + applied_max_framerate: Option, + applied_scale_resolution_down_by: Option, + track_sid: String, +} + +#[derive(Clone, Copy, Debug)] +struct EncodingControlState { + bitrate_bps: u64, + max_framerate: f64, + scale_resolution_down_by: f64, +} + +impl Default for EncodingControlState { + fn default() -> Self { + Self { + bitrate_bps: DEFAULT_CONTROL_BITRATE_BPS, + max_framerate: DEFAULT_CONTROL_FRAMERATE, + scale_resolution_down_by: DEFAULT_CONTROL_RESOLUTION_SCALE, + } + } +} + +impl EncodingControlState { + fn limits(self) -> VideoEncodingLimits { + VideoEncodingLimits { + max_bitrate: Some(self.bitrate_bps), + max_framerate: Some(self.max_framerate), + scale_resolution_down_by: Some(self.scale_resolution_down_by), + } + } +} + +#[derive(Clone)] +struct EncodingControl { + inner: Arc, +} + +struct EncodingControlInner { + room: Arc, + target: Mutex>, + state: Mutex, + handle: tokio::runtime::Handle, +} + +#[derive(Clone)] +struct EncodingControlTarget { + publisher_identity: String, + track_sid: TrackSid, +} + +impl EncodingControl { + fn new(room: Arc) -> Self { + Self { + inner: Arc::new(EncodingControlInner { + room, + target: Mutex::new(None), + state: Mutex::new(EncodingControlState::default()), + handle: tokio::runtime::Handle::current(), + }), + } + } + + fn set_active_track(&self, publisher_identity: String, track_sid: TrackSid) { + *self.inner.target.lock() = Some(EncodingControlTarget { publisher_identity, track_sid }); + } + + fn clear_active_track(&self, track_sid: &TrackSid) { + let mut target = self.inner.target.lock(); + if target.as_ref().is_some_and(|target| &target.track_sid == track_sid) { + *target = None; + } + } + + fn active_state(&self) -> Option { + if self.inner.target.lock().is_none() { + return None; + } + Some(*self.inner.state.lock()) + } + + fn adjust_bitrate(&self, increase: bool) { + let reason = + if increase { "keyboard bitrate increase" } else { "keyboard bitrate decrease" }; + self.update_limits(reason, |state| { + state.bitrate_bps = if increase { + state.bitrate_bps.saturating_add(BITRATE_KEY_STEP_BPS) + } else { + state.bitrate_bps.saturating_sub(BITRATE_KEY_STEP_BPS).max(MIN_CONTROL_BITRATE_BPS) + }; + }); + } + + fn adjust_framerate(&self, increase: bool) { + let reason = + if increase { "keyboard framerate increase" } else { "keyboard framerate decrease" }; + self.update_limits(reason, |state| { + state.max_framerate = if increase { + next_higher_framerate(state.max_framerate) + } else { + next_lower_framerate(state.max_framerate) + }; + }); + } + + fn set_resolution_scale(&self, scale_resolution_down_by: f64) { + self.update_limits("keyboard resolution scale", |state| { + state.scale_resolution_down_by = scale_resolution_down_by; + }); + } + + fn update_limits(&self, reason: &'static str, update: impl FnOnce(&mut EncodingControlState)) { + let (limits, target) = { + let mut state = self.inner.state.lock(); + update(&mut state); + (state.limits(), self.inner.target.lock().clone()) + }; + let Some(target) = target else { + debug!("No active publisher video track for encoding control request"); + return; + }; + + let room = self.inner.room.clone(); + self.inner.handle.spawn(async move { + if let Err(err) = request_encoding_limits(&room, target, limits, reason).await { + log::warn!("encoding limits RPC failed: {err}"); + } + }); + } +} + +fn next_lower_framerate(fps: f64) -> f64 { + if fps <= 0.2 { + MIN_FRAMERATE + } else if fps <= 0.5 { + 0.2 + } else if fps <= 1.0 { + 0.5 + } else if fps <= 5.0 { + 1.0 + } else { + let next = fps - 5.0; + if next >= 5.0 { + next + } else { + 1.0 + } + } +} + +fn next_higher_framerate(fps: f64) -> f64 { + if fps < 0.2 { + 0.2 + } else if fps < 0.5 { + 0.5 + } else if fps < 1.0 { + 1.0 + } else if fps < 5.0 { + 5.0 + } else { + (fps + 5.0).min(MAX_FRAMERATE) + } +} + struct SharedYuv { width: u32, height: u32, @@ -777,7 +962,8 @@ fn video_status_line( simulcast: bool, ) -> String { let codec = codec_with_implementation(codec, codec_implementation); - let bitrate = bitrate_mbps.map(|mbps| format!(" {:.1}mbps", mbps.max(0.0))).unwrap_or_default(); + let bitrate = + bitrate_mbps.map(|mbps| format!(" {:.1} mbps", mbps.max(0.0))).unwrap_or_default(); if simulcast { format!("{}x{} {:.1}fps {codec}{bitrate} Simulcast", width, height, fps.max(0.0)) } else { @@ -785,6 +971,60 @@ fn video_status_line( } } +fn encoding_control_status_line(state: EncodingControlState) -> String { + format!( + "Encoding limit {:.1}mbps {:.1}fps {:.1}x scale", + state.bitrate_bps as f64 / 1_000_000.0, + state.max_framerate, + state.scale_resolution_down_by, + ) +} + +async fn request_encoding_limits( + room: &Arc, + target: EncodingControlTarget, + limits: VideoEncodingLimits, + reason: &'static str, +) -> Result<()> { + info!( + "Requesting video encoding limits from {}: {:?} bps, {:?} fps, {:?}x scale ({})", + target.publisher_identity, + limits.max_bitrate, + limits.max_framerate, + limits.scale_resolution_down_by, + reason, + ); + + let payload = serde_json::to_string(&SetEncodingLimitsRequest { + track_sid: target.track_sid.to_string(), + bitrate_bps: limits.max_bitrate, + max_framerate: limits.max_framerate, + scale_resolution_down_by: limits.scale_resolution_down_by, + reason: reason.to_string(), + })?; + let response = room + .local_participant() + .perform_rpc( + PerformRpcData::new(target.publisher_identity, SET_VIDEO_ENCODING_LIMITS_METHOD) + .with_payload(payload) + .with_response_timeout(Duration::from_millis(500)) + .with_max_round_trip_latency(Duration::from_millis(500)), + ) + .await + .map_err(|err| anyhow::anyhow!("encoding limits RPC failed: {err}"))?; + + let response: SetEncodingLimitsResponse = serde_json::from_str(&response)?; + info!( + "Publisher applied video encoding limits on {}: {:?} bps, {:?} fps, {:?}x scale", + response.track_sid, + response.applied_bitrate_bps, + response.applied_max_framerate, + response.applied_scale_resolution_down_by, + ); + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -803,11 +1043,43 @@ mod tests { Arc::new(Mutex::new(SimulcastState { available: true, ..Default::default() })); let subscriber_timing = SubscriberTimingHandle::new(); - let lines = subscriber_overlay_lines(&shared, &simulcast, false, &subscriber_timing) + let lines = subscriber_overlay_lines(&shared, &simulcast, false, &subscriber_timing, None) .expect("overlay should render"); assert_eq!(lines, vec!["1280x720 29.6fps H264 NVDEC 1.2 mbps Simulcast"]); } + + #[test] + fn framerate_steps_drop_below_five_to_named_low_rates() { + assert_eq!(next_lower_framerate(30.0), 25.0); + assert_eq!(next_lower_framerate(10.0), 5.0); + assert_eq!(next_lower_framerate(5.0), 1.0); + assert_eq!(next_lower_framerate(1.0), 0.5); + assert_eq!(next_lower_framerate(0.5), 0.2); + assert_eq!(next_lower_framerate(0.2), 0.1); + assert_eq!(next_lower_framerate(0.1), 0.1); + } + + #[test] + fn framerate_steps_rise_through_named_low_rates() { + assert_eq!(next_higher_framerate(0.1), 0.2); + assert_eq!(next_higher_framerate(0.2), 0.5); + assert_eq!(next_higher_framerate(0.5), 1.0); + assert_eq!(next_higher_framerate(1.0), 5.0); + assert_eq!(next_higher_framerate(5.0), 10.0); + assert_eq!(next_higher_framerate(60.0), 60.0); + } + + #[test] + fn encoding_control_line_uses_compact_mbps() { + let line = encoding_control_status_line(EncodingControlState { + bitrate_bps: 1_250_000, + max_framerate: 0.5, + scale_resolution_down_by: 4.0, + }); + + assert_eq!(line, "Encoding limit 1.2mbps 0.5fps 4.0x scale"); + } } async fn handle_track_subscribed( @@ -822,6 +1094,7 @@ async fn handle_track_subscribed( ctrl_c_received: &Arc, simulcast: &Arc>, repaint_ctx: &Arc>, + encoding_control: &EncodingControl, subscriber_timing: SubscriberTimingHandle, ) { // If a participant filter is set, skip others @@ -872,6 +1145,7 @@ async fn handle_track_subscribed( let mut s = shared.lock(); s.codec = codec; } + encoding_control.set_active_track(participant.identity().to_string(), sid.clone()); let mut timing_events = video_track.subscribe_timing_events(); let subscriber_timing_events = subscriber_timing.clone(); @@ -890,6 +1164,7 @@ async fn handle_track_subscribed( let my_sid = sid.clone(); let ctrl_c_sink = ctrl_c_received.clone(); let repaint_ctx_sink = repaint_ctx.clone(); + let encoding_control_sink = encoding_control.clone(); let subscriber_timing_sink = subscriber_timing.clone(); // Initialize simulcast state for this publication { @@ -983,6 +1258,7 @@ async fn handle_track_subscribed( let mut active = active_sid2.lock(); if active.as_ref() == Some(&my_sid) { *active = None; + encoding_control_sink.clear_active_track(&my_sid); } }); @@ -1066,6 +1342,7 @@ fn subscriber_overlay_lines( simulcast: &Arc>, include_timing: bool, subscriber_timing: &SubscriberTimingHandle, + encoding_control_state: Option, ) -> Option> { let status_line = { let s = shared.lock(); @@ -1086,6 +1363,9 @@ fn subscriber_overlay_lines( }; let mut lines = vec![status_line]; + if let Some(state) = encoding_control_state { + lines.push(encoding_control_status_line(state)); + } if include_timing { if let Some(mut timing_lines) = subscriber_timing.display_overlay_lines(Instant::now()) { lines.append(&mut timing_lines); @@ -1128,6 +1408,7 @@ fn handle_track_unsubscribed( video_size: &Arc, active_sid: &Arc>>, simulcast: &Arc>, + encoding_control: &EncodingControl, subscriber_timing: &SubscriberTimingHandle, ) { let sid = publication.sid().clone(); @@ -1136,6 +1417,7 @@ fn handle_track_unsubscribed( info!("Video track unsubscribed ({}), clearing active sink", sid); *active = None; } + encoding_control.clear_active_track(&sid); clear_hud_and_simulcast(shared, frame_slot, video_size, simulcast, subscriber_timing); } @@ -1146,6 +1428,7 @@ fn handle_track_unpublished( video_size: &Arc, active_sid: &Arc>>, simulcast: &Arc>, + encoding_control: &EncodingControl, subscriber_timing: &SubscriberTimingHandle, ) { let sid = publication.sid().clone(); @@ -1154,6 +1437,7 @@ fn handle_track_unpublished( info!("Video track unpublished ({}), clearing active sink", sid); *active = None; } + encoding_control.clear_active_track(&sid); clear_hud_and_simulcast(shared, frame_slot, video_size, simulcast, subscriber_timing); } @@ -1163,6 +1447,7 @@ struct VideoApp { video_size: Arc, simulcast: Arc>, subscriber_timing: SubscriberTimingHandle, + encoding_control: EncodingControl, repaint_ctx: Arc>, ctrl_c_received: Arc, viewport: AspectConstrainedViewport, @@ -1181,6 +1466,28 @@ impl eframe::App for VideoApp { self.viewport.set_video_size(ctx, width, height); } + if ctx.input(|i| i.key_pressed(egui::Key::ArrowUp)) { + self.encoding_control.adjust_bitrate(true); + } + if ctx.input(|i| i.key_pressed(egui::Key::ArrowDown)) { + self.encoding_control.adjust_bitrate(false); + } + if ctx.input(|i| i.key_pressed(egui::Key::ArrowLeft)) { + self.encoding_control.adjust_framerate(false); + } + if ctx.input(|i| i.key_pressed(egui::Key::ArrowRight)) { + self.encoding_control.adjust_framerate(true); + } + if ctx.input(|i| i.key_pressed(egui::Key::Num1)) { + self.encoding_control.set_resolution_scale(1.0); + } + if ctx.input(|i| i.key_pressed(egui::Key::Num2)) { + self.encoding_control.set_resolution_scale(2.0); + } + if ctx.input(|i| i.key_pressed(egui::Key::Num3)) { + self.encoding_control.set_resolution_scale(4.0); + } + let render_frame = self.frame_slot.take(); if let Some(frame) = render_frame.as_ref() { if let Some(metadata) = frame.frame_metadata { @@ -1199,6 +1506,7 @@ impl eframe::App for VideoApp { &self.simulcast, self.display_timestamp, &self.subscriber_timing, + self.encoding_control.active_state(), ); egui::CentralPanel::default().frame(egui::Frame::NONE).show(ctx, |ui| { @@ -1326,6 +1634,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let (room, _) = Room::connect(&url, &token, room_options).await?; let room = Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); + let encoding_control = EncodingControl::new(room.clone()); // Enable E2EE after connection if args.e2ee_key.is_some() { @@ -1360,10 +1669,12 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { let repaint_ctx_events = repaint_ctx.clone(); let ctrl_c_events = ctrl_c_received.clone(); let subscriber_timing_events = subscriber_timing.clone(); + let encoding_control_events = encoding_control.clone(); + let room_events = room.clone(); tokio::spawn(async move { let active_sid = active_sid.clone(); let simulcast = simulcast_events; - let mut events = room.subscribe(); + let mut events = room_events.subscribe(); info!("Subscribed to room events"); while let Some(evt) = events.recv().await { debug!("Room event: {:?}", evt); @@ -1381,6 +1692,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { &ctrl_c_events, &simulcast, &repaint_ctx_events, + &encoding_control_events, subscriber_timing_events.clone(), ) .await; @@ -1393,6 +1705,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { &video_size_events, &active_sid, &simulcast, + &encoding_control_events, &subscriber_timing_events, ); } @@ -1404,6 +1717,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { &video_size_events, &active_sid, &simulcast, + &encoding_control_events, &subscriber_timing_events, ); } @@ -1420,6 +1734,7 @@ async fn run(args: Args, ctrl_c_received: Arc) -> Result<()> { video_size, simulcast, subscriber_timing, + encoding_control, repaint_ctx, ctrl_c_received: ctrl_c_received.clone(), viewport, diff --git a/examples/local_video/src/video_display.rs b/examples/local_video/src/video_display.rs index 9acb7fd1e..dcd69e289 100644 --- a/examples/local_video/src/video_display.rs +++ b/examples/local_video/src/video_display.rs @@ -24,6 +24,7 @@ pub(crate) struct SharedYuv { pub(crate) v: Vec, pub(crate) codec: String, pub(crate) codec_implementation: String, + pub(crate) encode_bitrate_mbps: Option, pub(crate) fps: f32, pub(crate) simulcast: bool, pub(crate) dirty: bool, @@ -373,13 +374,16 @@ fn video_status_line( fps: f32, codec: &str, codec_implementation: &str, + encode_bitrate_mbps: Option, simulcast: bool, ) -> String { let codec = codec_with_implementation(codec, codec_implementation); + let bitrate = + encode_bitrate_mbps.map(|mbps| format!(" {:.1}mbps", mbps.max(0.0))).unwrap_or_default(); if simulcast { - format!("{}x{} {:.1}fps {codec} Simulcast", width, height, fps.max(0.0)) + format!("{}x{} {:.1}fps {codec}{bitrate} Simulcast", width, height, fps.max(0.0)) } else { - format!("{}x{} {:.1}fps {codec}", width, height, fps.max(0.0)) + format!("{}x{} {:.1}fps {codec}{bitrate}", width, height, fps.max(0.0)) } } @@ -401,6 +405,7 @@ fn publisher_overlay_lines( s.fps, &s.codec, &s.codec_implementation, + s.encode_bitrate_mbps, s.simulcast, ), s.timing_sample, @@ -453,6 +458,25 @@ mod tests { assert_eq!(lines, vec!["1280x720 29.6fps H264 NVENC Simulcast"]); } + #[test] + fn publisher_overlay_shows_encode_bitrate() { + let shared = Arc::new(Mutex::new(SharedYuv::default())); + { + let mut s = shared.lock(); + s.width = 1280; + s.height = 720; + s.codec = "H264".to_string(); + s.encode_bitrate_mbps = Some(1.25); + s.fps = 29.6; + } + + let mut overlay_state = PublisherTimingOverlayState::default(); + let lines = publisher_overlay_lines(&shared, &mut overlay_state, Instant::now()) + .expect("status overlay should render"); + + assert_eq!(lines, vec!["1280x720 29.6fps H264 1.2mbps"]); + } + #[test] fn preview_handoff_skips_unconsumed_frame() { let shared = Arc::new(Mutex::new(SharedYuv::default())); diff --git a/libwebrtc/src/native/rtp_parameters.rs b/libwebrtc/src/native/rtp_parameters.rs index fc6fc28b0..fd7e2976e 100644 --- a/libwebrtc/src/native/rtp_parameters.rs +++ b/libwebrtc/src/native/rtp_parameters.rs @@ -39,6 +39,7 @@ impl From for RtpParameters { Self { codecs: value.codecs.into_iter().map(Into::into).collect(), header_extensions: value.header_extensions.into_iter().map(Into::into).collect(), + encodings: value.encodings.into_iter().map(Into::into).collect(), rtcp: value.rtcp.into(), } } diff --git a/libwebrtc/src/native/rtp_sender.rs b/libwebrtc/src/native/rtp_sender.rs index f58b08484..cf8ca4371 100644 --- a/libwebrtc/src/native/rtp_sender.rs +++ b/libwebrtc/src/native/rtp_sender.rs @@ -76,8 +76,26 @@ impl RtpSender { } pub fn set_parameters(&self, parameters: RtpParameters) -> Result<(), RtcError> { + let mut native_parameters = self.sys_handle.get_parameters(); + for (native_encoding, encoding) in + native_parameters.encodings.iter_mut().zip(parameters.encodings) + { + native_encoding.active = encoding.active; + native_encoding.has_max_bitrate_bps = encoding.max_bitrate.is_some(); + native_encoding.max_bitrate_bps = encoding.max_bitrate.unwrap_or_default() as i32; + native_encoding.has_max_framerate = encoding.max_framerate.is_some(); + native_encoding.max_framerate = encoding.max_framerate.unwrap_or_default(); + native_encoding.network_priority = encoding.priority.into(); + native_encoding.has_scale_resolution_down_by = + encoding.scale_resolution_down_by.is_some(); + native_encoding.scale_resolution_down_by = + encoding.scale_resolution_down_by.unwrap_or_default(); + native_encoding.has_scalability_mode = encoding.scalability_mode.is_some(); + native_encoding.scalability_mode = encoding.scalability_mode.unwrap_or_default(); + } + self.sys_handle - .set_parameters(parameters.into()) + .set_parameters(native_parameters) .map_err(|e| unsafe { sys_err::ffi::RtcError::from(e.what()).into() }) } diff --git a/libwebrtc/src/rtp_parameters.rs b/libwebrtc/src/rtp_parameters.rs index 75d8b1830..da9a26170 100644 --- a/libwebrtc/src/rtp_parameters.rs +++ b/libwebrtc/src/rtp_parameters.rs @@ -33,6 +33,7 @@ pub struct RtpHeaderExtensionParameters { pub struct RtpParameters { pub codecs: Vec, pub header_extensions: Vec, + pub encodings: Vec, pub rtcp: RtcpParameters, } diff --git a/livekit/src/prelude.rs b/livekit/src/prelude.rs index 8c57f3108..cfc2bc006 100644 --- a/livekit/src/prelude.rs +++ b/livekit/src/prelude.rs @@ -30,7 +30,8 @@ pub use crate::{ AudioTrack, LocalAudioTrack, LocalTrack, LocalVideoTrack, PublishTimingEvent, PublishTimingEventStream, PublishTimingStage, RemoteAudioTrack, RemoteTrack, RemoteVideoTrack, StreamState, SubscribeTimingEvent, SubscribeTimingEventStream, - SubscribeTimingStage, Track, TrackDimension, TrackKind, TrackSource, VideoTrack, + SubscribeTimingStage, Track, TrackDimension, TrackKind, TrackSource, VideoEncodingLimits, + VideoTrack, }, ConnectionState, DataPacket, DataPacketKind, Room, RoomError, RoomEvent, RoomOptions, RoomResult, RoomSdkOptions, SipDTMF, Transcription, TranscriptionSegment, diff --git a/livekit/src/room/publication/local.rs b/livekit/src/room/publication/local.rs index 0a12e4e27..7a96d2886 100644 --- a/livekit/src/room/publication/local.rs +++ b/livekit/src/room/publication/local.rs @@ -78,6 +78,20 @@ impl LocalTrackPublication { self.local.publish_options.lock().clone() } + /// Sets runtime encoding limits for this publication's local video track. + /// + /// Pass `None` for an individual field to clear that explicit cap and + /// return control of that field to libwebrtc. + pub fn set_video_encoding_limits(&self, limits: VideoEncodingLimits) -> RoomResult<()> { + let Some(LocalTrack::Video(track)) = self.track() else { + return Err(RoomError::Internal( + "publication does not contain a local video track".into(), + )); + }; + + track.set_encoding_limits(limits) + } + pub fn mute(&self) { if let Some(track) = self.track() { track.mute(); diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index 65fc9596d..1676b2390 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -46,6 +46,17 @@ pub struct LocalVideoTrack { publish_timing_tx: Arc>>>, } +/// Runtime encoding limits for a published local video track. +#[derive(Clone, Copy, Debug, Default, PartialEq)] +pub struct VideoEncodingLimits { + /// Maximum encoded bitrate in bits per second. + pub max_bitrate: Option, + /// Maximum encoded frame rate in frames per second. + pub max_framerate: Option, + /// Encoded resolution downscale factor. + pub scale_resolution_down_by: Option, +} + impl Debug for LocalVideoTrack { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LocalVideoTrack") @@ -166,6 +177,40 @@ impl LocalVideoTrack { self.source.clone() } + /// Sets runtime encoding limits for all RTP encodings on this published video track. + /// + /// Pass `None` for an individual field to clear that explicit cap and + /// return control of that field to libwebrtc. The track must be published + /// before this method can update sender parameters. + pub(crate) fn set_encoding_limits(&self, limits: VideoEncodingLimits) -> RoomResult<()> { + self.update_encoding_parameters(|encoding| { + encoding.max_bitrate = limits.max_bitrate; + encoding.max_framerate = limits.max_framerate; + encoding.scale_resolution_down_by = limits.scale_resolution_down_by; + }) + } + + fn update_encoding_parameters( + &self, + mut update: impl FnMut(&mut RtpEncodingParameters), + ) -> RoomResult<()> { + let Some(transceiver) = self.transceiver() else { + return Err(RoomError::Rtc(RtcError { + error_type: RtcErrorType::InvalidState, + message: "track is not published".into(), + })); + }; + + let sender = transceiver.sender(); + let mut parameters = sender.parameters(); + for encoding in &mut parameters.encodings { + update(encoding); + } + sender.set_parameters(parameters)?; + + Ok(()) + } + /// Returns a stream of native local video publish-pipeline timing events. /// /// Multiple concurrent subscriptions are supported; each call returns an