diff --git a/README.md b/README.md index 144bc8d..8b6c767 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,10 @@ ferroflow-vcan down vcan0 ## Development +### Runtime Configuration + +Ferroflow reads `config.yml` on startup. An example can be found at `config.example.yml`. + ### Running CI Checks The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions diff --git a/config.example.yml b/config.example.yml new file mode 100644 index 0000000..49aa3a4 --- /dev/null +++ b/config.example.yml @@ -0,0 +1,11 @@ +can_bus_interfaces: + - can0 + +heartbeat: + period: 1 + backoff_multiplier: 2 + max_period: 60 + max_unanswered: 5 + +# Leave empty to disable database logging. +database_url: "" diff --git a/src/config/mod.rs b/src/config/mod.rs index 3c04e19..ed69aad 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -4,10 +4,25 @@ use anyhow::{Context, Result}; use config as config_builder; use serde::{Deserialize, Serialize}; +const DEFAULT_HEARTBEAT_BACKOFF_MULTIPLIER: u32 = 2; +const DEFAULT_HEARTBEAT_MAX_PERIOD: u32 = 60; +const DEFAULT_HEARTBEAT_MAX_UNANSWERED: u32 = 5; + +#[derive(Deserialize, Serialize, Debug)] +pub struct HeartbeatConfig { + pub period: u32, + #[serde(default = "default_heartbeat_backoff_multiplier")] + pub backoff_multiplier: u32, + #[serde(default = "default_heartbeat_max_period")] + pub max_period: u32, + #[serde(default = "default_heartbeat_max_unanswered")] + pub max_unanswered: u32, +} + #[derive(Deserialize, Serialize, Debug)] pub struct Config { pub can_bus_interfaces: Vec, - pub heartbeat_period: u64, + pub heartbeat: HeartbeatConfig, pub database_url: String, } @@ -20,3 +35,15 @@ pub fn load_config(path: &str) -> Result { .try_deserialize() .with_context(|| format!("Failed to deserialize config from {}", path)) } + +fn default_heartbeat_backoff_multiplier() -> u32 { + DEFAULT_HEARTBEAT_BACKOFF_MULTIPLIER +} + +fn default_heartbeat_max_period() -> u32 { + DEFAULT_HEARTBEAT_MAX_PERIOD +} + +fn default_heartbeat_max_unanswered() -> u32 { + DEFAULT_HEARTBEAT_MAX_UNANSWERED +} diff --git a/src/lib.rs b/src/lib.rs index f75ccd3..55f53c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,12 +38,7 @@ pub fn run_with_dependencies( println!("Starting node registration"); nodes::spawn_can_msg_handler_thread(node_manager, event_dispatcher, scope); - nodes::spawn_heartbeat_thread( - node_manager, - std::time::Duration::from_secs(config.heartbeat_period), - event_dispatcher, - scope, - ); + nodes::spawn_heartbeat_thread(node_manager, &config.heartbeat, event_dispatcher, scope); node_manager.start_node_registration(); diff --git a/src/nodes/heartbeat.rs b/src/nodes/heartbeat.rs new file mode 100644 index 0000000..900e2ae --- /dev/null +++ b/src/nodes/heartbeat.rs @@ -0,0 +1,332 @@ +use std::{ + sync::mpsc::{self, RecvTimeoutError}, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result, anyhow}; +use chrono::{DateTime, Utc}; +use liquidcan::{CanMessage, CanMessageId, payloads::HeartbeatPayload}; + +use crate::{ + config::HeartbeatConfig, + events::{self, EventKind}, +}; + +use super::node_manager::NodeManager; + +pub fn spawn_heartbeat_thread<'a>( + node_manager: &'a NodeManager<'a>, + heartbeat_config: &'a HeartbeatConfig, + event_dispatcher: &'a events::EventDispatcher, + scope: &'a std::thread::Scope<'a, '_>, +) { + let (tx, rx) = mpsc::channel::(); + let events = vec![EventKind::Shutdown]; + + event_dispatcher.subscribe(tx, events, "Heartbeat thread"); + + scope.spawn(move || { + if let Err(error) = + dispatch_heartbeat_requests(node_manager, event_dispatcher, heartbeat_config) + { + eprintln!("Error dispatching heartbeat requests: {error:#}"); + } + + let period_duration = Duration::from_secs(heartbeat_config.period as u64); + + let mut next_heartbeat_at = Instant::now() + period_duration; + + loop { + match rx.recv_timeout(next_heartbeat_at - Instant::now()) { + Ok(events::Event::Shutdown) => break, + Err(RecvTimeoutError::Timeout) => { + if let Err(error) = dispatch_heartbeat_requests( + node_manager, + event_dispatcher, + heartbeat_config, + ) { + eprintln!("Error dispatching heartbeat requests: {error:#}"); + } + next_heartbeat_at = Instant::now() + period_duration; + } + Err(RecvTimeoutError::Disconnected) => break, + Ok(_) => {} + } + } + }); +} + +pub fn handle_heartbeat_res( + node_manager: &NodeManager, + can_msg_id: CanMessageId, + payload: HeartbeatPayload, +) -> Result<()> { + let timestamp = Utc::now(); + let node_id = can_msg_id.sender_id(); + + let node = node_manager.get_nodes().get(&node_id).with_context(|| { + format!( + "received heartbeat response for node {} but it is not registered", + node_id + ) + })?; + + let mut latest_heartbeat = node + .latest_heartbeat_received + .write() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; + + *latest_heartbeat = Some((timestamp, payload.counter)); + + Ok(()) +} + +fn dispatch_heartbeat_requests( + node_manager: &NodeManager, + event_dispatcher: &events::EventDispatcher, + heartbeat_config: &HeartbeatConfig, +) -> Result<()> { + let now = Utc::now(); + let mut expired_nodes = Vec::new(); + + for node_entry in node_manager.get_nodes().iter() { + let node_id = *node_entry.key(); + let latest_heartbeat_received = node_entry + .latest_heartbeat_received + .read() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))? + .to_owned(); + + let mut latest_heartbeat_sent = node_entry + .latest_heartbeat_sent + .write() + .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; + let latest_sent = latest_heartbeat_sent.to_owned(); + + let unanswered_heartbeats = + unanswered_heartbeat_count(latest_sent, latest_heartbeat_received); + + if unanswered_heartbeats >= heartbeat_config.max_unanswered { + expired_nodes.push((node_id, unanswered_heartbeats)); + continue; + } + + if !heartbeat_is_due(now, latest_sent, unanswered_heartbeats, heartbeat_config) { + continue; + } + + let next_heartbeat = latest_heartbeat_sent + .map(|(_, counter)| counter + 1) + .unwrap_or(0); + + *latest_heartbeat_sent = Some((now, next_heartbeat)); + + event_dispatcher.dispatch(events::Event::SendCanMessage { + receiver_node_id: node_id, + message: CanMessage::HeartbeatReq { + payload: HeartbeatPayload { + counter: next_heartbeat, + }, + }, + }); + } + + for (node_id, unanswered_heartbeats) in expired_nodes { + if node_manager.get_nodes().remove(&node_id).is_some() { + eprintln!( + "Removing CAN node {node_id}: {unanswered_heartbeats} unanswered heartbeat requests" + ); + } + } + + Ok(()) +} + +fn heartbeat_is_due( + now: DateTime, + latest_sent: Option<(DateTime, u32)>, + unanswered_heartbeats: u32, + heartbeat_config: &HeartbeatConfig, +) -> bool { + let Some((latest_sent_at, _)) = latest_sent else { + return true; + }; + + let elapsed = now + .signed_duration_since(latest_sent_at) + .to_std() + .unwrap_or_default(); + + elapsed >= interval_after_unanswered(heartbeat_config, unanswered_heartbeats) +} + +fn unanswered_heartbeat_count( + latest_sent: Option<(DateTime, u32)>, + latest_received: Option<(DateTime, u32)>, +) -> u32 { + let Some((_, sent_counter)) = latest_sent else { + return 0; + }; + + match latest_received { + // Nodes respond to heartbeat requests with the counter incremented by 1. + Some((_, received_counter)) => sent_counter - (received_counter - 1), + None => sent_counter + 1, + } +} + +pub fn interval_after_unanswered( + heartbeat_config: &HeartbeatConfig, + unanswered_heartbeats: u32, +) -> Duration { + let multiplier = heartbeat_config + .backoff_multiplier + .saturating_pow(unanswered_heartbeats); + let interval = heartbeat_config.period.saturating_mul(multiplier); + + Duration::from_secs(interval.min(heartbeat_config.max_period) as u64) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration as ChronoDuration; + use liquidcan::payloads::NodeInfoResPayload; + + fn test_heartbeat_config() -> HeartbeatConfig { + HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 60, + max_unanswered: 3, + } + } + + fn register_test_node(manager: &NodeManager, node_id: u8) { + let message_id = CanMessageId::new().with_sender_id(node_id); + manager + .handle_node_info_announcement( + message_id, + NodeInfoResPayload { + tel_count: 0, + par_count: 0, + firmware_hash: 0, + liquid_hash: 0, + device_name: "test-node".try_into().unwrap(), + }, + ) + .expect("test node should register"); + } + + fn drain_heartbeat_counter(rx: &mpsc::Receiver) -> u32 { + let event = rx + .recv_timeout(Duration::from_millis(50)) + .expect("heartbeat request should be dispatched"); + + let events::Event::SendCanMessage { message, .. } = event else { + panic!("expected SendCanMessage event"); + }; + + let CanMessage::HeartbeatReq { payload } = message else { + panic!("expected heartbeat request"); + }; + + payload.counter + } + + fn make_latest_heartbeat_sent_old(manager: &NodeManager, node_id: u8, seconds: i64) { + let node = manager + .get_nodes() + .get(&node_id) + .expect("test node should exist"); + let mut latest_sent = node + .latest_heartbeat_sent + .write() + .expect("heartbeat sent lock should not be poisoned"); + let Some((sent_at, _)) = latest_sent.as_mut() else { + panic!("test node should have heartbeat sent state"); + }; + + *sent_at = Utc::now() - ChronoDuration::seconds(seconds); + } + + #[test] + fn heartbeat_config_caps_backoff_interval() { + let config = HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 5, + max_unanswered: 3, + }; + + assert_eq!( + interval_after_unanswered(&config, 0), + Duration::from_secs(1) + ); + assert_eq!( + interval_after_unanswered(&config, 1), + Duration::from_secs(2) + ); + assert_eq!( + interval_after_unanswered(&config, 2), + Duration::from_secs(4) + ); + assert_eq!( + interval_after_unanswered(&config, 3), + Duration::from_secs(5) + ); + } + + #[test] + fn unanswered_heartbeats_backoff_and_eventually_evict_node() { + let dispatcher = events::EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![events::EventKind::SendCanMessage], "test-listener"); + + let manager = NodeManager::new(&dispatcher); + register_test_node(&manager, 5); + + let config = test_heartbeat_config(); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 0); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert!(rx.recv_timeout(Duration::from_millis(20)).is_err()); + + make_latest_heartbeat_sent_old(&manager, 5, 2); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 1); + + make_latest_heartbeat_sent_old(&manager, 5, 4); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 2); + + make_latest_heartbeat_sent_old(&manager, 5, 8); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert!(rx.recv_timeout(Duration::from_millis(20)).is_err()); + assert!(manager.get_nodes().get(&5).is_none()); + } + + #[test] + fn heartbeat_response_resets_unanswered_backoff() { + let dispatcher = events::EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![events::EventKind::SendCanMessage], "test-listener"); + + let manager = NodeManager::new(&dispatcher); + register_test_node(&manager, 5); + + let config = test_heartbeat_config(); + + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 0); + + let message_id = CanMessageId::new().with_sender_id(5); + handle_heartbeat_res(&manager, message_id, HeartbeatPayload { counter: 1 }).unwrap(); + + make_latest_heartbeat_sent_old(&manager, 5, 1); + dispatch_heartbeat_requests(&manager, &dispatcher, &config).unwrap(); + assert_eq!(drain_heartbeat_counter(&rx), 1); + } +} diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs index b4d9876..65205e3 100644 --- a/src/nodes/mod.rs +++ b/src/nodes/mod.rs @@ -1,13 +1,12 @@ //! Contains code for managing the CAN nodes that are connected to FerroFlow, their fields and data types. mod can_node; +mod heartbeat; mod node_manager; +pub use heartbeat::spawn_heartbeat_thread; pub use node_manager::NodeManager; -use std::{ - sync::mpsc::{self, RecvTimeoutError}, - time::{Duration, Instant}, -}; +use std::sync::mpsc; use crate::events; use crate::events::EventKind; @@ -34,42 +33,3 @@ pub fn spawn_can_msg_handler_thread<'a>( } }); } - -pub fn spawn_heartbeat_thread<'a>( - node_manager: &'a NodeManager<'a>, - interval: Duration, - event_dispatcher: &'a events::EventDispatcher, - scope: &'a std::thread::Scope<'a, '_>, -) { - let (tx, rx) = mpsc::channel::(); - let events = vec![EventKind::Shutdown]; - - event_dispatcher.subscribe(tx, events, "Heartbeat thread"); - - scope.spawn(move || { - if let Err(error) = node_manager.dispatch_heartbeat_requests() { - eprintln!("Error dispatching heartbeat requests: {error:#}"); - } - let mut next_heartbeat_at = Instant::now() + interval; - - loop { - match rx.recv_timeout(next_heartbeat_at - Instant::now()) { - Ok(events::Event::Shutdown) => break, - Err(RecvTimeoutError::Timeout) => { - if let Err(error) = node_manager.dispatch_heartbeat_requests() { - eprintln!("Error dispatching heartbeat requests: {error:#}"); - } - - next_heartbeat_at += interval; - - // edge case: if next_heartbeat_at is already in the past, skip to now. - if next_heartbeat_at < Instant::now() { - next_heartbeat_at = Instant::now(); - } - } - Err(RecvTimeoutError::Disconnected) => break, - Ok(_) => {} - } - } - }); -} diff --git a/src/nodes/node_manager.rs b/src/nodes/node_manager.rs index fefaf67..c1b394c 100644 --- a/src/nodes/node_manager.rs +++ b/src/nodes/node_manager.rs @@ -7,11 +7,12 @@ use dashmap::DashMap; use liquidcan::{ CanMessage, CanMessageId, payloads::{ - CanDataValue, FieldGetResPayload, FieldRegistrationPayload, HeartbeatPayload, - NodeInfoResPayload, TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, + CanDataValue, FieldGetResPayload, FieldRegistrationPayload, NodeInfoResPayload, + TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, }, }; +use crate::nodes::heartbeat; use crate::{db::FieldLog, events}; use super::can_node::{CanNode, FieldInfo, RegistrationInfo, TelemetryGroupDefinition}; @@ -63,7 +64,9 @@ impl<'a> NodeManager<'a> { self.handle_telemetry_group_update(message_id, payload) } CanMessage::FieldGetRes { payload } => self.handle_field_get_res(message_id, payload), - CanMessage::HeartbeatRes { payload } => self.handle_heartbeat_res(message_id, payload), + CanMessage::HeartbeatRes { payload } => { + heartbeat::handle_heartbeat_res(self, message_id, payload) + } _ => bail!( "received unsupported CAN message from node {}: {:?}", message_id.sender_id(), @@ -304,58 +307,6 @@ impl<'a> NodeManager<'a> { Ok(()) } - pub fn handle_heartbeat_res( - &self, - can_msg_id: CanMessageId, - payload: HeartbeatPayload, - ) -> Result<()> { - let timestamp = Utc::now(); - let node_id = can_msg_id.sender_id(); - - let node = self.can_nodes.get(&node_id).with_context(|| { - format!( - "received heartbeat response for node {} but it is not registered", - node_id - ) - })?; - - let mut latest_heartbeat = node - .latest_heartbeat_received - .write() - .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; - - *latest_heartbeat = Some((timestamp, payload.counter)); - - Ok(()) - } - - pub fn dispatch_heartbeat_requests(&self) -> Result<()> { - for node_entry in self.can_nodes.iter() { - let node_id = *node_entry.key(); - let mut latest_heartbeat_sent = node_entry - .latest_heartbeat_sent - .write() - .map_err(|error| anyhow!("RwLock was poisoned: {}", error))?; - - let next_heartbeat = latest_heartbeat_sent - .map(|(_, counter)| counter + 1) - .unwrap_or(0); - - *latest_heartbeat_sent = Some((Utc::now(), next_heartbeat)); - - self.event_dispatcher - .dispatch(events::Event::SendCanMessage { - receiver_node_id: node_id, - message: CanMessage::HeartbeatReq { - payload: HeartbeatPayload { - counter: next_heartbeat, - }, - }, - }); - } - - Ok(()) - } pub fn get_nodes(&self) -> &DashMap { &self.can_nodes } diff --git a/tests/emulator.rs b/tests/emulator.rs index 334829b..f879da0 100644 --- a/tests/emulator.rs +++ b/tests/emulator.rs @@ -2,13 +2,15 @@ mod common; use crate::common::ShutdownGuard; use chrono::{DateTime, Utc}; -use ferro_flow::config::Config; +use ferro_flow::config::{Config, HeartbeatConfig}; use ferro_flow::{events, nodes, run_with_dependencies}; -use liquidcan::payloads::CanDataType; -use std::{io::Write, time::Instant}; +use liquidcan::{CanMessage, payloads::CanDataType}; +use std::{io::Write, sync::mpsc, time::Duration, time::Instant}; use testcontainers::core::logs::LogFrame; use testcontainers::{GenericImage, ImageExt, runners::SyncRunner}; +const TEST_NODE_ID: u8 = 5; + #[test] fn test_node_registration() { let vcan_iface = common::unique_vcan_iface(); @@ -180,14 +182,173 @@ fn test_telemetry_group_updates() { }); } +#[test] +fn test_node_is_removed_when_heartbeat_responses_stop() { + let vcan_iface = common::unique_vcan_iface(); + let _vcan = common::ensure_vcan(&vcan_iface); + + let emulator_config = ecuemulator_test_config_toml(&vcan_iface); + + let event_dispatcher = events::EventDispatcher::new(); + let (tx, heartbeat_rx) = mpsc::channel(); + event_dispatcher.subscribe( + tx, + vec![events::EventKind::SendCanMessage], + "heartbeat-test-listener", + ); + + let node_manager = nodes::NodeManager::new(&event_dispatcher); + let config = build_test_config_with_heartbeat( + &vcan_iface, + HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 4, + max_unanswered: 3, + }, + ); + + std::thread::scope(|s| { + let _shutdown = ShutdownGuard { + event_dispatcher: &event_dispatcher, + }; + s.spawn(|| { + run_with_dependencies(&event_dispatcher, &node_manager, config) + .expect("application should start with test config"); + }); + let ecuemulator_container = start_ecuemulator_container_with_config(&emulator_config); + + wait_for_registered_node(&node_manager, Duration::from_secs(10)); + drain_send_can_events(&heartbeat_rx); + + ecuemulator_container + .stop_with_timeout(Some(0)) + .expect("ecuemulator container should stop"); + drain_send_can_events(&heartbeat_rx); + + // first heartbeat at ~1s, then backoff to 2s, then 4s, then node removal after 3 unanswered heartbeats + let (first_heartbeat_at, first_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(1100)); + let (second_heartbeat_at, second_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(2100)); + let (third_heartbeat_at, third_counter) = + wait_for_heartbeat_request(&heartbeat_rx, Duration::from_secs(4100)); + + assert_eq!( + second_counter, + first_counter + 1, + "second unanswered heartbeat should increment the request counter" + ); + assert_eq!( + third_counter, + first_counter + 2, + "third unanswered heartbeat should increment the request counter" + ); + assert!( + second_heartbeat_at.duration_since(first_heartbeat_at) < Duration::from_millis(2100) + && second_heartbeat_at.duration_since(first_heartbeat_at) + > Duration::from_millis(1900), + "second unanswered heartbeat should be sent after ~2s" + ); + + assert!( + third_heartbeat_at.duration_since(second_heartbeat_at) < Duration::from_millis(4100) + && third_heartbeat_at.duration_since(second_heartbeat_at) + > Duration::from_millis(3900), + "third unanswered heartbeat should be sent after backoff multiplier is applied to the heartbeat period" + ); + + // Node is removed after one regular period, as the number of unanswered heartbeats has reached the max_unanswered threshold + wait_for_node_removal(&node_manager, TEST_NODE_ID, Duration::from_secs(1)); + }); +} + fn build_test_config(can_iface: &str) -> Config { + build_test_config_with_heartbeat( + can_iface, + HeartbeatConfig { + period: 1, + backoff_multiplier: 2, + max_period: 10, + max_unanswered: 3, + }, + ) +} + +fn build_test_config_with_heartbeat(can_iface: &str, heartbeat: HeartbeatConfig) -> Config { Config { can_bus_interfaces: vec![can_iface.to_string()], - heartbeat_period: 1, + heartbeat, database_url: "".to_string(), } } +fn wait_for_registered_node(node_manager: &nodes::NodeManager<'_>, timeout: Duration) { + let start_time = Instant::now(); + + loop { + if node_manager.get_nodes().len() == 1 { + return; + } + if start_time.elapsed() > timeout { + panic!("ECUEmulator did not register within timeout"); + } + std::thread::sleep(Duration::from_millis(20)); + } +} + +fn wait_for_node_removal(node_manager: &nodes::NodeManager<'_>, node_id: u8, expected: Duration) { + let start_time = Instant::now(); + let max_diff = Duration::from_millis(100); + + loop { + if node_manager.get_nodes().get(&node_id).is_none() { + if start_time.elapsed() < expected - max_diff { + panic!("node {node_id} was removed too early after unanswered heartbeats"); + } + return; + } + if start_time.elapsed() > expected + max_diff { + panic!( + "node {node_id} was not removed within expected time after unanswered heartbeats" + ); + } + std::thread::sleep(Duration::from_millis(20)); + } +} + +fn drain_send_can_events(rx: &mpsc::Receiver) { + while rx.try_recv().is_ok() {} +} + +fn wait_for_heartbeat_request( + rx: &mpsc::Receiver, + timeout: Duration, +) -> (Instant, u32) { + let start_time = Instant::now(); + + loop { + let elapsed = start_time.elapsed(); + if elapsed >= timeout { + panic!("heartbeat request was not dispatched within timeout"); + } + + match rx.recv_timeout(timeout - elapsed) { + Ok(events::Event::SendCanMessage { + receiver_node_id: TEST_NODE_ID, + message: CanMessage::HeartbeatReq { payload }, + }) => return (Instant::now(), payload.counter), + Ok(_) => {} + Err(mpsc::RecvTimeoutError::Timeout) => { + panic!("heartbeat request was not dispatched within timeout"); + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + panic!("heartbeat test listener disconnected"); + } + } + } +} + fn ecuemulator_test_config_toml(can_iface: &str) -> String { format!( r#"node_id = 5