Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 101 additions & 70 deletions server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ async fn replicated_loglet() -> googletest::Result<()> {
#[test_log::test(restate_core::test)]
async fn cluster_chaos_test() -> googletest::Result<()> {
let num_nodes = 3;
let chaos_duration = Duration::from_secs(20);
let expected_recovery_interval = Duration::from_secs(10);
let chaos_duration = Duration::from_secs(60);
let expected_recovery_interval = Duration::from_secs(30);
let request_timeout = Duration::from_secs(5);
let mut base_config = Configuration::new_unix_sockets();
base_config.metadata_server.set_raft_options(RaftOptions {
raft_election_tick: NonZeroUsize::new(5).expect("5 to be non zero"),
Expand All @@ -110,6 +111,13 @@ async fn cluster_chaos_test() -> googletest::Result<()> {
base_config.common.default_num_partitions = 4;
base_config.bifrost.default_provider = ProviderKind::Replicated;
base_config.common.log_filter = "warn,restate=debug".to_owned();
// Shorten gossip Suspect -> Alive transition for the test only. The 5s
// production default makes recovery after a single node restart take
// ~10s even on a healthy machine because every other node has to wait
// 5s before promoting the restarted node back to `Alive`, which in
// turn delays cluster-controller leader election and partition
// placement.
base_config.common.gossip.gossip_suspect_interval = Duration::from_secs(1).into();

let nodes = NodeSpec::new_test_nodes(
base_config,
Expand Down Expand Up @@ -142,10 +150,22 @@ async fn cluster_chaos_test() -> googletest::Result<()> {
.into_test_result()?;

let ingress_url = "http://localhost/Counter/1/add";
let ingress_addresses: Vec<_> = cluster
let ingress_clients: Vec<(reqwest::Client, std::path::PathBuf)> = cluster
.nodes
.iter()
.flat_map(|node| node.ingress_address().clone())
.map(|addr| {
let uds = match addr.into_address().expect("ingress address must be set") {
PeerNetAddress::Uds(uds) => uds,
_ => panic!("ingress address must be a unix domain socket"),
};
let client = reqwest::Client::builder()
.unix_socket(uds.clone())
.timeout(request_timeout)
.build()
.expect("reqwest client should build");
(client, uds)
})
.collect();
let admin_address = cluster
.nodes
Expand Down Expand Up @@ -208,91 +228,102 @@ async fn cluster_chaos_test() -> googletest::Result<()> {

let (success_tx, success_rx) = watch::channel(0);

let chaos_handle = TaskCenter::spawn_unmanaged(TaskKind::TestRunner, "chaos", async move {
let cancellation = cancellation_token();

async fn restart_nodes(
cluster: &mut StartedCluster,
mut success_rx: watch::Receiver<i32>,
expected_recovery_interval: Duration,
) -> Result<Infallible, anyhow::Error> {
loop {
let node = cluster
.nodes
.choose_mut(&mut rand::rng())
.expect("at least one node being present");

node.restart(TerminationSignal::random()).await?;

success_rx.mark_unchanged();
cluster.wait_healthy(Duration::from_secs(10)).await?;
tokio::time::timeout(expected_recovery_interval, success_rx.changed())
.await
.map_err(|_| {
anyhow!("Cluster did not recover in time to accept new invocations")
})??;
let mut chaos_handle =
TaskCenter::spawn_unmanaged(TaskKind::TestRunner, "chaos", async move {
let cancellation = cancellation_token();

async fn restart_nodes(
cluster: &mut StartedCluster,
mut success_rx: watch::Receiver<i32>,
expected_recovery_interval: Duration,
) -> Result<Infallible, anyhow::Error> {
loop {
let node = cluster
.nodes
.choose_mut(&mut rand::rng())
.expect("at least one node being present");

node.restart(TerminationSignal::random()).await?;

success_rx.mark_unchanged();
cluster.wait_healthy(Duration::from_secs(10)).await?;
tokio::time::timeout(expected_recovery_interval, success_rx.changed())
.await
.map_err(|_| {
anyhow!("Cluster did not recover in time to accept new invocations")
})??;
}
}
}

if let Some(result) = cancellation
.run_until_cancelled(restart_nodes(
&mut cluster,
success_rx,
expected_recovery_interval,
))
.await
{
result?;
}
if let Some(result) = cancellation
.run_until_cancelled(restart_nodes(
&mut cluster,
success_rx,
expected_recovery_interval,
))
.await
{
result?;
}

Ok::<_, anyhow::Error>(cluster)
})?;
Ok::<_, anyhow::Error>(cluster)
})?;

info!("Starting the cluster chaos test");

let mut successes = 0;
let mut failures = 0;
let mut rng = rand::rng();
while start_chaos.elapsed() < chaos_duration {
let ingress = ingress_addresses
let (ingress_client, ingress_uds) = ingress_clients
.choose(&mut rng)
.cloned()
.expect("at least one address to be present");

let ingress_uds = ingress.into_address().unwrap();
let PeerNetAddress::Uds(ingress_uds) = ingress_uds else {
panic!("ingress address must be a unix domain socket");
};
let ingress_client = reqwest::Client::builder()
.unix_socket(ingress_uds.clone())
.build()
.expect("reqwest client should build");
.expect("at least one ingress client to be present");

debug!("Send request {successes} to {}", ingress_uds.display());
match ingress_client
let request = ingress_client
.post(ingress_url)
.header(CONTENT_TYPE, "application/json")
// Idempotency-key is `successes` (the current count) on purpose:
// if a request fails ambiguously (e.g. the server processed it
// but the response was lost), the next iteration retries with
// the same key, the server returns the cached result, and
// `assert_eq!(successes, counter_value)` below stays in sync.
.header("idempotency-key", successes.to_string())
.body("1")
.send()
.await
{
Ok(response) => {
if response.status().is_success() {
successes += 1;
success_tx.send_replace(successes);
let counter_value: i32 =
serde_json::from_slice(response.bytes().await?.as_ref())?;
assert_eq!(successes, counter_value);
} else {
failures += 1;
}
.send();

tokio::select! {
biased;
// If the chaos task exits before we cancel it, it must be an
// `Err` (the restart loop is `loop { ... }` returning
// `Infallible` on the success branch). Surface that error
// immediately rather than spinning out the rest of
// `chaos_duration` against a torn-down cluster and then
// panicking on the much less informative `successful writes: 0`.
chaos_result = &mut chaos_handle => {
chaos_result?.into_test_result()?;
unreachable!("chaos task exited Ok before cancellation");
}
Err(err) => {
failures += 1;
// request failed, let's retry
debug!(%err, "failed sending request {successes} to {}", ingress_uds.display());
tokio::time::sleep(Duration::from_millis(200)).await;
response = request => {
match response {
Ok(response) => {
if response.status().is_success() {
successes += 1;
success_tx.send_replace(successes);
let counter_value: i32 =
serde_json::from_slice(response.bytes().await?.as_ref())?;
assert_eq!(successes, counter_value);
} else {
failures += 1;
}
}
Err(err) => {
failures += 1;
// request failed, let's retry
debug!(%err, "failed sending request {successes} to {}", ingress_uds.display());
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}
}
}
Expand Down
Loading