From 36851a518b3bf307f7ef4d9f312aa14f842e0ab0 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 4 Jun 2026 16:11:07 -0700 Subject: [PATCH 1/3] feat(moq-relay): graceful shutdown via GOAWAY drain The first shutdown signal (Ctrl+C / SIGINT, or SIGTERM from `systemctl stop`) now stops accepting new connections, sends a GOAWAY to every active session, and waits for them all to drain. A second signal forces an immediate shutdown. This lets an operator roll a relay node without dropping in-flight media. moq-net gains the plumbing to actually send GOAWAY (previously the message types existed but were only received): - `Session::goaway(uri)` flips a per-session watch signal without closing the session, so in-flight groups can finish before the peer migrates away. - moq-lite (04+) opens a dedicated control stream and writes GOAWAY. - IETF moq-transport sends GOAWAY on the control stream for draft-14-16 (via the control-stream adapter) and on the SETUP uni stream for draft-17+. moq-native's `Server::accept` Ctrl+C handler (which hard-closes the endpoints) is now opt-out via `with_signal_handler(false)`; the relay disables it and drives signals itself. Other consumers keep the previous behavior by default. Tested end-to-end over WebTransport with moq-transport-14, where receiving a GOAWAY closes the peer session, so the drain is observable on both sides. Co-Authored-By: Claude Opus 4.8 (1M context) --- doc/bin/relay/index.md | 16 ++++++ rs/moq-native/src/server.rs | 20 ++++++- rs/moq-native/tests/broadcast.rs | 49 +++++++++++++++++ rs/moq-net/src/client.rs | 51 ++++++++++++++++-- rs/moq-net/src/ietf/adapter.rs | 18 +++++++ rs/moq-net/src/ietf/session.rs | 47 +++++++++++++--- rs/moq-net/src/lite/goaway.rs | 38 ++++++++++++- rs/moq-net/src/lite/session.rs | 32 ++++++++++- rs/moq-net/src/server.rs | 41 ++++++++++++-- rs/moq-net/src/session.rs | 65 ++++++++++++++++++++++ rs/moq-relay/src/connection.rs | 19 +++++-- rs/moq-relay/src/main.rs | 93 ++++++++++++++++++++++++++++---- 12 files changed, 458 insertions(+), 31 deletions(-) diff --git a/doc/bin/relay/index.md b/doc/bin/relay/index.md index 11ab4a643..22c2d5260 100644 --- a/doc/bin/relay/index.md +++ b/doc/bin/relay/index.md @@ -87,6 +87,22 @@ Or with the config path as the only argument: moq-relay relay.toml ``` +## Graceful Shutdown + +The relay drains connections on the first shutdown signal (`SIGINT` from Ctrl+C, or +`SIGTERM` from `systemctl stop`): + +1. **First signal** stops accepting new connections and sends a `GOAWAY` to every + active session, asking clients to migrate elsewhere. The relay keeps serving so + in-flight groups can finish, and waits for all sessions to close on their own. On + systemd it also reports `STOPPING=1`. +2. **Second signal** forces an immediate shutdown, closing every connection. (Under + systemd this is normally `SIGKILL` after `TimeoutStopSec`.) + +A client that honors `GOAWAY` reconnects to another relay (or re-resolves DNS in a +load-balanced deployment), so draining lets you roll a node with no dropped media. +Clients that ignore `GOAWAY` stay connected until the second signal. + ## HTTP Endpoints The relay exposes HTTP/HTTPS endpoints for debugging, health checks, and late-join. See [HTTP](/bin/relay/http) for details. diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 36e8795ab..244e83169 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -204,6 +204,10 @@ pub(crate) const DEFAULT_BIND: &str = "[::]:443"; pub struct Server { moq: moq_net::Server, versions: moq_net::Versions, + /// When true (default), [`accept`](Self::accept) returns `None` on Ctrl+C after + /// closing the endpoints. Callers wanting a graceful shutdown disable this via + /// [`with_signal_handler`](Self::with_signal_handler) and drive signals themselves. + handle_ctrl_c: bool, accept: FuturesUnordered>>, #[cfg(feature = "iroh")] iroh: Option, @@ -267,6 +271,7 @@ impl Server { }; Ok(Server { + handle_ctrl_c: true, accept: Default::default(), moq: moq_net::Server::new().with_versions(versions.clone()), versions, @@ -300,6 +305,16 @@ impl Server { self } + /// Enable or disable the built-in Ctrl+C handler in [`accept`](Self::accept). + /// + /// Enabled by default: Ctrl+C closes the endpoints and makes `accept` return `None`. + /// Disable it to handle shutdown yourself (e.g. a GOAWAY drain), then drive signals + /// from the caller. + pub fn with_signal_handler(mut self, enabled: bool) -> Self { + self.handle_ctrl_c = enabled; + self + } + pub fn with_publisher(mut self, publish: moq_net::OriginProducer) -> Self { self.moq = self.moq.with_publisher(publish); self @@ -347,6 +362,9 @@ impl Server { /// Call [Request::ok] or [Request::close] to complete the handshake. #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))] pub async fn accept(&mut self) -> Option { + // Copied out so the select! arm's `if` guard doesn't borrow `self` (the arm body + // needs `&mut self` to close). + let handle_ctrl_c = self.handle_ctrl_c; loop { // tokio::select! does not support cfg directives on arms, so we need to create the futures here. #[cfg(feature = "noq")] @@ -476,7 +494,7 @@ impl Server { Err(err) => tracing::debug!(%err, "failed to accept session"), } } - _ = tokio::signal::ctrl_c() => { + _ = tokio::signal::ctrl_c(), if handle_ctrl_c => { self.close().await; return None; } diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index e4a001fd7..75817528b 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -117,6 +117,55 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi .expect("server task failed"); } +/// `Session::goaway` must actually deliver a GOAWAY to the peer. We use +/// moq-transport-14, where receiving a GOAWAY closes the session, so the drain is +/// observable end-to-end: the server fires GOAWAY and both sides see the session close. +#[tokio::test] +async fn goaway_drains_peer_moq_transport_14() { + let mut server_config = moq_native::ServerConfig::default(); + server_config.bind = Some("[::]:0".to_string()); + server_config.tls.generate = vec!["localhost".into()]; + server_config.version = vec!["moq-transport-14".parse().unwrap()]; + + let mut server = server_config.init().expect("failed to init server"); + let addr = server.local_addr().expect("failed to get local addr"); + + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(true); + client_config.version = vec!["moq-transport-14".parse().unwrap()]; + let client = client_config.init().expect("failed to init client"); + let url: url::Url = format!("https://localhost:{}", addr.port()).parse().unwrap(); + + let server_handle = tokio::spawn(async move { + let request = server.accept().await.expect("no incoming connection"); + let session = request.ok().await?; + + // Send GOAWAY, then confirm the peer actually leaves (the session closes). + session.goaway(""); + tokio::time::timeout(TIMEOUT, session.closed()) + .await + .expect("server session did not drain after goaway") + .ok(); + Ok::<_, anyhow::Error>(()) + }); + + let session = tokio::time::timeout(TIMEOUT, client.connect(url)) + .await + .expect("client connect timed out") + .expect("client connect failed"); + + // The client should observe the session closing once the GOAWAY arrives. + tokio::time::timeout(TIMEOUT, session.closed()) + .await + .expect("client session did not close after goaway") + .ok(); + + server_handle + .await + .expect("server task panicked") + .expect("server task failed"); +} + /// Lite05 publisher↔subscriber round-trip exercising the per-frame timestamp /// delta encoding, including negative deltas (B-frame ordering). async fn lite05_timestamp_roundtrip(scheme: &str) { diff --git a/rs/moq-net/src/client.rs b/rs/moq-net/src/client.rs index 3ef382128..0070fc4b1 100644 --- a/rs/moq-net/src/client.rs +++ b/rs/moq-net/src/client.rs @@ -81,6 +81,10 @@ impl Client { let consume = consumer.clone(); let consumer_view = consumer.consume(); + // Per-session GOAWAY channel: `goaway` lives on the returned `Session`, `signal` + // drives the protocol task. Exactly one negotiated branch consumes each below. + let (goaway, signal) = crate::session::goaway_channel(); + // If ALPN was used to negotiate the version, use the appropriate encoding. // Default to IETF 14 if no ALPN was used and we'll negotiate the version later. let (encoding, supported) = match session.protocol() { @@ -100,10 +104,18 @@ impl Client { consume, self.stats.clone(), ietf::Version::Draft18, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_17) => { let v = self @@ -121,10 +133,18 @@ impl Client { consume, self.stats.clone(), ietf::Version::Draft17, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_16) => { let v = self @@ -159,6 +179,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite05Wip, + signal, )?; // Block until the initial announce set has landed (Lite05 reports it @@ -171,6 +192,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_04) => { @@ -185,6 +207,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite04, + signal, )?; // Lite04 has no initial-set boundary, so this resolves immediately. @@ -196,6 +219,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_03) => { @@ -211,6 +235,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite03, + signal, )?; // Lite03 has no initial-set boundary, so this resolves immediately. @@ -222,6 +247,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE) | None => { @@ -259,8 +285,15 @@ impl Client { let recv_bw = match version { Version::Lite(v) => { let stream = stream.with_version(v); - let (recv_bw, connecting) = - lite::start(session.clone(), Some(stream), publish, consume, self.stats.clone(), v)?; + let (recv_bw, connecting) = lite::start( + session.clone(), + Some(stream), + publish, + consume, + self.stats.clone(), + v, + signal, + )?; // Block until the initial announce set has landed (for versions that // report one); resolves immediately otherwise. @@ -285,12 +318,20 @@ impl Client { consume, self.stats.clone(), v, + signal, )?; None } }; - Ok(Session::new(session, version, recv_bw, publisher, consumer_view)) + Ok(Session::new( + session, + version, + recv_bw, + publisher, + consumer_view, + goaway, + )) } } diff --git a/rs/moq-net/src/ietf/adapter.rs b/rs/moq-net/src/ietf/adapter.rs index e43073ce8..424dc3c57 100644 --- a/rs/moq-net/src/ietf/adapter.rs +++ b/rs/moq-net/src/ietf/adapter.rs @@ -365,6 +365,24 @@ impl ControlStreamAdapter { } } + /// Queue a GOAWAY on the shared control stream (draft-14-16; v17+ uses the SETUP stream). + pub fn send_goaway(&self, uri: &str) -> Result<(), crate::Error> { + let msg = ietf::GoAway { + new_session_uri: uri.into(), + timeout: 0, + }; + + // Control-stream framing is [type_id][size][body], matching `run_read`. + let mut raw = BytesMut::new(); + ietf::GoAway::ID.encode(&mut raw, self.version)?; + msg.encode(&mut raw, self.version)?; + + self.shared + .control_tx + .send(raw.freeze()) + .map_err(|_| crate::Error::Closed) + } + /// Open a real (non-virtual) bidi stream, bypassing control stream multiplexing. /// Used for v16 SubscribeNamespace which moved to its own bidi stream. pub async fn open_native_bi(&self) -> Result<(AdapterSend, AdapterRecv), crate::Error> { diff --git a/rs/moq-net/src/ietf/session.rs b/rs/moq-net/src/ietf/session.rs index e51ea81df..465656128 100644 --- a/rs/moq-net/src/ietf/session.rs +++ b/rs/moq-net/src/ietf/session.rs @@ -2,6 +2,7 @@ use crate::{ Error, OriginConsumer, OriginProducer, StatsHandle, coding::{Encode, Reader, Stream, Writer}, ietf::{self, FetchHeader, RequestId}, + session::GoawaySignal, setup, }; @@ -20,6 +21,8 @@ pub fn start( // Tier-scoped stats handle. Pass [`StatsHandle::default`] to opt out. stats: StatsHandle, version: Version, + // Fires when the session should send a GOAWAY and start draining. + goaway: GoawaySignal, ) -> Result<(), Error> { web_async::spawn(async move { let res = match version { @@ -31,6 +34,22 @@ pub fn start( let control = Control::new(request_id_max, client); let adapter = ControlStreamAdapter::new(session.clone(), tx, control.clone(), version); + // GOAWAY rides the shared control stream for draft-14-16. + web_async::spawn({ + let adapter = adapter.clone(); + let session = session.clone(); + async move { + tokio::select! { + _ = session.closed() => {} + uri = goaway.triggered() => { + if let Err(err) = adapter.send_goaway(&uri) { + tracing::debug!(%err, "failed to send goaway"); + } + } + } + } + }); + let publisher = Publisher::new(adapter.clone(), publish, control.clone(), stats.clone(), version); let subscriber = Subscriber::new(adapter.clone(), subscribe, control, stats, version); @@ -62,11 +81,11 @@ pub fn start( } } _ => { - // Spawn SETUP sender (keeps stream alive for GOAWAY). + // Spawn SETUP sender, which also writes GOAWAY on that same uni stream. web_async::spawn({ let session = session.clone(); async move { - if let Err(err) = run_setup(session, version).await { + if let Err(err) = run_setup(session, version, goaway).await { tracing::warn!(%err, "setup send error"); } } @@ -113,8 +132,12 @@ pub fn start( Ok(()) } -/// Send our SETUP on a uni stream and keep it alive for potential GOAWAY. -async fn run_setup(session: S, version: Version) -> Result<(), Error> { +/// Send our SETUP on a uni stream, then keep it alive to carry a GOAWAY. +async fn run_setup( + session: S, + version: Version, + goaway: GoawaySignal, +) -> Result<(), Error> { let outer_version = crate::Version::Ietf(version); let send = session.open_uni().await.map_err(Error::from_transport)?; @@ -126,8 +149,20 @@ async fn run_setup(session: S, version: Version writer.encode(&setup::Setup { parameters }).await?; - // Hold the writer alive until the session closes. - session.closed().await; + // GOAWAY frames use the inner IETF framing ([type][size][body]). + let mut writer = writer.with_version(version); + + // Hold the stream open until the session closes, sending a GOAWAY first if asked. + tokio::select! { + _ = session.closed() => {} + uri = goaway.triggered() => { + let msg = ietf::GoAway { new_session_uri: uri.as_ref().into(), timeout: 0 }; + if let Err(err) = writer.encode_message(&msg).await { + tracing::debug!(%err, "failed to send goaway"); + } + session.closed().await; + } + } writer.finish().ok(); Ok(()) diff --git a/rs/moq-net/src/lite/goaway.rs b/rs/moq-net/src/lite/goaway.rs index dd834f087..1a9d65a0a 100644 --- a/rs/moq-net/src/lite/goaway.rs +++ b/rs/moq-net/src/lite/goaway.rs @@ -7,7 +7,6 @@ use super::{Message, Version}; /// Sent to gracefully shut down a session and optionally redirect to a new URI. /// /// Lite04+ only. -#[allow(dead_code)] #[derive(Clone, Debug)] pub struct Goaway<'a> { pub uri: Cow<'a, str>, @@ -38,3 +37,40 @@ impl Message for Goaway<'_> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + fn roundtrip(uri: &str) { + let msg = Goaway { uri: uri.into() }; + + let mut buf = BytesMut::new(); + msg.encode_msg(&mut buf, Version::Lite04).unwrap(); + + let mut bytes = bytes::Bytes::from(buf.to_vec()); + let decoded = Goaway::decode_msg(&mut bytes, Version::Lite04).unwrap(); + + assert_eq!(decoded.uri, uri); + } + + #[test] + fn roundtrip_with_uri() { + roundtrip("https://example.com/new"); + } + + #[test] + fn roundtrip_empty() { + roundtrip(""); + } + + #[test] + fn rejected_before_lite04() { + let msg = Goaway { + uri: "https://example.com/new".into(), + }; + let mut buf = BytesMut::new(); + assert!(msg.encode_msg(&mut buf, Version::Lite03).is_err()); + } +} diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index 73def6a49..bd53632db 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -1,9 +1,9 @@ use crate::{ BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, StatsHandle, coding::Stream, - lite::SessionInfo, + lite::SessionInfo, session::GoawaySignal, }; -use super::{Connecting, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; +use super::{Connecting, ControlType, Goaway, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; /// Start a lite session. /// @@ -24,6 +24,8 @@ pub fn start( stats: StatsHandle, // The version of the protocol to use. version: Version, + // Fires when the session should send a GOAWAY and start draining. + goaway: GoawaySignal, ) -> Result<(Option, Connecting), Error> { let recv_bw = BandwidthProducer::new(); @@ -68,6 +70,23 @@ pub fn start( version, }); + // GOAWAY is moq-lite-04+. On older drafts we simply never send it; the relay + // still drains by waiting for the peer to leave (or a forced shutdown). + if !matches!(version, Version::Lite01 | Version::Lite02 | Version::Lite03) { + let session = session.clone(); + web_async::spawn(async move { + tokio::select! { + // Don't outlive the session: stop waiting once it's gone. + _ = session.closed() => {} + uri = goaway.triggered() => { + if let Err(err) = send_goaway(&session, &uri, version).await { + tracing::debug!(%err, "failed to send goaway"); + } + } + } + }); + } + web_async::spawn(async move { let res = tokio::select! { Err(res) = run_session(setup_stream) => Err(res), @@ -94,6 +113,15 @@ pub fn start( Ok((recv_bw_consumer, connecting)) } +/// Open a dedicated control stream and write a single GOAWAY message. +async fn send_goaway(session: &S, uri: &str, version: Version) -> Result<(), Error> { + let mut stream = Stream::open(session, version).await?; + stream.writer.encode(&ControlType::Goaway).await?; + stream.writer.encode(&Goaway { uri: uri.into() }).await?; + stream.writer.finish()?; + stream.writer.closed().await +} + // TODO do something useful with this async fn run_session(stream: Option>) -> Result<(), Error> { if let Some(mut stream) = stream { diff --git a/rs/moq-net/src/server.rs b/rs/moq-net/src/server.rs index c7b568bc2..58b18a7f4 100644 --- a/rs/moq-net/src/server.rs +++ b/rs/moq-net/src/server.rs @@ -74,6 +74,10 @@ impl Server { let consume = consumer.clone(); let consumer_view = consumer.consume(); + // Per-session GOAWAY channel: `goaway` lives on the returned `Session`, `signal` + // drives the protocol task. Exactly one negotiated branch consumes each below. + let (goaway, signal) = crate::session::goaway_channel(); + let (encoding, supported) = match session.protocol() { Some(ALPN_18) => { let v = self @@ -91,10 +95,18 @@ impl Server { consume.clone(), self.stats.clone(), ietf::Version::Draft18, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_17) => { let v = self @@ -112,10 +124,18 @@ impl Server { consume.clone(), self.stats.clone(), ietf::Version::Draft17, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_16) => { let v = self @@ -151,6 +171,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite05Wip, + signal, )?; return Ok(Session::new( @@ -159,6 +180,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_04) => { @@ -173,6 +195,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite04, + signal, )?; return Ok(Session::new( @@ -181,6 +204,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_03) => { @@ -196,6 +220,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite03, + signal, )?; return Ok(Session::new( @@ -204,6 +229,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE) | None => { @@ -252,6 +278,7 @@ impl Server { consume.clone(), self.stats.clone(), v, + signal, )?; recv_bw } @@ -272,11 +299,19 @@ impl Server { consume.clone(), self.stats.clone(), v, + signal, )?; None } }; - Ok(Session::new(session, version, recv_bw, publisher, consumer_view)) + Ok(Session::new( + session, + version, + recv_bw, + publisher, + consumer_view, + goaway, + )) } } diff --git a/rs/moq-net/src/session.rs b/rs/moq-net/src/session.rs index 8c008ecf2..c5e9ecb12 100644 --- a/rs/moq-net/src/session.rs +++ b/rs/moq-net/src/session.rs @@ -1,5 +1,6 @@ use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; +use tokio::sync::watch; use web_transport_trait::Stats; use crate::{BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, Version}; @@ -27,6 +28,7 @@ pub struct Session { recv_bandwidth: Option, publisher: OriginProducer, consumer: OriginConsumer, + goaway: GoawayTrigger, closed: bool, } @@ -37,6 +39,7 @@ impl Session { recv_bandwidth: Option, publisher: OriginProducer, consumer: OriginConsumer, + goaway: GoawayTrigger, ) -> Self { // Send bandwidth is version-agnostic: it depends on QUIC backend support. let send_bandwidth = if session.stats().estimated_send_rate().is_some() { @@ -60,6 +63,7 @@ impl Session { recv_bandwidth, publisher, consumer, + goaway, closed: false, } } @@ -102,6 +106,16 @@ impl Session { &self.consumer } + /// Ask the peer to migrate away by sending a GOAWAY, without closing the session. + /// + /// The session stays open so in-flight groups can finish; await + /// [`closed`](Self::closed) to learn when the peer actually leaves. `uri` is an + /// optional redirect target (pass `""` to just drain). Calling more than once, + /// or on a protocol version that predates GOAWAY, is harmless. + pub fn goaway(&self, uri: &str) { + self.goaway.send(uri); + } + /// Close the underlying transport session. pub fn close(&mut self, err: Error) { if self.closed { @@ -126,6 +140,57 @@ impl Drop for Session { } } +/// Create a linked [`GoawayTrigger`] / [`GoawaySignal`] pair for one session. +/// +/// The trigger lives on the [`Session`]; the signal is handed to the per-protocol +/// task spawned by `lite::start` / `ietf::start`, which writes the actual GOAWAY +/// frame when fired. +pub(crate) fn goaway_channel() -> (GoawayTrigger, GoawaySignal) { + let (tx, rx) = watch::channel(None); + (GoawayTrigger { tx }, GoawaySignal { rx }) +} + +/// Sender half of a session's GOAWAY signal, held by [`Session`]. +#[derive(Clone)] +pub(crate) struct GoawayTrigger { + tx: watch::Sender>>, +} + +impl GoawayTrigger { + fn send(&self, uri: &str) { + // Ignore send errors: the receiver task may have already exited (session + // closed), in which case there's no one left to GOAWAY. + let _ = self.tx.send(Some(Arc::from(uri))); + } +} + +/// Receiver half handed to the per-protocol session task. +#[derive(Clone)] +pub(crate) struct GoawaySignal { + rx: watch::Receiver>>, +} + +impl GoawaySignal { + /// Resolve once a GOAWAY is requested, yielding the (possibly empty) redirect URI. + /// + /// Never resolves if the trigger is dropped without firing, so it's safe to + /// `select!` this against the session closing. + pub(crate) async fn triggered(mut self) -> Arc { + // Clone the value out so the watch `Ref` guard is dropped before any further + // await (the guard is not `Send`, which would taint the spawned task). + let uri = self + .rx + .wait_for(|uri| uri.is_some()) + .await + .ok() + .and_then(|uri| uri.clone()); + match uri { + Some(uri) => uri, + None => std::future::pending().await, + } + } +} + /// Polls the QUIC congestion controller for estimated send rate. /// /// Exits as soon as the session closes so we don't pin the underlying connection diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 5a1a2afb4..39c2d91d8 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -3,6 +3,7 @@ use crate::{Auth, AuthError, AuthParams, AuthToken, Cluster}; use axum::http; use moq_native::Request; use moq_net::Path; +use tokio::sync::watch; /// An error carrying the HTTP status to send when closing the request. /// @@ -39,8 +40,11 @@ pub struct Connection { impl Connection { /// Authenticates and serves this connection until it closes. + /// + /// `drain` flips to `true` during a graceful shutdown: we send a GOAWAY and then + /// keep serving until the peer actually leaves. #[tracing::instrument("conn", skip_all, fields(id = self.id))] - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self, mut drain: watch::Receiver) -> anyhow::Result<()> { let token = match self.authenticate().await { Ok(token) => token, Err(err) => { @@ -104,8 +108,17 @@ impl Connection { tracing::info!(version = %session.version(), transport, "negotiated"); - // Wait until the session is closed. - session.closed().await?; + // Serve until the peer leaves. On a graceful shutdown, send a GOAWAY first and + // keep serving so in-flight groups can finish before the peer migrates away. + tokio::select! { + res = session.closed() => res?, + // Drop the watch `Ref` guard (not `Send`) before the awaits in the body. + _ = async { let _ = drain.wait_for(|d| *d).await; } => { + tracing::info!("draining; sending goaway"); + session.goaway(""); + session.closed().await?; + } + } Ok(()) } diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index d1e3f6a4d..a8e5b3f26 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -21,8 +21,10 @@ async fn main() -> anyhow::Result<()> { let mtls_enabled = !config.server.tls.root.is_empty(); + // We drive shutdown ourselves (GOAWAY drain on the first Ctrl+C, force on the + // second), so opt out of moq-native's built-in Ctrl+C-closes-everything handler. #[allow(unused_mut)] - let mut server = config.server.init()?; + let mut server = config.server.init()?.with_signal_handler(false); let client = config.client.clone().init()?; let addr = server.local_addr()?; @@ -77,19 +79,82 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "jemalloc"))] let jemalloc = std::future::pending::>(); + // First signal (Ctrl+C / SIGTERM): GOAWAY all connections and stop accepting, then + // wait for them to drain. Second signal: force shutdown immediately. + let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); + let shutdown = async move { + shutdown_signal().await; + tracing::info!("shutting down: sending GOAWAY and draining connections (signal again to force)"); + #[cfg(unix)] + let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); + let _ = drain_tx.send(true); + shutdown_signal().await; + tracing::warn!("forcing shutdown"); + }; + tokio::select! { - Err(err) = cluster.clone().run() => return Err(err).context("cluster failed"), - Err(err) = web.run() => return Err(err).context("web server failed"), - Err(err) = serve(server, cluster, auth) => return Err(err).context("server failed"), - Err(err) = jemalloc => return Err(err).context("jemalloc profiler failed"), - else => Ok(()), + Err(err) = cluster.clone().run() => Err(err).context("cluster failed"), + Err(err) = web.run() => Err(err).context("web server failed"), + res = serve(server, cluster, auth, drain_rx) => res.context("server failed"), + Err(err) = jemalloc => Err(err).context("jemalloc profiler failed"), + // Forced shutdown: dropping `server` (and the connection tasks) closes everything. + _ = shutdown => Ok(()), + } +} + +/// Resolve on the next shutdown signal: Ctrl+C (SIGINT) or, on Unix, SIGTERM +/// (what `systemctl stop` sends). Never resolves if signal registration fails, +/// so a broken handler can't masquerade as a shutdown request. +async fn shutdown_signal() { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let mut term = match signal(SignalKind::terminate()) { + Ok(term) => term, + Err(err) => { + tracing::warn!(%err, "failed to listen for SIGTERM"); + // Diverges: a broken handler must never count as a shutdown request. + std::future::pending().await + } + }; + tokio::select! { + res = tokio::signal::ctrl_c() => { + if res.is_err() { + std::future::pending::<()>().await; + } + } + _ = term.recv() => {} + } + } + #[cfg(not(unix))] + { + if tokio::signal::ctrl_c().await.is_err() { + std::future::pending::<()>().await; + } } } -async fn serve(mut server: moq_native::Server, cluster: Cluster, auth: Auth) -> anyhow::Result<()> { +async fn serve( + mut server: moq_native::Server, + cluster: Cluster, + auth: Auth, + mut drain: tokio::sync::watch::Receiver, +) -> anyhow::Result<()> { let mut conn_id = 0; - while let Some(request) = server.accept().await { + // Tracks in-flight connections: each task holds a clone of `active`, so + // `active_rx.recv()` resolves to `None` only once every task has finished. + let (active, mut active_rx) = tokio::sync::mpsc::channel::<()>(1); + + loop { + let request = tokio::select! { + request = server.accept() => request, + // Stop accepting once draining begins; existing connections keep running. + _ = drain.wait_for(|d| *d) => break, + }; + + let Some(request) = request else { break }; + let conn = Connection { id: conn_id, request, @@ -98,12 +163,20 @@ async fn serve(mut server: moq_native::Server, cluster: Cluster, auth: Auth) -> }; conn_id += 1; + let drain = drain.clone(); + let active = active.clone(); tokio::spawn(async move { - if let Err(err) = conn.run().await { + let _active = active; + if let Err(err) = conn.run(drain).await { tracing::warn!(%err, "connection closed"); } }); } - anyhow::bail!("stopped accepting connections") + // No longer accepting. Wait for every in-flight connection to drain. + drop(active); + tracing::info!("waiting for connections to drain"); + let _ = active_rx.recv().await; + tracing::info!("all connections drained"); + Ok(()) } From e2c505070d76cc61955ee72fe5e08e162bb3efd6 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 4 Jun 2026 16:15:36 -0700 Subject: [PATCH 2/3] refactor(moq-relay): register stop-signal streams once Open the SIGINT/SIGTERM streams up front in a `ShutdownSignals` helper and recv() twice, instead of re-registering a fresh listener for each wait. This closes the small window where a second signal could arrive between the first firing and the new listener being registered, and makes the soft (drain) vs hard (force) mapping explicit: SIGINT/SIGTERM drains, a second forces, and SIGKILL stays the kernel's uncatchable backstop. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-relay/src/main.rs | 71 ++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index a8e5b3f26..a8bd40889 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -79,16 +79,22 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "jemalloc"))] let jemalloc = std::future::pending::>(); - // First signal (Ctrl+C / SIGTERM): GOAWAY all connections and stop accepting, then - // wait for them to drain. Second signal: force shutdown immediately. + // Graceful-then-forceful shutdown. The first stop signal (SIGINT/Ctrl+C, or SIGTERM + // from `systemctl stop`) drains; a second one forces. There's no handler for a hard + // kill: SIGKILL is uncatchable, and that's the kernel-level backstop (systemd sends + // it after TimeoutStopSec). let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); let shutdown = async move { - shutdown_signal().await; + // Open the streams once so the second signal can't slip through a re-registration gap. + let mut signals = ShutdownSignals::listen(); + + signals.recv().await; tracing::info!("shutting down: sending GOAWAY and draining connections (signal again to force)"); #[cfg(unix)] let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); let _ = drain_tx.send(true); - shutdown_signal().await; + + signals.recv().await; tracing::warn!("forcing shutdown"); }; @@ -102,34 +108,43 @@ async fn main() -> anyhow::Result<()> { } } -/// Resolve on the next shutdown signal: Ctrl+C (SIGINT) or, on Unix, SIGTERM -/// (what `systemctl stop` sends). Never resolves if signal registration fails, -/// so a broken handler can't masquerade as a shutdown request. -async fn shutdown_signal() { +/// Listens for OS stop signals. On Unix that's SIGINT (Ctrl+C) and SIGTERM (what +/// `systemctl stop` sends); elsewhere just Ctrl+C. Both streams are registered up +/// front so repeated signals are delivered reliably. +struct ShutdownSignals { #[cfg(unix)] - { - use tokio::signal::unix::{SignalKind, signal}; - let mut term = match signal(SignalKind::terminate()) { - Ok(term) => term, - Err(err) => { - tracing::warn!(%err, "failed to listen for SIGTERM"); - // Diverges: a broken handler must never count as a shutdown request. - std::future::pending().await - } - }; - tokio::select! { - res = tokio::signal::ctrl_c() => { - if res.is_err() { - std::future::pending::<()>().await; - } + sigint: tokio::signal::unix::Signal, + #[cfg(unix)] + sigterm: tokio::signal::unix::Signal, +} + +impl ShutdownSignals { + fn listen() -> Self { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + Self { + sigint: signal(SignalKind::interrupt()).expect("failed to listen for SIGINT"), + sigterm: signal(SignalKind::terminate()).expect("failed to listen for SIGTERM"), } - _ = term.recv() => {} } + #[cfg(not(unix))] + Self {} } - #[cfg(not(unix))] - { - if tokio::signal::ctrl_c().await.is_err() { - std::future::pending::<()>().await; + + /// Resolve on the next stop signal. + async fn recv(&mut self) { + #[cfg(unix)] + tokio::select! { + _ = self.sigint.recv() => {} + _ = self.sigterm.recv() => {} + } + #[cfg(not(unix))] + { + // Windows: Ctrl+C only. A failed registration must not look like a signal. + if tokio::signal::ctrl_c().await.is_err() { + std::future::pending::<()>().await; + } } } } From e7dda5fe908457e50542c633822f95dd151405df Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 9 Jun 2026 21:09:05 -0700 Subject: [PATCH 3/3] =?UTF-8?q?refactor(moq-net):=20address=20review=20?= =?UTF-8?q?=E2=80=94=20public=20Drain=20API=20on=20kio?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reworks the session GOAWAY surface per review feedback: - Replace `Session::goaway(&str)` with a public `Drain` handle: `session.drain()` returns a `Drain` whose `start(uri)` sends the GOAWAY and `complete().await` waits for the peer to leave. This combines "send GOAWAY" and "await drain" into one type instead of two loose methods. - `Drain::start` takes `impl Into>`, matching the crate convention for optional args (`None` to just drain, `Some(uri)` to redirect). - Back the trigger with `kio::Producer`/`Consumer` instead of `tokio::sync::watch`, consistent with the rest of moq-net's async state. moq-native: rename `with_signal_handler` to `with_ctrl_c_handler`, and replace the `select!` `if`-guard on the built-in Ctrl+C arm with an explicit ctrl_c-or-pending future so the behavior doesn't depend on guard-evaluation timing. moq-relay/test: use `session.drain().start(None)` / `drain.complete().await`. Co-Authored-By: Claude Opus 4.8 (1M context) --- rs/moq-native/src/server.rs | 17 +++-- rs/moq-native/tests/broadcast.rs | 8 +-- rs/moq-net/src/ietf/session.rs | 20 +++--- rs/moq-net/src/lite/session.rs | 12 ++-- rs/moq-net/src/session.rs | 110 +++++++++++++++++-------------- rs/moq-relay/src/connection.rs | 17 ++--- rs/moq-relay/src/main.rs | 2 +- 7 files changed, 108 insertions(+), 78 deletions(-) diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 244e83169..fc6bb74b7 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -206,7 +206,7 @@ pub struct Server { versions: moq_net::Versions, /// When true (default), [`accept`](Self::accept) returns `None` on Ctrl+C after /// closing the endpoints. Callers wanting a graceful shutdown disable this via - /// [`with_signal_handler`](Self::with_signal_handler) and drive signals themselves. + /// [`with_ctrl_c_handler`](Self::with_ctrl_c_handler) and drive signals themselves. handle_ctrl_c: bool, accept: FuturesUnordered>>, #[cfg(feature = "iroh")] @@ -310,7 +310,7 @@ impl Server { /// Enabled by default: Ctrl+C closes the endpoints and makes `accept` return `None`. /// Disable it to handle shutdown yourself (e.g. a GOAWAY drain), then drive signals /// from the caller. - pub fn with_signal_handler(mut self, enabled: bool) -> Self { + pub fn with_ctrl_c_handler(mut self, enabled: bool) -> Self { self.handle_ctrl_c = enabled; self } @@ -362,10 +362,17 @@ impl Server { /// Call [Request::ok] or [Request::close] to complete the handshake. #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))] pub async fn accept(&mut self) -> Option { - // Copied out so the select! arm's `if` guard doesn't borrow `self` (the arm body - // needs `&mut self` to close). let handle_ctrl_c = self.handle_ctrl_c; loop { + // Either wait for Ctrl+C, or never resolve when the handler is disabled. Built + // as an explicit future (rather than a `select!` `if` guard) so the behavior + // doesn't hinge on guard-evaluation timing. + let ctrl_c = async { + match handle_ctrl_c { + true => drop(tokio::signal::ctrl_c().await), + false => std::future::pending().await, + } + }; // tokio::select! does not support cfg directives on arms, so we need to create the futures here. #[cfg(feature = "noq")] let noq_accept = async { @@ -494,7 +501,7 @@ impl Server { Err(err) => tracing::debug!(%err, "failed to accept session"), } } - _ = tokio::signal::ctrl_c(), if handle_ctrl_c => { + _ = ctrl_c => { self.close().await; return None; } diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index 75817528b..fb9902570 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -141,11 +141,11 @@ async fn goaway_drains_peer_moq_transport_14() { let session = request.ok().await?; // Send GOAWAY, then confirm the peer actually leaves (the session closes). - session.goaway(""); - tokio::time::timeout(TIMEOUT, session.closed()) + let drain = session.drain(); + drain.start(None); + tokio::time::timeout(TIMEOUT, drain.complete()) .await - .expect("server session did not drain after goaway") - .ok(); + .expect("server session did not drain after goaway"); Ok::<_, anyhow::Error>(()) }); diff --git a/rs/moq-net/src/ietf/session.rs b/rs/moq-net/src/ietf/session.rs index 465656128..c5b9e38cc 100644 --- a/rs/moq-net/src/ietf/session.rs +++ b/rs/moq-net/src/ietf/session.rs @@ -2,7 +2,7 @@ use crate::{ Error, OriginConsumer, OriginProducer, StatsHandle, coding::{Encode, Reader, Stream, Writer}, ietf::{self, FetchHeader, RequestId}, - session::GoawaySignal, + session::{GoawaySignal, goaway_triggered}, setup, }; @@ -41,8 +41,10 @@ pub fn start( async move { tokio::select! { _ = session.closed() => {} - uri = goaway.triggered() => { - if let Err(err) = adapter.send_goaway(&uri) { + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri + && let Err(err) = adapter.send_goaway(&uri) + { tracing::debug!(%err, "failed to send goaway"); } } @@ -155,12 +157,14 @@ async fn run_setup( // Hold the stream open until the session closes, sending a GOAWAY first if asked. tokio::select! { _ = session.closed() => {} - uri = goaway.triggered() => { - let msg = ietf::GoAway { new_session_uri: uri.as_ref().into(), timeout: 0 }; - if let Err(err) = writer.encode_message(&msg).await { - tracing::debug!(%err, "failed to send goaway"); + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri { + let msg = ietf::GoAway { new_session_uri: uri.as_ref().into(), timeout: 0 }; + if let Err(err) = writer.encode_message(&msg).await { + tracing::debug!(%err, "failed to send goaway"); + } + session.closed().await; } - session.closed().await; } } writer.finish().ok(); diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index bd53632db..61614ff5f 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -1,6 +1,8 @@ use crate::{ - BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, StatsHandle, coding::Stream, - lite::SessionInfo, session::GoawaySignal, + BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, StatsHandle, + coding::Stream, + lite::SessionInfo, + session::{GoawaySignal, goaway_triggered}, }; use super::{Connecting, ControlType, Goaway, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; @@ -78,8 +80,10 @@ pub fn start( tokio::select! { // Don't outlive the session: stop waiting once it's gone. _ = session.closed() => {} - uri = goaway.triggered() => { - if let Err(err) = send_goaway(&session, &uri, version).await { + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri + && let Err(err) = send_goaway(&session, &uri, version).await + { tracing::debug!(%err, "failed to send goaway"); } } diff --git a/rs/moq-net/src/session.rs b/rs/moq-net/src/session.rs index c5e9ecb12..c7a419e90 100644 --- a/rs/moq-net/src/session.rs +++ b/rs/moq-net/src/session.rs @@ -1,6 +1,6 @@ -use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; +use std::{future::Future, pin::Pin, sync::Arc, task::Poll, time::Duration}; -use tokio::sync::watch; +use kio::{Consumer, Producer}; use web_transport_trait::Stats; use crate::{BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, Version}; @@ -106,14 +106,16 @@ impl Session { &self.consumer } - /// Ask the peer to migrate away by sending a GOAWAY, without closing the session. + /// Begin a graceful drain of this session. /// - /// The session stays open so in-flight groups can finish; await - /// [`closed`](Self::closed) to learn when the peer actually leaves. `uri` is an - /// optional redirect target (pass `""` to just drain). Calling more than once, - /// or on a protocol version that predates GOAWAY, is harmless. - pub fn goaway(&self, uri: &str) { - self.goaway.send(uri); + /// Returns a [`Drain`] handle: [`Drain::start`] sends a GOAWAY asking the peer to + /// migrate away (without closing the session), and [`Drain::complete`] awaits its + /// departure. + pub fn drain(&self) -> Drain { + Drain { + goaway: self.goaway.clone(), + session: self.session.clone(), + } } /// Close the underlying transport session. @@ -140,55 +142,67 @@ impl Drop for Session { } } -/// Create a linked [`GoawayTrigger`] / [`GoawaySignal`] pair for one session. +/// A handle to gracefully drain a [`Session`], obtained via [`Session::drain`]. /// -/// The trigger lives on the [`Session`]; the signal is handed to the per-protocol -/// task spawned by `lite::start` / `ietf::start`, which writes the actual GOAWAY -/// frame when fired. -pub(crate) fn goaway_channel() -> (GoawayTrigger, GoawaySignal) { - let (tx, rx) = watch::channel(None); - (GoawayTrigger { tx }, GoawaySignal { rx }) -} - -/// Sender half of a session's GOAWAY signal, held by [`Session`]. +/// `start` asks the peer to migrate away (GOAWAY) without closing the session; +/// `complete` waits until it actually leaves. Cheaply clonable. #[derive(Clone)] -pub(crate) struct GoawayTrigger { - tx: watch::Sender>>, +pub struct Drain { + goaway: GoawayTrigger, + session: Arc, } -impl GoawayTrigger { - fn send(&self, uri: &str) { - // Ignore send errors: the receiver task may have already exited (session - // closed), in which case there's no one left to GOAWAY. - let _ = self.tx.send(Some(Arc::from(uri))); +impl Drain { + /// Send a GOAWAY asking the peer to migrate away, optionally to `uri` (`None` + /// just asks them to leave). The session stays open so in-flight groups can + /// finish; call [`complete`](Self::complete) to await departure. Calling more + /// than once, or on a protocol version that predates GOAWAY, is harmless. + pub fn start<'a>(&self, uri: impl Into>) { + let uri: Option<&str> = uri.into(); + // A closed channel means the protocol task already exited (session gone), + // so there's nothing left to GOAWAY. + if let Ok(mut value) = self.goaway.write() { + *value = Some(Arc::from(uri.unwrap_or(""))); + } + } + + /// Wait until the session has fully closed: the peer left, or it was forced. + pub async fn complete(&self) { + self.session.closed().await; } } -/// Receiver half handed to the per-protocol session task. -#[derive(Clone)] -pub(crate) struct GoawaySignal { - rx: watch::Receiver>>, +/// Trigger half of a session's GOAWAY signal, held by [`Session`] / [`Drain`]. +/// `None` means "not yet requested"; `Some(uri)` carries the (possibly empty) URI. +pub(crate) type GoawayTrigger = Producer>>; + +/// Signal half handed to the per-protocol session task spawned by `lite::start` / +/// `ietf::start`, which writes the actual GOAWAY frame when fired. +pub(crate) type GoawaySignal = Consumer>>; + +/// Create a linked [`GoawayTrigger`] / [`GoawaySignal`] pair for one session. +pub(crate) fn goaway_channel() -> (GoawayTrigger, GoawaySignal) { + let trigger = Producer::new(None); + let signal = trigger.consume(); + (trigger, signal) } -impl GoawaySignal { - /// Resolve once a GOAWAY is requested, yielding the (possibly empty) redirect URI. - /// - /// Never resolves if the trigger is dropped without firing, so it's safe to - /// `select!` this against the session closing. - pub(crate) async fn triggered(mut self) -> Arc { - // Clone the value out so the watch `Ref` guard is dropped before any further - // await (the guard is not `Send`, which would taint the spawned task). - let uri = self - .rx - .wait_for(|uri| uri.is_some()) - .await - .ok() - .and_then(|uri| uri.clone()); - match uri { - Some(uri) => uri, - None => std::future::pending().await, +/// Resolve once a GOAWAY is requested, yielding the (possibly empty) redirect URI, +/// or `None` if the trigger was dropped without firing (the session is going away). +pub(crate) async fn goaway_triggered(signal: GoawaySignal) -> Option> { + // Map inside the closure so the `Ref` lock guard (not `Send`) never lands in the + // returned future, keeping it spawnable. + kio::wait(|waiter| { + match signal.poll(waiter, |uri| match &**uri { + Some(uri) => Poll::Ready(uri.clone()), + None => Poll::Pending, + }) { + Poll::Ready(Ok(uri)) => Poll::Ready(Some(uri)), + Poll::Ready(Err(_closed)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, } - } + }) + .await } /// Polls the QUIC congestion controller for estimated send rate. diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 39c2d91d8..8d5833e8a 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -41,10 +41,10 @@ pub struct Connection { impl Connection { /// Authenticates and serves this connection until it closes. /// - /// `drain` flips to `true` during a graceful shutdown: we send a GOAWAY and then - /// keep serving until the peer actually leaves. + /// `shutdown` flips to `true` during a graceful shutdown: we drain the session + /// (GOAWAY) and keep serving until the peer actually leaves. #[tracing::instrument("conn", skip_all, fields(id = self.id))] - pub async fn run(self, mut drain: watch::Receiver) -> anyhow::Result<()> { + pub async fn run(self, mut shutdown: watch::Receiver) -> anyhow::Result<()> { let token = match self.authenticate().await { Ok(token) => token, Err(err) => { @@ -108,15 +108,16 @@ impl Connection { tracing::info!(version = %session.version(), transport, "negotiated"); - // Serve until the peer leaves. On a graceful shutdown, send a GOAWAY first and - // keep serving so in-flight groups can finish before the peer migrates away. + // Serve until the peer leaves. On a graceful shutdown, drain the session (send a + // GOAWAY) and keep serving so in-flight groups can finish before it migrates away. tokio::select! { res = session.closed() => res?, // Drop the watch `Ref` guard (not `Send`) before the awaits in the body. - _ = async { let _ = drain.wait_for(|d| *d).await; } => { + _ = async { let _ = shutdown.wait_for(|d| *d).await; } => { tracing::info!("draining; sending goaway"); - session.goaway(""); - session.closed().await?; + let drain = session.drain(); + drain.start(None); + drain.complete().await; } } Ok(()) diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index a8bd40889..cd8bbff0a 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -24,7 +24,7 @@ async fn main() -> anyhow::Result<()> { // We drive shutdown ourselves (GOAWAY drain on the first Ctrl+C, force on the // second), so opt out of moq-native's built-in Ctrl+C-closes-everything handler. #[allow(unused_mut)] - let mut server = config.server.init()?.with_signal_handler(false); + let mut server = config.server.init()?.with_ctrl_c_handler(false); let client = config.client.clone().init()?; let addr = server.local_addr()?;