Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions doc/lib/c/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,28 @@ A server can reject the connection on auth grounds: unauthorized (HTTP 401) or f

Failed calls are reported only through the return code and `moq_error()`, not logged. To surface libmoq's internal logs (moq-net / QUIC activity), call `moq_log_level("debug")` (or `"trace"`, `"info"`, etc.) to install a tracing subscriber.

## Connection statistics

`moq_session_stats(session, &dst)` fills a `moq_connection_stats` struct with a point-in-time view of the underlying QUIC/WebTransport connection: RTT, send/receive bandwidth estimates, and byte/packet counters. Unlike the callback-based APIs, this is a plain synchronous query you can poll on a timer.

Each metric carries a `*_valid` flag because availability depends on the transport backend (native QUIC reports every metric; the browser WebTransport reports few or none). A `false` flag is not the same as a zero value, so always check it before reading the field:

```c
moq_connection_stats stats = {0};
int rc = moq_session_stats(session, &stats);
if (rc < 0) {
// < 0 also covers "currently reconnecting, no live connection".
fprintf(stderr, "stats failed: %s\n", moq_error());
} else {
if (stats.rtt_valid) {
printf("rtt: %llu us\n", (unsigned long long)stats.rtt_us);
}
if (stats.send_rate_valid) {
printf("send: %llu bps\n", (unsigned long long)stats.send_rate_bps);
}
}
```

## Use cases

- **C/C++ applications** integrating MoQ without a Rust toolchain
Expand Down
4 changes: 4 additions & 0 deletions doc/lib/kt/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ client.setConsume(origin)

val session = client.connect("https://relay.example.com")

// Poll connection stats (RTT, bandwidth estimates, byte/packet counters).
// Each field is null when the transport backend doesn't report that metric.
session.stats().rttUs?.let { println("rtt: $it us") }

origin.use {
val consumer = origin.consume()
val announced = consumer.announced("demos/")
Expand Down
6 changes: 6 additions & 0 deletions doc/lib/swift/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ client.setConsume(origin: origin)

let session = try await client.connect(url: "https://relay.example.com")

// Poll connection stats (RTT, bandwidth estimates, byte/packet counters).
// Each field is nil when the transport backend doesn't report that metric.
if let rtt = session.stats().rttUs {
print("rtt: \(rtt) us")
}

let consumer = origin.consume()
let announced = try consumer.announced(prefix: "demos/")
for try await announcement in announced.announcements {
Expand Down
109 changes: 109 additions & 0 deletions rs/libmoq/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,92 @@ pub struct moq_announced {
pub active: bool,
}

/// A snapshot of connection statistics, filled in by [moq_session_stats].
///
/// Each metric has a `*_valid` flag: when `false`, the matching value is meaningless because
/// the transport backend doesn't report it (a `false` flag is NOT the same as a zero value).
/// Native QUIC reports every metric; the browser WebTransport reports few or none. Initialize
/// the struct to zero before the call; [moq_session_stats] overwrites every field.
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct moq_connection_stats {
/// Smoothed round-trip time, in microseconds.
pub rtt_us: u64,
pub rtt_valid: bool,

/// Estimated send bandwidth from the congestion controller, in bits per second.
pub send_rate_bps: u64,
pub send_rate_valid: bool,

/// Estimated receive bandwidth from MoQ PROBE, in bits per second.
pub recv_rate_bps: u64,
pub recv_rate_valid: bool,

/// Total bytes sent, including retransmissions and overhead.
pub bytes_sent: u64,
pub bytes_sent_valid: bool,

/// Total bytes received, including duplicates and overhead.
pub bytes_received: u64,
pub bytes_received_valid: bool,

/// Total bytes lost (detected via retransmission or acknowledgement).
pub bytes_lost: u64,
pub bytes_lost_valid: bool,

/// Total datagrams sent.
pub packets_sent: u64,
pub packets_sent_valid: bool,

/// Total datagrams received.
pub packets_received: u64,
pub packets_received_valid: bool,

/// Total datagrams detected as lost.
pub packets_lost: u64,
pub packets_lost_valid: bool,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

impl From<&moq_net::ConnectionStats> for moq_connection_stats {
fn from(stats: &moq_net::ConnectionStats) -> Self {
// An Option<u64> becomes a (value, valid) pair; absent metrics report 0/false.
fn split(value: Option<u64>) -> (u64, bool) {
(value.unwrap_or(0), value.is_some())
}

let (rtt_us, rtt_valid) = split(stats.rtt.map(|d| d.as_micros() as u64));
let (send_rate_bps, send_rate_valid) = split(stats.estimated_send_rate);
let (recv_rate_bps, recv_rate_valid) = split(stats.estimated_recv_rate);
let (bytes_sent, bytes_sent_valid) = split(stats.bytes_sent);
let (bytes_received, bytes_received_valid) = split(stats.bytes_received);
let (bytes_lost, bytes_lost_valid) = split(stats.bytes_lost);
let (packets_sent, packets_sent_valid) = split(stats.packets_sent);
let (packets_received, packets_received_valid) = split(stats.packets_received);
let (packets_lost, packets_lost_valid) = split(stats.packets_lost);

Self {
rtt_us,
rtt_valid,
send_rate_bps,
send_rate_valid,
recv_rate_bps,
recv_rate_valid,
bytes_sent,
bytes_sent_valid,
bytes_received,
bytes_received_valid,
bytes_lost,
bytes_lost_valid,
packets_sent,
packets_sent_valid,
packets_received,
packets_received_valid,
packets_lost,
packets_lost_valid,
}
}
}

/// Initialize the library with a log level.
///
/// This should be called before any other functions.
Expand Down Expand Up @@ -195,6 +281,29 @@ pub extern "C" fn moq_session_close(session: u32) -> i32 {
})
}

/// Snapshot the current connection statistics for a session.
///
/// Fills `dst` with a point-in-time view of the underlying QUIC/WebTransport connection
/// (RTT, bandwidth estimates, byte/packet counters). Each metric carries a `*_valid` flag
/// since availability depends on the transport backend; see [moq_connection_stats].
///
/// Returns zero on success, or a negative code on failure: the session handle is unknown, or
/// the session is currently reconnecting and has no live connection (in which case `dst` is
/// left untouched). Safe to call repeatedly to poll stats over the life of the session.
///
/// # Safety
/// - The caller must ensure that `dst` is a valid pointer to a [moq_connection_stats] struct.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn moq_session_stats(session: u32, dst: *mut moq_connection_stats) -> i32 {
ffi::enter(move || {
let session = ffi::parse_id(session)?;
let dst = unsafe { dst.as_mut() }.ok_or(Error::InvalidPointer)?;
let stats = State::lock().session.stats(session)?;
*dst = moq_connection_stats::from(&stats);
Ok(())
})
}

