diff --git a/.changeset/reject_oversized_data_messages_before_they_break_the_data_channel.md b/.changeset/reject_oversized_data_messages_before_they_break_the_data_channel.md new file mode 100644 index 000000000..06dce2927 --- /dev/null +++ b/.changeset/reject_oversized_data_messages_before_they_break_the_data_channel.md @@ -0,0 +1,7 @@ +--- +livekit: patch +livekit-ffi: patch +--- + +Reject oversized data messages before they break the data channel. + diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index eb8d12a8f..7c6bf1b12 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -72,6 +72,10 @@ pub const RELIABLE_RECEIVED_STATE_TTL: Duration = Duration::from_secs(30); pub const PUBLISHER_NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150); pub const INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD: u64 = 2 * 1024 * 1024; +/// Default data-channel max message size (bytes), used when the remote SDP +/// answer does not advertise an `a=max-message-size` attribute (RFC 8841). +pub const DEFAULT_MAX_MESSAGE_SIZE: u64 = 64000; + /// Buffered-amount low threshold for the `_data_track` DC. /// /// Kept small (vs. the 2 MiB default for reliable/lossy) so we hand at most @@ -367,6 +371,11 @@ struct SessionInner { reliable_dc_buffered_amount_low_threshold: AtomicU64, data_track_dc: DataChannel, + /// Negotiated SCTP max message size (bytes), parsed from the publisher + /// answer SDP (`a=max-message-size`). `0` means "no limit". Defaults to + /// [`DEFAULT_MAX_MESSAGE_SIZE`] until an answer is received. + max_message_size: AtomicU64, + /// Next sequence number for reliable packets. next_packet_sequence: AtomicU32, @@ -593,6 +602,7 @@ impl RtcSession { INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD, ), data_track_dc, + max_message_size: AtomicU64::new(DEFAULT_MAX_MESSAGE_SIZE), next_packet_sequence: 1.into(), packet_rx_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)), participant_info, @@ -612,6 +622,27 @@ impl RtcSession { pc_state_notify: Notify::new(), }); + // Log when a publisher data channel closes without the engine or peer + // connection tearing it down + for (dc, label) in [ + (&inner.reliable_dc, RELIABLE_DC_LABEL), + (&inner.lossy_dc, LOSSY_DC_LABEL), + (&inner.data_track_dc, DATA_TRACK_DC_LABEL), + ] { + let weak_inner = Arc::downgrade(&inner); + dc.on_state_change(Some(Box::new(move |state| { + if state != DataChannelState::Closed { + return; + } + let Some(inner) = weak_inner.upgrade() else { + return; + }; + if !inner.closed.load(Ordering::Acquire) && inner.publisher_pc.is_connected() { + log::error!("publisher data channel '{}' closed unexpectedly", label); + } + }))); + } + // Start session tasks let signal_task = livekit_runtime::spawn(inner.clone().signal_task(signal_events, close_rx.clone())); @@ -1025,10 +1056,29 @@ impl SessionInner { if event.kind == DataPacketKind::Reliable { request.packet.sequence = self.next_packet_sequence.fetch_add(1, Ordering::Relaxed); } + let encoded_packet: EncodedPacket = request.packet.into(); + + let max_message_size = self.max_message_size.load(Ordering::Acquire); + if max_message_size != 0 + && encoded_packet.data.len() as u64 > max_message_size + { + let err = EngineError::Internal( + format!( + "data packet size ({} bytes) exceeds the negotiated maximum message size ({} bytes)", + encoded_packet.data.len(), + max_message_size + ) + .into(), + ); + log::warn!("{}", err); + _ = request.completion_tx.send(Err(err)); + continue; + } + let ev = DataChannelEvent { kind: event.kind, detail: DataChannelEventDetail::PublishData(PublishDataRequest { - encoded_packet: request.packet.into(), + encoded_packet, completion_tx: request.completion_tx.into(), }) }; @@ -1188,6 +1238,14 @@ impl SessionInner { } } + let max_message_size = std::cmp::min( + parse_sdp_max_message_size(&answer.sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE), + DEFAULT_MAX_MESSAGE_SIZE, + ); + + self.max_message_size.store(max_message_size, Ordering::Release); + log::debug!("negotiated data channel max message size: {} bytes", max_message_size); + let answer = SessionDescription::parse(&answer.sdp, answer.r#type.parse().unwrap()).unwrap(); // Unwrap is ok, the server shouldn't give us an invalid sdp self.publisher_pc.set_remote_description(answer).await?; @@ -2250,6 +2308,14 @@ impl SessionInner { } } +/// Parses the `a=max-message-size` attribute (RFC 8841) from an SDP, returning +/// the value in bytes. Returns `None` when the attribute is absent or invalid. +fn parse_sdp_max_message_size(sdp: &str) -> Option { + sdp.lines() + .find_map(|line| line.trim().strip_prefix("a=max-message-size:")) + .and_then(|value| value.trim().parse::().ok()) +} + /// Emit incoming data track packets as session events. pub fn handle_remote_dt_packets(dc: &DataChannel, emitter: WeakUnboundedSender) { let on_message: libwebrtc::data_channel::OnMessage = Box::new(move |buffer: DataBuffer| { @@ -2290,3 +2356,47 @@ macro_rules! make_rtc_config { make_rtc_config!(make_rtc_config_join, proto::JoinResponse); make_rtc_config!(make_rtc_config_reconnect, proto::ReconnectResponse); + +#[cfg(test)] +mod tests { + use super::{parse_sdp_max_message_size, DEFAULT_MAX_MESSAGE_SIZE}; + + #[test] + fn parses_max_message_size_from_application_section() { + let sdp = "v=0\r\n\ + m=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\n\ + a=sctp-port:5000\r\n\ + a=max-message-size:262144\r\n"; + assert_eq!(parse_sdp_max_message_size(sdp), Some(262144)); + } + + #[test] + fn parses_with_lf_only_and_surrounding_whitespace() { + let sdp = "m=application 9 UDP/DTLS/SCTP webrtc-datachannel\n a=max-message-size: 65536 \n"; + assert_eq!(parse_sdp_max_message_size(sdp), Some(65536)); + } + + #[test] + fn missing_attribute_returns_none_so_default_is_used() { + let sdp = "v=0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\na=sctp-port:5000\r\n"; + assert_eq!(parse_sdp_max_message_size(sdp), None); + // Callers fall back to the default when the attribute is absent. + assert_eq!( + parse_sdp_max_message_size(sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE), + DEFAULT_MAX_MESSAGE_SIZE + ); + } + + #[test] + fn zero_is_parsed_and_means_no_limit() { + // RFC 8841: a value of 0 indicates the peer can receive any message size. + // The send guard treats 0 as "no limit" and skips the check. + assert_eq!(parse_sdp_max_message_size("a=max-message-size:0\r\n"), Some(0)); + } + + #[test] + fn invalid_or_empty_value_returns_none() { + assert_eq!(parse_sdp_max_message_size("a=max-message-size:abc\r\n"), None); + assert_eq!(parse_sdp_max_message_size("a=max-message-size:\r\n"), None); + } +} diff --git a/livekit/tests/data_channel_test.rs b/livekit/tests/data_channel_test.rs index af38a551d..e8b7b21ad 100644 --- a/livekit/tests/data_channel_test.rs +++ b/livekit/tests/data_channel_test.rs @@ -86,3 +86,68 @@ async fn test_reliable_retry() -> Result<()> { .context("Not all packets received before timeout")?; Ok(()) } + +/// A data message larger than the negotiated SCTP max message size must be +/// rejected with an error by `publish_data` (rather than silently aborting the +/// data channel), and the channel must keep working afterwards. +#[cfg(feature = "__lk-e2e-test")] +#[test_log::test(tokio::test)] +async fn test_oversized_data_message_rejected() -> Result<()> { + use anyhow::{ensure, Context}; + + // Comfortably larger than any plausible negotiated max-message-size + // (LiveKit/pion advertises ~64 KiB; libwebrtc's default is 256 KiB). + const OVERSIZED: usize = 300_000; + const SMALL: usize = 256; + + let mut rooms = test_rooms(2).await?; + let (receiving_room, mut receiving_event_rx) = rooms.pop().unwrap(); + let (sending_room, _) = rooms.pop().unwrap(); + let receiving_identity = receiving_room.local_participant().identity(); + + let small_packet = || DataPacket { + reliable: true, + payload: vec![0xAB; SMALL], + destination_identities: vec![receiving_identity.clone()], + ..Default::default() + }; + + // Baseline: a small reliable packet publishes successfully. + sending_room.local_participant().publish_data(small_packet()).await?; + + // The oversized packet must be rejected with an error and must NOT be sent + // to libwebrtc (which would abruptly close the data channel). + let result = sending_room + .local_participant() + .publish_data(DataPacket { + reliable: true, + payload: vec![0xCD; OVERSIZED], + destination_identities: vec![receiving_identity.clone()], + ..Default::default() + }) + .await; + ensure!(result.is_err(), "oversized publish_data should return an error, got {:?}", result); + + // The channel must still be usable: this follow-up small packet proves the + // oversized send did not break publishing (no 15s publisher timeout). + sending_room.local_participant().publish_data(small_packet()).await?; + + // Exactly the two small packets should arrive; the oversized one never does. + let receive = async { + let mut received = 0; + while let Some(event) = receiving_event_rx.recv().await { + if let RoomEvent::DataReceived { payload, .. } = event { + ensure!(payload.len() == SMALL, "unexpected packet size: {}", payload.len()); + received += 1; + if received == 2 { + break; + } + } + } + Ok(()) + }; + timeout(Duration::from_secs(15), receive) + .await + .context("did not receive both small packets before timeout")??; + Ok(()) +}