Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion livekit/src/rtc_engine/rtc_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@
pub const PUBLISHER_NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150);
pub const INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD: u64 = 2 * 1024 * 1024;

/// Default data-channel max message size (bytes), used when the remote SDP
/// answer does not advertise an `a=max-message-size` attribute (RFC 8841).
pub const DEFAULT_MAX_MESSAGE_SIZE: u64 = 65535;

/// Buffered-amount low threshold for the `_data_track` DC.
///
/// Kept small (vs. the 2 MiB default for reliable/lossy) so we hand at most
Expand Down Expand Up @@ -367,6 +371,11 @@
reliable_dc_buffered_amount_low_threshold: AtomicU64,
data_track_dc: DataChannel,

/// Negotiated SCTP max message size (bytes), parsed from the publisher
/// answer SDP (`a=max-message-size`). `0` means "no limit". Defaults to
/// [`DEFAULT_MAX_MESSAGE_SIZE`] until an answer is received.
max_message_size: AtomicU64,

/// Next sequence number for reliable packets.
next_packet_sequence: AtomicU32,

Expand Down Expand Up @@ -593,6 +602,7 @@
INITIAL_BUFFERED_AMOUNT_LOW_THRESHOLD,
),
data_track_dc,
max_message_size: AtomicU64::new(DEFAULT_MAX_MESSAGE_SIZE),
next_packet_sequence: 1.into(),
packet_rx_state: Mutex::new(TtlMap::new(RELIABLE_RECEIVED_STATE_TTL)),
participant_info,
Expand All @@ -610,8 +620,30 @@
e2ee_manager,
subscriber_primary,
pc_state_notify: Notify::new(),
});

Check warning on line 623 in livekit/src/rtc_engine/rtc_session.rs

View workflow job for this annotation

GitHub Actions / Check Formatting

Diff in /home/runner/work/rust-sdks/rust-sdks/livekit/src/rtc_engine/rtc_session.rs

// Log when a publisher data channel closes without the engine or peer
// connection tearing it down
for (dc, label) in
[(&inner.reliable_dc, RELIABLE_DC_LABEL), (&inner.lossy_dc, LOSSY_DC_LABEL)]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: For completeness, this should probably also track the data track DC.

{
let weak_inner = Arc::downgrade(&inner);
dc.on_state_change(Some(Box::new(move |state| {
if state != DataChannelState::Closed {
return;
}
let Some(inner) = weak_inner.upgrade() else {
return;

Check warning on line 636 in livekit/src/rtc_engine/rtc_session.rs

View workflow job for this annotation

GitHub Actions / Check Formatting

Diff in /home/runner/work/rust-sdks/rust-sdks/livekit/src/rtc_engine/rtc_session.rs
};
if !inner.closed.load(Ordering::Acquire) && inner.publisher_pc.is_connected() {
log::info!(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: This should probably be logged at error level.

"publisher data channel '{}' closed unexpectedly",
label
);
}
})));
}

// Start session tasks
let signal_task =
livekit_runtime::spawn(inner.clone().signal_task(signal_events, close_rx.clone()));
Expand Down Expand Up @@ -1025,10 +1057,29 @@
if event.kind == DataPacketKind::Reliable {
request.packet.sequence = self.next_packet_sequence.fetch_add(1, Ordering::Relaxed);
}
let encoded_packet: EncodedPacket = request.packet.into();

let max_message_size = self.max_message_size.load(Ordering::Acquire);
if max_message_size != 0
&& encoded_packet.data.len() as u64 > max_message_size
{
let err = EngineError::Internal(
format!(
"data packet size ({} bytes) exceeds the negotiated maximum message size ({} bytes)",
encoded_packet.data.len(),
max_message_size
)
.into(),
);
log::warn!("{}", err);
_ = request.completion_tx.send(Err(err));
continue;
}

let ev = DataChannelEvent {
kind: event.kind,
detail: DataChannelEventDetail::PublishData(PublishDataRequest {
encoded_packet: request.packet.into(),
encoded_packet,
completion_tx: request.completion_tx.into(),
})
};
Expand Down Expand Up @@ -1188,6 +1239,14 @@
}
}

let max_message_size =

Check warning on line 1242 in livekit/src/rtc_engine/rtc_session.rs

View workflow job for this annotation

GitHub Actions / Check Formatting

Diff in /home/runner/work/rust-sdks/rust-sdks/livekit/src/rtc_engine/rtc_session.rs
parse_sdp_max_message_size(&answer.sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE);
self.max_message_size.store(max_message_size, Ordering::Release);
log::debug!(
"negotiated data channel max message size: {} bytes",
max_message_size
);

let answer =
SessionDescription::parse(&answer.sdp, answer.r#type.parse().unwrap()).unwrap(); // Unwrap is ok, the server shouldn't give us an invalid sdp
self.publisher_pc.set_remote_description(answer).await?;
Expand Down Expand Up @@ -2250,6 +2309,14 @@
}
}

