From fad5a433fe4565e2114fab04e0edfaa2cb5dc515 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sat, 9 May 2026 17:20:37 +0200 Subject: [PATCH 1/7] fix: update latest_heartbeat_sent when sending a heartbeat --- src/nodes/node_manager.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/nodes/node_manager.rs b/src/nodes/node_manager.rs index 9b95b9c..fefaf67 100644 --- a/src/nodes/node_manager.rs +++ b/src/nodes/node_manager.rs @@ -332,14 +332,17 @@ impl<'a> NodeManager<'a> { pub fn dispatch_heartbeat_requests(&self) -> Result<()> { for node_entry in self.can_nodes.iter() { let node_id = *node_entry.key(); - let next_heartbeat = node_entry + let mut latest_heartbeat_sent = node_entry .latest_heartbeat_sent - .read() - .map_err(|error| anyhow!("RwLock was poisoned: {}", error))? - .as_ref() - .map(|(_, counter)| *counter + 1) + .write() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; + + let next_heartbeat = latest_heartbeat_sent + .map(|(_, counter)| counter + 1) .unwrap_or(0); + *latest_heartbeat_sent = Some((Utc::now(), next_heartbeat)); + self.event_dispatcher .dispatch(events::Event::SendCanMessage { receiver_node_id: node_id, From 3079980951e03f456aec6edf5714a5c0de16f5be Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sat, 9 May 2026 19:38:12 +0200 Subject: [PATCH 2/7] detect nodes that are not responsive to heartbeats If nodes don't respond, we retry sending heartbeat requests with exponential backoff. If the still do not respond after a set amount of attempts, we remove them from the list of connected nodes, and they must re-register. --- README.md | 16 ++ src/config/mod.rs | 29 +++- src/lib.rs | 7 +- src/nodes/heartbeat.rs | 338 ++++++++++++++++++++++++++++++++++++++ src/nodes/mod.rs | 46 +----- src/nodes/node_manager.rs | 61 +------ tests/emulator.rs | 9 +- 7 files changed, 399 insertions(+), 107 deletions(-) create mode 100644 src/nodes/heartbeat.rs diff --git a/README.md b/README.md index 144bc8d..600bb4f 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,22 @@ ferroflow-vcan down vcan0 ## Development +### Runtime Configuration + +Ferroflow reads `config.yml` on startup. Heartbeat retries use `heartbeat_period` as the base interval and back off per node when heartbeat responses are missing. + +```yaml +can_bus_interfaces: + - can0 +heartbeat_period: 1 +heartbeat_backoff_multiplier: 2 +heartbeat_max_period: 60 +heartbeat_max_unanswered: 5 +database_url: postgres://postgres:yourpassword@localhost:5432/ferroflow +``` + +With these values, healthy nodes get a 1s heartbeat cadence. After a missed response, retries wait 2s, 4s, 8s, and so on up to 60s. After 5 unanswered heartbeat requests, the node is removed and Ferroflow stops sending heartbeats to it. + ### Running CI Checks The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions diff --git a/src/config/mod.rs b/src/config/mod.rs index 3c04e19..ed69aad 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -4,10 +4,25 @@ use anyhow::{Context, Result}; use config as config_builder; use serde::{Deserialize, Serialize}; +const DEFAULT_HEARTBEAT_BACKOFF_MULTIPLIER: u32 = 2; +const DEFAULT_HEARTBEAT_MAX_PERIOD: u32 = 60; +const DEFAULT_HEARTBEAT_MAX_UNANSWERED: u32 = 5; + +#[derive(Deserialize, Serialize, Debug)] +pub struct HeartbeatConfig { + pub period: u32, + #[serde(default = "default_heartbeat_backoff_multiplier")] + pub backoff_multiplier: u32, + #[serde(default = "default_heartbeat_max_period")] + pub max_period: u32, + #[serde(default = "default_heartbeat_max_unanswered")] + pub max_unanswered: u32, +} + #[derive(Deserialize, Serialize, Debug)] pub struct Config { pub can_bus_interfaces: Vec, - pub heartbeat_period: u64, + pub heartbeat: HeartbeatConfig, pub database_url: String, } @@ -20,3 +35,15 @@ pub fn load_config(path: &str) -> Result { .try_deserialize() .with_context(|| format!("Failed to deserialize config from {}", path)) } + +fn default_heartbeat_backoff_multiplier() -> u32 { + DEFAULT_HEARTBEAT_BACKOFF_MULTIPLIER +} + +fn default_heartbeat_max_period() -> u32 { + DEFAULT_HEARTBEAT_MAX_PERIOD +} + +fn default_heartbeat_max_unanswered() -> u32 { + DEFAULT_HEARTBEAT_MAX_UNANSWERED +} diff --git a/src/lib.rs b/src/lib.rs index f75ccd3..55f53c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,12 +38,7 @@ pub fn run_with_dependencies( println!("Starting node registration"); nodes::spawn_can_msg_handler_thread(node_manager, event_dispatcher, scope); - nodes::spawn_heartbeat_thread( - node_manager, - std::time::Duration::from_secs(config.heartbeat_period), - event_dispatcher, - scope, - ); + nodes::spawn_heartbeat_thread(node_manager, &config.heartbeat, event_dispatcher, scope); node_manager.start_node_registration(); diff --git a/src/nodes/heartbeat.rs b/src/nodes/heartbeat.rs new file mode 100644 index 0000000..96e24a8 --- /dev/null +++ b/src/nodes/heartbeat.rs @@ -0,0 +1,338 @@ +use std::{ + sync::mpsc::{self, RecvTimeoutError}, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result, anyhow}; +use chrono::{DateTime, Utc}; +use liquidcan::{CanMessage, CanMessageId, payloads::HeartbeatPayload}; + +use crate::{ + config::HeartbeatConfig, + events::{self, EventKind}, +}; + +use super::node_manager::NodeManager; + +pub fn spawn_heartbeat_thread<'a>( + node_manager: &'a NodeManager<'a>, + heartbeat_config: &'a HeartbeatConfig, + event_dispatcher: &'a events::EventDispatcher, + scope: &'a std::thread::Scope<'a, '_>, +) { + let (tx, rx) = mpsc::channel::(); + let events = vec![EventKind::Shutdown]; + + event_dispatcher.subscribe(tx, events, "Heartbeat thread"); + + scope.spawn(move || { + if let Err(error) = + dispatch_heartbeat_requests(node_manager, event_dispatcher, heartbeat_config) + { + eprintln!("Error dispatching heartbeat requests: {error:#}"); + } + + let period_duration = Duration::from_secs(heartbeat_config.period as u64); + + let mut next_heartbeat_at = Instant::now() + period_duration; + + loop { + match rx.recv_timeout(next_heartbeat_at - Instant::now()) { + Ok(events::Event::Shutdown) => break, + Err(RecvTimeoutError::Timeout) => { + if let Err(error) = dispatch_heartbeat_requests( + node_manager, + event_dispatcher, + heartbeat_config, + ) { + eprintln!("Error dispatching heartbeat requests: {error:#}"); + } + + next_heartbeat_at += period_duration; + + // edge case: if next_heartbeat_at is already in the past, skip to now. + if next_heartbeat_at < Instant::now() { + next_heartbeat_at = Instant::now(); + } + } + Err(RecvTimeoutError::Disconnected) => break, + Ok(_) => {} + } + } + }); +} + +pub fn handle_heartbeat_res( + node_manager: &NodeManager, + can_msg_id: CanMessageId, + payload: HeartbeatPayload, +) -> Result<()> { + let timestamp = Utc::now(); + let node_id = can_msg_id.sender_id(); + + let node = node_manager.get_nodes().get(&node_id).with_context(|| { + format!( + "received heartbeat response for node {} but it is not registered", + node_id + ) + })?; + + let mut latest_heartbeat = node + .latest_heartbeat_received + .write() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; + + *latest_heartbeat = Some((timestamp, payload.counter)); + + Ok(()) +} + +fn dispatch_heartbeat_requests( + node_manager: &NodeManager, + event_dispatcher: &events::EventDispatcher, + heartbeat_config: &HeartbeatConfig, +) -> Result<()> { + let now = Utc::now(); + let mut expired_nodes = Vec::new(); + + for node_entry in node_manager.get_nodes().iter() { + let node_id = *node_entry.key(); + let latest_heartbeat_received = node_entry + .latest_heartbeat_received + .read() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))? + .to_owned(); + + let mut latest_heartbeat_sent = node_entry + .latest_heartbeat_sent + .write() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; + let latest_sent = latest_heartbeat_sent.to_owned(); + + let unanswered_heartbeats = + unanswered_heartbeat_count(latest_sent, latest_heartbeat_received); + + if unanswered_heartbeats >= heartbeat_config.max_unanswered { + expired_nodes.push((node_id, unanswered_heartbeats)); + continue; + } + + if !heartbeat_is_due(now, latest_sent, unanswered_heartbeats, heartbeat_config) { + continue; + } + + let next_heartbeat = latest_heartbeat_sent + .map(|(_, counter)| counter + 1) + .unwrap_or(0); + + *latest_heartbeat_sent = Some((now, next_heartbeat)); + + event_dispatcher.dispatch(events::Event::SendCanMessage { + receiver_node_id: node_id, + message: CanMessage::HeartbeatReq { + payload: HeartbeatPayload { + counter: next_heartbeat, + }, + }, + }); + } + + for (node_id, unanswered_heartbeats) in expired_nodes { + if node_manager.get_nodes().remove(&node_id).is_some() { + eprintln!( + "Removing CAN node {node_id}: {unanswered_heartbeats} unanswered heartbeat requests" + ); + } + } + + Ok(()) +} + +fn heartbeat_is_due( + now: DateTime, + latest_sent: Option<(DateTime, u32)>, + unanswered_heartbeats: u32, + heartbeat_config: &HeartbeatConfig, +) -> bool { + let Some((latest_sent_at, _)) = latest_sent else { + return true; + }; + + let elapsed = now + .signed_duration_since(latest_sent_at) + .to_std() + .unwrap_or_default(); + + elapsed >= interval_after_unanswered(heartbeat_config, unanswered_heartbeats) +} + +fn unanswered_heartbeat_count( + latest_sent: Option<(DateTime, u32)>, + latest_received: Option<(DateTime, u32)>, +) -> u32 { + let Some((_, sent_counter)) = latest_sent else { + return 0; + }; + + match latest_received { + // Nodes respond to heartbeat requests with the counter incremented by 1. + Some((_, received_counter)) => sent_counter - (received_counter - 1), + None => sent_counter + 1, + } +} + +pub fn interval_after_unanswered( + heartbeat_config: &HeartbeatConfig, + unanswered_heartbeats: u32, +) -> Duration { + let multiplier = heartbeat_config + .backoff_multiplier + .saturating_pow(unanswered_heartbeats); + let interval = heartbeat_config.period.saturating_mul(multiplier); + + Duration::from_secs(interval.min(heartbeat_config.max_period) as u64) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration as ChronoDuration; + use liquidcan::payloads::NodeInfoResPayload; + + fn test_heartbeat_config() -> HeartbeatConfig { + HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 60, + max_unanswered: 3, + } + } + + fn register_test_node(manager: &NodeManager, node_id: u8) { + let message_id = CanMessageId::new().with_sender_id(node_id); + manager + .handle_node_info_announcement( + message_id, + NodeInfoResPayload { + tel_count: 0, + par_count: 0, + firmware_hash: 0, + liquid_hash: 0, + device_name: "test-node".try_into().unwrap(), + }, + ) + .expect("test node should register"); + } + + fn drain_heartbeat_counter(rx: &mpsc::Receiver) -> u32 { + let event = rx + .recv_timeout(Duration::from_millis(50)) + .expect("heartbeat request should be dispatched"); + + let events::Event::SendCanMessage { message, .. } = event else { + panic!("expected SendCanMessage event"); + }; + + let CanMessage::HeartbeatReq { payload } = message else { + panic!("expected heartbeat request"); + }; + + payload.counter + } + + fn make_latest_heartbeat_sent_old(manager: &NodeManager, node_id: u8, seconds: i64) { + let node = manager + .get_nodes() + .get(&node_id) + .expect("test node should exist"); + let mut latest_sent = node + .latest_heartbeat_sent + .write() + .expect("heartbeat sent lock should not be poisoned"); + let Some((sent_at, _)) = latest_sent.as_mut() else { + panic!("test node should have heartbeat sent state"); + }; + + *sent_at = Utc::now() - ChronoDuration::seconds(seconds); + } + + #[test] + fn heartbeat_config_caps_backoff_interval() { + let config = HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 5, + max_unanswered: 3, + }; + + assert_eq!( + interval_after_unanswered(&config, 0), + Duration::from_secs(1) + ); + assert_eq!( + interval_after_unanswered(&config, 1), + Duration::from_secs(2) + ); + assert_eq!( + interval_after_unanswered(&config, 2), + Duration::from_secs(4) + ); + assert_eq!( + interval_after_unanswered(&config, 3), + Duration::from_secs(5) + ); + } + + #[test] + fn unanswered_heartbeats_backoff_and_eventually_evict_node() { + let dispatcher = events::EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![events::EventKind::SendCanMessage], "test-listener"); + + let manager = NodeManager::new(&dispatcher); + register_test_node(&manager, 5); + + let config = test_heartbeat_config(); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 0); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert!(rx.recv_timeout(Duration::from_millis(20)).is_err()); + + make_latest_heartbeat_sent_old(&manager, 5, 2); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 1); + + make_latest_heartbeat_sent_old(&manager, 5, 4); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 2); + + make_latest_heartbeat_sent_old(&manager, 5, 8); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert!(rx.recv_timeout(Duration::from_millis(20)).is_err()); + assert!(manager.get_nodes().get(&5).is_none()); + } + + #[test] + fn heartbeat_response_resets_unanswered_backoff() { + let dispatcher = events::EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![events::EventKind::SendCanMessage], "test-listener"); + + let manager = NodeManager::new(&dispatcher); + register_test_node(&manager, 5); + + let config = test_heartbeat_config(); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 0); + + let message_id = CanMessageId::new().with_sender_id(5); + handle_heartbeat_res(&manager, message_id, HeartbeatPayload { counter: 1 }).unwrap(); + + make_latest_heartbeat_sent_old(&manager, 5, 1); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 1); + } +} diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs index b4d9876..65205e3 100644 --- a/src/nodes/mod.rs +++ b/src/nodes/mod.rs @@ -1,13 +1,12 @@ //! Contains code for managing the CAN nodes that are connected to FerroFlow, their fields and data types. mod can_node; +mod heartbeat; mod node_manager; +pub use heartbeat::spawn_heartbeat_thread; pub use node_manager::NodeManager; -use std::{ - sync::mpsc::{self, RecvTimeoutError}, - time::{Duration, Instant}, -}; +use std::sync::mpsc; use crate::events; use crate::events::EventKind; @@ -34,42 +33,3 @@ pub fn spawn_can_msg_handler_thread<'a>( } }); } - -pub fn spawn_heartbeat_thread<'a>( - node_manager: &'a NodeManager<'a>, - interval: Duration, - event_dispatcher: &'a events::EventDispatcher, - scope: &'a std::thread::Scope<'a, '_>, -) { - let (tx, rx) = mpsc::channel::(); - let events = vec![EventKind::Shutdown]; - - event_dispatcher.subscribe(tx, events, "Heartbeat thread"); - - scope.spawn(move || { - if let Err(error) = node_manager.dispatch_heartbeat_requests() { - eprintln!("Error dispatching heartbeat requests: {error:#}"); - } - let mut next_heartbeat_at = Instant::now() + interval; - - loop { - match rx.recv_timeout(next_heartbeat_at - Instant::now()) { - Ok(events::Event::Shutdown) => break, - Err(RecvTimeoutError::Timeout) => { - if let Err(error) = node_manager.dispatch_heartbeat_requests() { - eprintln!("Error dispatching heartbeat requests: {error:#}"); - } - - next_heartbeat_at += interval; - - // edge case: if next_heartbeat_at is already in the past, skip to now. - if next_heartbeat_at < Instant::now() { - next_heartbeat_at = Instant::now(); - } - } - Err(RecvTimeoutError::Disconnected) => break, - Ok(_) => {} - } - } - }); -} diff --git a/src/nodes/node_manager.rs b/src/nodes/node_manager.rs index fefaf67..c1b394c 100644 --- a/src/nodes/node_manager.rs +++ b/src/nodes/node_manager.rs @@ -7,11 +7,12 @@ use dashmap::DashMap; use liquidcan::{ CanMessage, CanMessageId, payloads::{ - CanDataValue, FieldGetResPayload, FieldRegistrationPayload, HeartbeatPayload, - NodeInfoResPayload, TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, + CanDataValue, FieldGetResPayload, FieldRegistrationPayload, NodeInfoResPayload, + TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, }, }; +use crate::nodes::heartbeat; use crate::{db::FieldLog, events}; use super::can_node::{CanNode, FieldInfo, RegistrationInfo, TelemetryGroupDefinition}; @@ -63,7 +64,9 @@ impl<'a> NodeManager<'a> { self.handle_telemetry_group_update(message_id, payload) } CanMessage::FieldGetRes { payload } => self.handle_field_get_res(message_id, payload), - CanMessage::HeartbeatRes { payload } => self.handle_heartbeat_res(message_id, payload), + CanMessage::HeartbeatRes { payload } => { + heartbeat::handle_heartbeat_res(self, message_id, payload) + } _ => bail!( "received unsupported CAN message from node {}: {:?}", message_id.sender_id(), @@ -304,58 +307,6 @@ impl<'a> NodeManager<'a> { Ok(()) } - pub fn handle_heartbeat_res( - &self, - can_msg_id: CanMessageId, - payload: HeartbeatPayload, - ) -> Result<()> { - let timestamp = Utc::now(); - let node_id = can_msg_id.sender_id(); - - let node = self.can_nodes.get(&node_id).with_context(|| { - format!( - "received heartbeat response for node {} but it is not registered", - node_id - ) - })?; - - let mut latest_heartbeat = node - .latest_heartbeat_received - .write() - .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; - - *latest_heartbeat = Some((timestamp, payload.counter)); - - Ok(()) - } - - pub fn dispatch_heartbeat_requests(&self) -> Result<()> { - for node_entry in self.can_nodes.iter() { - let node_id = *node_entry.key(); - let mut latest_heartbeat_sent = node_entry - .latest_heartbeat_sent - .write() - .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; - - let next_heartbeat = latest_heartbeat_sent - .map(|(_, counter)| counter + 1) - .unwrap_or(0); - - *latest_heartbeat_sent = Some((Utc::now(), next_heartbeat)); - - self.event_dispatcher - .dispatch(events::Event::SendCanMessage { - receiver_node_id: node_id, - message: CanMessage::HeartbeatReq { - payload: HeartbeatPayload { - counter: next_heartbeat, - }, - }, - }); - } - - Ok(()) - } pub fn get_nodes(&self) -> &DashMap { &self.can_nodes } diff --git a/tests/emulator.rs b/tests/emulator.rs index 334829b..790fdab 100644 --- a/tests/emulator.rs +++ b/tests/emulator.rs @@ -2,7 +2,7 @@ mod common; use crate::common::ShutdownGuard; use chrono::{DateTime, Utc}; -use ferro_flow::config::Config; +use ferro_flow::config::{Config, HeartbeatConfig}; use ferro_flow::{events, nodes, run_with_dependencies}; use liquidcan::payloads::CanDataType; use std::{io::Write, time::Instant}; @@ -183,7 +183,12 @@ fn test_telemetry_group_updates() { fn build_test_config(can_iface: &str) -> Config { Config { can_bus_interfaces: vec![can_iface.to_string()], - heartbeat_period: 1, + heartbeat: HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 10, + max_unanswered: 3, + }, database_url: "".to_string(), } } From 5a98e11a6be809f506e95a6ae70d34e4d902128d Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Sat, 9 May 2026 19:58:17 +0200 Subject: [PATCH 3/7] fix config example --- README.md | 32 +++++++++++++++++++++++--------- config.example.yml | 11 +++++++++++ 2 files changed, 34 insertions(+), 9 deletions(-) create mode 100644 config.example.yml diff --git a/README.md b/README.md index 600bb4f..4aef27a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Ferroflow -Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. + +Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. It interfaces with our custom Engine Control Units ECUs, through our custom [LiquidCAN protocol](https://github.com/SpaceTeam/LiquidCAN/). On the other end, it provides a high-level API for our [ECUI](https://github.com/SpaceTeam/web_ecui_houbolt), which is the user interface for our ECUs. @@ -10,40 +11,47 @@ On the other end, it provides a high-level API for our [ECUI](https://github.com Some integration tests talk to the ECUemulator over SocketCAN. For that you use a virtual CAN interface. ### Test helper: `ferroflow-vcan` + For test environments, this repo provides a small helper binary that can be granted `CAP_NET_ADMIN` once via `setcap`. Integration tests will automatically use it (if it’s available on `PATH`) to create/delete `vcan` interfaces without sudo. Build the helper (feature-gated; not part of normal builds): + ```bash cargo build --release --features test-vcan --bin ferroflow-vcan ``` + Put it on PATH (recommended for tests): + ```bash install -m 0755 ./target/release/ferroflow-vcan ~/.local/bin/ferroflow-vcan sudo setcap cap_net_admin+ep ~/.local/bin/ferroflow-vcan ``` Manual usage: + ```bash ferroflow-vcan up vcan0 ferroflow-vcan down vcan0 ``` - ## Development ### Runtime Configuration -Ferroflow reads `config.yml` on startup. Heartbeat retries use `heartbeat_period` as the base interval and back off per node when heartbeat responses are missing. +Ferroflow reads `config.yml` on startup. Use `config.example.yml` as a starting point. Heartbeat retries use `heartbeat.period` as the base interval and back off per node when heartbeat responses are missing. ```yaml can_bus_interfaces: - can0 -heartbeat_period: 1 -heartbeat_backoff_multiplier: 2 -heartbeat_max_period: 60 -heartbeat_max_unanswered: 5 -database_url: postgres://postgres:yourpassword@localhost:5432/ferroflow + +heartbeat: + period: 1 + backoff_multiplier: 2 + max_period: 60 + max_unanswered: 5 + +database_url: "" ``` With these values, healthy nodes get a 1s heartbeat cadence. After a missed response, retries wait 2s, 4s, 8s, and so on up to 60s. After 5 unanswered heartbeat requests, the node is removed and Ferroflow stops sending heartbeats to it. @@ -53,6 +61,7 @@ With these values, healthy nodes get a 1s heartbeat cadence. After a missed resp The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions **Run all checks:** + ```bash ./ci-rust.sh # or explicitly @@ -60,17 +69,20 @@ The repository includes a CI script (`ci-rust.sh`) that runs all quality checks ``` **Run individual checks:** + ```bash ./ci-rust.sh build # Build the project ./ci-rust.sh test # Run tests ./ci-rust.sh fmt # Check code formatting ./ci-rust.sh clippy # Run clippy linter ``` + You can fix formatting or linter issues by adding the -fix suffix to the command. e.g: `./ci-rust.sh clippy-fix` ### Running `fmt` and `clippy` as a pre-commit hook A pre-commit hook script is available in `.githooks`, which executes the CI script with `fmt` and `clippy` only and without the `fix` option. To setup the hook, configure git to use the `.githooks` directory and make the `pre-commit` file executable. + ```bash git config core.hooksPath .githooks chmod u+x .githooks/pre-commit @@ -82,6 +94,7 @@ chmod u+x .githooks/pre-commit We use TimescaleDB, which is an extension of PostgreSQL optimized for time-series data. You can install it by following the instructions on the [TimescaleDB installation page](https://docs.timescale.com/install/latest/). Using docker is recommended for local development (if you already have another instance of postgres running, use e.g. `-p 5433:5432` instead of `-p 5432:5432`): + ```bash docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=yourpassword timescale/timescaledb:latest-pg18 ``` @@ -92,6 +105,7 @@ The project uses Diesel for database interactions. Diesel CLI is recommended for **Running Diesel CLI** Here's some common commands: + ```bash export DATABASE_URL=postgres://postgres:yourpassword@localhost:5432/ferroflow # Set the database URL diesel setup # Set up the database @@ -113,4 +127,4 @@ Database tests use `testcontainers` to start a temporary TimescaleDB/PostgreSQL There are two examples in the repository: - a unit test in `src/db/mod.rs` -- an integration test in `tests/db_logging.rs` \ No newline at end of file +- an integration test in `tests/db_logging.rs` diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..49aa3a4 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,11 @@ +can_bus_interfaces: + - can0 + +heartbeat: + period: 1 + backoff_multiplier: 2 + max_period: 60 + max_unanswered: 5 + +# Leave empty to disable database logging. +database_url: "" From 5e647e8f1e0652812dd04891184bbd96043a9b92 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Mon, 25 May 2026 15:46:30 +0200 Subject: [PATCH 4/7] ensure there is at least a full period between heartbeat requests Previously, heartbeats were scheduled an amount of periods from a fixed reference point. Therefore, if one heartbeat request was delayed, but the next was not, there could be a smaller time delta between those two requests. This made the calculation of the backoff wrong. --- src/nodes/heartbeat.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/nodes/heartbeat.rs b/src/nodes/heartbeat.rs index 96e24a8..900e2ae 100644 --- a/src/nodes/heartbeat.rs +++ b/src/nodes/heartbeat.rs @@ -47,13 +47,7 @@ pub fn spawn_heartbeat_thread<'a>( ) { eprintln!("Error dispatching heartbeat requests: {error:#}"); } - - next_heartbeat_at += period_duration; - - // edge case: if next_heartbeat_at is already in the past, skip to now. - if next_heartbeat_at < Instant::now() { - next_heartbeat_at = Instant::now(); - } + next_heartbeat_at = Instant::now() + period_duration; } Err(RecvTimeoutError::Disconnected) => break, Ok(_) => {} From e4f3a3de05c960d9ab8dd19cab8e1b5f000ed28d Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Mon, 25 May 2026 16:18:35 +0200 Subject: [PATCH 5/7] add test for node disconnect using the ECUEmulator --- tests/emulator.rs | 166 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 161 insertions(+), 5 deletions(-) diff --git a/tests/emulator.rs b/tests/emulator.rs index 790fdab..f879da0 100644 --- a/tests/emulator.rs +++ b/tests/emulator.rs @@ -4,11 +4,13 @@ use crate::common::ShutdownGuard; use chrono::{DateTime, Utc}; use ferro_flow::config::{Config, HeartbeatConfig}; use ferro_flow::{events, nodes, run_with_dependencies}; -use liquidcan::payloads::CanDataType; -use std::{io::Write, time::Instant}; +use liquidcan::{CanMessage, payloads::CanDataType}; +use std::{io::Write, sync::mpsc, time::Duration, time::Instant}; use testcontainers::core::logs::LogFrame; use testcontainers::{GenericImage, ImageExt, runners::SyncRunner}; +const TEST_NODE_ID: u8 = 5; + #[test] fn test_node_registration() { let vcan_iface = common::unique_vcan_iface(); @@ -180,19 +182,173 @@ fn test_telemetry_group_updates() { }); } +#[test] +fn test_node_is_removed_when_heartbeat_responses_stop() { + let vcan_iface = common::unique_vcan_iface(); + let _vcan = common::ensure_vcan(&vcan_iface); + + let emulator_config = ecuemulator_test_config_toml(&vcan_iface); + + let event_dispatcher = events::EventDispatcher::new(); + let (tx, heartbeat_rx) = mpsc::channel(); + event_dispatcher.subscribe( + tx, + vec![events::EventKind::SendCanMessage], + "heartbeat-test-listener", + ); + + let node_manager = nodes::NodeManager::new(&event_dispatcher); + let config = build_test_config_with_heartbeat( + &vcan_iface, + HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 4, + max_unanswered: 3, + }, + ); + + std::thread::scope(|s| { + let _shutdown = ShutdownGuard { + event_dispatcher: &event_dispatcher, + }; + s.spawn(|| { + run_with_dependencies(&event_dispatcher, &node_manager, config) + .expect("application should start with test config"); + }); + let ecuemulator_container = start_ecuemulator_container_with_config(&emulator_config); + + wait_for_registered_node(&node_manager, Duration::from_secs(10)); + drain_send_can_events(&heartbeat_rx); + + ecuemulator_container + .stop_with_timeout(Some(0)) + .expect("ecuemulator container should stop"); + drain_send_can_events(&heartbeat_rx); + + // first heartbeat at ~1s, then backoff to 2s, then 4s, then node removal after 3 unanswered heartbeats + let (first_heartbeat_at, first_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(1100)); + let (second_heartbeat_at, second_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(2100)); + let (third_heartbeat_at, third_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(4100)); + + assert_eq!( + second_counter, + first_counter + 1, + "second unanswered heartbeat should increment the request counter" + ); + assert_eq!( + third_counter, + first_counter + 2, + "third unanswered heartbeat should increment the request counter" + ); + assert!( + second_heartbeat_at.duration_since(first_heartbeat_at) < Duration::from_millis(2100) + && second_heartbeat_at.duration_since(first_heartbeat_at) + > Duration::from_millis(1900), + "second unanswered heartbeat should be sent after ~2s" + ); + + assert!( + third_heartbeat_at.duration_since(second_heartbeat_at) < Duration::from_millis(4100) + && third_heartbeat_at.duration_since(second_heartbeat_at) + > Duration::from_millis(3900), + "third unanswered heartbeat should be sent after backoff multiplier is applied to the heartbeat period" + ); + + // Node is removed after one regular period, as the number of unanswered heartbeats has reached the max_unanswered threshold + wait_for_node_removal(&node_manager, TEST_NODE_ID, Duration::from_secs(1)); + }); +} + fn build_test_config(can_iface: &str) -> Config { - Config { - can_bus_interfaces: vec![can_iface.to_string()], - heartbeat: HeartbeatConfig { + build_test_config_with_heartbeat( + can_iface, + HeartbeatConfig { period: 1, backoff_multiplier: 2, max_period: 10, max_unanswered: 3, }, + ) +} + +fn build_test_config_with_heartbeat(can_iface: &str, heartbeat: HeartbeatConfig) -> Config { + Config { + can_bus_interfaces: vec![can_iface.to_string()], + heartbeat, database_url: "".to_string(), } } +fn wait_for_registered_node(node_manager: &nodes::NodeManager<'_>, timeout: Duration) { + let start_time = Instant::now(); + + loop { + if node_manager.get_nodes().len() == 1 { + return; + } + if start_time.elapsed() > timeout { + panic!("ECUEmulator did not register within timeout"); + } + std::thread::sleep(Duration::from_millis(20)); + } +} + +fn wait_for_node_removal(node_manager: &nodes::NodeManager<'_>, node_id: u8, expected: Duration) { + let start_time = Instant::now(); + let max_diff = Duration::from_millis(100); + + loop { + if node_manager.get_nodes().get(&node_id).is_none() { + if start_time.elapsed() < expected - max_diff { + panic!("node {node_id} was removed too early after unanswered heartbeats"); + } + return; + } + if start_time.elapsed() > expected + max_diff { + panic!( + "node {node_id} was not removed within expected time after unanswered heartbeats" + ); + } + std::thread::sleep(Duration::from_millis(20)); + } +} + +fn drain_send_can_events(rx: &mpsc::Receiver) { + while rx.try_recv().is_ok() {} +} + +fn wait_for_heartbeat_request( + rx: &mpsc::Receiver, + timeout: Duration, +) -> (Instant, u32) { + let start_time = Instant::now(); + + loop { + let elapsed = start_time.elapsed(); + if elapsed >= timeout { + panic!("heartbeat request was not dispatched within timeout"); + } + + match rx.recv_timeout(timeout - elapsed) { + Ok(events::Event::SendCanMessage { + receiver_node_id: TEST_NODE_ID, + message: CanMessage::HeartbeatReq { payload }, + }) => return (Instant::now(), payload.counter), + Ok(_) => {} + Err(mpsc::RecvTimeoutError::Timeout) => { + panic!("heartbeat request was not dispatched within timeout"); + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + panic!("heartbeat test listener disconnected"); + } + } + } +} + fn ecuemulator_test_config_toml(can_iface: &str) -> String { format!( r#"node_id = 5 From 20001dbc9b6a72b4c7e87a8c4abd972e00820f5f Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Mon, 25 May 2026 16:25:47 +0200 Subject: [PATCH 6/7] restore README changes --- README.md | 36 +++--------------------------------- 1 file changed, 3 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 4aef27a..144bc8d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ # Ferroflow - -Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. +Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. It interfaces with our custom Engine Control Units ECUs, through our custom [LiquidCAN protocol](https://github.com/SpaceTeam/LiquidCAN/). On the other end, it provides a high-level API for our [ECUI](https://github.com/SpaceTeam/web_ecui_houbolt), which is the user interface for our ECUs. @@ -11,57 +10,33 @@ On the other end, it provides a high-level API for our [ECUI](https://github.com Some integration tests talk to the ECUemulator over SocketCAN. For that you use a virtual CAN interface. ### Test helper: `ferroflow-vcan` - For test environments, this repo provides a small helper binary that can be granted `CAP_NET_ADMIN` once via `setcap`. Integration tests will automatically use it (if it’s available on `PATH`) to create/delete `vcan` interfaces without sudo. Build the helper (feature-gated; not part of normal builds): - ```bash cargo build --release --features test-vcan --bin ferroflow-vcan ``` - Put it on PATH (recommended for tests): - ```bash install -m 0755 ./target/release/ferroflow-vcan ~/.local/bin/ferroflow-vcan sudo setcap cap_net_admin+ep ~/.local/bin/ferroflow-vcan ``` Manual usage: - ```bash ferroflow-vcan up vcan0 ferroflow-vcan down vcan0 ``` -## Development - -### Runtime Configuration - -Ferroflow reads `config.yml` on startup. Use `config.example.yml` as a starting point. Heartbeat retries use `heartbeat.period` as the base interval and back off per node when heartbeat responses are missing. - -```yaml -can_bus_interfaces: - - can0 -heartbeat: - period: 1 - backoff_multiplier: 2 - max_period: 60 - max_unanswered: 5 - -database_url: "" -``` - -With these values, healthy nodes get a 1s heartbeat cadence. After a missed response, retries wait 2s, 4s, 8s, and so on up to 60s. After 5 unanswered heartbeat requests, the node is removed and Ferroflow stops sending heartbeats to it. +## Development ### Running CI Checks The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions **Run all checks:** - ```bash ./ci-rust.sh # or explicitly @@ -69,20 +44,17 @@ The repository includes a CI script (`ci-rust.sh`) that runs all quality checks ``` **Run individual checks:** - ```bash ./ci-rust.sh build # Build the project ./ci-rust.sh test # Run tests ./ci-rust.sh fmt # Check code formatting ./ci-rust.sh clippy # Run clippy linter ``` - You can fix formatting or linter issues by adding the -fix suffix to the command. e.g: `./ci-rust.sh clippy-fix` ### Running `fmt` and `clippy` as a pre-commit hook A pre-commit hook script is available in `.githooks`, which executes the CI script with `fmt` and `clippy` only and without the `fix` option. To setup the hook, configure git to use the `.githooks` directory and make the `pre-commit` file executable. - ```bash git config core.hooksPath .githooks chmod u+x .githooks/pre-commit @@ -94,7 +66,6 @@ chmod u+x .githooks/pre-commit We use TimescaleDB, which is an extension of PostgreSQL optimized for time-series data. You can install it by following the instructions on the [TimescaleDB installation page](https://docs.timescale.com/install/latest/). Using docker is recommended for local development (if you already have another instance of postgres running, use e.g. `-p 5433:5432` instead of `-p 5432:5432`): - ```bash docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=yourpassword timescale/timescaledb:latest-pg18 ``` @@ -105,7 +76,6 @@ The project uses Diesel for database interactions. Diesel CLI is recommended for **Running Diesel CLI** Here's some common commands: - ```bash export DATABASE_URL=postgres://postgres:yourpassword@localhost:5432/ferroflow # Set the database URL diesel setup # Set up the database @@ -127,4 +97,4 @@ Database tests use `testcontainers` to start a temporary TimescaleDB/PostgreSQL There are two examples in the repository: - a unit test in `src/db/mod.rs` -- an integration test in `tests/db_logging.rs` +- an integration test in `tests/db_logging.rs` \ No newline at end of file From 86e998da1e70b76dbc27b331b50f9aa505a5cda1 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Mon, 25 May 2026 16:29:08 +0200 Subject: [PATCH 7/7] call out config example in readme --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 144bc8d..8b6c767 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,10 @@ ferroflow-vcan down vcan0 ## Development +### Runtime Configuration + +Ferroflow reads `config.yml` on startup. An example can be found at `config.example.yml`. + ### Running CI Checks The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions