diff --git a/doc/bin/relay/index.md b/doc/bin/relay/index.md index 11ab4a643..22c2d5260 100644 --- a/doc/bin/relay/index.md +++ b/doc/bin/relay/index.md @@ -87,6 +87,22 @@ Or with the config path as the only argument: moq-relay relay.toml ``` +## Graceful Shutdown + +The relay drains connections on the first shutdown signal (`SIGINT` from Ctrl+C, or +`SIGTERM` from `systemctl stop`): + +1. **First signal** stops accepting new connections and sends a `GOAWAY` to every + active session, asking clients to migrate elsewhere. The relay keeps serving so + in-flight groups can finish, and waits for all sessions to close on their own. On + systemd it also reports `STOPPING=1`. +2. **Second signal** forces an immediate shutdown, closing every connection. (Under + systemd this is normally `SIGKILL` after `TimeoutStopSec`.) + +A client that honors `GOAWAY` reconnects to another relay (or re-resolves DNS in a +load-balanced deployment), so draining lets you roll a node with no dropped media. +Clients that ignore `GOAWAY` stay connected until the second signal. + ## HTTP Endpoints The relay exposes HTTP/HTTPS endpoints for debugging, health checks, and late-join. See [HTTP](/bin/relay/http) for details. diff --git a/rs/moq-native/src/server.rs b/rs/moq-native/src/server.rs index 36e8795ab..fc6bb74b7 100644 --- a/rs/moq-native/src/server.rs +++ b/rs/moq-native/src/server.rs @@ -204,6 +204,10 @@ pub(crate) const DEFAULT_BIND: &str = "[::]:443"; pub struct Server { moq: moq_net::Server, versions: moq_net::Versions, + /// When true (default), [`accept`](Self::accept) returns `None` on Ctrl+C after + /// closing the endpoints. Callers wanting a graceful shutdown disable this via + /// [`with_ctrl_c_handler`](Self::with_ctrl_c_handler) and drive signals themselves. + handle_ctrl_c: bool, accept: FuturesUnordered>>, #[cfg(feature = "iroh")] iroh: Option, @@ -267,6 +271,7 @@ impl Server { }; Ok(Server { + handle_ctrl_c: true, accept: Default::default(), moq: moq_net::Server::new().with_versions(versions.clone()), versions, @@ -300,6 +305,16 @@ impl Server { self } + /// Enable or disable the built-in Ctrl+C handler in [`accept`](Self::accept). + /// + /// Enabled by default: Ctrl+C closes the endpoints and makes `accept` return `None`. + /// Disable it to handle shutdown yourself (e.g. a GOAWAY drain), then drive signals + /// from the caller. + pub fn with_ctrl_c_handler(mut self, enabled: bool) -> Self { + self.handle_ctrl_c = enabled; + self + } + pub fn with_publisher(mut self, publish: moq_net::OriginProducer) -> Self { self.moq = self.moq.with_publisher(publish); self @@ -347,7 +362,17 @@ impl Server { /// Call [Request::ok] or [Request::close] to complete the handshake. #[cfg(any(feature = "noq", feature = "quinn", feature = "quiche", feature = "iroh"))] pub async fn accept(&mut self) -> Option { + let handle_ctrl_c = self.handle_ctrl_c; loop { + // Either wait for Ctrl+C, or never resolve when the handler is disabled. Built + // as an explicit future (rather than a `select!` `if` guard) so the behavior + // doesn't hinge on guard-evaluation timing. + let ctrl_c = async { + match handle_ctrl_c { + true => drop(tokio::signal::ctrl_c().await), + false => std::future::pending().await, + } + }; // tokio::select! does not support cfg directives on arms, so we need to create the futures here. #[cfg(feature = "noq")] let noq_accept = async { @@ -476,7 +501,7 @@ impl Server { Err(err) => tracing::debug!(%err, "failed to accept session"), } } - _ = tokio::signal::ctrl_c() => { + _ = ctrl_c => { self.close().await; return None; } diff --git a/rs/moq-native/tests/broadcast.rs b/rs/moq-native/tests/broadcast.rs index bef304656..495f1b2d8 100644 --- a/rs/moq-native/tests/broadcast.rs +++ b/rs/moq-native/tests/broadcast.rs @@ -117,6 +117,55 @@ async fn broadcast_test(scheme: &str, client_version: Option<&str>, server_versi .expect("server task failed"); } +/// `Session::drain` must actually deliver a GOAWAY to the peer. We use +/// moq-transport-14, where receiving a GOAWAY closes the session, so the drain is +/// observable end-to-end: the server fires GOAWAY and both sides see the session close. +#[tokio::test] +async fn goaway_drains_peer_moq_transport_14() { + let mut server_config = moq_native::ServerConfig::default(); + server_config.bind = Some("[::]:0".to_string()); + server_config.tls.generate = vec!["localhost".into()]; + server_config.version = vec!["moq-transport-14".parse().unwrap()]; + + let mut server = server_config.init().expect("failed to init server"); + let addr = server.local_addr().expect("failed to get local addr"); + + let mut client_config = moq_native::ClientConfig::default(); + client_config.tls.disable_verify = Some(true); + client_config.version = vec!["moq-transport-14".parse().unwrap()]; + let client = client_config.init().expect("failed to init client"); + let url: url::Url = format!("https://localhost:{}", addr.port()).parse().unwrap(); + + let server_handle = tokio::spawn(async move { + let request = server.accept().await.expect("no incoming connection"); + let session = request.ok().await?; + + // Send GOAWAY, then confirm the peer actually leaves (the session closes). + let drain = session.drain(); + drain.start(None); + tokio::time::timeout(TIMEOUT, drain.complete()) + .await + .expect("server session did not drain after goaway"); + Ok::<_, anyhow::Error>(()) + }); + + let session = tokio::time::timeout(TIMEOUT, client.connect(url)) + .await + .expect("client connect timed out") + .expect("client connect failed"); + + // The client should observe the session closing once the GOAWAY arrives. + tokio::time::timeout(TIMEOUT, session.closed()) + .await + .expect("client session did not close after goaway") + .ok(); + + server_handle + .await + .expect("server task panicked") + .expect("server task failed"); +} + /// Lite05 publisher↔subscriber round-trip exercising the per-frame timestamp /// delta encoding, including negative deltas (B-frame ordering). async fn lite05_timestamp_roundtrip(scheme: &str) { diff --git a/rs/moq-net/src/client.rs b/rs/moq-net/src/client.rs index 3ef382128..0070fc4b1 100644 --- a/rs/moq-net/src/client.rs +++ b/rs/moq-net/src/client.rs @@ -81,6 +81,10 @@ impl Client { let consume = consumer.clone(); let consumer_view = consumer.consume(); + // Per-session GOAWAY channel: `goaway` lives on the returned `Session`, `signal` + // drives the protocol task. Exactly one negotiated branch consumes each below. + let (goaway, signal) = crate::session::goaway_channel(); + // If ALPN was used to negotiate the version, use the appropriate encoding. // Default to IETF 14 if no ALPN was used and we'll negotiate the version later. let (encoding, supported) = match session.protocol() { @@ -100,10 +104,18 @@ impl Client { consume, self.stats.clone(), ietf::Version::Draft18, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_17) => { let v = self @@ -121,10 +133,18 @@ impl Client { consume, self.stats.clone(), ietf::Version::Draft17, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_16) => { let v = self @@ -159,6 +179,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite05Wip, + signal, )?; // Block until the initial announce set has landed (Lite05 reports it @@ -171,6 +192,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_04) => { @@ -185,6 +207,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite04, + signal, )?; // Lite04 has no initial-set boundary, so this resolves immediately. @@ -196,6 +219,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_03) => { @@ -211,6 +235,7 @@ impl Client { consume, self.stats.clone(), lite::Version::Lite03, + signal, )?; // Lite03 has no initial-set boundary, so this resolves immediately. @@ -222,6 +247,7 @@ impl Client { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE) | None => { @@ -259,8 +285,15 @@ impl Client { let recv_bw = match version { Version::Lite(v) => { let stream = stream.with_version(v); - let (recv_bw, connecting) = - lite::start(session.clone(), Some(stream), publish, consume, self.stats.clone(), v)?; + let (recv_bw, connecting) = lite::start( + session.clone(), + Some(stream), + publish, + consume, + self.stats.clone(), + v, + signal, + )?; // Block until the initial announce set has landed (for versions that // report one); resolves immediately otherwise. @@ -285,12 +318,20 @@ impl Client { consume, self.stats.clone(), v, + signal, )?; None } }; - Ok(Session::new(session, version, recv_bw, publisher, consumer_view)) + Ok(Session::new( + session, + version, + recv_bw, + publisher, + consumer_view, + goaway, + )) } } diff --git a/rs/moq-net/src/ietf/adapter.rs b/rs/moq-net/src/ietf/adapter.rs index e43073ce8..424dc3c57 100644 --- a/rs/moq-net/src/ietf/adapter.rs +++ b/rs/moq-net/src/ietf/adapter.rs @@ -365,6 +365,24 @@ impl ControlStreamAdapter { } } + /// Queue a GOAWAY on the shared control stream (draft-14-16; v17+ uses the SETUP stream). + pub fn send_goaway(&self, uri: &str) -> Result<(), crate::Error> { + let msg = ietf::GoAway { + new_session_uri: uri.into(), + timeout: 0, + }; + + // Control-stream framing is [type_id][size][body], matching `run_read`. + let mut raw = BytesMut::new(); + ietf::GoAway::ID.encode(&mut raw, self.version)?; + msg.encode(&mut raw, self.version)?; + + self.shared + .control_tx + .send(raw.freeze()) + .map_err(|_| crate::Error::Closed) + } + /// Open a real (non-virtual) bidi stream, bypassing control stream multiplexing. /// Used for v16 SubscribeNamespace which moved to its own bidi stream. pub async fn open_native_bi(&self) -> Result<(AdapterSend, AdapterRecv), crate::Error> { diff --git a/rs/moq-net/src/ietf/session.rs b/rs/moq-net/src/ietf/session.rs index e51ea81df..c5b9e38cc 100644 --- a/rs/moq-net/src/ietf/session.rs +++ b/rs/moq-net/src/ietf/session.rs @@ -2,6 +2,7 @@ use crate::{ Error, OriginConsumer, OriginProducer, StatsHandle, coding::{Encode, Reader, Stream, Writer}, ietf::{self, FetchHeader, RequestId}, + session::{GoawaySignal, goaway_triggered}, setup, }; @@ -20,6 +21,8 @@ pub fn start( // Tier-scoped stats handle. Pass [`StatsHandle::default`] to opt out. stats: StatsHandle, version: Version, + // Fires when the session should send a GOAWAY and start draining. + goaway: GoawaySignal, ) -> Result<(), Error> { web_async::spawn(async move { let res = match version { @@ -31,6 +34,24 @@ pub fn start( let control = Control::new(request_id_max, client); let adapter = ControlStreamAdapter::new(session.clone(), tx, control.clone(), version); + // GOAWAY rides the shared control stream for draft-14-16. + web_async::spawn({ + let adapter = adapter.clone(); + let session = session.clone(); + async move { + tokio::select! { + _ = session.closed() => {} + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri + && let Err(err) = adapter.send_goaway(&uri) + { + tracing::debug!(%err, "failed to send goaway"); + } + } + } + } + }); + let publisher = Publisher::new(adapter.clone(), publish, control.clone(), stats.clone(), version); let subscriber = Subscriber::new(adapter.clone(), subscribe, control, stats, version); @@ -62,11 +83,11 @@ pub fn start( } } _ => { - // Spawn SETUP sender (keeps stream alive for GOAWAY). + // Spawn SETUP sender, which also writes GOAWAY on that same uni stream. web_async::spawn({ let session = session.clone(); async move { - if let Err(err) = run_setup(session, version).await { + if let Err(err) = run_setup(session, version, goaway).await { tracing::warn!(%err, "setup send error"); } } @@ -113,8 +134,12 @@ pub fn start( Ok(()) } -/// Send our SETUP on a uni stream and keep it alive for potential GOAWAY. -async fn run_setup(session: S, version: Version) -> Result<(), Error> { +/// Send our SETUP on a uni stream, then keep it alive to carry a GOAWAY. +async fn run_setup( + session: S, + version: Version, + goaway: GoawaySignal, +) -> Result<(), Error> { let outer_version = crate::Version::Ietf(version); let send = session.open_uni().await.map_err(Error::from_transport)?; @@ -126,8 +151,22 @@ async fn run_setup(session: S, version: Version writer.encode(&setup::Setup { parameters }).await?; - // Hold the writer alive until the session closes. - session.closed().await; + // GOAWAY frames use the inner IETF framing ([type][size][body]). + let mut writer = writer.with_version(version); + + // Hold the stream open until the session closes, sending a GOAWAY first if asked. + tokio::select! { + _ = session.closed() => {} + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri { + let msg = ietf::GoAway { new_session_uri: uri.as_ref().into(), timeout: 0 }; + if let Err(err) = writer.encode_message(&msg).await { + tracing::debug!(%err, "failed to send goaway"); + } + session.closed().await; + } + } + } writer.finish().ok(); Ok(()) diff --git a/rs/moq-net/src/lite/goaway.rs b/rs/moq-net/src/lite/goaway.rs index dd834f087..1a9d65a0a 100644 --- a/rs/moq-net/src/lite/goaway.rs +++ b/rs/moq-net/src/lite/goaway.rs @@ -7,7 +7,6 @@ use super::{Message, Version}; /// Sent to gracefully shut down a session and optionally redirect to a new URI. /// /// Lite04+ only. -#[allow(dead_code)] #[derive(Clone, Debug)] pub struct Goaway<'a> { pub uri: Cow<'a, str>, @@ -38,3 +37,40 @@ impl Message for Goaway<'_> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bytes::BytesMut; + + fn roundtrip(uri: &str) { + let msg = Goaway { uri: uri.into() }; + + let mut buf = BytesMut::new(); + msg.encode_msg(&mut buf, Version::Lite04).unwrap(); + + let mut bytes = bytes::Bytes::from(buf.to_vec()); + let decoded = Goaway::decode_msg(&mut bytes, Version::Lite04).unwrap(); + + assert_eq!(decoded.uri, uri); + } + + #[test] + fn roundtrip_with_uri() { + roundtrip("https://example.com/new"); + } + + #[test] + fn roundtrip_empty() { + roundtrip(""); + } + + #[test] + fn rejected_before_lite04() { + let msg = Goaway { + uri: "https://example.com/new".into(), + }; + let mut buf = BytesMut::new(); + assert!(msg.encode_msg(&mut buf, Version::Lite03).is_err()); + } +} diff --git a/rs/moq-net/src/lite/session.rs b/rs/moq-net/src/lite/session.rs index 73def6a49..61614ff5f 100644 --- a/rs/moq-net/src/lite/session.rs +++ b/rs/moq-net/src/lite/session.rs @@ -1,9 +1,11 @@ use crate::{ - BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, StatsHandle, coding::Stream, + BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, StatsHandle, + coding::Stream, lite::SessionInfo, + session::{GoawaySignal, goaway_triggered}, }; -use super::{Connecting, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; +use super::{Connecting, ControlType, Goaway, Publisher, PublisherConfig, Subscriber, SubscriberConfig, Version}; /// Start a lite session. /// @@ -24,6 +26,8 @@ pub fn start( stats: StatsHandle, // The version of the protocol to use. version: Version, + // Fires when the session should send a GOAWAY and start draining. + goaway: GoawaySignal, ) -> Result<(Option, Connecting), Error> { let recv_bw = BandwidthProducer::new(); @@ -68,6 +72,25 @@ pub fn start( version, }); + // GOAWAY is moq-lite-04+. On older drafts we simply never send it; the relay + // still drains by waiting for the peer to leave (or a forced shutdown). + if !matches!(version, Version::Lite01 | Version::Lite02 | Version::Lite03) { + let session = session.clone(); + web_async::spawn(async move { + tokio::select! { + // Don't outlive the session: stop waiting once it's gone. + _ = session.closed() => {} + uri = goaway_triggered(goaway) => { + if let Some(uri) = uri + && let Err(err) = send_goaway(&session, &uri, version).await + { + tracing::debug!(%err, "failed to send goaway"); + } + } + } + }); + } + web_async::spawn(async move { let res = tokio::select! { Err(res) = run_session(setup_stream) => Err(res), @@ -94,6 +117,15 @@ pub fn start( Ok((recv_bw_consumer, connecting)) } +/// Open a dedicated control stream and write a single GOAWAY message. +async fn send_goaway(session: &S, uri: &str, version: Version) -> Result<(), Error> { + let mut stream = Stream::open(session, version).await?; + stream.writer.encode(&ControlType::Goaway).await?; + stream.writer.encode(&Goaway { uri: uri.into() }).await?; + stream.writer.finish()?; + stream.writer.closed().await +} + // TODO do something useful with this async fn run_session(stream: Option>) -> Result<(), Error> { if let Some(mut stream) = stream { diff --git a/rs/moq-net/src/server.rs b/rs/moq-net/src/server.rs index c7b568bc2..58b18a7f4 100644 --- a/rs/moq-net/src/server.rs +++ b/rs/moq-net/src/server.rs @@ -74,6 +74,10 @@ impl Server { let consume = consumer.clone(); let consumer_view = consumer.consume(); + // Per-session GOAWAY channel: `goaway` lives on the returned `Session`, `signal` + // drives the protocol task. Exactly one negotiated branch consumes each below. + let (goaway, signal) = crate::session::goaway_channel(); + let (encoding, supported) = match session.protocol() { Some(ALPN_18) => { let v = self @@ -91,10 +95,18 @@ impl Server { consume.clone(), self.stats.clone(), ietf::Version::Draft18, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_17) => { let v = self @@ -112,10 +124,18 @@ impl Server { consume.clone(), self.stats.clone(), ietf::Version::Draft17, + signal, )?; tracing::debug!(version = ?v, "connected"); - return Ok(Session::new(session, v, None, publisher.clone(), consumer_view.clone())); + return Ok(Session::new( + session, + v, + None, + publisher.clone(), + consumer_view.clone(), + goaway, + )); } Some(ALPN_16) => { let v = self @@ -151,6 +171,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite05Wip, + signal, )?; return Ok(Session::new( @@ -159,6 +180,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_04) => { @@ -173,6 +195,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite04, + signal, )?; return Ok(Session::new( @@ -181,6 +204,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE_03) => { @@ -196,6 +220,7 @@ impl Server { consume.clone(), self.stats.clone(), lite::Version::Lite03, + signal, )?; return Ok(Session::new( @@ -204,6 +229,7 @@ impl Server { recv_bw, publisher.clone(), consumer_view.clone(), + goaway, )); } Some(ALPN_LITE) | None => { @@ -252,6 +278,7 @@ impl Server { consume.clone(), self.stats.clone(), v, + signal, )?; recv_bw } @@ -272,11 +299,19 @@ impl Server { consume.clone(), self.stats.clone(), v, + signal, )?; None } }; - Ok(Session::new(session, version, recv_bw, publisher, consumer_view)) + Ok(Session::new( + session, + version, + recv_bw, + publisher, + consumer_view, + goaway, + )) } } diff --git a/rs/moq-net/src/session.rs b/rs/moq-net/src/session.rs index 1f8594a34..1f59b4e18 100644 --- a/rs/moq-net/src/session.rs +++ b/rs/moq-net/src/session.rs @@ -1,5 +1,6 @@ -use std::{sync::Arc, time::Duration}; +use std::{sync::Arc, task::Poll, time::Duration}; +use kio::{Consumer, Producer}; use web_transport_trait::Stats; use crate::{BandwidthConsumer, BandwidthProducer, Error, OriginConsumer, OriginProducer, Version, util::MaybeSendBox}; @@ -27,6 +28,7 @@ pub struct Session { recv_bandwidth: Option, publisher: OriginProducer, consumer: OriginConsumer, + goaway: GoawayTrigger, closed: bool, } @@ -37,6 +39,7 @@ impl Session { recv_bandwidth: Option, publisher: OriginProducer, consumer: OriginConsumer, + goaway: GoawayTrigger, ) -> Self { // Send bandwidth is version-agnostic: it depends on QUIC backend support. let send_bandwidth = if session.stats().estimated_send_rate().is_some() { @@ -60,6 +63,7 @@ impl Session { recv_bandwidth, publisher, consumer, + goaway, closed: false, } } @@ -102,6 +106,18 @@ impl Session { &self.consumer } + /// Begin a graceful drain of this session. + /// + /// Returns a [`Drain`] handle: [`Drain::start`] sends a GOAWAY asking the peer to + /// migrate away (without closing the session), and [`Drain::complete`] awaits its + /// departure. + pub fn drain(&self) -> Drain { + Drain { + goaway: self.goaway.clone(), + session: self.session.clone(), + } + } + /// Close the underlying transport session. pub fn close(&mut self, err: Error) { if self.closed { @@ -126,6 +142,69 @@ impl Drop for Session { } } +/// A handle to gracefully drain a [`Session`], obtained via [`Session::drain`]. +/// +/// `start` asks the peer to migrate away (GOAWAY) without closing the session; +/// `complete` waits until it actually leaves. Cheaply clonable. +#[derive(Clone)] +pub struct Drain { + goaway: GoawayTrigger, + session: Arc, +} + +impl Drain { + /// Send a GOAWAY asking the peer to migrate away, optionally to `uri` (`None` + /// just asks them to leave). The session stays open so in-flight groups can + /// finish; call [`complete`](Self::complete) to await departure. Calling more + /// than once, or on a protocol version that predates GOAWAY, is harmless. + pub fn start<'a>(&self, uri: impl Into>) { + let uri: Option<&str> = uri.into(); + // A closed channel means the protocol task already exited (session gone), + // so there's nothing left to GOAWAY. + if let Ok(mut value) = self.goaway.write() { + *value = Some(Arc::from(uri.unwrap_or(""))); + } + } + + /// Wait until the session has fully closed: the peer left, or it was forced. + pub async fn complete(&self) { + self.session.closed().await; + } +} + +/// Trigger half of a session's GOAWAY signal, held by [`Session`] / [`Drain`]. +/// `None` means "not yet requested"; `Some(uri)` carries the (possibly empty) URI. +pub(crate) type GoawayTrigger = Producer>>; + +/// Signal half handed to the per-protocol session task spawned by `lite::start` / +/// `ietf::start`, which writes the actual GOAWAY frame when fired. +pub(crate) type GoawaySignal = Consumer>>; + +/// Create a linked [`GoawayTrigger`] / [`GoawaySignal`] pair for one session. +pub(crate) fn goaway_channel() -> (GoawayTrigger, GoawaySignal) { + let trigger = Producer::new(None); + let signal = trigger.consume(); + (trigger, signal) +} + +/// Resolve once a GOAWAY is requested, yielding the (possibly empty) redirect URI, +/// or `None` if the trigger was dropped without firing (the session is going away). +pub(crate) async fn goaway_triggered(signal: GoawaySignal) -> Option> { + // Map inside the closure so the `Ref` lock guard (not `Send`) never lands in the + // returned future, keeping it spawnable. + kio::wait(|waiter| { + match signal.poll(waiter, |uri| match &**uri { + Some(uri) => Poll::Ready(uri.clone()), + None => Poll::Pending, + }) { + Poll::Ready(Ok(uri)) => Poll::Ready(Some(uri)), + Poll::Ready(Err(_closed)) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + }) + .await +} + /// Polls the QUIC congestion controller for estimated send rate. /// /// Exits as soon as the session closes so we don't pin the underlying connection diff --git a/rs/moq-relay/src/connection.rs b/rs/moq-relay/src/connection.rs index 5a1a2afb4..8d5833e8a 100644 --- a/rs/moq-relay/src/connection.rs +++ b/rs/moq-relay/src/connection.rs @@ -3,6 +3,7 @@ use crate::{Auth, AuthError, AuthParams, AuthToken, Cluster}; use axum::http; use moq_native::Request; use moq_net::Path; +use tokio::sync::watch; /// An error carrying the HTTP status to send when closing the request. /// @@ -39,8 +40,11 @@ pub struct Connection { impl Connection { /// Authenticates and serves this connection until it closes. + /// + /// `shutdown` flips to `true` during a graceful shutdown: we drain the session + /// (GOAWAY) and keep serving until the peer actually leaves. #[tracing::instrument("conn", skip_all, fields(id = self.id))] - pub async fn run(self) -> anyhow::Result<()> { + pub async fn run(self, mut shutdown: watch::Receiver) -> anyhow::Result<()> { let token = match self.authenticate().await { Ok(token) => token, Err(err) => { @@ -104,8 +108,18 @@ impl Connection { tracing::info!(version = %session.version(), transport, "negotiated"); - // Wait until the session is closed. - session.closed().await?; + // Serve until the peer leaves. On a graceful shutdown, drain the session (send a + // GOAWAY) and keep serving so in-flight groups can finish before it migrates away. + tokio::select! { + res = session.closed() => res?, + // Drop the watch `Ref` guard (not `Send`) before the awaits in the body. + _ = async { let _ = shutdown.wait_for(|d| *d).await; } => { + tracing::info!("draining; sending goaway"); + let drain = session.drain(); + drain.start(None); + drain.complete().await; + } + } Ok(()) } diff --git a/rs/moq-relay/src/main.rs b/rs/moq-relay/src/main.rs index 7bc386a44..68652d9c1 100644 --- a/rs/moq-relay/src/main.rs +++ b/rs/moq-relay/src/main.rs @@ -21,8 +21,10 @@ async fn main() -> anyhow::Result<()> { let mtls_enabled = !config.server.tls.root.is_empty(); + // We drive shutdown ourselves (GOAWAY drain on the first Ctrl+C, force on the + // second), so opt out of moq-native's built-in Ctrl+C-closes-everything handler. #[allow(unused_mut)] - let mut server = config.server.init()?; + let mut server = config.server.init()?.with_ctrl_c_handler(false); let client = config.client.clone().init()?; let addr = server.local_addr()?; @@ -81,19 +83,97 @@ async fn main() -> anyhow::Result<()> { #[cfg(not(feature = "jemalloc"))] let jemalloc = std::future::pending::>(); + // Graceful-then-forceful shutdown. The first stop signal (SIGINT/Ctrl+C, or SIGTERM + // from `systemctl stop`) drains; a second one forces. There's no handler for a hard + // kill: SIGKILL is uncatchable, and that's the kernel-level backstop (systemd sends + // it after TimeoutStopSec). + let (drain_tx, drain_rx) = tokio::sync::watch::channel(false); + let shutdown = async move { + // Open the streams once so the second signal can't slip through a re-registration gap. + let mut signals = ShutdownSignals::listen(); + + signals.recv().await; + tracing::info!("shutting down: sending GOAWAY and draining connections (signal again to force)"); + #[cfg(unix)] + let _ = sd_notify::notify(&[sd_notify::NotifyState::Stopping]); + let _ = drain_tx.send(true); + + signals.recv().await; + tracing::warn!("forcing shutdown"); + }; + tokio::select! { - Err(err) = cluster.clone().run() => return Err(err).context("cluster failed"), - Err(err) = web.run() => return Err(err).context("web server failed"), - Err(err) = serve(server, cluster, auth) => return Err(err).context("server failed"), - Err(err) = jemalloc => return Err(err).context("jemalloc profiler failed"), - else => Ok(()), + Err(err) = cluster.clone().run() => Err(err).context("cluster failed"), + Err(err) = web.run() => Err(err).context("web server failed"), + res = serve(server, cluster, auth, drain_rx) => res.context("server failed"), + Err(err) = jemalloc => Err(err).context("jemalloc profiler failed"), + // Forced shutdown: dropping `server` (and the connection tasks) closes everything. + _ = shutdown => Ok(()), } } -async fn serve(mut server: moq_native::Server, cluster: Cluster, auth: Auth) -> anyhow::Result<()> { +/// Listens for OS stop signals. On Unix that's SIGINT (Ctrl+C) and SIGTERM (what +/// `systemctl stop` sends); elsewhere just Ctrl+C. Both streams are registered up +/// front so repeated signals are delivered reliably. +struct ShutdownSignals { + #[cfg(unix)] + sigint: tokio::signal::unix::Signal, + #[cfg(unix)] + sigterm: tokio::signal::unix::Signal, +} + +impl ShutdownSignals { + fn listen() -> Self { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + Self { + sigint: signal(SignalKind::interrupt()).expect("failed to listen for SIGINT"), + sigterm: signal(SignalKind::terminate()).expect("failed to listen for SIGTERM"), + } + } + #[cfg(not(unix))] + Self {} + } + + /// Resolve on the next stop signal. + async fn recv(&mut self) { + #[cfg(unix)] + tokio::select! { + _ = self.sigint.recv() => {} + _ = self.sigterm.recv() => {} + } + #[cfg(not(unix))] + { + // Windows: Ctrl+C only. A failed registration must not look like a signal. + if tokio::signal::ctrl_c().await.is_err() { + std::future::pending::<()>().await; + } + } + } +} + +async fn serve( + mut server: moq_native::Server, + cluster: Cluster, + auth: Auth, + mut drain: tokio::sync::watch::Receiver, +) -> anyhow::Result<()> { let mut conn_id = 0; - while let Some(request) = server.accept().await { + // Tracks in-flight connections: each task holds a clone of `active`, so + // `active_rx.recv()` resolves to `None` only once every task has finished. + let (active, mut active_rx) = tokio::sync::mpsc::channel::<()>(1); + + loop { + let request = tokio::select! { + request = server.accept() => request, + // Stop accepting once draining begins; existing connections keep running. + _ = drain.wait_for(|d| *d) => break, + }; + + let Some(request) = request else { break }; + let conn = Connection { id: conn_id, request, @@ -102,12 +182,20 @@ async fn serve(mut server: moq_native::Server, cluster: Cluster, auth: Auth) -> }; conn_id += 1; + let drain = drain.clone(); + let active = active.clone(); tokio::spawn(async move { - if let Err(err) = conn.run().await { + let _active = active; + if let Err(err) = conn.run(drain).await { tracing::warn!(%err, "connection closed"); } }); } - anyhow::bail!("stopped accepting connections") + // No longer accepting. Wait for every in-flight connection to drain. + drop(active); + tracing::info!("waiting for connections to drain"); + let _ = active_rx.recv().await; + tracing::info!("all connections drained"); + Ok(()) }