From d52a76aadf927faa9370299f85e6d5ddd3d1b7c5 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 3 Jun 2026 18:29:33 +0200 Subject: [PATCH] Reduce flakiness of cluster_chaos_test (#4878) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CI failure is most plausibly explained by recovery after a single node restart taking longer than the previous 10s `expected_recovery_interval` — gossip's `Suspect -> Alive` transitions (5s default) plus cluster-controller leader thrashing can push it past that bound. This is a hypothesis based on the logs; we have not reproduced it locally. The changes here aim to make the test resilient to this and a couple of other timing footguns: 1. Override `gossip_suspect_interval` to 1s in the test config and raise the budgets to 60s / 30s so a normal full-stack recovery has comfortable headroom. 2. Pre-build one reqwest client per ingress with a 5s timeout. The client was previously rebuilt every iteration with no timeout, so a request hung against a node mid-shutdown could stall the loop. 3. Race the request against `&mut chaos_handle` in a biased `tokio::select!`. Previously, if the chaos task errored out, the main loop kept spinning for the rest of `chaos_duration` and panicked on `successful writes: 0`, hiding the actual error message. Now the chaos error surfaces immediately. No production behavior changes. This fixes #4878. Co-Authored-By: Claude Opus 4.7 (1M context) --- server/tests/cluster.rs | 171 ++++++++++++++++++++++++---------------- 1 file changed, 101 insertions(+), 70 deletions(-) diff --git a/server/tests/cluster.rs b/server/tests/cluster.rs index 50524a7ced..c784bf1a63 100644 --- a/server/tests/cluster.rs +++ b/server/tests/cluster.rs @@ -99,8 +99,9 @@ async fn replicated_loglet() -> googletest::Result<()> { #[test_log::test(restate_core::test)] async fn cluster_chaos_test() -> googletest::Result<()> { let num_nodes = 3; - let chaos_duration = Duration::from_secs(20); - let expected_recovery_interval = Duration::from_secs(10); + let chaos_duration = Duration::from_secs(60); + let expected_recovery_interval = Duration::from_secs(30); + let request_timeout = Duration::from_secs(5); let mut base_config = Configuration::new_unix_sockets(); base_config.metadata_server.set_raft_options(RaftOptions { raft_election_tick: NonZeroUsize::new(5).expect("5 to be non zero"), @@ -110,6 +111,13 @@ async fn cluster_chaos_test() -> googletest::Result<()> { base_config.common.default_num_partitions = 4; base_config.bifrost.default_provider = ProviderKind::Replicated; base_config.common.log_filter = "warn,restate=debug".to_owned(); + // Shorten gossip Suspect -> Alive transition for the test only. The 5s + // production default makes recovery after a single node restart take + // ~10s even on a healthy machine because every other node has to wait + // 5s before promoting the restarted node back to `Alive`, which in + // turn delays cluster-controller leader election and partition + // placement. + base_config.common.gossip.gossip_suspect_interval = Duration::from_secs(1).into(); let nodes = NodeSpec::new_test_nodes( base_config, @@ -142,10 +150,22 @@ async fn cluster_chaos_test() -> googletest::Result<()> { .into_test_result()?; let ingress_url = "http://localhost/Counter/1/add"; - let ingress_addresses: Vec<_> = cluster + let ingress_clients: Vec<(reqwest::Client, std::path::PathBuf)> = cluster .nodes .iter() .flat_map(|node| node.ingress_address().clone()) + .map(|addr| { + let uds = match addr.into_address().expect("ingress address must be set") { + PeerNetAddress::Uds(uds) => uds, + _ => panic!("ingress address must be a unix domain socket"), + }; + let client = reqwest::Client::builder() + .unix_socket(uds.clone()) + .timeout(request_timeout) + .build() + .expect("reqwest client should build"); + (client, uds) + }) .collect(); let admin_address = cluster .nodes @@ -208,45 +228,46 @@ async fn cluster_chaos_test() -> googletest::Result<()> { let (success_tx, success_rx) = watch::channel(0); - let chaos_handle = TaskCenter::spawn_unmanaged(TaskKind::TestRunner, "chaos", async move { - let cancellation = cancellation_token(); - - async fn restart_nodes( - cluster: &mut StartedCluster, - mut success_rx: watch::Receiver, - expected_recovery_interval: Duration, - ) -> Result { - loop { - let node = cluster - .nodes - .choose_mut(&mut rand::rng()) - .expect("at least one node being present"); - - node.restart(TerminationSignal::random()).await?; - - success_rx.mark_unchanged(); - cluster.wait_healthy(Duration::from_secs(10)).await?; - tokio::time::timeout(expected_recovery_interval, success_rx.changed()) - .await - .map_err(|_| { - anyhow!("Cluster did not recover in time to accept new invocations") - })??; + let mut chaos_handle = + TaskCenter::spawn_unmanaged(TaskKind::TestRunner, "chaos", async move { + let cancellation = cancellation_token(); + + async fn restart_nodes( + cluster: &mut StartedCluster, + mut success_rx: watch::Receiver, + expected_recovery_interval: Duration, + ) -> Result { + loop { + let node = cluster + .nodes + .choose_mut(&mut rand::rng()) + .expect("at least one node being present"); + + node.restart(TerminationSignal::random()).await?; + + success_rx.mark_unchanged(); + cluster.wait_healthy(Duration::from_secs(10)).await?; + tokio::time::timeout(expected_recovery_interval, success_rx.changed()) + .await + .map_err(|_| { + anyhow!("Cluster did not recover in time to accept new invocations") + })??; + } } - } - if let Some(result) = cancellation - .run_until_cancelled(restart_nodes( - &mut cluster, - success_rx, - expected_recovery_interval, - )) - .await - { - result?; - } + if let Some(result) = cancellation + .run_until_cancelled(restart_nodes( + &mut cluster, + success_rx, + expected_recovery_interval, + )) + .await + { + result?; + } - Ok::<_, anyhow::Error>(cluster) - })?; + Ok::<_, anyhow::Error>(cluster) + })?; info!("Starting the cluster chaos test"); @@ -254,45 +275,55 @@ async fn cluster_chaos_test() -> googletest::Result<()> { let mut failures = 0; let mut rng = rand::rng(); while start_chaos.elapsed() < chaos_duration { - let ingress = ingress_addresses + let (ingress_client, ingress_uds) = ingress_clients .choose(&mut rng) - .cloned() - .expect("at least one address to be present"); - - let ingress_uds = ingress.into_address().unwrap(); - let PeerNetAddress::Uds(ingress_uds) = ingress_uds else { - panic!("ingress address must be a unix domain socket"); - }; - let ingress_client = reqwest::Client::builder() - .unix_socket(ingress_uds.clone()) - .build() - .expect("reqwest client should build"); + .expect("at least one ingress client to be present"); debug!("Send request {successes} to {}", ingress_uds.display()); - match ingress_client + let request = ingress_client .post(ingress_url) .header(CONTENT_TYPE, "application/json") + // Idempotency-key is `successes` (the current count) on purpose: + // if a request fails ambiguously (e.g. the server processed it + // but the response was lost), the next iteration retries with + // the same key, the server returns the cached result, and + // `assert_eq!(successes, counter_value)` below stays in sync. .header("idempotency-key", successes.to_string()) .body("1") - .send() - .await - { - Ok(response) => { - if response.status().is_success() { - successes += 1; - success_tx.send_replace(successes); - let counter_value: i32 = - serde_json::from_slice(response.bytes().await?.as_ref())?; - assert_eq!(successes, counter_value); - } else { - failures += 1; - } + .send(); + + tokio::select! { + biased; + // If the chaos task exits before we cancel it, it must be an + // `Err` (the restart loop is `loop { ... }` returning + // `Infallible` on the success branch). Surface that error + // immediately rather than spinning out the rest of + // `chaos_duration` against a torn-down cluster and then + // panicking on the much less informative `successful writes: 0`. + chaos_result = &mut chaos_handle => { + chaos_result?.into_test_result()?; + unreachable!("chaos task exited Ok before cancellation"); } - Err(err) => { - failures += 1; - // request failed, let's retry - debug!(%err, "failed sending request {successes} to {}", ingress_uds.display()); - tokio::time::sleep(Duration::from_millis(200)).await; + response = request => { + match response { + Ok(response) => { + if response.status().is_success() { + successes += 1; + success_tx.send_replace(successes); + let counter_value: i32 = + serde_json::from_slice(response.bytes().await?.as_ref())?; + assert_eq!(successes, counter_value); + } else { + failures += 1; + } + } + Err(err) => { + failures += 1; + // request failed, let's retry + debug!(%err, "failed sending request {successes} to {}", ingress_uds.display()); + tokio::time::sleep(Duration::from_millis(200)).await; + } + } } } }