/// Create an origin for publishing broadcasts.
///
/// Origins contain any number of broadcasts addressed by path.
Expand Down
43 changes: 24 additions & 19 deletions rs/libmoq/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::{Error, Id, NonZeroSlab, State, ffi};
struct TaskEntry {
close: Option<oneshot::Sender<()>>,
callback: ffi::OnStatus,
/// Reads live connection stats, reporting `None` while reconnecting.
stats: moq_native::ConnectionStatsReader,
}

#[derive(Default)]
Expand All @@ -30,19 +32,28 @@ impl Session {
consume: Option<moq_net::OriginProducer>,
callback: ffi::OnStatus,
) -> Result<Id, Error> {
let closed = oneshot::channel();
// Build the reconnect loop up front so we can grab a stats reader for it
// before moving it into the spawned task.
let reconnect = moq_native::ClientConfig::default()
.init()?
.with_publish(publish)
.with_consume(consume)
.reconnect(url);
let stats = reconnect.stats();

let closed = oneshot::channel();
let entry = TaskEntry {
close: Some(closed.0),
callback,
stats,
};
let id = self.task.insert(Some(entry))?;

tokio::spawn(async move {
let res = tokio::select! {
// close() requested: a clean shutdown delivers a terminal 0.
_ = closed.1 => Ok(()),
res = Self::connect_run(callback, url, publish, consume) => res,
res = Self::report(callback, reconnect) => res,
};

// Deliver one final terminal callback (0 = closed, < 0 = error), then
Expand All @@ -57,24 +68,18 @@ impl Session {
Ok(id)
}

/// Connect and stay connected, reconnecting with exponential backoff if the session drops.
/// Snapshot the current connection's stats.
///
/// Reports a positive connection epoch through the status callback on every (re)connect, and a
/// negative code only when reconnection permanently gives up (the backoff timeout is exceeded),
/// which is terminal.
async fn connect_run(
callback: ffi::OnStatus,
url: Url,
publish: Option<moq_net::OriginConsumer>,
consume: Option<moq_net::OriginProducer>,
) -> Result<(), Error> {
let reconnect = moq_native::ClientConfig::default()
.init()?
.with_publish(publish)
.with_consume(consume)
.reconnect(url);

Self::report(callback, reconnect).await
/// Errors with [`Error::SessionNotFound`] if the handle is unknown, or [`Error::Offline`]
/// if the session is currently between connections (reconnecting).
pub fn stats(&self, id: Id) -> Result<moq_net::ConnectionStats, Error> {
self.task
.get(id)
.and_then(|entry| entry.as_ref())
.ok_or(Error::SessionNotFound)?
.stats
.stats()
.ok_or(Error::Offline)
}

/// Forward connection epochs to the status callback until the reconnect loop stops.
Expand Down
58 changes: 58 additions & 0 deletions rs/moq-ffi/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,50 @@ impl MoqClient {
}
}

/// A snapshot of connection statistics for a [`MoqSession`].
///
/// Each field is `None` when the transport backend doesn't report that metric (native QUIC
/// reports all of them; the browser WebTransport reports few or none), or when it isn't yet
/// available (e.g. `send_rate_bps` before the congestion controller has a window). A `None` is
/// not the same as a zero value.
#[derive(uniffi::Record)]
pub struct MoqConnectionStats {
/// Smoothed round-trip time, in microseconds.
pub rtt_us: Option<u64>,
/// Estimated send bandwidth from the congestion controller, in bits per second.
pub send_rate_bps: Option<u64>,
/// Estimated receive bandwidth from MoQ PROBE, in bits per second.
pub recv_rate_bps: Option<u64>,
/// Total bytes sent, including retransmissions and overhead.
pub bytes_sent: Option<u64>,
/// Total bytes received, including duplicates and overhead.
pub bytes_received: Option<u64>,
/// Total bytes lost (detected via retransmission or acknowledgement).
pub bytes_lost: Option<u64>,
/// Total datagrams sent.
pub packets_sent: Option<u64>,
/// Total datagrams received.
pub packets_received: Option<u64>,
/// Total datagrams detected as lost.
pub packets_lost: Option<u64>,
}

impl From<moq_net::ConnectionStats> for MoqConnectionStats {
fn from(stats: moq_net::ConnectionStats) -> Self {
Self {
rtt_us: stats.rtt.map(|d| d.as_micros() as u64),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
send_rate_bps: stats.estimated_send_rate,
recv_rate_bps: stats.estimated_recv_rate,
bytes_sent: stats.bytes_sent,
bytes_received: stats.bytes_received,
bytes_lost: stats.bytes_lost,
packets_sent: stats.packets_sent,
packets_received: stats.packets_received,
packets_lost: stats.packets_lost,
}
}
}

#[derive(uniffi::Object)]
pub struct MoqSession {
inner: Option<moq_net::Session>,
Expand Down Expand Up @@ -198,4 +242,18 @@ impl MoqSession {
pub fn shutdown(&self) {
self.cancel(0);
}

/// Snapshot the current connection statistics (RTT, bandwidth estimates,
/// byte/packet counters). Cheap to call; intended for periodic polling.
///
/// Individual fields are `None` when the transport backend doesn't report
/// them; see [`MoqConnectionStats`].
pub fn stats(&self) -> MoqConnectionStats {
let _guard = crate::ffi::RUNTIME.enter();
self.inner
.as_ref()
.map(moq_net::Session::stats)
.unwrap_or_default()
.into()
}
}
30 changes: 30 additions & 0 deletions rs/moq-native/src/reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ struct State {
status: Option<Status>,
/// Set when the reconnect loop permanently gives up (reconnect timeout exceeded).
error: Option<Error>,
/// The currently-connected session, or `None` while reconnecting. Read by
/// [`ConnectionStatsReader`] to snapshot live connection stats.
session: Option<moq_net::Session>,
}

/// A cloneable read handle for the live connection stats of a [`Reconnect`] loop.
///
/// Obtained via [`Reconnect::stats`]. [`stats`](Self::stats) returns `None` while the loop is
/// between connections (reconnecting), and `Some` snapshot while a session is established.
#[derive(Clone)]
pub struct ConnectionStatsReader {
state: kio::Consumer<State>,
}

impl ConnectionStatsReader {
/// Snapshot the current connection's stats, or `None` if not currently connected.
pub fn stats(&self) -> Option<moq_net::ConnectionStats> {
self.state.read().session.as_ref().map(moq_net::Session::stats)
}
}

/// Handle to a background reconnect loop.
Expand Down Expand Up @@ -137,11 +156,13 @@ impl Reconnect {
last_error = None;
if let Ok(mut state) = state.write() {
state.status = Some(Status::Connected);
state.session = Some(session.clone());
}
let _ = session.closed().await;
tracing::warn!(%url, "session closed, reconnecting");
if let Ok(mut state) = state.write() {
state.status = Some(Status::Disconnected);
state.session = None;
}
retry_start = tokio::time::Instant::now();
}
Expand Down Expand Up @@ -201,6 +222,15 @@ impl Reconnect {
pub async fn closed(&self) -> crate::Result<()> {
kio::wait(|waiter| self.poll_closed(waiter)).await
}

/// A cloneable handle for reading the current connection's stats.
///
/// The handle keeps working across reconnects, reporting `None` between connections.
pub fn stats(&self) -> ConnectionStatsReader {
ConnectionStatsReader {
state: self.state.clone(),
}
}
}

impl Drop for Reconnect {
Expand Down
Loading
Loading