Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions doc/bin/relay/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ Or with the config path as the only argument:
moq-relay relay.toml
```

## Graceful Shutdown

The relay drains connections on the first shutdown signal (`SIGINT` from Ctrl+C, or
`SIGTERM` from `systemctl stop`):

1. **First signal** stops accepting new connections and sends a `GOAWAY` to every
active session, asking clients to migrate elsewhere. The relay keeps serving so
in-flight groups can finish, and waits for all sessions to close on their own. On
systemd it also reports `STOPPING=1`.
2. **Second signal** forces an immediate shutdown, closing every connection. (Under
systemd this is normally `SIGKILL` after `TimeoutStopSec`.)

A client that honors `GOAWAY` reconnects to another relay (or re-resolves DNS in a
load-balanced deployment), so draining lets you roll a node with no dropped media.
Clients that ignore `GOAWAY` stay connected until the second signal.

## HTTP Endpoints

The relay exposes HTTP/HTTPS endpoints for debugging, health checks, and late-join. See [HTTP](/bin/relay/http) for details.
Expand Down
20 changes: 19 additions & 1 deletion rs/moq-native/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ pub(crate) const DEFAULT_BIND: &str = "[::]:443";
pub struct Server {
moq: moq_net::Server,
versions: moq_net::Versions,
/// When true (default), [`accept`](Self::accept) returns `None` on Ctrl+C after
/// closing the endpoints. Callers wanting a graceful shutdown disable this via
/// [`with_signal_handler`](Self::with_signal_handler) and drive signals themselves.
handle_ctrl_c: bool,
accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<Request>>>,
#[cfg(feature = "iroh")]
iroh: Option<iroh::Endpoint>,
Expand Down Expand Up @@ -267,6 +271,7 @@ impl Server {
};

Ok(Server {
handle_ctrl_c: true,
accept: Default::default(),
moq: moq_net::Server::new().with_versions(versions.clone()),
versions,
Expand Down Expand Up @@ -300,6 +305,16 @@ impl Server {
self
}

/// Enable or disable the built-in Ctrl+C handler in [`accept`](Self::accept).
///
/// Enabled by default: Ctrl+C closes the endpoints and makes `accept` return `None`.
/// Disable it to handle shutdown yourself (e.g. a GOAWAY drain), then drive signals
/// from the caller.
pub fn with_signal_handler(mut self, enabled: bool) -> Self {
self.handle_ctrl_c = enabled;
self
}

pub fn with_publisher(mut self, publish: moq_net::OriginProducer) -> Self {
self.moq = self.moq.with_publisher(publish);
self
Expand Down Expand Up @@ -347,6 +362,9 @@ impl Server {
/// Call [Request::ok] or [Request::close] to complete the handshake.
#[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))]
pub async fn accept(&mut self) -> Option<Request> {
// Copied out so the select! arm's `if` guard doesn't borrow `self` (the arm body
// needs `&mut self` to close).
let handle_ctrl_c = self.handle_ctrl_c;
loop {
// tokio::select! does not support cfg directives on arms, so we need to create the futures here.
#[cfg(feature = "noq")]
Expand Down Expand Up @@ -476,7 +494,7 @@ impl Server {
Err(err) => tracing::debug!(%err, "failed to accept session"),
}
}
_ = tokio::signal::ctrl_c() => {
_ = tokio::signal::ctrl_c(), if handle_ctrl_c => {
Comment thread
kixelated marked this conversation as resolved.
Outdated
self.close().await;
return None;
}
Expand Down
49 changes: 49 additions & 0 deletions rs/moq-native/tests/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,55 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi
.expect("server task failed");
}

/// `Session::goaway` must actually deliver a GOAWAY to the peer. We use
/// moq-transport-14, where receiving a GOAWAY closes the session, so the drain is
/// observable end-to-end: the server fires GOAWAY and both sides see the session close.
#[tokio::test]
async fn goaway_drains_peer_moq_transport_14() {
let mut server_config = moq_native::ServerConfig::default();
server_config.bind = Some("[::]:0".to_string());
server_config.tls.generate = vec!["localhost".into()];
server_config.version = vec!["moq-transport-14".parse().unwrap()];

let mut server = server_config.init().expect("failed to init server");
let addr = server.local_addr().expect("failed to get local addr");

let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
client_config.version = vec!["moq-transport-14".parse().unwrap()];
let client = client_config.init().expect("failed to init client");
let url: url::Url = format!("https://localhost:{}", addr.port()).parse().unwrap();

let server_handle = tokio::spawn(async move {
let request = server.accept().await.expect("no incoming connection");
let session = request.ok().await?;

// Send GOAWAY, then confirm the peer actually leaves (the session closes).
session.goaway("");
tokio::time::timeout(TIMEOUT, session.closed())
.await
.expect("server session did not drain after goaway")
.ok();
Ok::<_, anyhow::Error>(())
});

let session = tokio::time::timeout(TIMEOUT, client.connect(url))
.await
.expect("client connect timed out")
.expect("client connect failed");

// The client should observe the session closing once the GOAWAY arrives.
tokio::time::timeout(TIMEOUT, session.closed())
.await
.expect("client session did not close after goaway")
.ok();

server_handle
.await
.expect("server task panicked")
.expect("server task failed");
}

/// Lite05 publisher↔subscriber round-trip exercising the per-frame timestamp
/// delta encoding, including negative deltas (B-frame ordering).
async fn lite05_timestamp_roundtrip(scheme: &str) {
Expand Down
51 changes: 46 additions & 5 deletions rs/moq-net/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ impl Client {
let consume = consumer.clone();
let consumer_view = consumer.consume();

// Per-session GOAWAY channel: `goaway` lives on the returned `Session`, `signal`
// drives the protocol task. Exactly one negotiated branch consumes each below.
let (goaway, signal) = crate::session::goaway_channel();

// If ALPN was used to negotiate the version, use the appropriate encoding.
// Default to IETF 14 if no ALPN was used and we'll negotiate the version later.
let (encoding, supported) = match session.protocol() {
Expand All @@ -100,10 +104,18 @@ impl Client {
consume,
self.stats.clone(),
ietf::Version::Draft18,
signal,
)?;

tracing::debug!(version = ?v, "connected");
return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone()));
return Ok(Session::new(
session,
v,
None,
publisher.clone(),
consumer_view.clone(),
goaway,
));
}
Some(ALPN_17) => {
let v = self
Expand All @@ -121,10 +133,18 @@ impl Client {
consume,
self.stats.clone(),
ietf::Version::Draft17,
signal,
)?;

tracing::debug!(version = ?v, "connected");
return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone()));
return Ok(Session::new(
session,
v,
None,
publisher.clone(),
consumer_view.clone(),
goaway,
));
}
Some(ALPN_16) => {
let v = self
Expand Down Expand Up @@ -159,6 +179,7 @@ impl Client {
consume,
self.stats.clone(),
lite::Version::Lite05Wip,
signal,
)?;

// Block until the initial announce set has landed (Lite05 reports it
Expand All @@ -171,6 +192,7 @@ impl Client {
recv_bw,
publisher.clone(),
consumer_view.clone(),
goaway,
));
}
Some(ALPN_LITE_04) => {
Expand All @@ -185,6 +207,7 @@ impl Client {
consume,
self.stats.clone(),
lite::Version::Lite04,
signal,
)?;

// Lite04 has no initial-set boundary, so this resolves immediately.
Expand All @@ -196,6 +219,7 @@ impl Client {
recv_bw,
publisher.clone(),
consumer_view.clone(),
goaway,
));
}
Some(ALPN_LITE_03) => {
Expand All @@ -211,6 +235,7 @@ impl Client {
consume,
self.stats.clone(),
lite::Version::Lite03,
signal,
)?;

// Lite03 has no initial-set boundary, so this resolves immediately.
Expand All @@ -222,6 +247,7 @@ impl Client {
recv_bw,
publisher.clone(),
consumer_view.clone(),
goaway,
));
}
Some(ALPN_LITE) | None => {
Expand Down Expand Up @@ -259,8 +285,15 @@ impl Client {
let recv_bw = match version {
Version::Lite(v) => {
let stream = stream.with_version(v);
let (recv_bw, connecting) =
lite::start(session.clone(), Some(stream), publish, consume, self.stats.clone(), v)?;
let (recv_bw, connecting) = lite::start(
session.clone(),
Some(stream),
publish,
consume,
self.stats.clone(),
v,
signal,
)?;

// Block until the initial announce set has landed (for versions that
// report one); resolves immediately otherwise.
Expand All @@ -285,12 +318,20 @@ impl Client {
consume,
self.stats.clone(),
v,
signal,
)?;
None
}
};

Ok(Session::new(session, version, recv_bw, publisher, consumer_view))
Ok(Session::new(
session,
version,
recv_bw,
publisher,
consumer_view,
goaway,
))
}
}

Expand Down
18 changes: 18 additions & 0 deletions rs/moq-net/src/ietf/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,24 @@ impl<S: web_transport_trait::Session> ControlStreamAdapter<S> {
}
}

/// Queue a GOAWAY on the shared control stream (draft-14-16; v17+ uses the SETUP stream).
pub fn send_goaway(&self, uri: &str) -> Result<(), crate::Error> {
let msg = ietf::GoAway {
new_session_uri: uri.into(),
timeout: 0,
};

// Control-stream framing is [type_id][size][body], matching `run_read`.
let mut raw = BytesMut::new();
ietf::GoAway::ID.encode(&mut raw, self.version)?;
msg.encode(&mut raw, self.version)?;

self.shared
.control_tx
.send(raw.freeze())
.map_err(|_| crate::Error::Closed)
}

/// Open a real (non-virtual) bidi stream, bypassing control stream multiplexing.
/// Used for v16 SubscribeNamespace which moved to its own bidi stream.
pub async fn open_native_bi(&self) -> Result<(AdapterSend<S>, AdapterRecv<S>), crate::Error> {
Expand Down
47 changes: 41 additions & 6 deletions rs/moq-net/src/ietf/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
Error, OriginConsumer, OriginProducer, StatsHandle,
coding::{Encode, Reader, Stream, Writer},
ietf::{self, FetchHeader, RequestId},
session::GoawaySignal,
setup,
};

Expand All @@ -20,6 +21,8 @@ pub fn start<S: web_transport_trait::Session>(
// Tier-scoped stats handle. Pass [`StatsHandle::default`] to opt out.
stats: StatsHandle,
version: Version,
// Fires when the session should send a GOAWAY and start draining.
goaway: GoawaySignal,
) -> Result<(), Error> {
web_async::spawn(async move {
let res = match version {
Expand All @@ -31,6 +34,22 @@ pub fn start<S: web_transport_trait::Session>(
let control = Control::new(request_id_max, client);
let adapter = ControlStreamAdapter::new(session.clone(), tx, control.clone(), version);

// GOAWAY rides the shared control stream for draft-14-16.
web_async::spawn({
let adapter = adapter.clone();
let session = session.clone();
async move {
tokio::select! {
_ = session.closed() => {}
uri = goaway.triggered() => {
if let Err(err) = adapter.send_goaway(&uri) {
tracing::debug!(%err, "failed to send goaway");
}
}
}
}
});

let publisher = Publisher::new(adapter.clone(), publish, control.clone(), stats.clone(), version);
let subscriber = Subscriber::new(adapter.clone(), subscribe, control, stats, version);

Expand Down Expand Up @@ -62,11 +81,11 @@ pub fn start<S: web_transport_trait::Session>(
}
}
_ => {
// Spawn SETUP sender (keeps stream alive for GOAWAY).
// Spawn SETUP sender, which also writes GOAWAY on that same uni stream.
web_async::spawn({
let session = session.clone();
async move {
if let Err(err) = run_setup(session, version).await {
if let Err(err) = run_setup(session, version, goaway).await {
tracing::warn!(%err, "setup send error");
}
}
Expand Down Expand Up @@ -113,8 +132,12 @@ pub fn start<S: web_transport_trait::Session>(
Ok(())
}

/// Send our SETUP on a uni stream and keep it alive for potential GOAWAY.
async fn run_setup<S: web_transport_trait::Session>(session: S, version: Version) -> Result<(), Error> {
/// Send our SETUP on a uni stream, then keep it alive to carry a GOAWAY.
async fn run_setup<S: web_transport_trait::Session>(
session: S,
version: Version,
goaway: GoawaySignal,
) -> Result<(), Error> {
let outer_version = crate::Version::Ietf(version);

let send = session.open_uni().await.map_err(Error::from_transport)?;
Expand All @@ -126,8 +149,20 @@ async fn run_setup<S: web_transport_trait::Session>(session: S, version: Version

writer.encode(&setup::Setup { parameters }).await?;

// Hold the writer alive until the session closes.
session.closed().await;
// GOAWAY frames use the inner IETF framing ([type][size][body]).
let mut writer = writer.with_version(version);

// Hold the stream open until the session closes, sending a GOAWAY first if asked.
tokio::select! {
_ = session.closed() => {}
uri = goaway.triggered() => {
let msg = ietf::GoAway { new_session_uri: uri.as_ref().into(), timeout: 0 };
if let Err(err) = writer.encode_message(&msg).await {
tracing::debug!(%err, "failed to send goaway");
}
session.closed().await;
}
}
writer.finish().ok();

Ok(())
Expand Down
Loading
Loading