diff --git a/Cargo.lock b/Cargo.lock index dbab1d454..9233a3a82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4543,7 +4543,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.117", + "syn 1.0.109", ] [[package]] @@ -16574,6 +16574,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "backon", "base64 0.22.1", "basic_system 0.1.0 (git+https://github.com/matter-labs/zksync-os?tag=v0.2.10-interface-v0.1.3)", "basic_system 0.1.0 (git+https://github.com/matter-labs/zksync-os?tag=v0.3.1-interface-v0.1.3)", diff --git a/integration-tests/src/lib.rs b/integration-tests/src/lib.rs index 8f799b08c..bb85c14db 100644 --- a/integration-tests/src/lib.rs +++ b/integration-tests/src/lib.rs @@ -50,7 +50,6 @@ pub mod assert_traits; pub mod config; pub mod contracts; pub mod l1_helpers; -pub mod multi_node; mod node_log; mod prover_tester; pub mod provider; @@ -595,41 +594,6 @@ impl Tester { .await } - pub(crate) async fn launch_node_with_ports( - l1: AnvilL1, - enable_prover: bool, - config_overrides: Option, - chain_layout: ChainLayout<'static>, - ports: Ports, - wait_for_initial_deposit: bool, - ) -> anyhow::Result { - let tempdir = Arc::new(tempfile::tempdir()?); - let mut config = build_node_config(&l1, chain_layout, false).await?; - if enable_prover { - config.prover_api_config.fake_fri_provers.enabled = false; - config.prover_api_config.fake_snark_provers.enabled = false; - } - if !prover_input_generation_enabled() { - disable_prover_input_generation(&mut config); - } - Self::bind_runtime_config(&l1, tempdir.as_ref(), &mut config, &ports); - if let Some(config_overrides) = config_overrides { - config_overrides(&mut config); - } - let bitcoin_da_mock = maybe_start_bitcoin_da_mock(&mut config).await; - Self::launch_node_inner( - l1, - config, - tempdir, - chain_layout, - None, - wait_for_initial_deposit, - Some(ports), - bitcoin_da_mock, - ) - .await - } - fn bind_runtime_config(l1: &AnvilL1, tempdir: &TempDir, config: &mut Config, ports: &Ports) { config.general_config.rocks_db_path = tempdir.path().join("rocksdb"); config.l1_provider_config.rpc_url = l1.address.clone(); diff --git a/integration-tests/src/multi_node.rs b/integration-tests/src/multi_node.rs deleted file mode 100644 index 3abbf6ab0..000000000 --- a/integration-tests/src/multi_node.rs +++ /dev/null @@ -1,658 +0,0 @@ -use alloy::primitives::U128; -use futures::future::try_join_all; -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; -use std::time::Duration; -use tokio::time::Instant; -use zksync_os_status_server::StatusResponse; - -/// Each respawn during a wait-helper poll buys this much extra time, since the freshly -/// respawned node needs to finish booting and the cluster needs another election cycle. -const RESPAWN_GRACE: Duration = Duration::from_secs(10); - -use crate::{ - AnvilL1, ChainLayout, Config, NodeRole, PROTOCOL_VERSION, Ports, StoppedTester, Tester, - provider::ZksyncTestingProvider, -}; - -const TEST_HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100); -const TEST_ELECTION_TIMEOUT_MIN: Duration = Duration::from_secs(2); -const TEST_ELECTION_TIMEOUT_MAX: Duration = Duration::from_secs(4); - -#[derive(Debug)] -enum NodeSlot { - Running(Box), - Suspended(Box), -} - -impl NodeSlot { - fn running(&self) -> Option<&Tester> { - match self { - Self::Running(tester) => Some(tester), - Self::Suspended(_) => None, - } - } -} - -/// Represents the consensus state of a Raft cluster based on node status responses -#[derive(Debug)] -pub struct ClusterState { - nodes: Vec<(usize, Result)>, -} - -impl ClusterState { - /// Collects status from the selected node indices in parallel. - async fn collect_indices( - nodes: &[NodeSlot], - node_indices: impl IntoIterator, - ) -> Self { - let node_states = - futures::future::join_all(node_indices.into_iter().map(|idx| async move { - let status = match nodes.get(idx) { - Some(NodeSlot::Running(node)) => node.status().await.map_err(|e| e.to_string()), - Some(NodeSlot::Suspended(_)) => Err("node is suspended".to_string()), - None => Err("node index out of range".to_string()), - }; - (idx, status) - })) - .await; - Self { nodes: node_states } - } - - /// Returns true if all nodes are healthy and returned successful status - pub fn all_healthy(&self) -> bool { - self.nodes - .iter() - .all(|(_, result)| matches!(result, Ok(status) if status.healthy)) - } - - /// Returns indices of nodes that report themselves as leaders - pub fn leader_indices(&self) -> Vec { - self.nodes - .iter() - .filter_map(|(idx, result)| { - result.as_ref().ok().and_then(|status| { - status - .consensus - .raft - .as_ref() - .filter(|r| r.is_leader) - .map(|_| *idx) - }) - }) - .collect() - } - - /// Returns true if all healthy nodes report having a current leader - pub fn all_have_leader(&self) -> bool { - self.nodes - .iter() - .filter_map(|(_, result)| result.as_ref().ok()) - .all(|status| { - status - .consensus - .raft - .as_ref() - .and_then(|r| r.current_leader.as_ref()) - .is_some() - }) - } - - /// Returns the agreed-upon leader ID if all nodes agree, None otherwise - pub fn agreed_leader(&self) -> Option<&str> { - let leaders: Vec<_> = self - .nodes - .iter() - .filter_map(|(_, result)| result.as_ref().ok()) - .filter_map(|status| status.consensus.raft.as_ref()?.current_leader.as_deref()) - .collect(); - - leaders - .first() - .copied() - .filter(|first| leaders.iter().all(|leader| leader == first)) - } - - /// Returns true if the cluster has successfully formed: - /// - All nodes healthy - /// - Exactly one leader - /// - All nodes have a leader - /// - All nodes agree on the same leader - /// - The leader's node_id matches what others believe - pub fn is_formed(&self) -> bool { - let leader_indices = self.leader_indices(); - if leader_indices.len() != 1 { - return false; - } - - let agreed = self.agreed_leader(); - let leader_node_id = self - .status_for_index(leader_indices[0]) - .and_then(|s| s.consensus.raft.as_ref()) - .map(|r| r.node_id.as_str()); - - self.all_healthy() && self.all_have_leader() && agreed.is_some() && agreed == leader_node_id - } - - /// Returns a summary string for logging cluster formation progress - pub fn summary(&self) -> String { - let leader_indices = self.leader_indices(); - let agreed = self.agreed_leader(); - let leader_node_id = leader_indices - .first() - .and_then(|&idx| self.status_for_index(idx)) - .and_then(|s| s.consensus.raft.as_ref()) - .map(|r| r.node_id.as_str()); - - format!( - "healthy={} leaders={} all_have_leader={} agreed_leader={:?} leader_node_id={:?}", - self.all_healthy(), - leader_indices.len(), - self.all_have_leader(), - agreed, - leader_node_id - ) - } - - /// Returns a detailed explanation of why cluster formation failed - pub fn failure_reason(&self) -> String { - let mut reasons = Vec::new(); - - if !self.all_healthy() { - let unhealthy: Vec<_> = self - .nodes - .iter() - .filter_map(|(idx, result)| match result { - Ok(status) if !status.healthy => Some(format!("node_{}: healthy=false", idx)), - Err(err) => Some(format!("node_{}: error={:?}", idx, err)), - _ => None, - }) - .collect(); - reasons.push(format!("Unhealthy nodes: [{}]", unhealthy.join(", "))); - } - - let leader_indices = self.leader_indices(); - if leader_indices.len() != 1 { - let leader_info: Vec<_> = leader_indices - .iter() - .filter_map(|&idx| { - self.status_for_index(idx) - .and_then(|status| status.consensus.raft.as_ref()) - .map(|r| format!("node_{} (id={})", idx, r.node_id)) - }) - .collect(); - reasons.push(format!( - "Expected 1 leader, found {}: [{}]", - leader_indices.len(), - leader_info.join(", ") - )); - } - - if !self.all_have_leader() { - let without_leader: Vec<_> = self - .nodes - .iter() - .filter_map(|(idx, result)| { - result.as_ref().ok().and_then(|status| { - if status.consensus.raft.as_ref()?.current_leader.is_none() { - Some(format!("node_{}", idx)) - } else { - None - } - }) - }) - .collect(); - reasons.push(format!( - "Nodes without leader: [{}]", - without_leader.join(", ") - )); - } - - if let Some(agreed) = self.agreed_leader() { - let leader_node_id = leader_indices - .first() - .and_then(|&idx| self.status_for_index(idx)) - .and_then(|s| s.consensus.raft.as_ref()) - .map(|r| r.node_id.as_str()); - - if leader_node_id != Some(agreed) { - reasons.push(format!( - "Leader mismatch: cluster agrees on '{}' but leader reports '{:?}'", - agreed, leader_node_id - )); - } - } else { - let leader_views: Vec<_> = self - .nodes - .iter() - .filter_map(|(idx, result)| { - result - .as_ref() - .ok() - .and_then(|s| s.consensus.raft.as_ref()?.current_leader.as_ref()) - .map(|leader| format!("node_{}: {}", idx, leader)) - }) - .collect(); - if !leader_views.is_empty() { - reasons.push(format!( - "Nodes disagree on leader: [{}]", - leader_views.join(", ") - )); - } - } - - if reasons.is_empty() { - "Unknown reason".to_string() - } else { - reasons.join("; ") - } - } - - fn status_for_index(&self, index: usize) -> Option<&StatusResponse> { - self.nodes - .iter() - .find(|(idx, _)| *idx == index) - .and_then(|(_, result)| result.as_ref().ok()) - } -} - -/// Test harness for multi-node consensus testing -pub struct MultiNodeTester { - nodes: Vec, - batcher_node_index: usize, -} - -impl MultiNodeTester { - pub fn builder() -> MultiNodeTesterBuilder { - MultiNodeTesterBuilder::default() - } - - pub fn node(&self, index: usize) -> &Tester { - self.nodes[index] - .running() - .unwrap_or_else(|| panic!("node {index} is suspended")) - } - - pub fn is_node_suspended(&self, index: usize) -> bool { - matches!(self.nodes[index], NodeSlot::Suspended(_)) - } - - pub fn batcher_node_index(&self) -> usize { - self.batcher_node_index - } - - pub fn len(&self) -> usize { - self.nodes.len() - } - - pub fn is_empty(&self) -> bool { - self.nodes.is_empty() - } - - fn all_node_indices(&self) -> Vec { - (0..self.nodes.len()).collect() - } - - fn active_node_indices(&self) -> Vec { - self.nodes - .iter() - .enumerate() - .filter_map(|(idx, node)| node.running().is_some().then_some(idx)) - .collect() - } - - /// Shuts down all active nodes and drops suspended ones. - pub async fn shutdown_all(self) -> anyhow::Result<()> { - for node in self.nodes { - match node { - NodeSlot::Running(node) => node.shutdown().await?, - NodeSlot::Suspended(node) => node.shutdown().await?, - } - } - Ok(()) - } - - /// Permanently shut down a node and remove it from the cluster. - pub async fn shutdown_node(&mut self, index: usize) -> anyhow::Result<()> { - tracing::info!("shutting down node {index}..."); - match self.nodes.remove(index) { - NodeSlot::Running(node) => node.shutdown().await, - NodeSlot::Suspended(node) => node.shutdown().await, - } - } - - /// Suspend a node (shut down its process, retain its state). The slot remains in `nodes` - /// as a suspended [`StoppedTester`] that can be restarted later with [`Self::start_node`]. - pub async fn suspend_node(&mut self, index: usize) -> anyhow::Result<()> { - tracing::info!("suspending node {index}..."); - let tester = self.nodes.remove(index); - let stopped = match tester { - NodeSlot::Running(tester) => tester.stop().await?, - NodeSlot::Suspended(_) => panic!("node {index} is already suspended"), - }; - self.nodes - .insert(index, NodeSlot::Suspended(Box::new(stopped))); - Ok(()) - } - - /// Restart a previously suspended node. - pub async fn start_node(&mut self, index: usize) -> anyhow::Result<()> { - tracing::info!("starting suspended node {index}..."); - let suspended = self.nodes.remove(index); - let started = match suspended { - NodeSlot::Suspended(tester) => tester.start().await?, - NodeSlot::Running(_) => panic!("node {index} is not suspended"), - }; - self.nodes - .insert(index, NodeSlot::Running(Box::new(started))); - Ok(()) - } - - /// Respawn any running node whose runtime reported a critical-task panic, reusing its - /// on-disk state and ports. Mirrors what a production orchestrator does on a `reth_tasks` - /// critical-task panic (notably the deliberate panic in - /// `lib/raft/src/leadership_monitor.rs` when a leader is demoted) so cluster-wait helpers - /// can recover from a transient leader flicker without leaving a dead status endpoint. - /// Returns the number of nodes respawned in this sweep. - async fn respawn_crashed_running_nodes(&mut self) -> anyhow::Result { - let crashed: Vec = self - .nodes - .iter() - .enumerate() - .filter_map(|(idx, slot)| match slot { - NodeSlot::Running(t) if t.has_crashed() => Some(idx), - _ => None, - }) - .collect(); - let count = crashed.len(); - for idx in crashed { - tracing::warn!("node {idx} crashed (critical task panicked); respawning..."); - let running = self.nodes.remove(idx); - let stopped = match running { - NodeSlot::Running(tester) => tester.stop().await?, - NodeSlot::Suspended(_) => unreachable!("filtered to running above"), - }; - let restarted = stopped.start().await?; - self.nodes - .insert(idx, NodeSlot::Running(Box::new(restarted))); - } - Ok(count) - } - - pub async fn start_node_with_overrides( - &mut self, - index: usize, - config_overrides: impl FnOnce(&mut Config), - ) -> anyhow::Result<()> { - tracing::info!("starting suspended node {index} with config overrides..."); - let suspended = self.nodes.remove(index); - let started = match suspended { - NodeSlot::Suspended(tester) => tester.start_with_overrides(config_overrides).await?, - NodeSlot::Running(_) => panic!("node {index} is not suspended"), - }; - self.nodes - .insert(index, NodeSlot::Running(Box::new(started))); - Ok(()) - } - - /// Waits for the Raft cluster to form with a single elected leader - /// Returns the index of the leader node - pub async fn wait_for_raft_cluster_formation( - &mut self, - timeout: Duration, - ) -> anyhow::Result { - let node_indices = self.all_node_indices(); - self.wait_for_raft_cluster_formation_among(&node_indices, timeout) - .await - } - - /// Same as `wait_for_raft_cluster_formation`, but ignores suspended nodes. - pub async fn wait_for_active_raft_cluster_formation( - &mut self, - timeout: Duration, - ) -> anyhow::Result { - let node_indices = self.active_node_indices(); - self.wait_for_raft_cluster_formation_among(&node_indices, timeout) - .await - } - - /// Waits for all currently-running nodes to expose `block_number` via their L2 RPC. - pub async fn wait_for_active_l2_block( - &self, - block_number: u64, - timeout: Duration, - ) -> anyhow::Result<()> { - let waits = self - .nodes - .iter() - .filter_map(NodeSlot::running) - .map(|node| node.l2_zk_provider.wait_for_block(block_number)); - tokio::time::timeout(timeout, futures::future::try_join_all(waits)) - .await - .map_err(|_| { - anyhow::anyhow!( - "timed out waiting for all active nodes to reach L2 block {block_number}" - ) - })? - .map(|_| ()) - } - - /// Same as `wait_for_raft_cluster_formation`, but only considers selected nodes. - pub async fn wait_for_raft_cluster_formation_among( - &mut self, - node_indices: &[usize], - timeout: Duration, - ) -> anyhow::Result { - anyhow::ensure!( - !node_indices.is_empty(), - "cannot wait for raft cluster formation among an empty node set" - ); - for &index in node_indices { - anyhow::ensure!( - index < self.nodes.len(), - "node index {index} is out of range for cluster with {} nodes", - self.nodes.len() - ); - } - - let mut deadline = Instant::now() + timeout; - let mut last_summary = String::new(); - - while Instant::now() < deadline { - let respawned = self.respawn_crashed_running_nodes().await?; - if respawned > 0 { - deadline = deadline.max(Instant::now() + RESPAWN_GRACE); - } - let cluster_state = - ClusterState::collect_indices(&self.nodes, node_indices.iter().copied()).await; - let summary = cluster_state.summary(); - - if summary != last_summary { - tracing::info!( - "raft cluster formation check (node_indices={node_indices:?}): {summary}" - ); - last_summary = summary; - } - - if cluster_state.is_formed() { - let leader_index = cluster_state.leader_indices()[0]; - tracing::info!( - "raft cluster formed (node_indices={node_indices:?}): leader_index={leader_index}" - ); - for &index in node_indices { - if let Some(node) = self.nodes.get(index).and_then(NodeSlot::running) { - node.wait_for_initial_deposit().await?; - } - } - return Ok(leader_index); - } - - tokio::time::sleep(Duration::from_millis(200)).await; - } - - let final_state = - ClusterState::collect_indices(&self.nodes, node_indices.iter().copied()).await; - - tracing::error!( - "failed to form raft cluster (node_indices={node_indices:?}): reason={}, statuses={:?}", - final_state.failure_reason(), - final_state.nodes - ); - - anyhow::bail!( - "timed out waiting for raft cluster formation among {node_indices:?}: {}", - final_state.failure_reason() - ) - } -} - -#[derive(Default)] -pub struct MultiNodeTesterBuilder { - consensus_secret_keys: Vec, - consensus_nodes_to_spawn: Option, - batcher_node_index: Option, -} - -impl MultiNodeTesterBuilder { - pub fn with_consensus_secret_keys(mut self, keys: Vec) -> Self { - self.consensus_secret_keys = keys; - self - } - - pub fn spawn_consensus_nodes(mut self, count: usize) -> Self { - self.consensus_nodes_to_spawn = Some(count); - self - } - - /// Choose which launched consensus node runs the batcher. Exactly one node has - /// `batcher_config.enabled = true`; the rest keep it disabled. - pub fn with_batcher_node_index(mut self, index: usize) -> Self { - self.batcher_node_index = Some(index); - self - } - - pub async fn build(self) -> anyhow::Result { - let membership_nodes = self.consensus_secret_keys.len(); - assert!( - membership_nodes > 0, - "MultiNodeTester requires at least 1 node" - ); - let num_nodes = self.consensus_nodes_to_spawn.unwrap_or(membership_nodes); - assert!( - num_nodes > 0 && num_nodes <= membership_nodes, - "spawn_consensus_nodes must be in 1..={membership_nodes}" - ); - let batcher_node_index = self.batcher_node_index.unwrap_or(0); - assert!( - batcher_node_index < num_nodes, - "batcher_node_index must be in 0..{num_nodes}" - ); - - let mut node_ports = Vec::with_capacity(membership_nodes); - for _ in 0..membership_nodes { - node_ports.push(Ports::acquire_unused().await?); - } - - let node_records = self - .consensus_secret_keys - .iter() - .zip(node_ports.iter()) - .map(|(secret, port)| { - zksync_os_network::NodeRecord::from_secret_key( - SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port.network.port), - secret, - ) - }) - .collect::>(); - let peer_ids = node_records - .iter() - .map(|record| record.id) - .collect::>(); - let tx_forwarding_rpc_urls = node_records - .iter() - .zip(node_ports.iter()) - .map(|(record, ports)| format!("{}@127.0.0.1:{}", record.id, ports.l2_rpc.port)) - .collect::>(); - - let l1 = AnvilL1::start(ChainLayout::Default { - protocol_version: PROTOCOL_VERSION, - }) - .await?; - - let launches = self - .consensus_secret_keys - .into_iter() - .take(num_nodes) - .zip(node_ports.into_iter()) - .enumerate() - .map(|(i, (secret, ports))| { - let peers = peer_ids.clone(); - let tx_forwarding_rpc_urls = tx_forwarding_rpc_urls.clone(); - let boot_nodes: Vec = - node_records.iter().copied().map(Into::into).collect(); - let l1 = l1.clone(); - async move { - let network_port = ports.network.port; - // Production configs set this on every consensus node. The first node to - // initialize the cluster wins; the rest safely observe that it is initialized. - let bootstrap = true; - let batcher_enabled = i == batcher_node_index; - let expected_node_id = zksync_os_network::NodeRecord::from_secret_key( - SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), network_port), - &secret, - ) - .id; - tracing::info!("starting node... (node_index={i}, node_id={expected_node_id}, network_port={network_port}, bootstrap={bootstrap}, batcher_enabled={batcher_enabled})"); - - let node = Tester::launch_node_with_ports( - l1, - false, - Some(move |config: &mut Config| { - config.general_config.node_role = NodeRole::MainNode; - config.general_config.main_node_rpc_url = None; - config.batcher_config.enabled = batcher_enabled; - if !batcher_enabled { - // SYSCOIN: non-batcher consensus nodes do not run GasAdjuster, but - // may still become Raft leaders. Opt them into block production only - // with a static pubdata price so leader failover cannot stall on fee - // inputs that are intentionally absent from this node. - config.sequencer_config.allow_non_batcher_block_production = true; - config.fee_config.pubdata_price_override = - Some(U128::from(1_000_000u64)); - } - config.network_config.enabled = true; - config.network_config.secret_key = Some(secret); - config.network_config.address = Ipv4Addr::LOCALHOST; - config.network_config.port = network_port; - config.network_config.boot_nodes = boot_nodes.clone(); - config.consensus_config.enabled = true; - config.consensus_config.bootstrap = bootstrap; - config.consensus_config.peer_ids = peers.clone(); - config.consensus_config.tx_forwarding_rpc_urls = - tx_forwarding_rpc_urls.clone(); - // Keep elections reasonably fast while leaving enough room for - // batcher-enabled nodes to finish in-flight block work before a - // transient election can displace the current leader. - config.consensus_config.election_timeout_min = - TEST_ELECTION_TIMEOUT_MIN; - config.consensus_config.election_timeout_max = - TEST_ELECTION_TIMEOUT_MAX; - config.consensus_config.heartbeat_interval = TEST_HEARTBEAT_INTERVAL; - }), - ChainLayout::Default { - protocol_version: PROTOCOL_VERSION, - }, - ports, - false, - ) - .await?; - tracing::info!("node started with tempfile: {} (node_index={i}, node_id={expected_node_id})", node.tempdir.path().display()); - anyhow::Ok(NodeSlot::Running(Box::new(node))) - } - }); - - Ok(MultiNodeTester { - nodes: try_join_all(launches).await?, - batcher_node_index, - }) - } -} diff --git a/integration-tests/tests/consensus_node/mod.rs b/integration-tests/tests/consensus_node/mod.rs deleted file mode 100644 index 81775d292..000000000 --- a/integration-tests/tests/consensus_node/mod.rs +++ /dev/null @@ -1,681 +0,0 @@ -use alloy::eips::{BlockId, Encodable2718}; -use alloy::network::{NetworkTransactionBuilder, ReceiptResponse, TransactionBuilder}; -use alloy::primitives::{Address, TxHash, U256}; -use alloy::providers::Provider; -use alloy::rpc::types::TransactionRequest; -use anyhow::Context as _; -use std::time::Duration; -use tokio::time::sleep; -use zksync_os_integration_tests::Tester; -use zksync_os_integration_tests::assert_traits::ReceiptAssert; -use zksync_os_integration_tests::multi_node::MultiNodeTester; -use zksync_os_integration_tests::provider::ZksyncTestingProvider; - -const CLUSTER_FORMATION_TIMEOUT: Duration = Duration::from_secs(20); -const REPLICATION_TIMEOUT: Duration = Duration::from_secs(20); -const L1_FINALIZATION_TIMEOUT: Duration = Duration::from_secs(60); - -mod restarted_node_catchup; - -fn consensus_test_keys(n: usize) -> Vec { - (0..n) - .map(|_| zksync_os_network::rng_secret_key()) - .collect() -} - -async fn raft_node_id(cluster: &MultiNodeTester, index: usize) -> anyhow::Result { - cluster - .node(index) - .status() - .await? - .consensus - .raft - .map(|raft| raft.node_id) - .ok_or_else(|| anyhow::anyhow!("node {index} did not expose raft status")) -} - -async fn latest_l2_block(node: &Tester) -> anyhow::Result { - node.l2_zk_provider - .get_block_number_by_id(BlockId::latest()) - .await? - .context("latest block number is missing") -} - -pub(crate) async fn wait_for_l2_block( - node: &Tester, - block_number: u64, - timeout: Duration, -) -> anyhow::Result<()> { - tokio::time::timeout(timeout, node.l2_zk_provider.wait_for_block(block_number)) - .await - .with_context(|| format!("timed out waiting for L2 block {block_number}"))??; - Ok(()) -} - -#[derive(Clone, Copy)] -enum TransferSubmission { - SendTransaction, - SendRawTransactionSync, -} - -async fn send_transfer( - cluster: &MultiNodeTester, - index: usize, - submission: TransferSubmission, -) -> anyhow::Result { - let node = cluster.node(index); - let recipient = Address::random(); - - match submission { - TransferSubmission::SendTransaction => { - let gas_price = node.l2_provider.get_gas_price().await?; - let tx = TransactionRequest::default() - .with_to(recipient) - .with_value(U256::from(1)) - .with_gas_price(gas_price); - let receipt = node - .l2_provider - .send_transaction(tx) - .await? - .expect_successful_receipt() - .await?; - transfer_receipt_block_number(&receipt, recipient, None) - } - TransferSubmission::SendRawTransactionSync => { - let sender = node.l2_wallet.default_signer().address(); - let fees = node.l2_provider.estimate_eip1559_fees().await?; - let nonce = node.l2_provider.get_transaction_count(sender).await?; - let tx = TransactionRequest::default() - .with_to(recipient) - .with_value(U256::from(1)) - .with_nonce(nonce) - .with_gas_price(fees.max_fee_per_gas) - .with_gas_limit(50_000); - let tx_envelope = tx.build(&node.l2_wallet).await?; - let expected_hash = *tx_envelope.tx_hash(); - let encoded = tx_envelope.encoded_2718(); - - let receipt = node.l2_provider.send_raw_transaction_sync(&encoded).await?; - transfer_receipt_block_number(&receipt, recipient, Some(expected_hash)) - } - } -} - -fn transfer_receipt_block_number( - receipt: &impl ReceiptResponse, - recipient: Address, - expected_hash: Option, -) -> anyhow::Result { - assert!(receipt.status()); - assert_eq!(receipt.to(), Some(recipient)); - let tx_hash = receipt.transaction_hash(); - if let Some(expected_hash) = expected_hash { - assert_eq!(tx_hash, expected_hash); - } else { - assert_ne!(tx_hash, TxHash::ZERO); - } - receipt - .block_number() - .context("transfer receipt did not include a block number") -} - -/// Sends a transfer to `submit_index`, waits for all running nodes to expose the resulting -/// L2 block, then waits for L1 finalization if the producing leader runs the batcher. -/// Returns the L2 block number that included the transfer. -async fn send_transfer_and_wait_for_active_replication( - cluster: &mut MultiNodeTester, - submit_index: usize, - producer_index: usize, -) -> anyhow::Result { - let block_number = - send_transfer(cluster, submit_index, TransferSubmission::SendTransaction).await?; - cluster - .wait_for_active_l2_block(block_number, REPLICATION_TIMEOUT) - .await?; - wait_for_l1_finalization_if_leader_batches(cluster, producer_index, block_number).await?; - Ok(block_number) -} - -async fn send_transfer_with_submission_and_wait_for_active_replication( - cluster: &mut MultiNodeTester, - submit_index: usize, - producer_index: usize, - submission: TransferSubmission, -) -> anyhow::Result { - let block_number = send_transfer(cluster, submit_index, submission).await?; - cluster - .wait_for_active_l2_block(block_number, REPLICATION_TIMEOUT) - .await?; - wait_for_l1_finalization_if_leader_batches(cluster, producer_index, block_number).await?; - Ok(block_number) -} - -async fn wait_for_l1_finalization_if_leader_batches( - cluster: &MultiNodeTester, - leader_index: usize, - block_number: u64, -) -> anyhow::Result { - let batcher_idx = cluster.batcher_node_index(); - if leader_index != batcher_idx { - tracing::info!( - block_number, - leader_index, - batcher_idx, - "skipping L1 finalization check because the producing leader is not the batcher node" - ); - return Ok(block_number); - } - - if cluster.is_node_suspended(batcher_idx) { - tracing::info!( - block_number, - batcher_idx, - "skipping L1 finalization check because the batcher node is suspended" - ); - return Ok(block_number); - } - - cluster - .node(batcher_idx) - .l2_zk_provider - .wait_finalized_with_timeout(block_number, L1_FINALIZATION_TIMEOUT) - .await - .with_context(|| { - format!( - "block {block_number} was not finalized while batcher node {batcher_idx} was active" - ) - })?; - Ok(block_number) -} - -#[test_log::test(tokio::test)] -async fn consensus_cluster_includes_simple_transaction_with_wait() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(1)) - .build() - .await?; - let result = async { - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - let block_number = - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - wait_for_l1_finalization_if_leader_batches(&cluster, leader_index, block_number).await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_can_be_reenabled_after_clearing_raft_history() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(1)) - .build() - .await?; - let result = async { - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - - cluster.suspend_node(leader_index).await?; - - // This creates a WAL/raft gap: the restarted node clears raft history, then - // produces a block through loopback consensus while raft is disabled. - cluster - .start_node_with_overrides(leader_index, |config| { - config.consensus_config.enabled = false; - config.consensus_config.force_clear_raft_history = true; - }) - .await?; - - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - - cluster.suspend_node(leader_index).await?; - - // Re-enable consensus after the gap. The old WAL blocks are replayed locally; - // new blocks should be raft-canonized from this point onward. - cluster - .start_node_with_overrides(leader_index, |config| { - config.consensus_config.enabled = true; - config.consensus_config.force_clear_raft_history = false; - }) - .await?; - - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - - // Restart once more with consensus enabled to verify the sparse raft history - // written after re-enable is loadable. - cluster.suspend_node(leader_index).await?; - cluster.start_node(leader_index).await?; - - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer(&cluster, leader_index, TransferSubmission::SendTransaction).await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_cluster_forms_with_three_nodes_and_replicates_blocks() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - send_transfer_and_wait_for_active_replication(&mut cluster, leader_index, leader_index) - .await?; - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_cluster_accepts_transactions_from_any_node() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - for node_index in 0..cluster.len() { - send_transfer_and_wait_for_active_replication(&mut cluster, node_index, leader_index) - .await - .with_context(|| format!("transaction submitted to node {node_index} failed"))?; - } - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_cluster_send_raw_transaction_sync_accepts_leader_and_replica() --> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_index = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let replica_index = (0..cluster.len()) - .find(|idx| *idx != leader_index) - .context("3-node cluster must have a replica")?; - - send_transfer_with_submission_and_wait_for_active_replication( - &mut cluster, - leader_index, - leader_index, - TransferSubmission::SendRawTransactionSync, - ) - .await - .with_context(|| { - format!("eth_sendRawTransactionSync submitted to leader node {leader_index} failed") - })?; - send_transfer_with_submission_and_wait_for_active_replication( - &mut cluster, - replica_index, - leader_index, - TransferSubmission::SendRawTransactionSync, - ) - .await - .with_context(|| { - format!("eth_sendRawTransactionSync submitted to replica node {replica_index} failed") - })?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_cluster_rotates_leader_after_failure() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let initial_leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let initial_leader_node_id = raft_node_id(&cluster, initial_leader_idx).await?; - - // Warm up follower replication before taking the leader down so the surviving - // nodes have already exchanged append entries with the elected leader. - send_transfer_and_wait_for_active_replication( - &mut cluster, - initial_leader_idx, - initial_leader_idx, - ) - .await?; - - cluster.suspend_node(initial_leader_idx).await?; - - let new_leader_idx = cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let new_leader_id = raft_node_id(&cluster, new_leader_idx).await?; - - assert_ne!(initial_leader_node_id, new_leader_id); - - send_transfer_and_wait_for_active_replication(&mut cluster, new_leader_idx, new_leader_idx) - .await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_cluster_stops_making_progress_without_quorum() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - send_transfer_and_wait_for_active_replication(&mut cluster, leader_idx, leader_idx).await?; - let follower_indices: Vec<_> = (0..cluster.len()) - .filter(|idx| *idx != leader_idx) - .collect(); - let survivor_idx = follower_indices[1]; - - cluster.suspend_node(leader_idx).await?; - cluster.suspend_node(follower_indices[0]).await?; - - let survivor_block = latest_l2_block(cluster.node(survivor_idx)).await?; - sleep(Duration::from_secs(2)).await; - let survivor_block_after_wait = latest_l2_block(cluster.node(survivor_idx)).await?; - assert_eq!( - survivor_block_after_wait, survivor_block, - "L2 head advanced after quorum loss: before={survivor_block} after={survivor_block_after_wait}" - ); - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_original_leader_rejoins_and_cluster_remains_stable() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let initial_leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer_and_wait_for_active_replication( - &mut cluster, - initial_leader_idx, - initial_leader_idx, - ) - .await?; - - cluster.suspend_node(initial_leader_idx).await?; - - let new_leader_idx = cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - // Advance the cluster while the original leader is absent so it has entries to catch up. - let target_block = send_transfer_and_wait_for_active_replication( - &mut cluster, - new_leader_idx, - new_leader_idx, - ) - .await?; - - // Restart the original leader. It must rejoin without disrupting the running cluster: - // exactly one leader must remain, all three nodes must agree, and state must converge. - cluster.start_node(initial_leader_idx).await?; - let final_leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - cluster - .wait_for_active_l2_block(target_block, REPLICATION_TIMEOUT) - .await?; - - // Verify the cluster continues to make progress after the rejoin. - send_transfer_and_wait_for_active_replication( - &mut cluster, - final_leader_idx, - final_leader_idx, - ) - .await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_cluster_recovers_after_quorum_loss() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let committed_block = - send_transfer_and_wait_for_active_replication(&mut cluster, leader_idx, leader_idx) - .await?; - - let follower_indices: Vec<_> = (0..cluster.len()) - .filter(|&idx| idx != leader_idx) - .collect(); - let survivor_idx = follower_indices[1]; - - cluster.suspend_node(leader_idx).await?; - cluster.suspend_node(follower_indices[0]).await?; - - // Verify that quorum loss stops all progress. - let survivor_block = latest_l2_block(cluster.node(survivor_idx)).await?; - sleep(Duration::from_secs(2)).await; - let survivor_block_after = latest_l2_block(cluster.node(survivor_idx)).await?; - assert_eq!( - survivor_block_after, survivor_block, - "L2 head must not advance without quorum: before={survivor_block} after={survivor_block_after}", - ); - - // Restore quorum and verify the cluster recovers and makes progress. - cluster.start_node(leader_idx).await?; - cluster.start_node(follower_indices[0]).await?; - let new_leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let recovery_block = send_transfer_and_wait_for_active_replication( - &mut cluster, - new_leader_idx, - new_leader_idx, - ) - .await?; - assert!( - recovery_block > committed_block, - "cluster must make progress after quorum is restored: committed={committed_block} recovery={recovery_block}", - ); - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_cluster_fully_restarts_and_recovers() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let last_block = - send_transfer_and_wait_for_active_replication(&mut cluster, leader_idx, leader_idx) - .await?; - - // Suspend all nodes: state is durably on disk before any restarts. - for idx in 0..cluster.len() { - cluster.suspend_node(idx).await?; - } - // Restart all nodes: they recover from disk, re-elect a leader, and resume. - for idx in 0..cluster.len() { - cluster.start_node(idx).await?; - } - - let new_leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - cluster - .wait_for_active_l2_block(last_block, REPLICATION_TIMEOUT) - .await?; - - // Verify the cluster continues to make progress after the full restart. - send_transfer_and_wait_for_active_replication(&mut cluster, new_leader_idx, new_leader_idx) - .await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_late_node_joins_and_catches_up() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - // Suspend the third node before cluster formation so it hasn't participated in any - // block production — simulating a node that joins an already-established cluster. - let late_node_idx = 2; - cluster.suspend_node(late_node_idx).await?; - - // Two of three nodes form a quorum; the cluster must elect a leader and make progress. - let leader_idx = cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer_and_wait_for_active_replication(&mut cluster, leader_idx, leader_idx).await?; - let target_block = - send_transfer_and_wait_for_active_replication(&mut cluster, leader_idx, leader_idx) - .await?; - - // Start the late node. It must receive all missed entries via Raft log replication. - cluster.start_node(late_node_idx).await?; - wait_for_l2_block( - cluster.node(late_node_idx), - target_block, - REPLICATION_TIMEOUT, - ) - .await?; - - // The full 3-node cluster must be stable after the late joiner has caught up. - cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -async fn consensus_follower_restarts_and_catches_up() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - let leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let follower_idx = (0..cluster.len()) - .find(|idx| *idx != leader_idx) - .expect("3-node cluster must have a follower"); - - cluster.suspend_node(follower_idx).await?; - let active_leader_idx = cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - send_transfer_and_wait_for_active_replication( - &mut cluster, - active_leader_idx, - active_leader_idx, - ) - .await?; - let target_block = send_transfer_and_wait_for_active_replication( - &mut cluster, - active_leader_idx, - active_leader_idx, - ) - .await?; - - cluster.start_node(follower_idx).await?; - wait_for_l2_block( - cluster.node(follower_idx), - target_block, - REPLICATION_TIMEOUT, - ) - .await?; - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} diff --git a/integration-tests/tests/consensus_node/restarted_node_catchup.rs b/integration-tests/tests/consensus_node/restarted_node_catchup.rs deleted file mode 100644 index ee828f737..000000000 --- a/integration-tests/tests/consensus_node/restarted_node_catchup.rs +++ /dev/null @@ -1,709 +0,0 @@ -use super::*; - -use std::time::Duration; -use tokio::time::{Instant, sleep}; -use zksync_os_integration_tests::rpc_recorder::{HttpRpcRecorder, HttpRpcReport, RpcRecordConfig}; - -const CONSENSUS_LONG_GAP_LOAD_DURATION: Duration = Duration::from_secs(60); -const CONSENSUS_CONTINUED_LOAD_AFTER_RESTART_DURATION: Duration = Duration::from_secs(45); -const CONSENSUS_LONG_GAP_CATCH_UP_TIMEOUT: Duration = Duration::from_secs(180); -const MIN_LONG_GAP_LOAD_BLOCKS: usize = 10; -const MIN_CONTINUED_LOAD_BLOCKS: usize = 5; - -struct ConsensusLoadStats { - attempts: usize, - blocks: Vec, - elapsed: Duration, -} - -struct ConsensusRejoinLoadStats { - attempts: usize, - blocks: Vec, - blocks_before_restart: usize, - blocks_after_restart: usize, - elapsed: Duration, - restart_started_at: Duration, - restart_completed_at: Duration, - target_block_at_restart: u64, - l2_caught_up_at: Duration, - rpc_caught_up_at: Duration, -} - -async fn send_transfer_and_wait_for_l2_blocks( - cluster: &MultiNodeTester, - submit_index: usize, - node_indices: &[usize], -) -> anyhow::Result { - let block_number = - send_transfer(cluster, submit_index, TransferSubmission::SendTransaction).await?; - for &index in node_indices { - wait_for_l2_block(cluster.node(index), block_number, REPLICATION_TIMEOUT) - .await - .with_context(|| format!("node {index} did not reach L2 block {block_number}"))?; - } - Ok(block_number) -} - -fn remaining_storm_send_timeout(deadline: Instant) -> Option { - let now = Instant::now(); - if now >= deadline { - return None; - } - - Some((REPLICATION_TIMEOUT + Duration::from_secs(10)).min(deadline.duration_since(now))) -} - -async fn generate_consensus_transaction_storm( - cluster: &mut MultiNodeTester, - node_indices: &[usize], - duration: Duration, -) -> anyhow::Result { - let started_at = Instant::now(); - let deadline = started_at + duration; - let mut attempts = 0; - let mut blocks = Vec::new(); - let mut last_error = None; - let mut next_submit_index = 0; - - while Instant::now() < deadline { - let _leader_index = cluster - .wait_for_raft_cluster_formation_among(node_indices, CLUSTER_FORMATION_TIMEOUT) - .await?; - let submit_index = node_indices[next_submit_index % node_indices.len()]; - next_submit_index += 1; - let Some(send_timeout) = remaining_storm_send_timeout(deadline) else { - break; - }; - attempts += 1; - - match tokio::time::timeout( - send_timeout, - send_transfer_and_wait_for_l2_blocks(cluster, submit_index, node_indices), - ) - .await - { - Ok(Ok(block_number)) => { - tracing::info!( - attempts, - produced_blocks = blocks.len() + 1, - block_number, - submit_index, - elapsed_ms = started_at.elapsed().as_millis(), - "consensus transaction storm produced a block" - ); - blocks.push(block_number); - } - Ok(Err(error)) => { - tracing::warn!( - attempts, - error = %error, - "consensus transaction storm send failed; retrying" - ); - last_error = Some(error.to_string()); - sleep(Duration::from_millis(200)).await; - } - Err(_) => { - tracing::warn!( - attempts, - "consensus transaction storm send timed out; retrying" - ); - last_error = Some("timed out sending transfer".to_owned()); - sleep(Duration::from_millis(200)).await; - } - } - } - - anyhow::ensure!( - blocks.len() >= MIN_LONG_GAP_LOAD_BLOCKS, - "transaction storm produced too few blocks: produced={}, attempts={}, last_error={:?}", - blocks.len(), - attempts, - last_error - ); - - Ok(ConsensusLoadStats { - attempts, - blocks, - elapsed: started_at.elapsed(), - }) -} - -async fn observe_restarted_node_catch_up_to_target( - cluster: &MultiNodeTester, - restarted_node_idx: usize, - restarted_rpc_monitor: &HttpRpcRecorder, - target_block: u64, - started_at: Instant, - l2_caught_up: &mut Option, - rpc_caught_up: &mut Option, -) { - if l2_caught_up.is_none() - && let Ok(latest_block) = latest_l2_block(cluster.node(restarted_node_idx)).await - && latest_block >= target_block - { - *l2_caught_up = Some(started_at.elapsed()); - } - - if rpc_caught_up.is_none() - && restarted_rpc_monitor - .first_observed_block_at(target_block) - .await - .is_some() - { - *rpc_caught_up = Some(started_at.elapsed()); - } -} - -async fn generate_consensus_transaction_storm_across_restart( - cluster: &mut MultiNodeTester, - active_node_indices: &[usize], - restarted_node_idx: usize, - restart_after: Duration, - continue_after_restart: Duration, - restarted_rpc_monitor: &HttpRpcRecorder, -) -> anyhow::Result { - let started_at = Instant::now(); - let restart_deadline = started_at + restart_after; - let stop_deadline = restart_deadline + continue_after_restart; - let mut attempts = 0; - let mut blocks = Vec::new(); - let mut blocks_before_restart = 0; - let mut blocks_after_restart = 0; - let mut restarted = false; - let mut restart_started_at = None; - let mut restart_completed_at = None; - let mut target_block_at_restart = None; - let mut l2_caught_up = None; - let mut rpc_caught_up = None; - let mut last_error = None; - let mut next_submit_index = 0; - - while Instant::now() < stop_deadline { - if !restarted && Instant::now() >= restart_deadline { - let target_block = latest_l2_block(cluster.node(active_node_indices[0])).await?; - let started = started_at.elapsed(); - tracing::info!( - restarted_node_idx, - target_block, - elapsed_ms = started.as_millis(), - "restarting consensus node while transaction storm continues" - ); - cluster.start_node(restarted_node_idx).await?; - let completed = started_at.elapsed(); - - restarted = true; - restart_started_at = Some(started); - restart_completed_at = Some(completed); - target_block_at_restart = Some(target_block); - } - - if let Some(target) = target_block_at_restart { - observe_restarted_node_catch_up_to_target( - cluster, - restarted_node_idx, - restarted_rpc_monitor, - target, - started_at, - &mut l2_caught_up, - &mut rpc_caught_up, - ) - .await; - } - - let _leader_index = cluster - .wait_for_raft_cluster_formation_among(active_node_indices, CLUSTER_FORMATION_TIMEOUT) - .await?; - let submit_index = active_node_indices[next_submit_index % active_node_indices.len()]; - next_submit_index += 1; - let Some(send_timeout) = remaining_storm_send_timeout(stop_deadline) else { - break; - }; - attempts += 1; - - match tokio::time::timeout( - send_timeout, - send_transfer_and_wait_for_l2_blocks(cluster, submit_index, active_node_indices), - ) - .await - { - Ok(Ok(block_number)) => { - if restarted { - blocks_after_restart += 1; - } else { - blocks_before_restart += 1; - } - tracing::info!( - attempts, - produced_blocks = blocks.len() + 1, - blocks_before_restart, - blocks_after_restart, - block_number, - submit_index, - elapsed_ms = started_at.elapsed().as_millis(), - "continuous consensus transaction storm produced a block" - ); - blocks.push(block_number); - } - Ok(Err(error)) => { - tracing::warn!( - attempts, - error = %error, - "continuous consensus transaction storm send failed; retrying" - ); - last_error = Some(error.to_string()); - sleep(Duration::from_millis(200)).await; - } - Err(_) => { - tracing::warn!( - attempts, - "continuous consensus transaction storm send timed out; retrying" - ); - last_error = Some("timed out sending transfer".to_owned()); - sleep(Duration::from_millis(200)).await; - } - } - } - - if let Some(target) = target_block_at_restart { - observe_restarted_node_catch_up_to_target( - cluster, - restarted_node_idx, - restarted_rpc_monitor, - target, - started_at, - &mut l2_caught_up, - &mut rpc_caught_up, - ) - .await; - } - - anyhow::ensure!( - restarted, - "transaction storm ended before restarting node {restarted_node_idx}" - ); - anyhow::ensure!( - blocks_before_restart >= MIN_LONG_GAP_LOAD_BLOCKS, - "transaction storm produced too few blocks before restart: produced={}, attempts={}, last_error={:?}", - blocks_before_restart, - attempts, - last_error - ); - anyhow::ensure!( - blocks_after_restart >= MIN_CONTINUED_LOAD_BLOCKS, - "transaction storm produced too few blocks after restart: produced={}, attempts={}, last_error={:?}", - blocks_after_restart, - attempts, - last_error - ); - - let l2_caught_up_at = l2_caught_up.context( - "restarted node did not expose the restart target L2 block while load continued", - )?; - let rpc_caught_up_at = rpc_caught_up - .context("RPC monitor did not observe the restarted node reaching the restart target")?; - - Ok(ConsensusRejoinLoadStats { - attempts, - blocks, - blocks_before_restart, - blocks_after_restart, - elapsed: started_at.elapsed(), - restart_started_at: restart_started_at.expect("restart started"), - restart_completed_at: restart_completed_at.expect("restart completed"), - target_block_at_restart: target_block_at_restart.expect("target block set"), - l2_caught_up_at, - rpc_caught_up_at, - }) -} - -fn assert_rpc_monitor_stayed_ready(report: &HttpRpcReport) -> anyhow::Result<()> { - // A deliberate leader-crash-and-respawn cycle (triggered when the Raft leader is demoted) - // causes a single-poll-interval blip. Allow that while still catching sustained outages - // that would indicate a genuine availability problem. - const MAX_SUSTAINED_OUTAGE: Duration = Duration::from_secs(10); - report.assert_eventually_ready()?; - if let Some(outage) = report.longest_error_streak() { - anyhow::ensure!( - outage.duration < MAX_SUSTAINED_OUTAGE, - "{} observed a sustained RPC outage ({}ms >= {}ms) while it should have stayed ready: {report}\n{}", - report.name, - outage.duration.as_millis(), - MAX_SUSTAINED_OUTAGE.as_millis(), - report.format_detailed_timeline() - ); - } - Ok(()) -} - -fn assert_rpc_monitor_recovered_after_outage(report: &HttpRpcReport) -> anyhow::Result<()> { - let first_error_at = report.first_error_at().with_context(|| { - format!( - "{} did not observe the expected RPC outage while the node was stopped: {report}", - report.name - ) - })?; - let first_ready_at = report.first_ready_at().with_context(|| { - format!( - "{} never recovered after the expected RPC outage: {report}", - report.name - ) - })?; - - anyhow::ensure!( - first_error_at < first_ready_at, - "{} observed readiness before the expected outage: {report}\n{}", - report.name, - report.format_detailed_timeline() - ); - Ok(()) -} - -async fn wait_for_rpc_monitor_block( - recorder: &HttpRpcRecorder, - target_block: u64, - timeout: Duration, -) -> anyhow::Result { - let deadline = Instant::now() + timeout; - let mut last_observed = None; - - while Instant::now() < deadline { - if let Some(observed_at) = recorder.first_observed_block_at(target_block).await { - return Ok(observed_at); - } - - last_observed = recorder.latest_block_number().await; - sleep(Duration::from_millis(50)).await; - } - - anyhow::bail!( - "timed out waiting for RPC monitor to observe block >= {target_block}: last_observed={last_observed:?}" - ) -} - -async fn l2_block_snapshot(cluster: &MultiNodeTester, node_indices: &[usize]) -> Vec { - let mut snapshot = Vec::with_capacity(node_indices.len()); - for &index in node_indices { - match latest_l2_block(cluster.node(index)).await { - Ok(block) => snapshot.push(format!("node_{index}: block={block}")), - Err(error) => snapshot.push(format!("node_{index}: block_error={error:#}")), - } - } - snapshot -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_restarted_node_catches_up_after_long_transaction_storm() -> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - let restarted_node_idx = 2; - let active_node_indices = (0..cluster.len()) - .filter(|idx| *idx != restarted_node_idx) - .collect::>(); - let restarted_node_rpc_url = cluster.node(restarted_node_idx).l2_rpc_url().to_owned(); - let restarted_node_initial_block = latest_l2_block(cluster.node(restarted_node_idx)).await?; - - cluster.suspend_node(restarted_node_idx).await?; - cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - let load_stats = generate_consensus_transaction_storm( - &mut cluster, - &active_node_indices, - CONSENSUS_LONG_GAP_LOAD_DURATION, - ) - .await?; - let target_block = latest_l2_block(cluster.node(active_node_indices[0])).await?; - assert!( - target_block > restarted_node_initial_block, - "active cluster head did not advance while node was down: initial={restarted_node_initial_block}, target={target_block}" - ); - tracing::info!( - attempts = load_stats.attempts, - tx_blocks = load_stats.blocks.len(), - first_tx_block = load_stats.blocks.first().copied(), - last_tx_block = load_stats.blocks.last().copied(), - elapsed_ms = load_stats.elapsed.as_millis(), - restarted_node_initial_block, - target_block, - "transaction storm finished while consensus node was stopped" - ); - - let rpc_monitor = HttpRpcRecorder::start_http( - "restarted-consensus-node", - restarted_node_rpc_url, - RpcRecordConfig { - poll_interval: Duration::from_millis(200), - request_timeout: Duration::from_secs(1), - max_samples: 20_000, - }, - ); - let restart_started_at = Instant::now(); - cluster.start_node(restarted_node_idx).await?; - - let catch_up_result = async { - wait_for_l2_block( - cluster.node(restarted_node_idx), - target_block, - CONSENSUS_LONG_GAP_CATCH_UP_TIMEOUT, - ) - .await - .context("restarted node did not expose the target L2 block")?; - let l2_caught_up_at = restart_started_at.elapsed(); - - wait_for_rpc_monitor_block(&rpc_monitor, target_block, REPLICATION_TIMEOUT) - .await - .context("RPC monitor did not observe the restarted node's caught-up L2 head")?; - - anyhow::Ok(l2_caught_up_at) - } - .await; - - let rpc_report = rpc_monitor.stop().await; - tracing::info!("restarted consensus node RPC monitor summary: {rpc_report}"); - tracing::info!( - "restarted consensus node RPC monitor timeline:\n{}", - rpc_report.format_detailed_timeline() - ); - - let all_node_indices = (0..cluster.len()).collect::>(); - let final_l2_blocks = l2_block_snapshot(&cluster, &all_node_indices).await; - - let l2_caught_up_at = catch_up_result.with_context(|| { - format!( - "restarted consensus node failed to catch up after long downtime: \ - target_block={target_block}, \ - initial_block={restarted_node_initial_block}, active_nodes={active_node_indices:?}, \ - l2_blocks=[{}], rpc_report={rpc_report}", - final_l2_blocks.join(", "), - ) - })?; - - rpc_report.assert_eventually_ready()?; - let rpc_observed_target_at = rpc_report - .first_observed_block_at(target_block) - .with_context(|| { - format!( - "RPC monitor never observed restarted node reaching target block {target_block}: {rpc_report}" - ) - })?; - assert!( - rpc_report - .latest_block_number() - .is_some_and(|block| block >= target_block), - "RPC monitor latest block did not reach target {target_block}: {rpc_report}" - ); - tracing::info!( - l2_caught_up_ms = l2_caught_up_at.as_millis(), - rpc_observed_target_ms = rpc_observed_target_at.as_millis(), - target_block, - "restarted consensus node caught up after long downtime" - ); - - let leader_idx = cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let post_rejoin_block = - send_transfer_and_wait_for_l2_blocks(&cluster, leader_idx, &all_node_indices).await?; - assert!( - post_rejoin_block > target_block, - "cluster did not keep producing after restarted node caught up: post_rejoin_block={post_rejoin_block}, target_block={target_block}" - ); - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} - -#[test_log::test(tokio::test)] -#[ignore = "flaky; @romanbrodetski is working on it"] -async fn consensus_restarted_node_catches_up_while_transaction_storm_continues() --> anyhow::Result<()> { - let mut cluster = MultiNodeTester::builder() - .with_consensus_secret_keys(consensus_test_keys(3)) - .build() - .await?; - let result = async { - cluster - .wait_for_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - - let restarted_node_idx = 2; - let active_node_indices = (0..cluster.len()) - .filter(|idx| *idx != restarted_node_idx) - .collect::>(); - let all_node_indices = (0..cluster.len()).collect::>(); - let restarted_node_rpc_url = cluster.node(restarted_node_idx).l2_rpc_url().to_owned(); - let restarted_node_initial_block = latest_l2_block(cluster.node(restarted_node_idx)).await?; - - cluster.suspend_node(restarted_node_idx).await?; - let active_leader_idx = cluster - .wait_for_active_raft_cluster_formation(CLUSTER_FORMATION_TIMEOUT) - .await?; - let active_follower_idx = active_node_indices - .iter() - .copied() - .find(|idx| *idx != active_leader_idx) - .expect("two active nodes should include one follower"); - - let rpc_record_config = RpcRecordConfig { - poll_interval: Duration::from_millis(200), - request_timeout: Duration::from_secs(1), - max_samples: 30_000, - }; - let active_leader_rpc_monitor = HttpRpcRecorder::start_http( - format!("active-leader-node-{active_leader_idx}"), - cluster.node(active_leader_idx).l2_rpc_url().to_owned(), - rpc_record_config.clone(), - ); - let active_follower_rpc_monitor = HttpRpcRecorder::start_http( - format!("active-follower-node-{active_follower_idx}"), - cluster.node(active_follower_idx).l2_rpc_url().to_owned(), - rpc_record_config.clone(), - ); - let restarted_rpc_monitor = HttpRpcRecorder::start_http( - format!("restarted-node-{restarted_node_idx}"), - restarted_node_rpc_url, - rpc_record_config, - ); - - let observation_result = async { - let load_stats = generate_consensus_transaction_storm_across_restart( - &mut cluster, - &active_node_indices, - restarted_node_idx, - CONSENSUS_LONG_GAP_LOAD_DURATION, - CONSENSUS_CONTINUED_LOAD_AFTER_RESTART_DURATION, - &restarted_rpc_monitor, - ) - .await?; - let final_active_block = latest_l2_block(cluster.node(active_node_indices[0])).await?; - assert!( - load_stats.target_block_at_restart > restarted_node_initial_block, - "active cluster head did not advance while node was down: initial={}, target_at_restart={}", - restarted_node_initial_block, - load_stats.target_block_at_restart - ); - assert!( - final_active_block > load_stats.target_block_at_restart, - "active cluster did not keep producing after restart: final_active_block={}, target_at_restart={}", - final_active_block, - load_stats.target_block_at_restart - ); - - wait_for_l2_block( - cluster.node(restarted_node_idx), - final_active_block, - CONSENSUS_LONG_GAP_CATCH_UP_TIMEOUT, - ) - .await - .context("restarted node did not catch up to the final active head after continued load")?; - - wait_for_rpc_monitor_block( - &active_leader_rpc_monitor, - final_active_block, - REPLICATION_TIMEOUT, - ) - .await - .context("active leader RPC monitor did not observe the final active head")?; - wait_for_rpc_monitor_block( - &active_follower_rpc_monitor, - final_active_block, - REPLICATION_TIMEOUT, - ) - .await - .context("active follower RPC monitor did not observe the final active head")?; - wait_for_rpc_monitor_block( - &restarted_rpc_monitor, - final_active_block, - REPLICATION_TIMEOUT, - ) - .await - .context("restarted node RPC monitor did not observe the final active head")?; - - anyhow::Ok((load_stats, final_active_block)) - } - .await; - - let active_leader_rpc_report = active_leader_rpc_monitor.stop().await; - let active_follower_rpc_report = active_follower_rpc_monitor.stop().await; - let restarted_rpc_report = restarted_rpc_monitor.stop().await; - - tracing::info!("active leader RPC monitor summary: {active_leader_rpc_report}"); - tracing::info!( - "active leader RPC monitor timeline:\n{}", - active_leader_rpc_report.format_detailed_timeline() - ); - tracing::info!("active follower RPC monitor summary: {active_follower_rpc_report}"); - tracing::info!( - "active follower RPC monitor timeline:\n{}", - active_follower_rpc_report.format_detailed_timeline() - ); - tracing::info!("restarted node RPC monitor summary: {restarted_rpc_report}"); - tracing::info!( - "restarted node RPC monitor timeline:\n{}", - restarted_rpc_report.format_detailed_timeline() - ); - - let final_l2_blocks = l2_block_snapshot(&cluster, &all_node_indices).await; - let (load_stats, final_active_block) = observation_result.with_context(|| { - format!( - "restarted consensus node failed to catch up while load continued: \ - initial_block={restarted_node_initial_block}, active_nodes={active_node_indices:?}, \ - l2_blocks=[{}], \ - active_leader_rpc_report={active_leader_rpc_report}, \ - active_follower_rpc_report={active_follower_rpc_report}, \ - restarted_rpc_report={restarted_rpc_report}", - final_l2_blocks.join(", "), - ) - })?; - - assert_rpc_monitor_stayed_ready(&active_leader_rpc_report)?; - assert_rpc_monitor_stayed_ready(&active_follower_rpc_report)?; - assert_rpc_monitor_recovered_after_outage(&restarted_rpc_report)?; - let active_leader_final_at = active_leader_rpc_report - .first_observed_block_at(final_active_block) - .context("active leader RPC monitor never observed final active block")?; - let active_follower_final_at = active_follower_rpc_report - .first_observed_block_at(final_active_block) - .context("active follower RPC monitor never observed final active block")?; - let restarted_final_at = restarted_rpc_report - .first_observed_block_at(final_active_block) - .context("restarted RPC monitor never observed final active block")?; - - tracing::info!( - attempts = load_stats.attempts, - tx_blocks = load_stats.blocks.len(), - first_tx_block = load_stats.blocks.first().copied(), - last_tx_block = load_stats.blocks.last().copied(), - blocks_before_restart = load_stats.blocks_before_restart, - blocks_after_restart = load_stats.blocks_after_restart, - elapsed_ms = load_stats.elapsed.as_millis(), - restart_started_ms = load_stats.restart_started_at.as_millis(), - restart_completed_ms = load_stats.restart_completed_at.as_millis(), - target_block_at_restart = load_stats.target_block_at_restart, - l2_caught_up_ms = load_stats.l2_caught_up_at.as_millis(), - rpc_caught_up_ms = load_stats.rpc_caught_up_at.as_millis(), - final_active_block, - active_leader_final_rpc_ms = active_leader_final_at.as_millis(), - active_follower_final_rpc_ms = active_follower_final_at.as_millis(), - restarted_final_rpc_ms = restarted_final_at.as_millis(), - "restarted consensus node caught up while transaction storm continued" - ); - - Ok(()) - } - .await; - let shutdown_result = cluster.shutdown_all().await; - result.and(shutdown_result) -} diff --git a/integration-tests/tests/suite.rs b/integration-tests/tests/suite.rs index abfaa2659..07ffb8e8e 100644 --- a/integration-tests/tests/suite.rs +++ b/integration-tests/tests/suite.rs @@ -1,4 +1,3 @@ -mod consensus_node; mod node; mod protocol; mod rpc; diff --git a/lib/contract_interface/src/l1_discovery.rs b/lib/contract_interface/src/l1_discovery.rs index 58b4ca5d2..dca35c2e2 100644 --- a/lib/contract_interface/src/l1_discovery.rs +++ b/lib/contract_interface/src/l1_discovery.rs @@ -363,33 +363,6 @@ impl L1State { } } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn batch_frontiers_allow_ordered_state() { - validate_batch_frontiers(10, 9, 8, 7).expect("ordered frontiers must be accepted"); - validate_batch_frontiers(0, 0, 0, 0).expect("empty fresh-chain frontiers must be accepted"); - } - - #[test] - fn batch_frontiers_reject_finalized_ahead_of_latest() { - let err = validate_batch_frontiers(1, 1, 1, 2) - .expect_err("finalized executed cannot be ahead of latest executed"); - assert!( - err.to_string().contains("finalized executed batch 2"), - "unexpected error: {err}" - ); - } - - #[test] - fn batch_frontiers_reject_non_monotonic_latest_counters() { - assert!(validate_batch_frontiers(1, 1, 2, 1).is_err()); - assert!(validate_batch_frontiers(1, 2, 1, 1).is_err()); - } -} - async fn fetch_finalized_executed_batch( zk_chain_sl: &ZkChain, ) -> anyhow::Result<(u64, u64)> { @@ -417,7 +390,7 @@ async fn fetch_finalized_executed_batch( .retry(retry_builder) .notify(|(), _| { retries = retries.saturating_add(1); - if retries == 1 || retries % 30 == 0 { + if retries == 1 || retries.is_multiple_of(30) { tracing::warn!(retries, "finalized SL block is not available yet; waiting"); } }) @@ -566,3 +539,30 @@ async fn wait_to_finalize = ( // SYSCOIN: non-fatal receipt wait result used to recover from L1 mempool eviction // and visible-but-stale transactions. +#[allow(clippy::large_enum_variant)] enum ReceiptWaitOutcome { Confirmed(TransactionReceipt), Dropped, @@ -440,6 +441,7 @@ pub async fn run_l1_sender( } // SYSCOIN: common L1 tx submission path used by the normal loop and by dropped-tx recovery. +#[allow(clippy::too_many_arguments)] async fn submit_l1_transaction( provider: &NodeProvider, operator_address: Address, @@ -649,6 +651,7 @@ fn notify_commit_submitted_batch( /// Waits for all pending L1 transaction receipts, validates them, logs balance/nonce /// metrics, and forwards the completed commands downstream. +#[allow(clippy::too_many_arguments)] async fn wait_for_txs_and_forward( pending_txs: Vec>, provider: &NodeProvider, @@ -927,6 +930,7 @@ where // SYSCOIN: nonce-reuse rebroadcast errors mean the original nonce may already be occupied. // Keep looking for the same-nonce tx instead of resubmitting the command at a later nonce or // re-arming a waiter for the dropped hash. +#[allow(clippy::too_many_arguments)] async fn recover_same_nonce_tx( provider: &NodeProvider, operator_address: Address, @@ -1031,6 +1035,7 @@ where // SYSCOIN: standard-RPC fallback for providers that do not implement sender+nonce lookup. // Scan recent mined blocks and accept only a transaction with the same sender, nonce, and calldata. +#[allow(clippy::too_many_arguments)] async fn find_matching_mined_sender_nonce_tx( provider: &NodeProvider, operator_address: Address, @@ -1133,6 +1138,7 @@ enum SameNonceTx { // SYSCOIN: if a rebroadcast reports nonce reuse, try to discover the tx currently occupying the // original sender nonce and track it only if it carries the same command calldata. +#[allow(clippy::too_many_arguments)] async fn find_matching_sender_nonce_tx( provider: &NodeProvider, operator_address: Address, diff --git a/lib/l1_watcher/src/util.rs b/lib/l1_watcher/src/util.rs index ea58430e6..7691e381f 100644 --- a/lib/l1_watcher/src/util.rs +++ b/lib/l1_watcher/src/util.rs @@ -209,11 +209,18 @@ pub async fn find_block_by_migration_number( )); let target = U256::from(migration_number); let latest = instance.provider().get_block_number().await?; - let latest_migration_number = instance + let latest_migration_number = match instance .migrationNumber(U256::from(chain_id)) .block(latest.into()) .call() - .await?; + .await + { + Ok(n) => n, + // Pre-V31 `ChainAssetHandler` does not expose `migrationNumber`. No Gateway migrations can + // exist in that era, so there is nothing to scan for — start from the latest block. + Err(err) if is_method_missing(&err) => return Ok(latest), + Err(err) => return Err(err.into()), + }; // If this migration has not been reached yet, return the latest block. if latest_migration_number < target { return Ok(latest); diff --git a/lib/merkle_tree_api/src/lib.rs b/lib/merkle_tree_api/src/lib.rs index 431a7a0a3..4b2969311 100644 --- a/lib/merkle_tree_api/src/lib.rs +++ b/lib/merkle_tree_api/src/lib.rs @@ -144,6 +144,14 @@ mod tests { }, ))) } + + fn prove_index( + &self, + _version: u64, + _index: u64, + ) -> anyhow::Result> { + Ok(None) + } } #[test] diff --git a/lib/rpc/src/config.rs b/lib/rpc/src/config.rs index 2176ab24e..d9ca77db3 100644 --- a/lib/rpc/src/config.rs +++ b/lib/rpc/src/config.rs @@ -1,22 +1,71 @@ +use crate::limits::Limits; use alloy::primitives::{Address, B256}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::time::Duration; -/// A per-method rate limit entry. -#[derive(Clone, Debug)] -pub struct RpcRateLimit { - /// Exact RPC method name, e.g. `"eth_call"`. - pub method: String, - /// Maximum number of requests per second across all callers combined. - pub requests_per_second: NonZeroU32, +/// Rate-limit configuration. +#[derive(Clone, Debug, Default)] +pub enum RateLimits { + /// No rate limiting. + #[default] + None, + /// One global cap, plus per-method buckets: `m_rps` applied to each entry in + /// `m_methods`, and the explicit RPS in `custom_methods` for each entry there. + /// + /// Production example: + /// + /// ```yaml + /// rpc: + /// rate_limits: + /// type: Tiered + /// global_rps: 1000 + /// m_rps: 200 + /// m_methods: + /// - eth_call + /// - eth_estimateGas + /// - eth_getBlockReceipts + /// - eth_fillTransaction + /// - zks_getProof + /// - ots_getBlockTransactions + /// - txpool_inspect + /// custom_methods: + /// eth_getLogs: 200 + /// eth_simulateV1: 1 + /// debug_traceTransaction: 10 + /// debug_traceCall: 10 + /// debug_traceBlockByHash: 10 + /// debug_traceBlockByNumber: 10 + /// zks_getL2ToL1LogProof: 10 + /// ots_searchTransactionsBefore: 10 + /// ots_searchTransactionsAfter: 10 + /// txpool_content: 10 + /// ``` + Tiered { + global_rps: NonZeroU32, + m_rps: NonZeroU32, + m_methods: HashSet, + custom_methods: HashMap, + }, } -impl From<(String, NonZeroU32)> for RpcRateLimit { - fn from((method, requests_per_second): (String, NonZeroU32)) -> Self { - Self { - method, - requests_per_second, +impl RateLimits { + pub(crate) fn into_limits(self) -> Limits { + match self { + Self::None => Limits::default(), + Self::Tiered { + global_rps, + m_rps, + m_methods, + custom_methods, + } => Limits { + global_rps: Some(global_rps), + methods: m_methods + .into_iter() + .map(|name| (name, m_rps)) + .chain(custom_methods) + .collect(), + }, } } } @@ -107,9 +156,13 @@ pub struct RpcConfig { // before mempool admission unless every referenced Bitcoin DA blob is retrievable. pub edge_da_admission: Option, - /// Per-method rate limits. Use `"*"` as the method name for a global limit applied before - /// per-method limits. Empty means no rate limiting. - pub rate_limits: Vec, + /// Rate limits for incoming requests. + pub rate_limits: RateLimits, + + /// List of disabled methods. + /// Some stateful methods like `eth_newFilter` don't make sense when running in a cluster behind a load-balancer. + /// They get rejected with -32601 "Method disabled". + pub method_filter: HashSet, } impl RpcConfig { diff --git a/lib/rpc/src/lib.rs b/lib/rpc/src/lib.rs index b92fe4477..165d1578f 100644 --- a/lib/rpc/src/lib.rs +++ b/lib/rpc/src/lib.rs @@ -2,7 +2,7 @@ mod call_fees; mod config; -pub use config::{EdgeDaAdmissionConfig, RpcConfig, RpcRateLimit}; +pub use config::{EdgeDaAdmissionConfig, RateLimits, RpcConfig}; use std::sync::Arc; use tokio::sync::{Semaphore, watch}; @@ -20,7 +20,9 @@ mod simulate; pub use rpc_storage::{ReadRpcStorage, RpcStorage}; mod debug_impl; pub mod js_tracer; +mod limits; mod log_proof_utils; +mod method_filter_middleware; mod monitoring_middleware; mod net_impl; mod rate_limit_middleware; @@ -39,10 +41,12 @@ use crate::debug_impl::DebugNamespace; use crate::eth_filter::EthFilterNamespace; use crate::eth_impl::EthNamespace; use crate::eth_pubsub_impl::EthPubsubNamespace; +use crate::limits::{Limiter, LoggingLimiter}; +use crate::method_filter_middleware::MethodFiltering; use crate::monitoring_middleware::Monitoring; use crate::net_impl::NetNamespace; use crate::ots_impl::OtsNamespace; -use crate::rate_limit_middleware::{RateLimiting, build_limiters}; +use crate::rate_limit_middleware::RateLimiting; use crate::txpool_impl::TxpoolNamespace; use crate::unstable_impl::UnstableNamespace; use crate::web3_impl::Web3Namespace; @@ -164,8 +168,9 @@ pub async fn spawn( let blocking_rpcs_semaphore = Arc::new(Semaphore::new( config.max_concurrent_blocking_rpcs.max(1) as usize, )); - // Build once so all connections share the same token-bucket state. - let limiters = build_limiters(&config.rate_limits); + let limiter = LoggingLimiter::new(Limiter::new(config.rate_limits.clone().into_limits())); + let rate_limit_logging = LoggingLimiter::run(limiter.clone()); + let method_filter = Arc::new(config.method_filter.clone()); let rpc_middleware = RpcServiceBuilder::new() // Monitoring is outermost so rate-limited responses still appear in error metrics. .layer_fn(move |service| { @@ -175,7 +180,8 @@ pub async fn spawn( blocking_rpcs_semaphore.clone(), ) }) - .layer_fn(move |service| RateLimiting::new(service, limiters.clone())); + .layer_fn(move |service| MethodFiltering::new(service, method_filter.clone())) + .layer_fn(move |service| RateLimiting::new(service, limiter.clone())); let server_config = ServerConfigBuilder::default() .max_connections(config.max_connections) @@ -217,6 +223,9 @@ pub async fn spawn( _ = eth_filter.watch_and_clear_stale_filters() => { unreachable!("eth_filter.watch_and_clear_stale_filters() is an infinite loop") } + _ = rate_limit_logging => { + unreachable!("LoggingLimiter::run is an infinite loop") + } // Graceful shutdown was requested; stop accepting RPC traffic and wait for the server to exit. _guard = &mut shutdown => { server_handle.stop().expect("failed to stop server"); diff --git a/lib/rpc/src/limits.rs b/lib/rpc/src/limits.rs new file mode 100644 index 000000000..6ce683b38 --- /dev/null +++ b/lib/rpc/src/limits.rs @@ -0,0 +1,93 @@ +use governor::clock::{Clock, DefaultClock, QuantaInstant}; +use governor::{DefaultDirectRateLimiter, NotUntil, Quota, RateLimiter}; +use std::collections::HashMap; +use std::convert::Infallible; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; +use tokio::time::interval; + +/// Rate-limit spec consumed by [`Limiter`] at construction. +#[derive(Clone, Debug, Default)] +pub struct Limits { + pub global_rps: Option, + pub methods: HashMap, +} + +fn bucket(rps: NonZeroU32) -> DefaultDirectRateLimiter { + RateLimiter::direct(Quota::per_second(rps)) +} + +fn retry_after(not_until: NotUntil) -> u64 { + let now = DefaultClock::default().now(); + not_until + .wait_time_from(now) + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + +/// Stateful enforcer for a [`Limits`] spec. Owns the token buckets; middleware calls `check` +/// per request to gate it. +pub struct Limiter { + global: Option, + per_method: HashMap, +} + +impl Limiter { + pub fn new(limits: Limits) -> Self { + let global = limits.global_rps.map(bucket); + let per_method = limits + .methods + .into_iter() + .map(|(name, rps)| (name, bucket(rps))) + .collect(); + Self { global, per_method } + } + + fn check_global(&self) -> Option { + self.global.as_ref()?.check().err().map(retry_after) + } + + fn check_per_method(&self, name: &str) -> Option { + self.per_method.get(name)?.check().err().map(retry_after) + } + + pub fn check(&self, method: &str) -> Option { + self.check_global() + .or_else(|| self.check_per_method(method)) + } +} + +/// Wraps a [`Limiter`] with a rolling rejection counter, drained into a 1/s log line. +pub struct LoggingLimiter { + inner: Limiter, + rejections: AtomicU64, +} + +impl LoggingLimiter { + pub fn new(inner: Limiter) -> Arc { + Arc::new(Self { + inner, + rejections: AtomicU64::new(0), + }) + } + + pub(crate) fn check(&self, method: &str) -> Option { + self.inner.check(method).inspect(|_| { + self.rejections.fetch_add(1, Ordering::Relaxed); + }) + } + + pub async fn run(this: Arc) -> Infallible { + let mut ticker = interval(Duration::from_secs(1)); + loop { + ticker.tick().await; + let count = this.rejections.swap(0, Ordering::Relaxed); + if count > 0 { + tracing::warn!(count, "rpc requests rate-limited in last 1s"); + } + } + } +} diff --git a/lib/rpc/src/method_filter_middleware.rs b/lib/rpc/src/method_filter_middleware.rs new file mode 100644 index 000000000..19b4b6912 --- /dev/null +++ b/lib/rpc/src/method_filter_middleware.rs @@ -0,0 +1,82 @@ +use jsonrpsee::MethodResponse; +use jsonrpsee::core::middleware::{Batch, Notification}; +use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceT}; +use jsonrpsee::types::Request; +use jsonrpsee::types::error::{ErrorObject, METHOD_NOT_FOUND_CODE}; +use std::collections::HashSet; +use std::sync::Arc; + +fn method_disabled_err() -> ErrorObject<'static> { + ErrorObject::owned(METHOD_NOT_FOUND_CODE, "Method disabled", None::<()>) +} + +/// JSON-RPC middleware that rejects filtered methods with -32601 and emits a warning. +#[derive(Clone)] +pub(crate) struct MethodFiltering { + inner: S, + filter: Arc>, +} + +impl MethodFiltering { + pub(crate) fn new(inner: S, filter: Arc>) -> Self { + Self { inner, filter } + } +} + +impl RpcServiceT for MethodFiltering +where + S: RpcServiceT< + MethodResponse = MethodResponse, + NotificationResponse = MethodResponse, + BatchResponse = MethodResponse, + > + Clone + + Send + + 'static, +{ + type MethodResponse = MethodResponse; + type NotificationResponse = MethodResponse; + type BatchResponse = MethodResponse; + + fn call<'a>( + &self, + request: Request<'a>, + ) -> impl Future + Send + 'a { + let rejected = self.filter.contains(request.method_name()); + let inner = self.inner.clone(); + async move { + if rejected { + tracing::warn!( + method = request.method_name(), + "rpc method rejected by filter", + ); + let id = request.id.clone().into_owned(); + return MethodResponse::error(id, method_disabled_err()); + } + inner.call(request).await + } + } + + fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + let inner = self.inner.clone(); + async move { inner.batch(batch).await } + } + + fn notification<'a>( + &self, + n: Notification<'a>, + ) -> impl Future + Send + 'a { + // SYSCOIN: notifications cannot return -32601, but filtered methods must not execute. + let rejected = self.filter.contains(n.method_name()); + let inner = self.inner.clone(); + async move { + if rejected { + tracing::warn!( + method = n.method_name(), + "rpc notification rejected by filter" + ); + return MethodResponse::notification(); + } + inner.notification(n).await + } + } +} diff --git a/lib/rpc/src/rate_limit_middleware.rs b/lib/rpc/src/rate_limit_middleware.rs index 6bc304272..a26ece783 100644 --- a/lib/rpc/src/rate_limit_middleware.rs +++ b/lib/rpc/src/rate_limit_middleware.rs @@ -1,6 +1,4 @@ -use crate::config::RpcRateLimit; -use governor::clock::{Clock, DefaultClock}; -use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; +use crate::limits::LoggingLimiter; use jsonrpsee::MethodResponse; use jsonrpsee::core::middleware::{Batch, Notification}; use jsonrpsee::core::to_json_raw_value; @@ -8,30 +6,11 @@ use jsonrpsee::server::middleware::rpc::{RpcService, RpcServiceT}; use jsonrpsee::types::Request; use jsonrpsee::types::error::ErrorObject; use serde::Serialize; -use std::collections::HashMap; use std::sync::Arc; /// EIP-1474 "Limit exceeded" — the de facto Ethereum rate-limit error code used by Infura, Alchemy, etc. const RATE_LIMIT_ERROR_CODE: i32 = -32005; -/// Pre-builds the shared limiter map from config. Pass the returned `Arc` to every -/// `RateLimiting::new` call so all connections share the same token-bucket state. -pub(crate) fn build_limiters( - limits: &[RpcRateLimit], -) -> Arc> { - Arc::new( - limits - .iter() - .map(|l| { - ( - l.method.clone(), - RateLimiter::direct(Quota::per_second(l.requests_per_second)), - ) - }) - .collect(), - ) -} - #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct RetryData { @@ -43,40 +22,16 @@ fn rate_limited_err(retry_after_ms: u64) -> ErrorObject<'static> { ErrorObject::owned(RATE_LIMIT_ERROR_CODE, "Too many requests", Some(data)) } -/// JSON-RPC middleware that enforces per-method request rate limits globally across all connections. -/// -/// Build the limiter map once with [`build_limiters`] and share it across connections via `Arc`. -/// Sit this layer inside `Monitoring` so rate-limited responses are counted in error metrics. -/// `Monitoring` decomposes batch requests by calling `call()` per entry, so batch items are -/// rate-limited automatically. Any method absent from the map is unrestricted. +/// JSON-RPC middleware that enforces per-method rate limits. #[derive(Clone)] pub(crate) struct RateLimiting { inner: S, - limiters: Arc>, + limiter: Arc, } impl RateLimiting { - pub(crate) fn new(inner: S, limiters: Arc>) -> Self { - Self { inner, limiters } - } - - fn retry_after_ms(&self, method_name: &str) -> Option { - // Global limit ("*") is checked first; if it fires the per-method limiter is not touched. - let now = DefaultClock::default().now(); - let not_until_global = self.limiters.get("*").and_then(|l| l.check().err()); - let not_until_method = if not_until_global.is_none() { - self.limiters.get(method_name).and_then(|l| l.check().err()) - } else { - None - }; - - not_until_global.or(not_until_method).map(|not_until| { - not_until - .wait_time_from(now) - .as_millis() - .try_into() - .unwrap_or(u64::MAX) - }) + pub(crate) fn new(inner: S, limiter: Arc) -> Self { + Self { inner, limiter } } } @@ -98,15 +53,12 @@ where &self, request: Request<'a>, ) -> impl Future + Send + 'a { - // Check synchronously before the async block to avoid holding a borrow across an await. - let retry_after_ms = self.retry_after_ms(request.method_name()); + let retry_after_ms = self.limiter.check(request.method_name()); let inner = self.inner.clone(); async move { if let Some(ms) = retry_after_ms { - return MethodResponse::error( - request.id.clone().into_owned(), - rate_limited_err(ms), - ); + let id = request.id.clone().into_owned(); + return MethodResponse::error(id, rate_limited_err(ms)); } inner.call(request).await } @@ -121,7 +73,8 @@ where &self, n: Notification<'a>, ) -> impl Future + Send + 'a { - let rate_limited = self.retry_after_ms(n.method_name()).is_some(); + // SYSCOIN: notifications have no response id, but still consume ingress budget. + let rate_limited = self.limiter.check(n.method_name()).is_some(); let inner = self.inner.clone(); async move { // JSON-RPC notifications have no id, so the server must not emit an error response. diff --git a/node/bin/Cargo.toml b/node/bin/Cargo.toml index 7efa182b9..b6134379f 100644 --- a/node/bin/Cargo.toml +++ b/node/bin/Cargo.toml @@ -79,6 +79,7 @@ reth-network-peers.workspace = true reth-net-nat.workspace = true anyhow.workspace = true +backon.workspace = true alloy = { workspace = true, default-features = false, features = [ "rlp", "eips", diff --git a/node/bin/src/batcher/bitcoin_da_finality_gate.rs b/node/bin/src/batcher/bitcoin_da_finality_gate.rs index 4b0547023..ecf68e467 100644 --- a/node/bin/src/batcher/bitcoin_da_finality_gate.rs +++ b/node/bin/src/batcher/bitcoin_da_finality_gate.rs @@ -360,7 +360,7 @@ impl BitcoinDaFinalityGate { for edge_ref in edge_refs { for version_hash in edge_ref.blob_version_hashes.chunks_exact(32) { let version_hash = hex::encode(version_hash); - self.wait_for_edge_ref_finality(&client, &version_hash) + self.wait_for_edge_ref_finality(client, &version_hash) .await?; } } diff --git a/node/bin/src/config/mod.rs b/node/bin/src/config/mod.rs index 7b95c36fe..e4977f8c4 100644 --- a/node/bin/src/config/mod.rs +++ b/node/bin/src/config/mod.rs @@ -1255,17 +1255,53 @@ pub struct RpcConfig { #[config(default_t = false)] pub enable_txpool_namespace: bool, - /// Per-method rate limits: map from RPC method name to max requests per second (across all - /// callers). Use `"*"` for a global limit applied before per-method limits. Methods absent - /// from the map are unrestricted. - /// - /// Accepts a JSON object or a comma-separated `method=rps` string, e.g. - /// `*=500,eth_call=100,debug_traceTransaction=5`. - #[config(default, with = Entries::WELL_KNOWN.delimited(",", "="), validate( - rate_limits_within_global, - "each per-method limit must not exceed the global `*` limit" - ))] - pub rate_limits: HashMap, + /// Rate limits for incoming JSON-RPC requests. + #[config(nest)] + pub rate_limits: RpcRateLimitsConfig, + + /// List of disabled methods. + /// Some stateful methods like `eth_newFilter` don't make sense when running in a cluster behind a load-balancer. + /// They get rejected with -32601 "Method disabled". + #[config(default, with = Delimited::new(","))] + pub method_filter: HashSet, +} + +/// Rate-limit configuration for the JSON-RPC server. +#[derive(Clone, Debug, DescribeConfig, DeserializeConfig)] +#[config(tag = "type", derive(Default))] +pub enum RpcRateLimitsConfig { + /// No rate limiting. + #[config(default)] + None, + /// One global cap, plus per-method buckets: `m_rps` applied to each entry in + /// `m_methods`, and the explicit RPS in `custom_methods` for each entry there. + Tiered { + global_rps: NonZeroU32, + m_rps: NonZeroU32, + #[config(default, with = Delimited::new(","))] + m_methods: HashSet, + #[config(default, with = Entries::WELL_KNOWN.delimited(",", "="))] + custom_methods: HashMap, + }, +} + +impl From for zksync_os_rpc::RateLimits { + fn from(c: RpcRateLimitsConfig) -> Self { + match c { + RpcRateLimitsConfig::None => Self::None, + RpcRateLimitsConfig::Tiered { + global_rps, + m_rps, + m_methods, + custom_methods, + } => Self::Tiered { + global_rps, + m_rps, + m_methods, + custom_methods, + }, + } + } } /// L1 sender configuration. The signing key fields are only required on the Main Node @@ -1411,16 +1447,6 @@ fn is_greater_than_one_f64(&val: &f64) -> bool { val > 1.0 } -fn rate_limits_within_global(limits: &HashMap) -> bool { - let Some(&global) = limits.get("*") else { - return true; - }; - limits - .iter() - .filter(|(k, _)| k.as_str() != "*") - .all(|(_, &v)| v <= global) -} - /// Gateway sender configuration. Used by the L1Sender pipeline components when the chain is /// currently settling on a Gateway (as discovered from the L1 settlement layer interval at /// startup). When the chain is settling on L1 directly, this config is ignored and @@ -2078,7 +2104,7 @@ pub struct BatchVerificationConfig { ))] pub threshold: u64, /// [main node] Accepted signer pubkeys. - #[config(default_t = vec![], with = Delimited::new(","))] + #[config(default, with = Delimited::new(","))] // SYSCOIN #[config_validate(custom( |root: &Config, value: &Vec| { @@ -2327,7 +2353,8 @@ impl From for zksync_os_rpc::RpcConfig { enable_debug_namespace: c.enable_debug_namespace, enable_txpool_namespace: c.enable_txpool_namespace, edge_da_admission: None, - rate_limits: c.rate_limits.into_iter().map(Into::into).collect(), + rate_limits: c.rate_limits.into(), + method_filter: c.method_filter, } } } diff --git a/node/bin/src/en_remote_config.rs b/node/bin/src/en_remote_config.rs index c6e440811..b6ec4669b 100644 --- a/node/bin/src/en_remote_config.rs +++ b/node/bin/src/en_remote_config.rs @@ -1,22 +1,17 @@ use crate::config::GenesisConfig; +use crate::main_node_client::MainNodeClient; use alloy::primitives::Address; use anyhow::Context; -use jsonrpsee::http_client::HttpClient; use std::sync::Arc; use zksync_os_genesis::{FileGenesisInputSource, GenesisInput, GenesisInputSource}; -use zksync_os_rpc_api::eth::EthApiClient; -use zksync_os_rpc_api::zks::ZksApiClient; /// Returns /// (bridgehub_address, bytecode_supplier_address, chain_id, genesis_input_source) pub async fn load_remote_config( - main_node_rpc_url: &str, + main_node_client: &MainNodeClient, en_local_genesis_config: &GenesisConfig, ) -> anyhow::Result<(Address, Address, u64, Arc)> { - let main_node_rpc_client = - jsonrpsee::http_client::HttpClientBuilder::new().build(main_node_rpc_url)?; - - let remote_bridgehub_address = main_node_rpc_client.get_bridgehub_contract().await?; + let remote_bridgehub_address = main_node_client.bridgehub_contract().await?; if let Some(local_bridgehub_address) = en_local_genesis_config.bridgehub_address { anyhow::ensure!( remote_bridgehub_address == local_bridgehub_address, @@ -24,10 +19,7 @@ pub async fn load_remote_config( ); } - let bytecode_supplier_address = match main_node_rpc_client - .get_bytecode_supplier_contract() - .await - { + let bytecode_supplier_address = match main_node_client.bytecode_supplier_contract().await { Ok(result) => { if let Some(local_bytecode_supplier_address) = en_local_genesis_config.bytecode_supplier_address @@ -51,7 +43,7 @@ pub async fn load_remote_config( }; let remote_chain_id: u64 = u64::from_be_bytes( - main_node_rpc_client + main_node_client .chain_id() .await? .context("missing chain_id")? @@ -65,7 +57,7 @@ pub async fn load_remote_config( } let main_node_genesis_input_source = - Arc::new(MainNodeGenesisInputSource::new(main_node_rpc_client)); + Arc::new(MainNodeGenesisInputSource::new(main_node_client.clone())); let genesis_input_source: Arc = if let Some(local_genesis_path) = en_local_genesis_config.genesis_input_path.clone() { let remote_genesis_input = main_node_genesis_input_source.genesis_input().await?; @@ -116,11 +108,11 @@ impl GenesisInputSource for CachedGenesisInputSource { #[derive(Debug)] pub struct MainNodeGenesisInputSource { - rpc_client: HttpClient, + rpc_client: MainNodeClient, } impl MainNodeGenesisInputSource { - pub fn new(rpc_client: HttpClient) -> Self { + pub fn new(rpc_client: MainNodeClient) -> Self { Self { rpc_client } } } @@ -128,7 +120,7 @@ impl MainNodeGenesisInputSource { #[async_trait::async_trait] impl GenesisInputSource for MainNodeGenesisInputSource { async fn genesis_input(&self) -> anyhow::Result { - let genesis = self.rpc_client.get_genesis().await?; + let genesis = self.rpc_client.genesis_input().await?; Ok(genesis) } } diff --git a/node/bin/src/lib.rs b/node/bin/src/lib.rs index bd3afbfe7..26b1899cd 100644 --- a/node/bin/src/lib.rs +++ b/node/bin/src/lib.rs @@ -12,6 +12,7 @@ pub mod default_protocol_version; mod en_remote_config; mod init_tx_forwarder; mod l1_revert; +mod main_node_client; mod node_state_on_startup; mod priority_tree_pipeline_step; pub mod prover_api; @@ -38,6 +39,7 @@ use crate::config::{ use crate::en_remote_config::load_remote_config; use crate::init_tx_forwarder::{build_consensus_tx_forwarder, build_static_tx_forwarder}; use crate::l1_revert::revert_l1_on_startup; +use crate::main_node_client::MainNodeClient; use crate::node_state_on_startup::NodeStateOnStartup; use crate::prover_api::fake_fri_provers_pool::FakeFriProversPool; use crate::prover_api::fri_job_manager::FriJobManager; @@ -56,7 +58,6 @@ use alloy::consensus::BlobTransactionSidecar; use alloy::primitives::{Address, BlockHash, BlockNumber}; use alloy::providers::Provider; use anyhow::Context; -use jsonrpsee::http_client::HttpClient; use priority_tree_pipeline_step::PriorityTreePipelineStep; use reth_tasks::Runtime; use secrecy::ExposeSecret; @@ -113,7 +114,6 @@ use zksync_os_replay_archive::{ use zksync_os_reth_compat::provider::ZkProviderFactory; use zksync_os_revm_consistency_checker::node::RevmConsistencyChecker; use zksync_os_rpc::{EthCallHandler, RpcStorage}; -use zksync_os_rpc_api::eth::EthApiClient; use zksync_os_sequencer::execution::block_context_provider::{BlockContextProvider, LastBlockSeed}; use zksync_os_sequencer::execution::{BlockApplier, BlockCanonizer, BlockExecutor, FeeProvider}; use zksync_os_status_server::run_status_server; @@ -314,10 +314,6 @@ pub async fn run = @@ -357,12 +360,10 @@ pub async fn run anyhow::Result { async fn check( repo: &RepositoryManager, - main_node_client: &HttpClient, + main_node_client: &MainNodeClient, block_number: u64, ) -> anyhow::Result { let local_hash = repo @@ -2257,14 +2257,12 @@ async fn find_last_matching_main_node_block( } } - let main_node_rpc_client = - jsonrpsee::http_client::HttpClientBuilder::new().build(main_node_rpc_url)?; let last_block = repo.get_latest_block(); // Check last block first. Unless there was a reorg recently, this should return quickly. - if check(repo, &main_node_rpc_client, last_block).await? { + if check(repo, main_node_client, last_block).await? { return Ok(last_block); } - if !check(repo, &main_node_rpc_client, 0).await? { + if !check(repo, main_node_client, 0).await? { panic!("Genesis block mismatch between EN and main node"); } @@ -2274,7 +2272,7 @@ async fn find_last_matching_main_node_block( while left < right { #[allow(clippy::manual_div_ceil)] let mid = (left + right + 1) / 2; - if check(repo, &main_node_rpc_client, mid).await? { + if check(repo, main_node_client, mid).await? { left = mid; } else { right = mid - 1; diff --git a/node/bin/src/main_node_client.rs b/node/bin/src/main_node_client.rs new file mode 100644 index 000000000..c5ac01ff4 --- /dev/null +++ b/node/bin/src/main_node_client.rs @@ -0,0 +1,78 @@ +use alloy::eips::BlockNumberOrTag; +use alloy::primitives::{Address, U64}; +use backon::{ExponentialBuilder, Retryable}; +use jsonrpsee::core::ClientError; +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use std::future::Future; +use std::time::Duration; +use zksync_os_genesis::GenesisInput; +use zksync_os_rpc_api::eth::EthApiClient; +use zksync_os_rpc_api::types::ZkApiBlock; +use zksync_os_rpc_api::zks::ZksApiClient; + +/// No cap: wait for the main node instead of crash-looping. +const RETRY: ExponentialBuilder = ExponentialBuilder::new() + .with_min_delay(Duration::from_secs(1)) + .with_max_delay(Duration::from_secs(30)) + .without_max_times(); + +/// Whether `err` means "couldn't reach the main node" (worth retrying) rather than "the main node +/// answered with an error" (e.g. an old main node missing a method) or a local decoding error. +fn is_transient(err: &ClientError) -> bool { + matches!( + err, + ClientError::Transport(_) + | ClientError::RequestTimeout + | ClientError::RestartNeeded(_) + | ClientError::ServiceDisconnect + ) +} + +async fn with_retry(call: impl FnMut() -> Fut) -> Result +where + Fut: Future>, +{ + call.retry(RETRY) + .when(is_transient) + .notify(|err, after| { + tracing::warn!(%err, ?after, "main node unreachable; retrying in {after:?}: {err}") + }) + .await +} + +#[derive(Clone, Debug)] +pub struct MainNodeClient { + rpc: HttpClient, +} + +impl MainNodeClient { + pub fn new(url: &str) -> anyhow::Result { + Ok(Self { + rpc: HttpClientBuilder::new().build(url)?, + }) + } + + pub async fn bridgehub_contract(&self) -> Result { + with_retry(|| self.rpc.get_bridgehub_contract()).await + } + + pub async fn bytecode_supplier_contract(&self) -> Result { + with_retry(|| self.rpc.get_bytecode_supplier_contract()).await + } + + pub async fn chain_id(&self) -> Result, ClientError> { + with_retry(|| self.rpc.chain_id()).await + } + + pub async fn genesis_input(&self) -> Result { + with_retry(|| self.rpc.get_genesis()).await + } + + pub async fn block_by_number( + &self, + number: BlockNumberOrTag, + full: bool, + ) -> Result, ClientError> { + with_retry(|| self.rpc.block_by_number(number, full)).await + } +} diff --git a/node/bin/src/prover_api/fri_job_manager.rs b/node/bin/src/prover_api/fri_job_manager.rs index 594f09168..1c718f328 100644 --- a/node/bin/src/prover_api/fri_job_manager.rs +++ b/node/bin/src/prover_api/fri_job_manager.rs @@ -591,7 +591,7 @@ mod tests { async fn proof_storage_for_test() -> anyhow::Result { let dir = TempDir::new()?; let config = ProofStorageConfig { - path: dir.into_path(), + path: dir.keep(), ..ProofStorageConfig::default() }; ProofStorage::new(config).await diff --git a/node/bin/src/prover_api/fri_proof_verifier.rs b/node/bin/src/prover_api/fri_proof_verifier.rs index b412c6223..40f3bf7ff 100644 --- a/node/bin/src/prover_api/fri_proof_verifier.rs +++ b/node/bin/src/prover_api/fri_proof_verifier.rs @@ -8,7 +8,7 @@ pub fn verify_real_fri_proof_bytes( proof_bytes: &[u8], ) -> Result<(), SubmitError> { let program_proof = bincode::serde::decode_from_slice(proof_bytes, bincode::config::standard()) - .map_err(|err| SubmitError::DeserializationFailed(err))? + .map_err(SubmitError::DeserializationFailed)? .0; verify_fri_proof(previous_state_commitment, stored_batch_info, program_proof) diff --git a/node/bin/src/prover_api/fri_proving_pipeline_step.rs b/node/bin/src/prover_api/fri_proving_pipeline_step.rs index 6bc12c70d..5b324cf50 100644 --- a/node/bin/src/prover_api/fri_proving_pipeline_step.rs +++ b/node/bin/src/prover_api/fri_proving_pipeline_step.rs @@ -400,7 +400,7 @@ mod tests { async fn proof_storage_for_test() -> anyhow::Result { let dir = TempDir::new()?; let config = ProofStorageConfig { - path: dir.into_path(), + path: dir.keep(), ..ProofStorageConfig::default() }; ProofStorage::new(config).await diff --git a/node/bin/src/prover_api/snark_proving_pipeline_step.rs b/node/bin/src/prover_api/snark_proving_pipeline_step.rs index 94f7c2c4f..302f5a63b 100644 --- a/node/bin/src/prover_api/snark_proving_pipeline_step.rs +++ b/node/bin/src/prover_api/snark_proving_pipeline_step.rs @@ -100,19 +100,19 @@ impl SnarkProvingPipelineStep { return false; } // SYSCOIN - if let FriProof::Real(real) = &batch.data { - if let Err(err) = fri_proof_verifier::verify_real_fri_proof_bytes( + if let FriProof::Real(real) = &batch.data + && let Err(err) = fri_proof_verifier::verify_real_fri_proof_bytes( batch.batch.previous_stored_batch_info.state_commitment, local_stored_batch, real.proof(), - ) { - tracing::warn!( - batch_number = expected_batch_number, - ?err, - "skipping SNARK rehydration due to invalid stored FRI proof" - ); - return false; - } + ) + { + tracing::warn!( + batch_number = expected_batch_number, + ?err, + "skipping SNARK rehydration due to invalid stored FRI proof" + ); + return false; } true diff --git a/scripts/explorer/blockscout/deploy-zksys-en-rpc.sh b/scripts/explorer/blockscout/deploy-zksys-en-rpc.sh index 55b2e3b92..ff0f79230 100755 --- a/scripts/explorer/blockscout/deploy-zksys-en-rpc.sh +++ b/scripts/explorer/blockscout/deploy-zksys-en-rpc.sh @@ -488,9 +488,24 @@ for instance in (public, debug): ] if instance["rate_limits"]: insert_at = lines.index("status_server:") - rate_lines = [" rate_limits:"] - for method, rps in instance["rate_limits"].items(): - rate_lines.append(f" {q(method)}: {rps}") + # SYSCOIN: preserve the existing method=rps env-var input by mapping it to + # upstream's tagged Tiered config. An input "*=rps" remains the global cap; + # otherwise use a sentinel that avoids adding a practical global cap. + max_rps = 2**32 - 1 + custom_limits = { + method: rps for method, rps in instance["rate_limits"].items() if method != "*" + } + global_rps = instance["rate_limits"].get("*", max_rps) + rate_lines = [ + " rate_limits:", + " type: Tiered", + f" global_rps: {global_rps}", + f" m_rps: {max_rps}", + ] + if custom_limits: + rate_lines.append(" custom_methods:") + for method, rps in custom_limits.items(): + rate_lines.append(f" {q(method)}: {rps}") lines[insert_at:insert_at] = rate_lines write_secret(config_path, "\n".join(lines) + "\n") write_start_script(