Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
livekit: patch
livekit-ffi: patch
---

Reject oversized data messages before they break the data channel.

112 changes: 111 additions & 1 deletion livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub const RELIABLE_RECEIVED_STATE_TTL: Duration = Duration::from_secs(30);
pub const PUBLISHER_NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150);
pub const INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD: u64 = 2 * 1024 * 1024;

/// Default data-channel max message size (bytes), used when the remote SDP
/// answer does not advertise an `a=max-message-size` attribute (RFC 8841).
pub const DEFAULT_MAX_MESSAGE_SIZE: u64 = 64000;

/// Buffered-amount low threshold for the `_data_track` DC.
///
/// Kept small (vs. the 2 MiB default for reliable/lossy) so we hand at most
Expand Down Expand Up @@ -367,6 +371,11 @@ struct SessionInner {
reliable_dc_buffered_amount_low_threshold: AtomicU64,
data_track_dc: DataChannel,

/// Negotiated SCTP max message size (bytes), parsed from the publisher
/// answer SDP (`a=max-message-size`). `0` means "no limit". Defaults to
/// [`DEFAULT_MAX_MESSAGE_SIZE`] until an answer is received.
max_message_size: AtomicU64,

/// Next sequence number for reliable packets.
next_packet_sequence: AtomicU32,

Expand Down Expand Up @@ -593,6 +602,7 @@ impl RtcSession {
INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
),
data_track_dc,
max_message_size: AtomicU64::new(DEFAULT_MAX_MESSAGE_SIZE),
next_packet_sequence: 1.into(),
packet_rx_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)),
participant_info,
Expand All @@ -612,6 +622,27 @@ impl RtcSession {
pc_state_notify: Notify::new(),
});

// Log when a publisher data channel closes without the engine or peer
// connection tearing it down
for (dc, label) in [
(&inner.reliable_dc, RELIABLE_DC_LABEL),
(&inner.lossy_dc, LOSSY_DC_LABEL),
(&inner.data_track_dc, DATA_TRACK_DC_LABEL),
] {
let weak_inner = Arc::downgrade(&inner);
dc.on_state_change(Some(Box::new(move |state| {
if state != DataChannelState::Closed {
return;
}
let Some(inner) = weak_inner.upgrade() else {
return;
};
if !inner.closed.load(Ordering::Acquire) && inner.publisher_pc.is_connected() {
log::error!("publisher data channel '{}' closed unexpectedly", label);
}
})));
}

// Start session tasks
let signal_task =
livekit_runtime::spawn(inner.clone().signal_task(signal_events, close_rx.clone()));
Expand Down Expand Up @@ -1025,10 +1056,29 @@ impl SessionInner {
if event.kind == DataPacketKind::Reliable {
request.packet.sequence = self.next_packet_sequence.fetch_add(1, Ordering::Relaxed);
}
let encoded_packet: EncodedPacket = request.packet.into();

let max_message_size = self.max_message_size.load(Ordering::Acquire);
if max_message_size != 0
&& encoded_packet.data.len() as u64 > max_message_size
{
let err = EngineError::Internal(
format!(
"data packet size ({} bytes) exceeds the negotiated maximum message size ({} bytes)",
encoded_packet.data.len(),
max_message_size
)
.into(),
);
log::warn!("{}", err);
_ = request.completion_tx.send(Err(err));
continue;
}

let ev = DataChannelEvent {
kind: event.kind,
detail: DataChannelEventDetail::PublishData(PublishDataRequest {
encoded_packet: request.packet.into(),
encoded_packet,
completion_tx: request.completion_tx.into(),
})
};
Expand Down Expand Up @@ -1188,6 +1238,14 @@ impl SessionInner {
}
}

let max_message_size = std::cmp::min(
parse_sdp_max_message_size(&answer.sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE),
DEFAULT_MAX_MESSAGE_SIZE,
);

self.max_message_size.store(max_message_size, Ordering::Release);
log::debug!("negotiated data channel max message size: {} bytes", max_message_size);

let answer =
SessionDescription::parse(&answer.sdp, answer.r#type.parse().unwrap()).unwrap(); // Unwrap is ok, the server shouldn't give us an invalid sdp
self.publisher_pc.set_remote_description(answer).await?;
Expand Down Expand Up @@ -2250,6 +2308,14 @@ impl SessionInner {
}
}

/// Parses the `a=max-message-size` attribute (RFC 8841) from an SDP, returning
/// the value in bytes. Returns `None` when the attribute is absent or invalid.
fn parse_sdp_max_message_size(sdp: &str) -> Option<u64> {
sdp.lines()
.find_map(|line| line.trim().strip_prefix("a=max-message-size:"))
.and_then(|value| value.trim().parse::<u64>().ok())
}