/// Parses the `a=max-message-size` attribute (RFC 8841) from an SDP, returning
/// the value in bytes. Returns `None` when the attribute is absent or invalid.
fn parse_sdp_max_message_size(sdp: &str) -> Option<u64> {
sdp.lines()
.find_map(|line| line.trim().strip_prefix("a=max-message-size:"))
.and_then(|value| value.trim().parse::<u64>().ok())
}

/// Emit incoming data track packets as session events.
pub fn handle_remote_dt_packets(dc: &DataChannel, emitter: WeakUnboundedSender<SessionEvent>) {
let on_message: libwebrtc::data_channel::OnMessage = Box::new(move |buffer: DataBuffer| {
Expand Down Expand Up @@ -2290,3 +2357,47 @@

make_rtc_config!(make_rtc_config_join, proto::JoinResponse);
make_rtc_config!(make_rtc_config_reconnect, proto::ReconnectResponse);

#[cfg(test)]
mod tests {
use super::{parse_sdp_max_message_size, DEFAULT_MAX_MESSAGE_SIZE};

#[test]
fn parses_max_message_size_from_application_section() {
let sdp = "v=0\r\n\
m=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\n\
a=sctp-port:5000\r\n\
a=max-message-size:262144\r\n";
assert_eq!(parse_sdp_max_message_size(sdp), Some(262144));
}

#[test]
fn parses_with_lf_only_and_surrounding_whitespace() {
let sdp = "m=application 9 UDP/DTLS/SCTP webrtc-datachannel\n a=max-message-size: 65536 \n";
assert_eq!(parse_sdp_max_message_size(sdp), Some(65536));
}

#[test]
fn missing_attribute_returns_none_so_default_is_used() {
let sdp = "v=0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\na=sctp-port:5000\r\n";
assert_eq!(parse_sdp_max_message_size(sdp), None);
// Callers fall back to the default when the attribute is absent.
assert_eq!(
parse_sdp_max_message_size(sdp).unwrap_or(DEFAULT_MAX_MESSAGE_SIZE),
DEFAULT_MAX_MESSAGE_SIZE
);
}

#[test]
fn zero_is_parsed_and_means_no_limit() {
// RFC 8841: a value of 0 indicates the peer can receive any message size.
// The send guard treats 0 as "no limit" and skips the check.
assert_eq!(parse_sdp_max_message_size("a=max-message-size:0\r\n"), Some(0));
}

#[test]
fn invalid_or_empty_value_returns_none() {
assert_eq!(parse_sdp_max_message_size("a=max-message-size:abc\r\n"), None);
assert_eq!(parse_sdp_max_message_size("a=max-message-size:\r\n"), None);
}
}
69 changes: 69 additions & 0 deletions livekit/tests/data_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,72 @@
.context("Not all packets received before timeout")?;
Ok(())
}

/// A data message larger than the negotiated SCTP max message size must be
/// rejected with an error by `publish_data` (rather than silently aborting the
/// data channel), and the channel must keep working afterwards.
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_oversized_data_message_rejected() -> Result<()> {
use anyhow::{ensure, Context};

// Comfortably larger than any plausible negotiated max-message-size
// (LiveKit/pion advertises ~64 KiB; libwebrtc's default is 256 KiB).
const OVERSIZED: usize = 300_000;
const SMALL: usize = 256;

let mut rooms = test_rooms(2).await?;
let (receiving_room, mut receiving_event_rx) = rooms.pop().unwrap();
let (sending_room, _) = rooms.pop().unwrap();
let receiving_identity = receiving_room.local_participant().identity();

let small_packet = || DataPacket {
reliable: true,
payload: vec![0xAB; SMALL],
destination_identities: vec![receiving_identity.clone()],
..Default::default()
};

// Baseline: a small reliable packet publishes successfully.
sending_room.local_participant().publish_data(small_packet()).await?;

// The oversized packet must be rejected with an error and must NOT be sent
// to libwebrtc (which would abruptly close the data channel).
let result = sending_room
.local_participant()
.publish_data(DataPacket {
reliable: true,
payload: vec![0xCD; OVERSIZED],
destination_identities: vec![receiving_identity.clone()],
..Default::default()

Check warning on line 126 in livekit/tests/data_channel_test.rs

View workflow job for this annotation

GitHub Actions / Check Formatting

Diff in /home/runner/work/rust-sdks/rust-sdks/livekit/tests/data_channel_test.rs
})
.await;
ensure!(
result.is_err(),
"oversized publish_data should return an error, got {:?}",
result
);

// The channel must still be usable: this follow-up small packet proves the
// oversized send did not break publishing (no 15s publisher timeout).
sending_room.local_participant().publish_data(small_packet()).await?;

// Exactly the two small packets should arrive; the oversized one never does.
let receive = async {
let mut received = 0;
while let Some(event) = receiving_event_rx.recv().await {
if let RoomEvent::DataReceived { payload, .. } = event {
ensure!(payload.len() == SMALL, "unexpected packet size: {}", payload.len());
received += 1;
if received == 2 {
break;
}
}
}
Ok(())
};
timeout(Duration::from_secs(15), receive)
.await
.context("did not receive both small packets before timeout")??;
Ok(())
}
Loading