Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 23 additions & 34 deletions src/input/espresso.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ mod test {
use vbs::version::StaticVersion;

use alloy::transports::http::reqwest::StatusCode;
use async_lock::Semaphore;
use async_lock::{RwLockReadGuard, Semaphore};
use testing::MemoryStorage;
use tide_disco::Error;
use tokio::task::spawn;
Expand Down Expand Up @@ -1315,12 +1315,16 @@ mod test {
.await;
}

async fn wait_for_state_and_rewards(
state_lock: &Arc<RwLock<State<Persistence, QueryServiceClient>>>,
/// Wait for the given Espresso state to reach `target_epoch`, and then for the Espresso reward
/// API to catch up to the same block height.
///
/// Returns the Espresso state locked at the block height reached.
async fn wait_for_state_and_rewards<'a>(
state_lock: &'a RwLock<State<Persistence, QueryServiceClient>>,
http_client: &HttpClient<crate::Error, StaticVersion<0, 1>>,
target_epoch: u64,
epoch_height: u64,
) -> u64 {
) -> RwLockReadGuard<'a, State<Persistence, QueryServiceClient>> {
let min_blocks_to_wait = target_epoch * epoch_height;
tracing::info!(
"Waiting for blocks up to epoch {target_epoch} (block {min_blocks_to_wait})"
Expand All @@ -1347,6 +1351,7 @@ mod test {
.await
.expect("Failed to get active node set");
let latest_espresso_block = active_node_snapshot.espresso_block.block;
tracing::info!("Reached latest block {latest_espresso_block}",);

loop {
sleep(Duration::from_secs(1)).await;
Expand All @@ -1358,7 +1363,7 @@ mod test {
{
Ok(block) if block >= latest_espresso_block => {
tracing::info!("Reward-state is at block {block}");
break latest_espresso_block;
break final_state;
}
Ok(_) => {}
Err(e) => {
Expand All @@ -1369,12 +1374,10 @@ mod test {
}

async fn assert_reward_balances(
state_lock: &Arc<RwLock<State<Persistence, QueryServiceClient>>>,
state: &State<Persistence, QueryServiceClient>,
http_client: &HttpClient<crate::Error, StaticVersion<0, 1>>,
latest_espresso_block: u64,
) {
let state = state_lock.read().await;

let latest_espresso_block = state.latest_espresso_block().unwrap();
let active_node_snapshot = state
.active_node_set()
.await
Expand Down Expand Up @@ -1414,6 +1417,7 @@ mod test {
);
}
}

#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn test_wallet_block_rewards_e2e_test() {
let espresso_port = portpicker::pick_unused_port().expect("No free ports");
Expand Down Expand Up @@ -1476,14 +1480,11 @@ mod test {
let state_lock = Arc::new(RwLock::new(espresso_state));
let update_task = spawn(State::update_task(state_lock.clone()));

let latest_block =
let state =
wait_for_state_and_rewards(&state_lock, &http_client, current_epoch, epoch_height)
.await;

tracing::info!("Reached latest block {latest_block}");
update_task.abort();

assert_reward_balances(&state_lock, &http_client, latest_block).await;
assert_reward_balances(&state, &http_client).await;
}

// Test at current_epoch + 3
Expand All @@ -1506,18 +1507,15 @@ mod test {
let state_lock = Arc::new(RwLock::new(espresso_state));
let update_task = spawn(State::update_task(state_lock.clone()));

let latest_block = wait_for_state_and_rewards(
let state = wait_for_state_and_rewards(
&state_lock,
&http_client,
current_epoch + 3,
epoch_height,
)
.await;

tracing::info!("Reached latest block {latest_block}");
update_task.abort();

assert_reward_balances(&state_lock, &http_client, latest_block).await;
assert_reward_balances(&state, &http_client).await;
}

// Test at current_epoch + 3 again
Expand All @@ -1540,18 +1538,15 @@ mod test {
let state_lock = Arc::new(RwLock::new(espresso_state));
let update_task = spawn(State::update_task(state_lock.clone()));

let latest_block = wait_for_state_and_rewards(
let state = wait_for_state_and_rewards(
&state_lock,
&http_client,
current_epoch + 3,
epoch_height,
)
.await;

tracing::info!("Reached latest block {latest_block}");
update_task.abort();

assert_reward_balances(&state_lock, &http_client, latest_block).await;
assert_reward_balances(&state, &http_client).await;
}

// Test at current_epoch + 4
Expand All @@ -1574,18 +1569,15 @@ mod test {
let state_lock = Arc::new(RwLock::new(espresso_state));
let update_task = spawn(State::update_task(state_lock.clone()));

let latest_block = wait_for_state_and_rewards(
let state = wait_for_state_and_rewards(
&state_lock,
&http_client,
current_epoch + 4,
epoch_height,
)
.await;

tracing::info!("Reached latest block {latest_block}");
update_task.abort();

assert_reward_balances(&state_lock, &http_client, latest_block).await;
assert_reward_balances(&state, &http_client).await;
}

// Test at current_epoch + 4 with new storage
Expand All @@ -1609,18 +1601,15 @@ mod test {
let state_lock = Arc::new(RwLock::new(espresso_state));
let update_task = spawn(State::update_task(state_lock.clone()));

let latest_block = wait_for_state_and_rewards(
let state = wait_for_state_and_rewards(
&state_lock,
&http_client,
current_epoch + 4,
epoch_height,
)
.await;

tracing::info!("Reached latest block {latest_block}");
update_task.abort();

assert_reward_balances(&state_lock, &http_client, latest_block).await;
assert_reward_balances(&state, &http_client).await;
}

drop(network);
Expand Down
Loading