/// Emit incoming data track packets as session events.
pub fn handle_remote_dt_packets(dc: &DataChannel, emitter: WeakUnboundedSender<SessionEvent>) {
let on_message: libwebrtc::data_channel::OnMessage = Box::new(move |buffer: DataBuffer| {
Expand Down Expand Up @@ -2290,3 +2356,47 @@ macro_rules! make_rtc_config {

make_rtc_config!(make_rtc_config_join, proto::JoinResponse);
make_rtc_config!(make_rtc_config_reconnect, proto::ReconnectResponse);

#[cfg(test)]
mod tests {
use super::{parse_sdp_max_message_size, DEFAULT_MAX_MESSAGE_SIZE};

#[test]
fn parses_max_message_size_from_application_section() {
let sdp = "v=0\r\n\
m=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\n\
a=sctp-port:5000\r\n\
a=max-message-size:262144\r\n";
assert_eq!(parse_sdp_max_message_size(sdp), Some(262144));
}

#[test]
fn parses_with_lf_only_and_surrounding_whitespace() {
let sdp = "m=application 9 UDP/DTLS/SCTP webrtc-datachannel\n a=max-message-size: 65536 \n";
assert_eq!(parse_sdp_max_message_size(sdp), Some(65536));
}

#[test]
fn missing_attribute_returns_none_so_default_is_used() {
let sdp = "v=0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\na=sctp-port:5000\r\n";
assert_eq!(parse_sdp_max_message_size(sdp), None);
// Callers fall back to the default when the attribute is absent.
assert_eq!(
parse_sdp_max_message_size(sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE),
DEFAULT_MAX_MESSAGE_SIZE
);
}

#[test]
fn zero_is_parsed_and_means_no_limit() {
// RFC 8841: a value of 0 indicates the peer can receive any message size.
// The send guard treats 0 as "no limit" and skips the check.
assert_eq!(parse_sdp_max_message_size("a=max-message-size:0\r\n"), Some(0));
}

#[test]
fn invalid_or_empty_value_returns_none() {
assert_eq!(parse_sdp_max_message_size("a=max-message-size:abc\r\n"), None);
assert_eq!(parse_sdp_max_message_size("a=max-message-size:\r\n"), None);
}
}
65 changes: 65 additions & 0 deletions livekit/tests/data_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,68 @@ async fn test_reliable_retry() -> Result<()> {
.context("Not all packets received before timeout")?;
Ok(())
}

/// A data message larger than the negotiated SCTP max message size must be
/// rejected with an error by `publish_data` (rather than silently aborting the
/// data channel), and the channel must keep working afterwards.
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_oversized_data_message_rejected() -> Result<()> {
use anyhow::{ensure, Context};

// Comfortably larger than any plausible negotiated max-message-size
// (LiveKit/pion advertises ~64 KiB; libwebrtc's default is 256 KiB).
const OVERSIZED: usize = 300_000;
const SMALL: usize = 256;

let mut rooms = test_rooms(2).await?;
let (receiving_room, mut receiving_event_rx) = rooms.pop().unwrap();
let (sending_room, _) = rooms.pop().unwrap();
let receiving_identity = receiving_room.local_participant().identity();

let small_packet = || DataPacket {
reliable: true,
payload: vec![0xAB; SMALL],
destination_identities: vec![receiving_identity.clone()],
..Default::default()
};

// Baseline: a small reliable packet publishes successfully.
sending_room.local_participant().publish_data(small_packet()).await?;

// The oversized packet must be rejected with an error and must NOT be sent
// to libwebrtc (which would abruptly close the data channel).
let result = sending_room
.local_participant()
.publish_data(DataPacket {
reliable: true,
payload: vec![0xCD; OVERSIZED],
destination_identities: vec![receiving_identity.clone()],
..Default::default()
})
.await;
ensure!(result.is_err(), "oversized publish_data should return an error, got {:?}", result);

// The channel must still be usable: this follow-up small packet proves the
// oversized send did not break publishing (no 15s publisher timeout).
sending_room.local_participant().publish_data(small_packet()).await?;

// Exactly the two small packets should arrive; the oversized one never does.
let receive = async {
let mut received = 0;
while let Some(event) = receiving_event_rx.recv().await {
if let RoomEvent::DataReceived { payload, .. } = event {
ensure!(payload.len() == SMALL, "unexpected packet size: {}", payload.len());
received += 1;
if received == 2 {
break;
}
}
}
Ok(())
};
timeout(Duration::from_secs(15), receive)
.await
.context("did not receive both small packets before timeout")??;
Ok(())
}
Loading