diff --git a/Cargo.lock b/Cargo.lock index d672317338..d06cf679ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1008,17 +1008,6 @@ dependencies = [ "hybrid-array", ] -[[package]] -name = "ctrlc" -version = "3.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0b1fab2ae45819af2d0731d60f2afe17227ebb1a1538a236da84c93e9a60162" -dependencies = [ - "dispatch2", - "nix 0.31.3", - "windows-sys 0.61.2", -] - [[package]] name = "darling" version = "0.20.11" @@ -1116,13 +1105,13 @@ dependencies = [ "arrayvec", "axum", "axum-server", - "ctrlc", "dataplane-args", "dataplane-concurrency", "dataplane-config", "dataplane-flow-entry", "dataplane-flow-filter", "dataplane-id", + "dataplane-lifecycle", "dataplane-mgmt", "dataplane-nat", "dataplane-net", @@ -1138,7 +1127,6 @@ dependencies = [ "linkme", "metrics", "metrics-exporter-prometheus", - "mio", "n-vm", "netdev", "nix 0.31.3", @@ -1447,6 +1435,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "dataplane-lifecycle" +version = "0.21.0" +dependencies = [ + "dataplane-concurrency", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "dataplane-lpm" version = "0.21.0" @@ -1479,6 +1477,7 @@ dependencies = [ "dataplane-interface-manager", "dataplane-k8s-intf", "dataplane-k8s-less", + "dataplane-lifecycle", "dataplane-lpm", "dataplane-nat", "dataplane-net", @@ -1605,6 +1604,7 @@ dependencies = [ "dataplane-concurrency", "dataplane-config", "dataplane-left-right-tlcache", + "dataplane-lifecycle", "dataplane-lpm", "dataplane-net", "dataplane-tracectl", @@ -5562,6 +5562,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "slab", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 04a96edf26..9d9eb838f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "k8s-intf", "k8s-less", "left-right-tlcache", + "lifecycle", "lpm", "mgmt", "nat", @@ -78,6 +79,7 @@ interface-manager = { path = "./interface-manager", package = "dataplane-interfa k8s-intf = { path = "./k8s-intf", package = "dataplane-k8s-intf", default-features = false, features = [] } k8s-less = { path = "./k8s-less", package = "dataplane-k8s-less", features = [] } left-right-tlcache = { path = "./left-right-tlcache", package = "dataplane-left-right-tlcache", features = [] } +lifecycle = { path = "./lifecycle", package = "dataplane-lifecycle", features = [] } lpm = { path = "./lpm", package = "dataplane-lpm", features = [] } mgmt = { path = "./mgmt", package = "dataplane-mgmt", features = [] } nat = { path = "./nat", package = "dataplane-nat", features = [] } @@ -114,7 +116,6 @@ clap = { version = "4.6.1", default-features = true, features = [] } color-eyre = { version = "0.6.5", default-features = false, features = [] } colored = { version = "3.1.1", default-features = false, features = [] } crossbeam-utils = { version = "0.8.21", default-features = false, features = [] } -ctrlc = { version = "3.5.2", default-features = false, features = [] } dashmap = { version = "6.2.1", default-features = false, features = [] } derive_builder = { version = "0.20.2", default-features = false, features = [] } dotenvy = { version = "0.15.7", default-features = false, features = [] } diff --git a/dataplane/Cargo.toml b/dataplane/Cargo.toml index 94fefe5fc5..83d86fcba4 100644 --- a/dataplane/Cargo.toml +++ b/dataplane/Cargo.toml @@ -19,7 +19,6 @@ axum = { workspace = true, features = ["http1", "tokio"] } axum-server = { workspace = true } concurrency = { workspace = true } config = { workspace = true } -ctrlc = { workspace = true, features = ["termination"] } dyn-iter = { workspace = true } flow-entry = { workspace = true } flow-filter = { workspace = true } @@ -27,11 +26,11 @@ futures = { workspace = true } hyper = { workspace = true } hyper-util = { workspace = true } id = { workspace = true } +lifecycle = { workspace = true } linkme = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } mgmt = { workspace = true } -mio = { workspace = true, features = ["os-ext", "net"] } nat = { workspace = true } net = { workspace = true, features = ["test_buffer"] } nix = { workspace = true, features = ["socket", "hostname"] } @@ -46,7 +45,7 @@ rtnetlink = { workspace = true, features = ["default", "tokio"] } serde = { workspace = true, features = ["derive"] } stats = { workspace = true } thiserror = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tracectl = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, default-features = true } diff --git a/dataplane/src/drivers/kernel/mod.rs b/dataplane/src/drivers/kernel/mod.rs index 88a5594358..f5914de3ad 100644 --- a/dataplane/src/drivers/kernel/mod.rs +++ b/dataplane/src/drivers/kernel/mod.rs @@ -18,6 +18,9 @@ mod worker; use concurrency::sync::Arc; use concurrency::thread; +#[allow(unused_imports)] // used under loom/shuttle backends +use concurrency::thread::BuilderExt; +use lifecycle::Subsystem; use net::buffer::test_buffer::TestBuffer; use pipeline::DynPipeline; use tracectl::trace_target; @@ -25,88 +28,87 @@ use tracectl::trace_target; use tracing::{debug, error, info, trace, warn}; use super::DriverError; -use super::tokio_util::run_in_local_tokio_runtime; use kif::{Kif, bring_kifs_up}; use worker::Worker; trace_target!("kernel-driver", LevelFilter::INFO, &["driver"]); -/// Main structure representing the kernel driver. -/// This driver: -/// * receives raw frames via `AF_PACKET`, parses to `Packet` -/// * selects a worker by symmetric flow hash -/// * workers run independent pipelines and send processed packets back -/// * dispatcher serializes & transmits on the chosen outgoing interface +/// AF_PACKET-based kernel driver. Spawns N workers with symmetric-hash +/// fanout and per-worker pipelines. pub struct DriverKernel; #[allow(clippy::cast_possible_truncation)] impl DriverKernel { - /// Spawn `workers` processing threads, each with its own pipeline instance. - /// - /// Returns: - /// - `Arc>>>` one sender per worker (dispatcher -> worker) - /// - `Receiver>` a single queue for processed packets (worker -> dispatcher) - fn spawn_workers( + /// Spawn `num_workers` worker threads into `scope`, each with its own + /// pipeline. Bails on the first spawn failure; workers that did spawn + /// drain via the scope join. + fn spawn_workers_scoped<'scope>( + scope: &'scope thread::Scope<'scope, '_>, + workers_subsystem: &Subsystem, num_workers: usize, setup_pipeline: &Arc DynPipeline>, interfaces: &[Kif], - ) -> Vec>> { + ) -> Result>>, std::io::Error> + { info!("Spawning {num_workers} workers"); - let mut workers = Vec::new(); - for wid in 0..num_workers { - let builder = thread::Builder::new().name(format!("dp-worker-{wid}")); - let mut worker = Worker::new(wid, num_workers, setup_pipeline); - match worker.start(builder, interfaces) { - Ok(handle) => workers.push(handle), - Err(e) => { - error!("Failed to start worker {wid}: {e}"); - } - } - } - workers + (0..num_workers) + .map(|wid| { + let builder = thread::Builder::new().name(format!("dp-worker-{wid}")); + Worker::new(wid, num_workers, setup_pipeline, workers_subsystem.clone()) + .start(scope, builder, interfaces) + }) + .collect() } - /// Starts the kernel driver, spawns worker threads, and runs the dispatcher loop. - /// - /// - `args`: kernel driver CLI parameters (e.g., `--interface` list) - /// - `workers`: number of worker threads / pipelines - /// - `setup_pipeline`: factory returning a **fresh** `DynPipeline` per worker + /// Spawn worker threads + supervisor into `scope`. The scope joins + /// all driver threads on closure return. /// /// # Errors - /// Returns [`DriverError`] in case the driver fails to start successfully. - pub fn start( - stop_tx: std::sync::mpsc::Sender, + /// Returns [`DriverError`] on interface setup or thread spawn failure. + pub fn start<'scope>( + scope: &'scope thread::Scope<'scope, '_>, + workers_subsystem: &Subsystem, args: impl IntoIterator + Clone>, num_workers: usize, setup_pipeline: &Arc DynPipeline>, ) -> Result<(), DriverError> { + // A current_thread runtime built inside another tokio runtime + // panics; catch nesting in debug. + debug_assert!( + tokio::runtime::Handle::try_current().is_err(), + "DriverKernel::start must not be invoked from within a tokio runtime context" + ); + info!("Collecting interfaces from config"); let interfaces = kif::get_interfaces(args)?; - // ensure that the kernel interfaces for rx/tx are up - run_in_local_tokio_runtime(async || bring_kifs_up(interfaces.as_slice()).await)?; + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()? + .block_on(bring_kifs_up(interfaces.as_slice()))?; - // Spawn workers - let worker_handles = - Self::spawn_workers(num_workers, setup_pipeline, interfaces.as_slice()); + let worker_handles = Self::spawn_workers_scoped( + scope, + workers_subsystem, + num_workers, + setup_pipeline, + interfaces.as_slice(), + )?; - let control_builder = thread::Builder::new().name("kernel-driver-controller".to_string()); - control_builder.spawn(move || { + // The supervisor just joins-and-logs; worker fatal reporting is + // handled by the `ExitGuard` inside each worker thread. + let supervisor_builder = + thread::Builder::new().name("kernel-driver-supervisor".to_string()); + supervisor_builder.spawn_scoped(scope, move || { for (id, handle) in worker_handles.into_iter().enumerate() { - info!("Waiting for workers to finish"); + info!("Waiting for worker {id} to finish"); match handle.join() { - Ok(result) => match result { - Ok(()) => info!("Worker {id} exited successfully"), - Err(e) => error!("Worker {id} exited with error: {e}"), - }, - Err(e) => error!("Unable to spawn worker {id} error: {e:?}"), + Ok(Ok(())) => info!("Worker {id} exited successfully"), + Ok(Err(e)) => error!("Worker {id} exited with error: {e}"), + Err(panic_payload) => error!("Worker {id} panicked: {panic_payload:?}"), } } - - // Exiting with error as it's not expected for all workers to finish - error!("All workers finished unexpectedly"); - #[allow(clippy::expect_used)] - stop_tx.send(1).expect("Failed to send stop signal"); + info!("All workers joined"); })?; Ok(()) diff --git a/dataplane/src/drivers/kernel/worker.rs b/dataplane/src/drivers/kernel/worker.rs index 48a80935eb..235f3b325d 100644 --- a/dataplane/src/drivers/kernel/worker.rs +++ b/dataplane/src/drivers/kernel/worker.rs @@ -15,6 +15,9 @@ use tokio::sync::Mutex; use concurrency::sync::Arc; use concurrency::thread; +#[allow(unused_imports)] // used under loom/shuttle backends +use concurrency::thread::BuilderExt; +use lifecycle::Subsystem; use net::buffer::test_buffer::TestBuffer; use net::interface::InterfaceIndex; use net::packet::{DoneReason, Packet}; @@ -22,7 +25,6 @@ use pipeline::{DynPipeline, NetworkFunction}; use crate::drivers::kernel::fanout::{PacketFanoutType, set_packet_fanout}; use crate::drivers::kernel::kif::Kif; -use crate::drivers::tokio_util::run_in_local_tokio_runtime; use tracing::{debug, error, info, trace, warn}; @@ -126,6 +128,7 @@ pub struct Worker { id: WorkerId, total_workers: usize, setup_pipeline: Arc DynPipeline>, + subsystem: Subsystem, } impl Worker { @@ -133,28 +136,69 @@ impl Worker { id: WorkerId, total_workers: usize, setup_pipeline: &Arc DynPipeline>, + subsystem: Subsystem, ) -> Self { Worker { id, total_workers, setup_pipeline: setup_pipeline.clone(), + subsystem, } } - pub fn start( + #[allow(clippy::too_many_lines)] + pub fn start<'scope>( &mut self, + scope: &'scope thread::Scope<'scope, '_>, thread_builder: thread::Builder, interfaces: &[Kif], - ) -> Result>, io::Error> { + ) -> Result>, io::Error> { let id = self.id; let total_workers = self.total_workers; let setup = self.setup_pipeline.clone(); - let interfaces = interfaces.iter().map(Kif::clone).collect::>(); + let subsystem = self.subsystem.clone(); + let cancel = subsystem.cancel_token(); + let interfaces = interfaces.to_vec(); + + let handle_res = thread_builder.spawn_scoped(scope, move || { + // Drop-guard so panic-unwind, early-`?`, and unexpected normal + // return all reach report_fatal. Disarmed on the graceful path. + struct ExitGuard { + subsystem: Subsystem, + id: WorkerId, + armed: bool, + } + impl ExitGuard { + fn disarm(&mut self) { + self.armed = false; + } + } + impl Drop for ExitGuard { + fn drop(&mut self) { + if !self.armed || self.subsystem.is_cancelled() { + return; + } + let reason = if std::thread::panicking() { + format!("worker {} panicked", self.id) + } else { + format!("worker {} exited unexpectedly", self.id) + }; + self.subsystem.report_fatal(&reason); + } + } - let handle_res = thread_builder.spawn(move || { info!(worker = id, "Worker started"); + let mut guard = ExitGuard { + subsystem: subsystem.clone(), + id, + armed: true, + }; + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build_local(tokio::runtime::LocalOptions::default())?; - run_in_local_tokio_runtime(async || { + let result = rt.block_on(async { let (readers, if_table) = match build_interface_table(id, total_workers, interfaces.as_slice()) { Ok(table) => table, @@ -166,27 +210,39 @@ impl Worker { let setup = setup.clone(); let if_table = if_table.clone(); + let cancel = cancel.clone(); let mut reader_handles = tokio::task::JoinSet::new(); for intf in readers { let setup = setup.clone(); let if_table = if_table.clone(); + let cancel = cancel.clone(); reader_handles.spawn_local(async move { let intf = intf; let mut pipeline = setup(); loop { - tracing::debug!(worker = id, "awaiting packets"); + debug!(worker = id, "awaiting packets"); - let packets_vec = match read_packets_from_interface(id, &intf).await { - Ok(packets) => packets, - Err(e) => { - error!( + let packets_vec = tokio::select! { + () = cancel.cancelled() => { + info!( worker = id, rx_intf_name = intf.if_name, - "Error reading packets from interface: {e}" + "cancellation observed; exiting reader" ); - vec![] + break; + } + result = read_packets_from_interface(id, &intf) => match result { + Ok(packets) => packets, + Err(e) => { + error!( + worker = id, + rx_intf_name = intf.if_name, + "Error reading packets from interface: {e}" + ); + vec![] + } } }; @@ -198,7 +254,6 @@ impl Worker { intf.if_name ); - // Try to receive everything else that is in the buffer let packets = packets_vec.into_iter(); let mut count = 0; @@ -238,8 +293,13 @@ impl Worker { } Ok::<(), io::Error>(()) - })?; - info!(worker = id, "Worker exited"); + }); + + if subsystem.is_cancelled() { + guard.disarm(); + } + info!(worker = id, "worker exited"); + result?; Ok::<(), io::Error>(()) })?; Ok(handle_res) diff --git a/dataplane/src/drivers/mod.rs b/dataplane/src/drivers/mod.rs index 80731da0e5..74990160a7 100644 --- a/dataplane/src/drivers/mod.rs +++ b/dataplane/src/drivers/mod.rs @@ -4,7 +4,6 @@ use thiserror::Error; pub mod kernel; -mod tokio_util; #[derive(Error, Debug)] pub enum DriverError { diff --git a/dataplane/src/drivers/tokio_util.rs b/dataplane/src/drivers/tokio_util.rs deleted file mode 100644 index 0ad91dbf51..0000000000 --- a/dataplane/src/drivers/tokio_util.rs +++ /dev/null @@ -1,50 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// Copyright Open Network Fabric Authors - -use tokio::runtime::{Builder, LocalOptions}; - -/// Executes a function inside a local (non-Send) tokio runtime. -/// The runtime will be torn down when the function returns. -/// -/// # Panics -/// If it fails to create a current thread runtime. -pub fn run_in_local_tokio_runtime(f: F) -> R -where - F: FnOnce() -> Fut, - Fut: std::future::Future, -{ - let current_runtime = tokio::runtime::Handle::try_current(); - assert!( - current_runtime.is_err(), - "Expected no active tokio runtime, but found: {:?}", - current_runtime.unwrap_err() - ); - - let rt = Builder::new_current_thread() - .enable_all() - .build_local(LocalOptions::default()) - .expect("Failed to create current thread runtime"); - - rt.block_on(f()) -} - -#[cfg(test)] -mod tests { - use super::*; - use tokio::time::{Duration, sleep}; - - #[test] - fn test_run_in_tokio_runtime_pure() { - let result = run_in_local_tokio_runtime(|| async { 42 }); - assert_eq!(result, 42); - } - - #[test] - fn test_run_in_tokio_runtime_async() { - let result = run_in_local_tokio_runtime(|| async { - sleep(Duration::from_millis(100)).await; - 42 - }); - assert_eq!(result, 42); - } -} diff --git a/dataplane/src/packet_processor/mod.rs b/dataplane/src/packet_processor/mod.rs index 3142d99d9b..233e0fcaa5 100644 --- a/dataplane/src/packet_processor/mod.rs +++ b/dataplane/src/packet_processor/mod.rs @@ -49,6 +49,9 @@ where /// Start a router and provide the associated pipeline pub(crate) fn start_router( + mgmt: &lifecycle::Subsystem, + mgmt_handle: &tokio::runtime::Handle, + router: &lifecycle::Subsystem, params: RouterParams, ) -> Result, RouterError> { let vpcmapw = VpcMapWriter::::new(); @@ -83,7 +86,7 @@ pub(crate) fn start_router( }; // create router - let router = Router::new(params, Some(cli_sources))?; + let router = Router::new(mgmt, mgmt_handle, router, params, Some(cli_sources))?; let iftr_factory = router.get_iftabler_factory(); let fibtr_factory = router.get_fibtr_factory(); let atabler_factory = router.get_atabler_factory(); diff --git a/dataplane/src/runtime.rs b/dataplane/src/runtime.rs index fccabf8c84..94b4814169 100644 --- a/dataplane/src/runtime.rs +++ b/dataplane/src/runtime.rs @@ -2,11 +2,12 @@ // Copyright Open Network Fabric Authors use crate::packet_processor::start_router; -use crate::statistics::MetricsServer; +use crate::statistics::spawn_metrics; use args::{CmdArgs, Parser}; use crate::drivers::kernel::DriverKernel; -use mgmt::{ConfigProcessorParams, MgmtParams, start_mgmt}; +use lifecycle::{Shutdown, default_deadlines, spawn_shutdown_watchdog}; +use mgmt::{ConfigProcessorParams, LaunchError, MgmtParams, run_mgmt}; use nix::unistd::gethostname; use pyroscope::backend::{BackendConfig, PprofConfig, pprof_backend}; @@ -178,14 +179,20 @@ pub fn main() { process_tracing_cmds(&args); - let (stop_tx, stop_rx) = std::sync::mpsc::channel(); - let ctrlc_stop_tx = stop_tx.clone(); - ctrlc::set_handler(move || { - ctrlc_stop_tx - .send(0) - .expect("Error sending shutdown signal"); - }) - .expect("failed to set SIGINT handler"); + let shutdown = Shutdown::new(); + + let mgmt_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name("mgmt-rt") + .build() + .expect("Failed to build mgmt runtime"); + let mgmt_handle = mgmt_runtime.handle().clone(); + + lifecycle::spawn_signal_handler(&mgmt_handle, shutdown.root.clone()) + .expect("failed to install signal handler"); + + spawn_shutdown_watchdog(shutdown.root.clone(), default_deadlines::TOTAL, 124) + .expect("failed to spawn shutdown watchdog"); /* router parameters */ let mut binding = RouterParamsBuilder::default(); @@ -195,7 +202,6 @@ pub fn main() { .frr_agent_path(args.frr_agent_path()) .dp_status(dp_status.clone()); - // Only set BMP when it's enabled (strip_option setter expects the inner type) if let Some(server) = bmp_server_params { rp_builder = rp_builder.bmp(server); } @@ -205,65 +211,101 @@ pub fn main() { panic!("Bad router configuration"); }; - // start the router; returns control-plane handles and a pipeline factory - let setup = start_router(router_params).expect("failed to start router"); - - MetricsServer::new(args.metrics_address(), setup.stats); + let mut setup = start_router( + &shutdown.mgmt, + &mgmt_handle, + &shutdown.router, + router_params, + ) + .expect("failed to start router"); + + spawn_metrics( + &shutdown.metrics, + &mgmt_handle, + args.metrics_address(), + setup.stats, + ); - // pipeline builder let pipeline_factory = setup.pipeline; - /* start management: main thread will be blocked until ready or failure */ - if let Err(e) = start_mgmt(MgmtParams { - config_dir: args.config_dir().cloned(), - hostname: gwname.clone(), - processor_params: ConfigProcessorParams { - router_ctl: setup.router.get_ctl_tx(), - pipeline_data: pipeline_factory().get_data(), - flow_table: setup.flow_table, - vpcmapw: setup.vpcmapw, - nattablesw: setup.nattablesw, - natallocatorw: setup.natallocatorw, - flowfilterw: setup.flowfiltertablesw, - portfw_w: setup.portfw_w, - vpc_stats_store: setup.vpc_stats_store, - dp_status_r: dp_status.clone(), - bmp_options: bmp_client_opts, - }, - }) { - error!("Failed to start mgmt: {e}. Stopping dataplane..."); - std::process::exit(-1); - } - info!("Management is running now"); - - /* start driver with the provided pipeline builder */ - let e = match args.driver_name() { - "dpdk" => { - info!("Using driver DPDK..."); - todo!(); - } - "kernel" => { - info!("Using driver kernel..."); - DriverKernel::start( - stop_tx.clone(), - args.kernel_interfaces(), - args.kernel_num_workers(), - &pipeline_factory, - ) - } - other => { - error!("Unknown driver '{other}'. Aborting..."); - panic!("Packet processing pipeline failed to start. Aborting..."); + concurrency::thread::scope(|scope| { + let mgmt_result = run_mgmt( + &mgmt_handle, + &shutdown.mgmt, + MgmtParams { + config_dir: args.config_dir().cloned(), + hostname: gwname.clone(), + processor_params: ConfigProcessorParams { + router_ctl: setup.router.get_ctl_tx(), + pipeline_data: pipeline_factory().get_data(), + flow_table: setup.flow_table, + vpcmapw: setup.vpcmapw, + nattablesw: setup.nattablesw, + natallocatorw: setup.natallocatorw, + flowfilterw: setup.flowfiltertablesw, + portfw_w: setup.portfw_w, + vpc_stats_store: setup.vpc_stats_store, + dp_status_r: dp_status.clone(), + bmp_options: bmp_client_opts, + }, + }, + ); + + match mgmt_result { + Ok(()) => { + info!("Management is running now"); + + let driver_result = match args.driver_name() { + "dpdk" => { + info!("Using driver DPDK..."); + todo!(); + } + "kernel" => { + info!("Using driver kernel..."); + Some(DriverKernel::start( + scope, + &shutdown.workers, + args.kernel_interfaces(), + args.kernel_num_workers(), + &pipeline_factory, + )) + } + other => { + error!("Unknown driver '{other}'. Stopping dataplane..."); + shutdown.fail(); + None + } + }; + + if let Some(Err(e)) = driver_result { + error!("Failed to start driver: {e}"); + shutdown.fail(); + } + } + Err(LaunchError::Cancelled) => { + // Don't call shutdown.fail() — that flips the fatal flag + // and turns a graceful SIGINT into a non-zero exit, which + // systemd would restart-loop. + info!("Mgmt init cancelled; proceeding to shutdown"); + } + Err(e) => { + error!("Failed to start mgmt: {e}. Stopping dataplane..."); + shutdown.fail(); + } } - }; - if let Err(e) = e { - error!("Failed to start driver: {e}"); - std::process::exit(-1); - } + mgmt_handle.block_on(shutdown.root.cancelled()); + info!("Shutting down dataplane"); + mgmt_handle.block_on(shutdown.drain_in_order()); + }); + + let exit_code = i32::from(shutdown.is_fatal()); + + // Router::stop()'s BMP abort needs mgmt_runtime live, so stop router + // before shutting the runtime down. + setup.router.stop(); + mgmt_runtime.shutdown_timeout(Duration::from_secs(2)); - let exit_code = stop_rx.recv().expect("failed to receive stop signal"); - info!("Shutting down dataplane"); if let Some(running) = agent_running { match running.stop() { Ok(ready) => ready.shutdown(), @@ -272,28 +314,3 @@ pub fn main() { } std::process::exit(exit_code); } - -#[cfg(false)] // disabled until dpdk-sys refactor is complete -#[cfg(test)] -mod test { - use n_vm::in_vm; - - #[test] - #[in_vm] - fn root_filesystem_in_vm_is_read_only() { - let error = std::fs::File::create_new("/some.file").unwrap_err(); - assert_eq!(error.kind(), std::io::ErrorKind::ReadOnlyFilesystem); - } - - #[test] - #[in_vm] - fn run_filesystem_in_vm_is_read_write() { - std::fs::File::create_new("/run/some.file").unwrap(); - } - - #[test] - #[in_vm] - fn tmp_filesystem_in_vm_is_read_write() { - std::fs::File::create_new("/tmp/some.file").unwrap(); - } -} diff --git a/dataplane/src/statistics/mod.rs b/dataplane/src/statistics/mod.rs index 453367697d..0aad75a28a 100644 --- a/dataplane/src/statistics/mod.rs +++ b/dataplane/src/statistics/mod.rs @@ -2,9 +2,9 @@ // Copyright Open Network Fabric Authors use axum::{Router, response::Response, routing::get}; +use lifecycle::Subsystem; use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle}; use stats::StatsCollector; -use std::thread::JoinHandle; use std::time::Duration; use tracing::{error, info}; @@ -45,60 +45,68 @@ async fn metrics_handler( .unwrap() } -#[derive(Debug)] -pub struct MetricsServer { - #[allow(unused)] // temporary - handle: JoinHandle<()>, -} - -impl MetricsServer { - // TODO: convert to scoped thread - #[tracing::instrument(level = "info", skip(stats))] - pub fn new(addr: std::net::SocketAddr, stats: StatsCollector) -> Self { - MetricsServer { - handle: std::thread::Builder::new() - .name("metrics-server".to_string()) - .spawn(move || { - info!("Starting metrics server thread"); - - // create tokio runtime - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .expect("runtime creation failed for metrics server"); - - // block thread to run metrics HTTP server - rt.block_on(Self::run(addr, stats)); - }) - .unwrap(), - } - } - - #[tracing::instrument(level = "info", skip(stats))] - async fn run(addr: std::net::SocketAddr, stats: StatsCollector) { - let PrometheusHandler { handle } = PrometheusHandler::new(); +/// Spawn the `/metrics` endpoint on `addr`, a 30s upkeep ticker, and the +/// stats collector onto `handle`, tracked under `metrics`. Uses +/// [`Subsystem::spawn_on`] — a dead metrics endpoint should not take down +/// the dataplane. +pub fn spawn_metrics( + metrics: &Subsystem, + handle: &tokio::runtime::Handle, + addr: std::net::SocketAddr, + stats: StatsCollector, +) { + let PrometheusHandler { + handle: prom_handle, + } = PrometheusHandler::new(); - let upkeep_handle = handle.clone(); - tokio::spawn(async move { - // avgerage prometheus scraper is between 15 and 60 secs, - // so run upkeep every 30 secs is a reasonable default + let upkeep_handle = prom_handle.clone(); + let upkeep_cancel = metrics.cancel_token(); + metrics.spawn_on( + async move { let mut ticker = tokio::time::interval(Duration::from_secs(30)); loop { - ticker.tick().await; - // run_upkeep is synchronous; call it periodically. - upkeep_handle.run_upkeep(); + tokio::select! { + () = upkeep_cancel.cancelled() => break, + _ = ticker.tick() => { + upkeep_handle.run_upkeep(); + } + } } - }); - tokio::spawn(stats.run()); - let app = Router::new() - .route("/metrics", get(metrics_handler)) - .with_state(handle); + }, + handle, + ); - info!("metrics server listening on {}", addr); + let stats_cancel = metrics.cancel_token(); + metrics.spawn_on( + async move { + tokio::select! { + () = stats_cancel.cancelled() => {} + () = stats.run() => {} + } + }, + handle, + ); - if let Err(e) = axum_server::bind(addr).serve(app.into_make_service()).await { - error!("metrics server error: {}", e); - } - } + let server_cancel = metrics.cancel_token(); + metrics.spawn_on( + async move { + let app = Router::new() + .route("/metrics", get(metrics_handler)) + .with_state(prom_handle); + + info!("metrics server listening on {}", addr); + + tokio::select! { + () = server_cancel.cancelled() => { + info!("metrics server shutdown requested"); + } + res = axum_server::bind(addr).serve(app.into_make_service()) => { + if let Err(e) = res { + error!("metrics server error: {}", e); + } + } + } + }, + handle, + ); } diff --git a/lifecycle/Cargo.toml b/lifecycle/Cargo.toml new file mode 100644 index 0000000000..36e5e9d272 --- /dev/null +++ b/lifecycle/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "dataplane-lifecycle" +version.workspace = true +edition.workspace = true +license.workspace = true +publish.workspace = true +repository.workspace = true + +[dependencies] +concurrency = { workspace = true } +# Base tokio features for cross-platform builds (incl. wasm32-wasip1, which +# rejects features outside the supported wasm set with a compile_error!). +# `rt` is required for the runtime/Handle/JoinHandle APIs we use directly +# (`spawn_signal_handler`, `Subsystem::spawn_on`/`spawn_fatal_on_exit`, the +# watchdog). Don't rely on transitive unification via tokio-util. +tokio = { workspace = true, features = ["macros", "rt", "time"] } +tokio-util = { workspace = true, features = ["rt"] } +tracing = { workspace = true } + +# spawn_signal_handler is cfg(unix); the "signal" feature is only enabled +# on unix targets to keep wasm builds of the lifecycle library clean. +[target.'cfg(unix)'.dependencies] +tokio = { workspace = true, features = ["signal"] } + +[dev-dependencies] +tokio = { workspace = true, features = ["rt", "macros", "time"] } diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs new file mode 100644 index 0000000000..3cd587e760 --- /dev/null +++ b/lifecycle/src/lib.rs @@ -0,0 +1,534 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Open Network Fabric Authors + +//! Process-lifecycle primitives for the dataplane binary. +//! +//! [`Shutdown`] bundles a root [`CancellationToken`] and one [`Subsystem`] +//! per long-lived component. Each subsystem owns a cancel token and a +//! [`TaskTracker`]; [`Shutdown::drain_in_order`] drains them in topological +//! order with per-subsystem deadlines. + +#![deny( + unsafe_code, + missing_docs, + clippy::all, + clippy::pedantic, + clippy::unwrap_used, + clippy::expect_used, + clippy::panic +)] + +use concurrency::sync::Arc; +use concurrency::sync::atomic::{AtomicBool, Ordering}; +use std::future::Future; +use std::time::Duration; + +use tokio::task::JoinHandle; +use tokio::time::error::Elapsed; +use tracing::{error, info, warn}; + +pub use tokio_util::sync::CancellationToken; +pub use tokio_util::task::TaskTracker; + +/// A named, drainable subsystem. Cheap to clone. +#[derive(Clone, Debug)] +pub struct Subsystem { + /// Stable name used in shutdown logs. + pub name: &'static str, + cancel: CancellationToken, + tasks: TaskTracker, + root: CancellationToken, + fatal: Arc, +} + +impl Subsystem { + /// Tests/ad-hoc only. Production code: use [`Shutdown::new`] so all + /// subsystems share one fatal flag. + #[doc(hidden)] + #[must_use] + pub fn new(name: &'static str, root: CancellationToken) -> Self { + Self::with_fatal(name, root, Arc::new(AtomicBool::new(false))) + } + + /// Construct a subsystem with an explicit shared fatal flag. + #[must_use] + pub fn with_fatal(name: &'static str, root: CancellationToken, fatal: Arc) -> Self { + Self { + name, + cancel: CancellationToken::new(), + tasks: TaskTracker::new(), + root, + fatal, + } + } + + /// Clone of this subsystem's cancellation token. + #[must_use] + pub fn cancel_token(&self) -> CancellationToken { + self.cancel.clone() + } + + /// True if this subsystem's cancellation token has been tripped. + #[must_use] + pub fn is_cancelled(&self) -> bool { + self.cancel.is_cancelled() + } + + /// Clone of the process-wide root cancellation token. Use for startup + /// work — the per-subsystem cancel is tripped after startup returns. + #[must_use] + pub fn root_token(&self) -> CancellationToken { + self.root.clone() + } + + /// Spawn an async task on `handle`, tracked under this subsystem. + pub fn spawn_on(&self, future: F, handle: &tokio::runtime::Handle) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.tasks.spawn_on(future, handle) + } + + /// Spawn `future`; if it exits (normally or by panic) before any + /// shutdown is requested, call [`Self::report_fatal`]. Use for tasks + /// whose unexpected exit means the subsystem is broken; for tasks + /// where silent exit is fine, use [`Self::spawn_on`]. + pub fn spawn_fatal_on_exit(&self, reason: &str, future: F, handle: &tokio::runtime::Handle) + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let cancel = self.cancel.clone(); + let root = self.root.clone(); + let subsystem = self.clone(); + let reason = reason.to_owned(); + // Spawn `inner` detached on the runtime so panics surface via its + // JoinHandle; only the wrapper is tracked. + let mut inner = handle.spawn(future); + self.tasks.spawn_on( + async move { + tokio::select! { + () = cancel.cancelled() => { + inner.abort(); + let _ = (&mut inner).await; + } + result = &mut inner => { + // Root counts as graceful: during SIGINT, root + // trips before this subsystem's cancel. + if root.is_cancelled() || cancel.is_cancelled() { + return; + } + match result { + Ok(_) => subsystem + .report_fatal(&format!("{reason} exited without cancellation")), + Err(e) if e.is_panic() => subsystem + .report_fatal(&format!("{reason} panicked: {e}")), + Err(_) => {} + } + } + } + }, + handle, + ); + } + + /// Set the fatal flag, trip this subsystem's cancel, trip the root. + /// Idempotent. Logs at error. + pub fn report_fatal(&self, reason: &str) { + error!(subsystem = self.name, reason, "fatal; tripping shutdown"); + self.fatal.store(true, Ordering::Relaxed); + self.cancel.cancel(); + self.root.cancel(); + } + + /// Cancel this subsystem and wait for tracked tokio tasks. Idempotent. + /// + /// Thread-based subsystems (workers, RIO) are not tracked here; their + /// joins happen at scope-close. The watchdog is their hard bound. + /// + /// # Errors + /// Returns [`Elapsed`] if any tracked task is still running after + /// `deadline`. Cancel is tripped and tracker closed either way. + pub async fn drain(&self, deadline: Duration) -> Result<(), Elapsed> { + self.cancel.cancel(); + self.tasks.close(); + tokio::time::timeout(deadline, self.tasks.wait()).await + } +} + +/// Default drain deadlines. Per-subsystem deadlines bound only the +/// tokio tasks tracked by each [`Subsystem`]; [`TOTAL`] is the absolute +/// process-level ceiling enforced by [`spawn_shutdown_watchdog`]. +pub mod default_deadlines { + use std::time::Duration; + /// Drain workers' tokio tasks. + pub const WORKERS: Duration = Duration::from_secs(5); + /// Drain RIO's tokio tasks. + pub const ROUTER: Duration = Duration::from_secs(5); + /// Drain mgmt's tasks (config processor, status updater, watcher). + pub const MGMT: Duration = Duration::from_secs(5); + /// Drain metrics; short — a stuck scrape is fine to abandon. + pub const METRICS: Duration = Duration::from_secs(2); + /// Hard process-wide ceiling. Independent of the sum above. + pub const TOTAL: Duration = Duration::from_secs(15); +} + +/// Root lifecycle bundle owned by `main`. +#[derive(Debug)] +pub struct Shutdown { + /// Tripped by `SIGINT`/`SIGTERM` or any subsystem's + /// [`Subsystem::report_fatal`]. + pub root: CancellationToken, + fatal: Arc, + /// Data-plane workers. + pub workers: Subsystem, + /// Routing/control I/O. + pub router: Subsystem, + /// Management plane. + pub mgmt: Subsystem, + /// Prometheus endpoint and stats collection. + pub metrics: Subsystem, +} + +impl Shutdown { + /// Build a `Shutdown` with subsystems pre-wired to one root and one + /// fatal flag. + #[must_use] + pub fn new() -> Self { + let root = CancellationToken::new(); + let fatal = Arc::new(AtomicBool::new(false)); + Self { + workers: Subsystem::with_fatal("workers", root.clone(), fatal.clone()), + router: Subsystem::with_fatal("router", root.clone(), fatal.clone()), + mgmt: Subsystem::with_fatal("mgmt", root.clone(), fatal.clone()), + metrics: Subsystem::with_fatal("metrics", root.clone(), fatal.clone()), + root, + fatal, + } + } + + /// Set the fatal flag and trip the root. Idempotent. + pub fn fail(&self) { + self.fatal.store(true, Ordering::Relaxed); + self.root.cancel(); + } + + /// True if any subsystem reported fatal or `main` called + /// [`Shutdown::fail`]. Read after drain to choose the exit code. + #[must_use] + pub fn is_fatal(&self) -> bool { + self.fatal.load(Ordering::Relaxed) + } + + /// Drain in order: workers, router, metrics, mgmt. Workers stop + /// touching packets before the control plane goes away. Subsystems + /// that miss their deadline are logged and abandoned. + pub async fn drain_in_order(&self) { + Self::drain_one(&self.workers, default_deadlines::WORKERS).await; + Self::drain_one(&self.router, default_deadlines::ROUTER).await; + Self::drain_one(&self.metrics, default_deadlines::METRICS).await; + Self::drain_one(&self.mgmt, default_deadlines::MGMT).await; + } + + async fn drain_one(sub: &Subsystem, deadline: Duration) { + if sub.drain(deadline).await.is_ok() { + info!(subsystem = sub.name, "drained cleanly"); + } else { + warn!( + subsystem = sub.name, + deadline_ms = u64::try_from(deadline.as_millis()).unwrap_or(u64::MAX), + "drain timed out; abandoning" + ); + } + } +} + +impl Default for Shutdown { + fn default() -> Self { + Self::new() + } +} + +/// Spawn a task on `handle` that trips `root` on `SIGINT`/`SIGTERM`, and +/// also exits if `root` was tripped through another path. +/// +/// # Errors +/// Returns [`std::io::Error`] if either signal handler install fails. +#[cfg(unix)] +pub fn spawn_signal_handler( + handle: &tokio::runtime::Handle, + root: CancellationToken, +) -> std::io::Result<()> { + use tokio::signal::unix::{SignalKind, signal}; + + // Install inside the runtime context so the handlers register with + // its signal driver, not just the EnterGuard. + let (mut sigint, mut sigterm) = { + let _guard = handle.enter(); + ( + signal(SignalKind::interrupt())?, + signal(SignalKind::terminate())?, + ) + }; + + handle.spawn(async move { + tokio::select! { + _ = sigint.recv() => info!("SIGINT received; tripping shutdown"), + _ = sigterm.recv() => info!("SIGTERM received; tripping shutdown"), + () = root.cancelled() => {} + } + root.cancel(); + }); + + Ok(()) +} + +/// Spawn a detached OS thread that calls [`std::process::exit`] `deadline` +/// after `root` is cancelled. Independent of the mgmt runtime so it still +/// fires if the runtime wedges. This is the only bound on a worker thread +/// blocked inside an I/O call that doesn't observe cancellation. +/// +/// # Errors +/// Returns [`std::io::Error`] if spawning fails. A runtime-build failure +/// inside the thread is logged and disarms the watchdog (the process then +/// has no hard shutdown ceiling); treat disarm logs as a startup warning. +pub fn spawn_shutdown_watchdog( + root: CancellationToken, + deadline: Duration, + exit_code: i32, +) -> std::io::Result<()> { + use std::io::Write; + std::thread::Builder::new() + .name("shutdown-watchdog".to_string()) + .spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_time() + .build() + { + Ok(rt) => rt, + Err(e) => { + error!(error = %e, "shutdown watchdog runtime failed to start; disarmed"); + return; + } + }; + rt.block_on(root.cancelled()); + drop(rt); + std::thread::sleep(deadline); + error!( + deadline_ms = u64::try_from(deadline.as_millis()).unwrap_or(u64::MAX), + exit_code, "shutdown exceeded total deadline; aborting" + ); + // process::exit skips destructors, so flush stderr explicitly. + let _ = std::io::stderr().flush(); + std::process::exit(exit_code); + }) + .map(|_| ()) +} + +#[cfg(test)] +mod tests { + use super::*; + use concurrency::sync::Arc; + use concurrency::sync::atomic::{AtomicBool, Ordering}; + + #[tokio::test] + async fn drain_completes_when_tasks_observe_cancel() { + let shutdown = Shutdown::new(); + let mgmt = shutdown.mgmt.clone(); + let cancel = mgmt.cancel_token(); + let observed = Arc::new(AtomicBool::new(false)); + let observed_in_task = observed.clone(); + + let handle = tokio::runtime::Handle::current(); + mgmt.spawn_on( + async move { + cancel.cancelled().await; + observed_in_task.store(true, Ordering::SeqCst); + }, + &handle, + ); + + let result = mgmt.drain(Duration::from_millis(500)).await; + assert!(result.is_ok()); + assert!(observed.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn drain_times_out_when_task_ignores_cancel() { + let shutdown = Shutdown::new(); + let mgmt = shutdown.mgmt.clone(); + + let handle = tokio::runtime::Handle::current(); + mgmt.spawn_on( + async move { + tokio::time::sleep(Duration::from_mins(1)).await; + }, + &handle, + ); + + let result = mgmt.drain(Duration::from_millis(50)).await; + assert!(result.is_err()); + assert!(mgmt.is_cancelled()); + assert!(mgmt.tasks.is_closed()); + } + + #[tokio::test] + async fn report_fatal_trips_root_self_cancel_and_fatal_flag() { + let shutdown = Shutdown::new(); + assert!(!shutdown.is_fatal()); + shutdown.workers.report_fatal("synthetic test failure"); + + assert!(shutdown.root.is_cancelled()); + assert!(shutdown.is_fatal()); + assert!(shutdown.workers.is_cancelled()); + assert!(!shutdown.mgmt.is_cancelled()); + assert!(!shutdown.router.is_cancelled()); + assert!(!shutdown.metrics.is_cancelled()); + } + + #[tokio::test] + async fn shutdown_fail_sets_fatal_and_trips_root() { + let shutdown = Shutdown::new(); + assert!(!shutdown.is_fatal()); + assert!(!shutdown.root.is_cancelled()); + + shutdown.fail(); + + assert!(shutdown.is_fatal()); + assert!(shutdown.root.is_cancelled()); + } + + #[tokio::test] + async fn standalone_subsystem_has_its_own_fatal_flag() { + let root = CancellationToken::new(); + let a = Subsystem::new("a", root.clone()); + let b = Subsystem::new("b", root); + a.report_fatal("isolated"); + assert!(a.fatal.load(Ordering::Relaxed)); + assert!(!b.fatal.load(Ordering::Relaxed)); + } + + #[tokio::test] + async fn subsystem_cancels_are_independent_of_root() { + let shutdown = Shutdown::new(); + shutdown.root.cancel(); + + assert!(shutdown.root.is_cancelled()); + assert!(!shutdown.workers.is_cancelled()); + assert!(!shutdown.mgmt.is_cancelled()); + } + + #[tokio::test] + async fn subsystem_root_token_observes_signal_handler_cancel() { + let shutdown = Shutdown::new(); + let mgmt_root = shutdown.mgmt.root_token(); + assert!(!mgmt_root.is_cancelled()); + + shutdown.fail(); + assert!(mgmt_root.is_cancelled()); + } + + #[tokio::test] + async fn drain_is_idempotent() { + let shutdown = Shutdown::new(); + let mgmt = shutdown.mgmt.clone(); + + let first = mgmt.drain(Duration::from_millis(50)).await; + let second = mgmt.drain(Duration::from_millis(50)).await; + assert!(first.is_ok()); + assert!(second.is_ok()); + } + + #[tokio::test] + async fn spawn_fatal_on_exit_trips_root_on_normal_return() { + let shutdown = Shutdown::new(); + let handle = tokio::runtime::Handle::current(); + shutdown + .mgmt + .spawn_fatal_on_exit("test task", async {}, &handle); + + tokio::time::timeout(Duration::from_millis(500), shutdown.root.cancelled()) + .await + .expect("root should trip on task exit"); + assert!(shutdown.is_fatal()); + } + + #[tokio::test] + async fn spawn_fatal_on_exit_trips_root_on_panic() { + let shutdown = Shutdown::new(); + let handle = tokio::runtime::Handle::current(); + shutdown.mgmt.spawn_fatal_on_exit( + "test task", + async { + panic!("synthetic panic"); + }, + &handle, + ); + + tokio::time::timeout(Duration::from_millis(500), shutdown.root.cancelled()) + .await + .expect("root should trip on task panic"); + assert!(shutdown.is_fatal()); + } + + #[tokio::test] + async fn spawn_fatal_on_exit_does_not_trip_when_root_cancelled_first() { + // Simulates: SIGINT trips root before drain_in_order reaches mgmt. + // A supervised mgmt task exiting in that window must not flip fatal. + let shutdown = Shutdown::new(); + let handle = tokio::runtime::Handle::current(); + shutdown.root.cancel(); + shutdown + .mgmt + .spawn_fatal_on_exit("test task", async {}, &handle); + + shutdown + .mgmt + .drain(Duration::from_millis(500)) + .await + .unwrap(); + assert!(!shutdown.is_fatal()); + } + + #[tokio::test] + async fn spawn_fatal_on_exit_does_not_trip_when_cancelled_first() { + let shutdown = Shutdown::new(); + let handle = tokio::runtime::Handle::current(); + let cancel = shutdown.mgmt.cancel_token(); + shutdown.mgmt.spawn_fatal_on_exit( + "test task", + async move { + cancel.cancelled().await; + }, + &handle, + ); + + shutdown + .mgmt + .drain(Duration::from_millis(500)) + .await + .unwrap(); + assert!(!shutdown.is_fatal()); + } + + #[tokio::test] + async fn drain_in_order_completes_when_all_subsystems_observe_cancel() { + let shutdown = Shutdown::new(); + let handle = tokio::runtime::Handle::current(); + for sub in [ + &shutdown.workers, + &shutdown.router, + &shutdown.mgmt, + &shutdown.metrics, + ] { + let cancel = sub.cancel_token(); + sub.spawn_on(async move { cancel.cancelled().await }, &handle); + } + shutdown.drain_in_order().await; + assert!(shutdown.workers.is_cancelled()); + assert!(shutdown.router.is_cancelled()); + assert!(shutdown.mgmt.is_cancelled()); + assert!(shutdown.metrics.is_cancelled()); + } +} diff --git a/mgmt/Cargo.toml b/mgmt/Cargo.toml index b474f55db1..b8e9d9fa41 100644 --- a/mgmt/Cargo.toml +++ b/mgmt/Cargo.toml @@ -26,6 +26,7 @@ id = { workspace = true } interface-manager = { workspace = true } k8s-intf = { workspace = true, features = ["client"] } k8s-less = { workspace = true } +lifecycle = { workspace = true } lpm = { workspace = true } nat = { workspace = true } net = { workspace = true } diff --git a/mgmt/src/lib.rs b/mgmt/src/lib.rs index 9847426ab7..245990a966 100644 --- a/mgmt/src/lib.rs +++ b/mgmt/src/lib.rs @@ -7,7 +7,7 @@ mod processor; mod tests; pub mod vpc_manager; -pub use processor::launch::{MgmtParams, start_mgmt}; +pub use processor::launch::{LaunchError, MgmtParams, run_mgmt}; pub use processor::proc::ConfigProcessorParams; use tracectl::trace_target; diff --git a/mgmt/src/processor/launch.rs b/mgmt/src/processor/launch.rs index 95fcf1623c..149c8d6b18 100644 --- a/mgmt/src/processor/launch.rs +++ b/mgmt/src/processor/launch.rs @@ -9,6 +9,7 @@ use crate::processor::proc::ConfigProcessor; use crate::processor::proc::ConfigProcessorParams; use concurrency::sync::Arc; +use lifecycle::{CancellationToken, Subsystem}; use tracing::{debug, error, info, warn}; #[derive(Debug, thiserror::Error)] @@ -16,21 +17,11 @@ pub enum LaunchError { #[error("IO error: {0}")] IoError(std::io::Error), #[error("Error in K8s client task: {0}")] - K8sClientError(K8sClientError), - #[error("Error starting/waiting for K8s client task: {0}")] - K8sClientJoinError(tokio::task::JoinError), - #[error("K8s client exited prematurely")] - PrematureK8sClientExit, - #[error("Config processor exited prematurely")] - PrematureProcessorExit, - - #[error("Error in Config Processor task: {0}")] - ProcessorError(std::io::Error), - #[error("Error starting/waiting for Config Processor task: {0}")] - ProcessorJoinError(tokio::task::JoinError), - + K8sClientError(#[from] K8sClientError), #[error("Error in k8s-less mode: {0}")] K8LessError(#[from] K8sLessError), + #[error("Mgmt init cancelled before completion")] + Cancelled, } pub struct MgmtParams { @@ -42,96 +33,262 @@ pub struct MgmtParams { use std::time::Duration; const K8S_STATUS_UPD: Duration = Duration::from_secs(15); const K8S_INIT_RETRY_TIME: Duration = Duration::from_secs(5); -const K8S_INIT_MAX_ATTEMPTS: u8 = 10; +const K8S_INIT_MAX_RETRIES: u8 = 10; -async fn k8s_mgmt_init(k8s_client: &K8sClient) -> Result<(), K8sClientError> { - let mut retries = K8S_INIT_MAX_ATTEMPTS; +/// Run `init` under `cancel`. Returns [`LaunchError::Cancelled`] on cancel. +async fn init_cancellable(init: F, cancel: &CancellationToken) -> Result<(), LaunchError> +where + F: std::future::Future>, + LaunchError: From, +{ + tokio::select! { + r = init => r.map_err(LaunchError::from), + () = cancel.cancelled() => { + info!("Mgmt init cancelled"); + Err(LaunchError::Cancelled) + } + } +} +/// Retry k8s init up to `K8S_INIT_MAX_RETRIES` times with +/// `K8S_INIT_RETRY_TIME` backoff. Attempt and backoff both observe `cancel`. +async fn k8s_mgmt_init( + k8s_client: &K8sClient, + cancel: &CancellationToken, +) -> Result<(), LaunchError> { + let mut retries = K8S_INIT_MAX_RETRIES; debug!("Initializing k8s client..."); - while let Err(e) = k8s_client.init().await { - warn!("Could not initialize k8s state. Will retry {retries} more times"); - tokio::time::sleep(K8S_INIT_RETRY_TIME).await; - if retries == 0 { - error!("Maximum k8s initialization attempts reached. Giving up..."); - return Err(e); + loop { + match init_cancellable(k8s_client.init(), cancel).await { + Ok(()) => break, + Err(LaunchError::Cancelled) => return Err(LaunchError::Cancelled), + Err(e) if retries == 0 => { + error!("Maximum k8s initialization attempts reached. Giving up..."); + return Err(e); + } + Err(_) => { + warn!("Could not initialize k8s state. Will retry {retries} more times"); + retries -= 1; + tokio::select! { + () = tokio::time::sleep(K8S_INIT_RETRY_TIME) => {} + () = cancel.cancelled() => { + info!("K8s init cancelled during retry backoff"); + return Err(LaunchError::Cancelled); + } + } + } } - retries -= 1; } info!("K8s initialization succeeded"); Ok(()) } -/// Start the mgmt service. If the k8s interface is not ready, this may take up to -/// K8S_INIT_RETRY_TIME * K8S_INIT_MAX_ATTEMPTS seconds to complete. -pub fn start_mgmt(params: MgmtParams) -> Result, LaunchError> { - let (tx, rx) = tokio::sync::oneshot::channel(); - - let handle = std::thread::Builder::new() - .name("mgmt".to_string()) - .spawn(move || { - debug!("Starting dataplane management thread..."); - - /* create tokio runtime */ - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .expect("Tokio runtime creation failed"); - - if let Some(config_dir) = ¶ms.config_dir { - warn!("Running in k8s-less mode...."); - rt.block_on(async { - let (processor, client) = ConfigProcessor::new(params.processor_params); - let k8sless = - Arc::new(K8sLess::new(params.hostname.as_str(), config_dir, client)); - let k8sless1 = k8sless.clone(); - - let init_result = k8sless.init().await.map_err(LaunchError::K8LessError); - let init_failed = init_result.is_err(); - tx.send(init_result).expect("Main thread gone"); - if init_failed { - return; - } +/// Init mgmt synchronously on `handle`, then spawn the long-lived tasks +/// (config processor, status updater, config watcher) tracked under +/// `mgmt`. Init observes `mgmt.root_token()` so SIGINT during init returns +/// [`LaunchError::Cancelled`] within cancel latency. +/// +/// # Errors +/// Returns [`LaunchError`] on init failure. [`LaunchError::Cancelled`] is +/// a clean-shutdown signal — callers must not flip the fatal flag for it. +pub fn run_mgmt( + handle: &tokio::runtime::Handle, + mgmt: &Subsystem, + params: MgmtParams, +) -> Result<(), LaunchError> { + if let Some(config_dir) = ¶ms.config_dir { + warn!("Running in k8s-less mode...."); + handle.block_on(run_k8s_less( + handle, + mgmt, + params.hostname.as_str(), + config_dir, + params.processor_params, + )) + } else { + debug!("Will start watching k8s for configuration changes"); + handle.block_on(run_k8s( + handle, + mgmt, + params.hostname.as_str(), + params.processor_params, + )) + } +} - tokio::spawn(async { processor.run().await }); - tokio::spawn(async move { k8sless.start_status_update(&K8S_STATUS_UPD).await }); - let _ = K8sLess::start_config_watch(k8sless1).await; - }) - } else { - debug!("Will start watching k8s for configuration changes"); - rt.block_on(async { - let (processor, client) = ConfigProcessor::new(params.processor_params); - let k8s_client = Arc::new(K8sClient::new(params.hostname.as_str(), client)); - let k8s_client1 = k8s_client.clone(); - - let init_result = k8s_mgmt_init(&k8s_client) - .await - .map_err(LaunchError::K8sClientError); - - let init_failed = init_result.is_err(); - tx.send(init_result).expect("Main thread gone"); - if init_failed { - return; - } +async fn run_k8s_less( + handle: &tokio::runtime::Handle, + mgmt: &Subsystem, + hostname: &str, + config_dir: &str, + processor_params: ConfigProcessorParams, +) -> Result<(), LaunchError> { + let (processor, client) = ConfigProcessor::new(processor_params); + let k8sless = Arc::new(K8sLess::new(hostname, config_dir, client)); + let k8sless_for_watch = k8sless.clone(); + + init_cancellable(k8sless.init(), &mgmt.root_token()).await?; - tokio::spawn(async { processor.run().await }); - tokio::spawn(async move { - k8s_client1.k8s_start_status_update(&K8S_STATUS_UPD).await - }); - let _ = - tokio::spawn(async { K8sClient::k8s_start_config_watch(k8s_client).await }) - .await; - }) + mgmt.spawn_fatal_on_exit("k8s-less config processor", processor.run(), handle); + let k8sless_for_status = k8sless.clone(); + mgmt.spawn_fatal_on_exit( + "k8s-less status updater", + async move { + k8sless_for_status + .start_status_update(&K8S_STATUS_UPD) + .await + }, + handle, + ); + mgmt.spawn_fatal_on_exit( + "k8s-less config watcher", + async move { + match K8sLess::start_config_watch(k8sless_for_watch).await { + Ok(()) => warn!("k8s-less config watcher returned Ok unexpectedly"), + Err(e) => error!("k8s-less config watcher failed: {e}"), } - unreachable!() - }) - .map_err(LaunchError::IoError)?; - - match rx - .blocking_recv() - .map_err(|_| LaunchError::PrematureProcessorExit)? - { - Ok(()) => Ok(handle), - Err(e) => Err(e), + }, + handle, + ); + + Ok(()) +} + +async fn run_k8s( + handle: &tokio::runtime::Handle, + mgmt: &Subsystem, + hostname: &str, + processor_params: ConfigProcessorParams, +) -> Result<(), LaunchError> { + let (processor, client) = ConfigProcessor::new(processor_params); + let k8s_client = Arc::new(K8sClient::new(hostname, client)); + let k8s_client_for_status = k8s_client.clone(); + + k8s_mgmt_init(&k8s_client, &mgmt.root_token()).await?; + + mgmt.spawn_fatal_on_exit("k8s config processor", processor.run(), handle); + mgmt.spawn_fatal_on_exit( + "k8s status updater", + async move { + k8s_client_for_status + .k8s_start_status_update(&K8S_STATUS_UPD) + .await + }, + handle, + ); + mgmt.spawn_fatal_on_exit( + "k8s config watcher", + async move { + K8sClient::k8s_start_config_watch(k8s_client).await; + }, + handle, + ); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::processor::k8s_less_client::K8sLessError; + use lifecycle::Shutdown; + use std::time::Duration; + + #[tokio::test] + async fn init_cancellable_returns_cancelled_on_pre_tripped_token() { + let cancel = CancellationToken::new(); + cancel.cancel(); + + let result: Result<(), LaunchError> = init_cancellable( + async { + // Long sleep so a missing cancel arm surfaces as a test timeout. + tokio::time::sleep(Duration::from_secs(60)).await; + Ok::<(), K8sLessError>(()) + }, + &cancel, + ) + .await; + + assert!(matches!(result, Err(LaunchError::Cancelled))); + } + + #[tokio::test] + async fn init_cancellable_returns_cancelled_when_tripped_mid_init() { + let cancel = CancellationToken::new(); + let cancel_for_task = cancel.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(20)).await; + cancel_for_task.cancel(); + }); + + let result: Result<(), LaunchError> = init_cancellable( + async { + tokio::time::sleep(Duration::from_secs(60)).await; + Ok::<(), K8sLessError>(()) + }, + &cancel, + ) + .await; + + assert!(matches!(result, Err(LaunchError::Cancelled))); + } + + #[tokio::test] + async fn init_cancellable_returns_ok_when_init_completes_first() { + let cancel = CancellationToken::new(); + let result: Result<(), LaunchError> = + init_cancellable(async { Ok::<(), K8sLessError>(()) }, &cancel).await; + assert!(result.is_ok()); + assert!(!cancel.is_cancelled()); + } + + #[tokio::test] + async fn init_cancellable_propagates_init_error() { + let cancel = CancellationToken::new(); + let result: Result<(), LaunchError> = init_cancellable( + async { Err::<(), K8sLessError>(K8sLessError::Internal("synthetic".into())) }, + &cancel, + ) + .await; + assert!(matches!(result, Err(LaunchError::K8LessError(_)))); + } + + /// Locks in the main.rs contract: SIGTERM during k8s init must yield + /// exit 0 (else systemd restart-loops the unit). Mirrors the match + /// arms in runtime.rs. + #[tokio::test] + async fn cancelled_launch_error_yields_zero_exit_code_at_call_site() { + let shutdown = Shutdown::new(); + shutdown.root.cancel(); + let mgmt_result: Result<(), LaunchError> = Err(LaunchError::Cancelled); + + match mgmt_result { + Ok(()) => {} + Err(LaunchError::Cancelled) => {} + Err(_) => { + shutdown.fail(); + } + } + + assert!(!shutdown.is_fatal()); + assert_eq!(i32::from(shutdown.is_fatal()), 0); + } + + #[tokio::test] + async fn non_cancelled_launch_error_yields_nonzero_exit_code_at_call_site() { + let shutdown = Shutdown::new(); + let mgmt_result: Result<(), LaunchError> = + Err(LaunchError::IoError(std::io::Error::other("synthetic"))); + + match mgmt_result { + Ok(()) => {} + Err(LaunchError::Cancelled) => {} + Err(_) => { + shutdown.fail(); + } + } + + assert!(shutdown.is_fatal()); + assert_eq!(i32::from(shutdown.is_fatal()), 1); } } diff --git a/mgmt/src/tests/mgmt.rs b/mgmt/src/tests/mgmt.rs index 508f6099cb..0eb6c9477a 100644 --- a/mgmt/src/tests/mgmt.rs +++ b/mgmt/src/tests/mgmt.rs @@ -435,7 +435,10 @@ pub mod test { .expect("Should succeed due to defaults"); /* start router */ - let router = Router::new(router_params, None); + let test_mgmt = lifecycle::Subsystem::new("mgmt", lifecycle::CancellationToken::new()); + let test_router = lifecycle::Subsystem::new("router", lifecycle::CancellationToken::new()); + let handle = tokio::runtime::Handle::current(); + let router = Router::new(&test_mgmt, &handle, &test_router, router_params, None); if let Err(e) = &router { error!("New router failed: {e}"); panic!(); diff --git a/routing/Cargo.toml b/routing/Cargo.toml index 74e4459638..d3c830eb40 100644 --- a/routing/Cargo.toml +++ b/routing/Cargo.toml @@ -20,6 +20,7 @@ config = { workspace = true } concurrency = { workspace = true } dplane-rpc = { workspace = true } left-right-tlcache = { workspace = true } +lifecycle = { workspace = true } lpm = { workspace = true } net = { workspace = true } tracectl = { workspace = true } diff --git a/routing/src/bmp/mod.rs b/routing/src/bmp/mod.rs index 44b5b99146..6e9b5ce9c4 100644 --- a/routing/src/bmp/mod.rs +++ b/routing/src/bmp/mod.rs @@ -9,6 +9,7 @@ pub use server::{BmpServer, BmpServerConfig}; use concurrency::sync::Arc; use config::internal::status::DataplaneStatus; +use lifecycle::Subsystem; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tracing::{error, info}; @@ -16,12 +17,16 @@ use tracing::{error, info}; use tracectl::trace_target; trace_target!("bmp", LevelFilter::INFO, &[]); -/// Spawn BMP server in background +/// Spawn the BMP server on `handle`, tracked under `mgmt` so it drains +/// with the rest of mgmt's tasks. +#[must_use] pub fn spawn_background( + mgmt: &Subsystem, + handle: &tokio::runtime::Handle, bind: std::net::SocketAddr, dp_status: Arc>, ) -> JoinHandle<()> { - // The future we want to run + let cancel = mgmt.cancel_token(); let fut = async move { info!("starting BMP server on {}", bind); let cfg = BmpServerConfig { @@ -29,19 +34,16 @@ pub fn spawn_background( ..Default::default() }; let srv = BmpServer::new(cfg, handler::StatusHandler::new(dp_status)); - if let Err(e) = srv.run().await { - error!("bmp server terminated: {e:#}"); + tokio::select! { + () = cancel.cancelled() => { + info!("BMP server shutdown requested"); + } + res = srv.run() => { + if let Err(e) = res { + error!("bmp server terminated: {e:#}"); + } + } } }; - - if let Ok(handle) = tokio::runtime::Handle::try_current() { - handle.spawn(fut) - } else { - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .expect("failed to build Tokio runtime for BMP"); - let rt_static: &'static tokio::runtime::Runtime = Box::leak(Box::new(rt)); - rt_static.spawn(fut) - } + mgmt.spawn_on(fut, handle) } diff --git a/routing/src/frr/test.rs b/routing/src/frr/test.rs index d83322e6fb..b53ce9847c 100644 --- a/routing/src/frr/test.rs +++ b/routing/src/frr/test.rs @@ -149,7 +149,12 @@ pub mod tests { .expect("Should succeed due to defaults"); /* start router */ - let mut router = Router::new(router_params, None).unwrap(); + let mgmt = lifecycle::Subsystem::new("mgmt", lifecycle::CancellationToken::new()); + let router_subsystem = + lifecycle::Subsystem::new("router", lifecycle::CancellationToken::new()); + let handle = tokio::runtime::Handle::current(); + let mut router = + Router::new(&mgmt, &handle, &router_subsystem, router_params, None).unwrap(); let mut ctl = router.get_ctl_tx(); /* start fake frr agent */ diff --git a/routing/src/router/ctl.rs b/routing/src/router/ctl.rs index 3fda28e73e..e504a7f33d 100644 --- a/routing/src/router/ctl.rs +++ b/routing/src/router/ctl.rs @@ -45,7 +45,6 @@ impl Drop for LockGuard { } pub(crate) enum RouterCtlMsg { - Finish, Lock(RouterCtlReplyTx), Unlock(RouterCtlReplyTx), GuardedUnlock, @@ -241,10 +240,6 @@ fn handle_config_history(rio: &mut Rio, history: Arc>) { /// Handle a request from the control channel pub(crate) fn handle_ctl_msg(rio: &mut Rio, db: &mut RoutingDb) { match rio.ctl_rx.try_recv() { - Ok(RouterCtlMsg::Finish) => { - info!("Got request to shutdown. Au revoir ..."); - rio.run = false; - } Ok(RouterCtlMsg::Lock(reply_to)) => handle_lock(rio, true, Some(reply_to)), Ok(RouterCtlMsg::Unlock(reply_to)) => handle_lock(rio, false, Some(reply_to)), Ok(RouterCtlMsg::GuardedUnlock) => handle_lock(rio, false, None), diff --git a/routing/src/router/mod.rs b/routing/src/router/mod.rs index a4bffdcb16..fa300c4151 100644 --- a/routing/src/router/mod.rs +++ b/routing/src/router/mod.rs @@ -144,6 +144,9 @@ impl Router { /// Start a `Router` #[allow(clippy::new_without_default)] pub fn new( + mgmt: &lifecycle::Subsystem, + mgmt_handle: &tokio::runtime::Handle, + router: &lifecycle::Subsystem, params: RouterParams, cli_sources: Option, ) -> Result { @@ -163,15 +166,16 @@ impl Router { let rioconf = Self::build_rio_config(¶ms)?; debug!("{name}: Starting router IO..."); - let rio_handle = start_rio(&rioconf, fibtw, iftw, atabler, cli_sources)?; + let rio_handle = start_rio(router, &rioconf, fibtw, iftw, atabler, cli_sources)?; - // Start BMP server in background if configured, always with mandatory dp_status let bmp_handle = if let Some(bmp_params) = ¶ms.bmp { debug!( "{name}: Starting BMP server on {} (interval={:?})", bmp_params.bind_addr, bmp_params.stats_interval ); Some(bmp::spawn_background( + mgmt, + mgmt_handle, bmp_params.bind_addr, params.dp_status.clone(), )) @@ -200,7 +204,9 @@ impl Router { } self.resolver.stop(); - // Abort BMP server task if running (Tokio handle). + // BMP is also tracked under the mgmt subsystem, which normally + // drains it cleanly via `Shutdown::drain_in_order`. This abort is + // a safety net for the case where the mgmt drain hit its deadline. if let Some(handle) = self.bmp_handle.take() { handle.abort(); } diff --git a/routing/src/router/rio.rs b/routing/src/router/rio.rs index 6916adb8f1..be2e3f2587 100644 --- a/routing/src/router/rio.rs +++ b/routing/src/router/rio.rs @@ -22,17 +22,18 @@ use cli::IoCache; use cli::cliproto::{CLI_RX_BUFF_SIZE, CliRequest}; use config::{GwConfigMeta, ValidatedGwConfig}; use dplane_rpc::socks::RpcCachedSock; +use lifecycle::{CancellationToken, Subsystem}; use mio::unix::SourceFd; use mio::{Events, Interest, Poll, Token}; use concurrency::sync::Arc; +use concurrency::thread::{self, JoinHandle}; use nix::sys::socket::{getsockopt, setsockopt, sockopt::SndBuf}; use std::fs; use std::os::fd::AsRawFd; use std::os::unix::fs::PermissionsExt; use std::os::unix::net::UnixDatagram; -use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; use tokio::sync::mpsc::{Receiver, Sender, channel}; @@ -44,31 +45,28 @@ const CTL_CHANNEL_CAPACITY: usize = 100; /// An object to control a router IO, [`Rio`] pub(crate) struct RioHandle { + cancel: CancellationToken, ctl: Sender, handle: Option>, } impl RioHandle { - /// Terminate the router IO loop / thread + /// Trip the router cancel and join the RIO thread. Idempotent — a + /// second call after the thread has been joined returns `Ok(())`. + /// Worst-case exit latency is one poll timeout (1 second). /// /// # Errors - /// Fails if the channel has been dropped or the thread cannot be joined + /// Fails if the thread panicked during join. pub(crate) fn finish(&mut self) -> Result<(), RouterError> { debug!("Requesting router IO to stop.."); - self.ctl - .try_send(RouterCtlMsg::Finish) - .map_err(|_| RouterError::Internal("Error sending over ctl channel"))?; - - let handle = self.handle.take(); - if let Some(handle) = handle { - debug!("Waiting for the router IO to terminate.."); - handle - .join() - .map_err(|_| RouterError::Internal("Error joining thread"))?; - debug!("Router IO ended successfully"); - Ok(()) - } else { - Err(RouterError::Internal("No handle")) - } + self.cancel.cancel(); + + let Some(handle) = self.handle.take() else { + return Ok(()); + }; + handle + .join() + .map_err(|_| RouterError::Internal("Error joining thread"))?; + Ok(()) } #[must_use] pub(crate) fn get_ctl_tx(&self) -> RouterCtlSender { @@ -113,7 +111,6 @@ pub(crate) const FRRMISOCK: Token = Token(2); pub(crate) struct Rio { #[allow(unused)] pub(crate) name: String, - pub(crate) run: bool, pub(crate) frozen: bool, pub(crate) cp_sock_path: String, pub(crate) cli_sock_path: String, @@ -185,7 +182,6 @@ impl Rio { Ok(Rio { name: conf.name.clone(), - run: true, frozen: false, cp_sock_path, cli_sock_path, @@ -342,6 +338,7 @@ impl Rio { #[allow(clippy::missing_errors_doc, clippy::too_many_lines)] pub(crate) fn start_rio( + router: &Subsystem, conf: &RioConf, fibtw: FibTableWriter, iftw: IfTableWriter, @@ -351,9 +348,34 @@ pub(crate) fn start_rio( let mut rio = Rio::new(conf)?; let ctl_tx = rio.ctl_tx.clone(); let cli_sources = cli_sources.unwrap_or_default(); + let cancel = router.cancel_token(); + let loop_cancel = cancel.clone(); + let guard_subsystem = router.clone(); /* router IO loop */ let rio_loop = move || { + // Drop-guard so panic-unwind or unexpected loop exit trips + // report_fatal. + struct ExitGuard { + subsystem: Subsystem, + } + impl Drop for ExitGuard { + fn drop(&mut self) { + if self.subsystem.is_cancelled() { + return; + } + let reason = if std::thread::panicking() { + "RIO thread panicked" + } else { + "RIO thread exited unexpectedly" + }; + self.subsystem.report_fatal(reason); + } + } + let _guard = ExitGuard { + subsystem: guard_subsystem, + }; + info!("CPI: Listening at {}.", &rio.cp_sock_path); info!("CLI: Listening at {}.", &rio.cli_sock_path); info!("FRRMI: will connect to {}.", &rio.frrmi.get_remote()); @@ -366,7 +388,9 @@ pub(crate) fn start_rio( revent!(RouterEvent::Started); info!("Entering router IO loop...."); - while rio.run { + // Observe the router subsystem cancellation between poll cycles. + // Worst-case exit latency is the poll timeout (1 second). + while !loop_cancel.is_cancelled() { if let Err(e) = rio.poller.poll(&mut events, Some(Duration::from_secs(1))) { error!("Poller error!: {e}"); continue; @@ -467,6 +491,7 @@ pub(crate) fn start_rio( .map_err(|_| RouterError::Internal("Failure spawning thread"))?; Ok(RioHandle { + cancel, ctl: ctl_tx, handle: Some(handle), }) @@ -479,9 +504,14 @@ mod tests { use crate::fib::fibtable::FibTableWriter; use crate::interfaces::iftablerw::IfTableWriter; use crate::router::rio::{RioConf, start_rio}; - use std::thread; + use concurrency::thread; + use lifecycle::{CancellationToken, Subsystem}; use std::time::Duration; + fn test_router_subsystem() -> Subsystem { + Subsystem::new("router", CancellationToken::new()) + } + #[test] #[cfg_attr(emulated, ignore = "binds Unix domain sockets at /tmp/hh_*.sock")] fn test_rio_ctl() { @@ -508,7 +538,9 @@ mod tests { let (_atablew, atabler) = AtableWriter::new(); /* start CPI */ - let mut cpi = start_rio(&conf, fibtw, iftw, atabler, None).expect("Should succeed"); + let router = test_router_subsystem(); + let mut cpi = + start_rio(&router, &conf, fibtw, iftw, atabler, None).expect("Should succeed"); thread::sleep(Duration::from_secs(3)); assert_eq!(cpi.finish(), Ok(())); } @@ -533,7 +565,8 @@ mod tests { let (_atablew, atabler) = AtableWriter::new(); /* start router IO */ - let rio = start_rio(&conf, fibtw, iftw, atabler, None); + let router = test_router_subsystem(); + let rio = start_rio(&router, &conf, fibtw, iftw, atabler, None); assert!(rio.is_err_and(|e| matches!(e, RouterError::InvalidPath(_)))); } }