Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ async-trait = "0.1.88"
modular-bitfield = "0.11.2"
reth = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
reth-basic-payload-builder = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
reth-chain-state = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
reth-chainspec = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
reth-cli = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
reth-cli-commands = { git = "https://github.com/paradigmxyz/reth", rev = "536bebfcd" }
Expand Down Expand Up @@ -118,6 +119,7 @@ ignored = ["modular-bitfield"]
reth = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-rpc = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-basic-payload-builder = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-chain-state = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-chainspec = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-cli = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
reth-cli-commands = { git = "https://github.com/rezbera/reth", branch = "rezbera/modular-flashblocks" }
Expand Down
27 changes: 27 additions & 0 deletions src/rpc/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use reth::{
pool::{BlockingTaskGuard, BlockingTaskPool},
},
};
use reth_chain_state::BlockState;
use reth_optimism_flashblocks::{FlashBlockBuildInfo, FlashblocksListeners, PendingFlashBlock};
use reth_primitives_traits::{Recovered, WithEncoded};
use reth_rpc_eth_api::{
Expand All @@ -35,6 +36,7 @@ use reth_rpc_eth_types::{
EthApiError, EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock,
builder::config::PendingBlockKind, error::FromEvmError,
};
use reth_storage_api::StateProviderFactory;
use reth_transaction_pool::PoolPooledTx;
use std::{sync::Arc, time::Duration};
use tokio::time;
Expand Down Expand Up @@ -700,4 +702,29 @@ where
fn pending_block_kind(&self) -> PendingBlockKind {
self.inner.pending_block_kind()
}

/// Returns a state provider for the pending flashblock state.
///
/// This overlays the flashblock execution output on top of the parent block's historical state,
/// allowing RPC methods like `eth_getBalance`, `eth_getCode`, etc. to query pending state
/// when called with the "pending" block tag.
async fn local_pending_state(
&self,
) -> Result<Option<reth_storage_api::StateProviderBox>, Self::Error>
where
Self: SpawnBlocking,
{
let Ok(Some(pending_block)) = self.pending_flashblock().await else {
return Ok(None);
};

let latest_historical = self
.provider()
.history_by_block_hash(pending_block.block().parent_hash())
.map_err(Into::<EthApiError>::into)?;

let state = BlockState::from(pending_block);

Ok(Some(Box::new(state.state_provider(latest_historical))))
}
}
122 changes: 120 additions & 2 deletions tests/e2e/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

use crate::e2e::{setup_test_boilerplate, test_signer};
use alloy_consensus::BlockHeader;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{Address, B256, Bloom, Bytes, U256};
use alloy_eips::{BlockId, eip2718::Encodable2718};
use alloy_primitives::{Address, B256, Bloom, Bytes, TxKind, U256};
use alloy_provider::Provider;
use alloy_rpc_types_eth::TransactionRequest;
use bera_reth::{
engine::validator::BerachainEngineValidatorBuilder,
flashblocks::{
Expand Down Expand Up @@ -233,6 +234,123 @@ async fn test_rpc_returns_flashblock_pending_receipt() -> eyre::Result<()> {
Ok(())
}

/// Tests that state-reading RPC methods return flashblock pending state.
///
/// This test verifies that when a flashblock contains a transfer transaction:
/// - `eth_getBalance` returns the recipient's updated balance
/// - `eth_getTransactionCount` returns the sender's incremented nonce
#[tokio::test]
async fn test_rpc_returns_flashblock_pending_state() -> eyre::Result<()> {
let (tasks, chain_spec) = setup_test_boilerplate().await?;
let executor = tasks.executor();

let (pending_tx, pending_rx) = watch::channel(None);
let (in_progress_tx, in_progress_rx) = watch::channel(None);
let (unused_sequence_tx, _) =
broadcast::channel::<FlashBlockCompleteSequence<BerachainFlashblockPayload>>(1);
let (unused_received_tx, _) = broadcast::channel::<Arc<BerachainFlashblockPayload>>(1);

let listeners: FlashblocksListeners<BerachainPrimitives, BerachainFlashblockPayload> =
FlashblocksListeners::new(
pending_rx,
unused_sequence_tx,
in_progress_rx,
unused_received_tx,
);

let eth_api_builder = BerachainEthApiBuilder::default().with_flashblocks_listeners(listeners);
let add_ons = BerachainAddOns::<_, _, BerachainEngineValidatorBuilder>::new(eth_api_builder);

let node_config = NodeConfig::new(chain_spec.clone())
.with_unused_ports()
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http());

let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(executor.clone())
.with_types::<BerachainNode>()
.with_components(BerachainNode::default().components_builder())
.with_add_ons(add_ons)
.launch()
.await?;

let (fb_tx, fb_rx) = tokio::sync::mpsc::channel::<BerachainFlashblockPayload>(128);
let stream = MockFlashblockStream { rx: fb_rx };

let service = FlashBlockService::new(
stream,
node.evm_config.clone(),
node.provider().clone(),
executor.clone(),
false,
);

let mut service_in_progress_rx = service.subscribe_in_progress();

executor.spawn_critical(
"flashblock-service",
Box::pin(async move {
service.run(pending_tx).await;
}),
);

let latest = node.provider().latest_header()?.expect("should have genesis");
let latest_hash = latest.hash();
let next_block = latest.number() + 1;
let next_timestamp = latest.timestamp() + 2;

// Create a transfer transaction to a fresh recipient address
let recipient = Address::random();
let transfer_value = U256::from(100);

let signer = test_signer()?;
let sender = signer.address();
let chain_id = chain_spec.chain_id();

// Build a transfer transaction with a specific recipient
let tx_request = TransactionRequest {
nonce: Some(0),
value: Some(transfer_value),
to: Some(TxKind::Call(recipient)),
gas: Some(21000),
max_fee_per_gas: Some(20e9 as u128),
max_priority_fee_per_gas: Some(20e9 as u128),
chain_id: Some(chain_id),
..Default::default()
};
let tx = TransactionTestContext::sign_tx(signer, tx_request).await;
let tx_bytes = Bytes::from(tx.encoded_2718());

let payload_id = PayloadId::new([2u8; 8]);
let mut fb0 = create_test_flashblock(0, next_block, payload_id, latest_hash, next_timestamp);
fb0.diff.transactions = vec![tx_bytes];
fb0.diff.gas_used = 21000;

// Inject the flashblock
fb_tx.send(fb0).await?;

// Wait for service to signal it's building
tokio::time::timeout(Duration::from_millis(500), service_in_progress_rx.changed()).await??;
in_progress_tx.send(*service_in_progress_rx.borrow())?;

// Give the service time to build and publish the pending block
tokio::time::sleep(Duration::from_millis(100)).await;

let rpc_url =
format!("http://127.0.0.1:{}", node.rpc_server_handle().http_local_addr().unwrap().port());
let client = alloy_provider::ProviderBuilder::new().connect(&rpc_url).await?;

// Check recipient balance with "pending" block tag - should reflect the pending transfer
let balance = client.get_balance(recipient).block_id(BlockId::pending()).await?;
assert_eq!(balance, transfer_value);

// Verify that the nonce is 1 more in flashblock state
let nonce_latest = client.get_transaction_count(sender).block_id(BlockId::latest()).await?;
let nonce_pending = client.get_transaction_count(sender).block_id(BlockId::pending()).await?;
assert_eq!(nonce_pending, nonce_latest + 1);

Ok(())
}

/// Tests that flashblocks with invalid parent hashes are rejected.
#[tokio::test]
async fn test_flashblock_rejects_invalid_parent_hash() -> eyre::Result<()> {
Expand Down
Loading