From 00796433cd5e965f5ce64451cb58d44283d5dc03 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Thu, 26 Mar 2026 12:29:40 -0700 Subject: [PATCH] Fix race condition in Espress input tests The combination of `wait_for_state_and_rewards` and `assert_reward_balances` used in many tests was problematic, because the former returned a specific Espresso block number without locking the Espresso state to that number. Now `wait_for_state_and_rewards` returns a lock on the Espresso state, to ensure that `assert_reward_balances` is called with the Espresso state at a known block number. --- src/input/espresso.rs | 57 +++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/src/input/espresso.rs b/src/input/espresso.rs index efdcfa5..8521c75 100644 --- a/src/input/espresso.rs +++ b/src/input/espresso.rs @@ -860,7 +860,7 @@ mod test { use vbs::version::StaticVersion; use alloy::transports::http::reqwest::StatusCode; - use async_lock::Semaphore; + use async_lock::{RwLockReadGuard, Semaphore}; use testing::MemoryStorage; use tide_disco::Error; use tokio::task::spawn; @@ -1315,12 +1315,16 @@ mod test { .await; } - async fn wait_for_state_and_rewards( - state_lock: &Arc>>, + /// Wait for the given Espresso state to reach `target_epoch`, and then for the Espresso reward + /// API to catch up to the same block height. + /// + /// Returns the Espresso state locked at the block height reached. + async fn wait_for_state_and_rewards<'a>( + state_lock: &'a RwLock>, http_client: &HttpClient>, target_epoch: u64, epoch_height: u64, - ) -> u64 { + ) -> RwLockReadGuard<'a, State> { let min_blocks_to_wait = target_epoch * epoch_height; tracing::info!( "Waiting for blocks up to epoch {target_epoch} (block {min_blocks_to_wait})" @@ -1347,6 +1351,7 @@ mod test { .await .expect("Failed to get active node set"); let latest_espresso_block = active_node_snapshot.espresso_block.block; + tracing::info!("Reached latest block {latest_espresso_block}",); loop { sleep(Duration::from_secs(1)).await; @@ -1358,7 +1363,7 @@ mod test { { Ok(block) if block >= latest_espresso_block => { tracing::info!("Reward-state is at block {block}"); - break latest_espresso_block; + break final_state; } Ok(_) => {} Err(e) => { @@ -1369,12 +1374,10 @@ mod test { } async fn assert_reward_balances( - state_lock: &Arc>>, + state: &State, http_client: &HttpClient>, - latest_espresso_block: u64, ) { - let state = state_lock.read().await; - + let latest_espresso_block = state.latest_espresso_block().unwrap(); let active_node_snapshot = state .active_node_set() .await @@ -1414,6 +1417,7 @@ mod test { ); } } + #[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_wallet_block_rewards_e2e_test() { let espresso_port = portpicker::pick_unused_port().expect("No free ports"); @@ -1476,14 +1480,11 @@ mod test { let state_lock = Arc::new(RwLock::new(espresso_state)); let update_task = spawn(State::update_task(state_lock.clone())); - let latest_block = + let state = wait_for_state_and_rewards(&state_lock, &http_client, current_epoch, epoch_height) .await; - - tracing::info!("Reached latest block {latest_block}"); update_task.abort(); - - assert_reward_balances(&state_lock, &http_client, latest_block).await; + assert_reward_balances(&state, &http_client).await; } // Test at current_epoch + 3 @@ -1506,18 +1507,15 @@ mod test { let state_lock = Arc::new(RwLock::new(espresso_state)); let update_task = spawn(State::update_task(state_lock.clone())); - let latest_block = wait_for_state_and_rewards( + let state = wait_for_state_and_rewards( &state_lock, &http_client, current_epoch + 3, epoch_height, ) .await; - - tracing::info!("Reached latest block {latest_block}"); update_task.abort(); - - assert_reward_balances(&state_lock, &http_client, latest_block).await; + assert_reward_balances(&state, &http_client).await; } // Test at current_epoch + 3 again @@ -1540,18 +1538,15 @@ mod test { let state_lock = Arc::new(RwLock::new(espresso_state)); let update_task = spawn(State::update_task(state_lock.clone())); - let latest_block = wait_for_state_and_rewards( + let state = wait_for_state_and_rewards( &state_lock, &http_client, current_epoch + 3, epoch_height, ) .await; - - tracing::info!("Reached latest block {latest_block}"); update_task.abort(); - - assert_reward_balances(&state_lock, &http_client, latest_block).await; + assert_reward_balances(&state, &http_client).await; } // Test at current_epoch + 4 @@ -1574,18 +1569,15 @@ mod test { let state_lock = Arc::new(RwLock::new(espresso_state)); let update_task = spawn(State::update_task(state_lock.clone())); - let latest_block = wait_for_state_and_rewards( + let state = wait_for_state_and_rewards( &state_lock, &http_client, current_epoch + 4, epoch_height, ) .await; - - tracing::info!("Reached latest block {latest_block}"); update_task.abort(); - - assert_reward_balances(&state_lock, &http_client, latest_block).await; + assert_reward_balances(&state, &http_client).await; } // Test at current_epoch + 4 with new storage @@ -1609,18 +1601,15 @@ mod test { let state_lock = Arc::new(RwLock::new(espresso_state)); let update_task = spawn(State::update_task(state_lock.clone())); - let latest_block = wait_for_state_and_rewards( + let state = wait_for_state_and_rewards( &state_lock, &http_client, current_epoch + 4, epoch_height, ) .await; - - tracing::info!("Reached latest block {latest_block}"); update_task.abort(); - - assert_reward_balances(&state_lock, &http_client, latest_block).await; + assert_reward_balances(&state, &http_client).await; } drop(network);