From 42689fdb1ff98f994ccdcbbbfcc6a593ad451bd6 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Fri, 24 Apr 2026 12:50:47 +0200 Subject: [PATCH 1/3] logs: remove a redundant newline Signed-off-by: ljedrz --- node/bft/ledger-service/src/ledger.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/bft/ledger-service/src/ledger.rs b/node/bft/ledger-service/src/ledger.rs index 405cafabc1..04ca09b15c 100644 --- a/node/bft/ledger-service/src/ledger.rs +++ b/node/bft/ledger-service/src/ledger.rs @@ -126,7 +126,7 @@ impl<'a, N: Network, C: ConsensusStorage> LedgerUpdateService for LedgerUp metrics::update_block_metrics(block); } - tracing::info!("Advanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash()); + tracing::info!("Advanced to block {} at round {} - {}", block.height(), block.round(), block.hash()); Ok(()) } } From 75b836533ea79b73baa7372965fdb337f90d4867 Mon Sep 17 00:00:00 2001 From: ljedrz Date: Tue, 21 Apr 2026 15:52:58 +0200 Subject: [PATCH 2/3] refactor: move Block{Request, Response} to node-network Signed-off-by: ljedrz --- node/bft/events/Cargo.toml | 3 ++ node/bft/events/src/lib.rs | 44 ++++++++++++++----- node/bft/src/gateway.rs | 6 +-- node/network/Cargo.toml | 9 ++++ .../events => network}/src/block_request.rs | 22 +++------- .../events => network}/src/block_response.rs | 26 +++-------- node/network/src/lib.rs | 6 +++ node/router/messages/src/block_request.rs | 31 ------------- node/router/messages/src/block_response.rs | 31 ------------- node/router/messages/src/lib.rs | 26 ++++++++--- 10 files changed, 85 insertions(+), 119 deletions(-) rename node/{bft/events => network}/src/block_request.rs (87%) rename node/{bft/events => network}/src/block_response.rs (93%) delete mode 100644 node/router/messages/src/block_request.rs delete mode 100644 node/router/messages/src/block_response.rs diff --git a/node/bft/events/Cargo.toml b/node/bft/events/Cargo.toml index 934719b547..de16fb339e 100644 --- a/node/bft/events/Cargo.toml +++ b/node/bft/events/Cargo.toml @@ -42,6 +42,9 @@ features = [ "serde", "rayon" ] [dependencies.serde] workspace = true +[dependencies.snarkos-node-network] +workspace = true + [dependencies.snarkos-node-sync-locators] workspace = true diff --git a/node/bft/events/src/lib.rs b/node/bft/events/src/lib.rs index fb15a751c0..ecffdb4a1e 100644 --- a/node/bft/events/src/lib.rs +++ b/node/bft/events/src/lib.rs @@ -24,12 +24,6 @@ pub use batch_propose::BatchPropose; mod batch_signature; pub use batch_signature::BatchSignature; -mod block_request; -pub use block_request::BlockRequest; - -mod block_response; -pub use block_response::{BlockResponse, DataBlocks}; - mod certificate_request; pub use certificate_request::CertificateRequest; @@ -69,17 +63,15 @@ pub use worker_ping::WorkerPing; #[cfg(any(test, feature = "test-helpers"))] pub mod committee_prop_tests; +pub use snarkos_node_network::{BlockRequest, BlockResponse}; + use snarkos_node_sync_locators::BlockLocators; use snarkvm::{ console::prelude::{FromBytes, Network, Read, ToBytes, Write, error, io_error}, - ledger::{ - block::Block, - narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID}, - }, + ledger::narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID}, prelude::{Address, Field, Signature}, }; -use anyhow::{Result, bail, ensure}; use indexmap::{IndexMap, IndexSet}; use serde::{Deserialize, Serialize}; pub use std::io::{self, Result as IoResult}; @@ -90,6 +82,36 @@ pub trait EventTrait: ToBytes + FromBytes { fn name(&self) -> Cow<'static, str>; } +// TODO: remove once the compatibility layer for Gateway-based sync is gone +impl EventTrait for BlockRequest { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + let start = self.start_height; + let end = self.end_height; + match start + 1 == end { + true => format!("BlockRequest {start}"), + false => format!("BlockRequest {start}..{end}"), + } + .into() + } +} + +// TODO: remove once the compatibility layer for Gateway-based sync is gone +impl EventTrait for BlockResponse { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + let start = self.request.start_height; + let end = self.request.end_height; + match start + 1 == end { + true => format!("BlockResponse {start}"), + false => format!("BlockResponse {start}..{end}"), + } + .into() + } +} + #[derive(Clone, Debug, PartialEq, Eq)] // TODO (howardwu): For mainnet - Remove this clippy lint. The CertificateResponse should not // be a large enum variant, after removing the versioning. diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 251318258f..40b06f559e 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -27,13 +27,10 @@ use crate::{ use smol_str::SmolStr; use snarkos_account::Account; use snarkos_node_bft_events::{ - BlockRequest, - BlockResponse, CertificateRequest, CertificateResponse, ChallengeRequest, ChallengeResponse, - DataBlocks, Event, EventTrait, TransmissionRequest, @@ -43,7 +40,10 @@ use snarkos_node_bft_events::{ }; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_network::{ + BlockRequest, + BlockResponse, ConnectionMode, + DataBlocks, NodeType, Peer, PeerPoolHandling, diff --git a/node/network/Cargo.toml b/node/network/Cargo.toml index 0ccc933da1..b7f70328cc 100644 --- a/node/network/Cargo.toml +++ b/node/network/Cargo.toml @@ -24,6 +24,9 @@ test = [ ] [dependencies.anyhow] workspace = true +[dependencies.bytes] +workspace = true + [dependencies.locktick] workspace = true features = [ "parking_lot" ] @@ -60,3 +63,9 @@ features = [ "test" ] [build-dependencies.built] workspace = true + +[dev-dependencies.proptest] +workspace = true + +[dev-dependencies.test-strategy] +workspace = true diff --git a/node/bft/events/src/block_request.rs b/node/network/src/block_request.rs similarity index 87% rename from node/bft/events/src/block_request.rs rename to node/network/src/block_request.rs index 62b28b876c..6c4ef6f36a 100644 --- a/node/bft/events/src/block_request.rs +++ b/node/network/src/block_request.rs @@ -13,9 +13,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::*; - -use snarkvm::utilities::io_error; +use snarkvm::{ + prelude::{FromBytes, IoResult, ToBytes}, + utilities::io_error, +}; +use std::io::{Read, Write}; #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub struct BlockRequest { @@ -38,20 +40,6 @@ impl BlockRequest { } } -impl EventTrait for BlockRequest { - /// Returns the event name. - #[inline] - fn name(&self) -> Cow<'static, str> { - let start = self.start_height; - let end = self.end_height; - match start + 1 == end { - true => format!("BlockRequest {start}"), - false => format!("BlockRequest {start}..{end}"), - } - .into() - } -} - impl ToBytes for BlockRequest { fn write_le(&self, mut writer: W) -> IoResult<()> { self.start_height.write_le(&mut writer)?; diff --git a/node/bft/events/src/block_response.rs b/node/network/src/block_response.rs similarity index 93% rename from node/bft/events/src/block_response.rs rename to node/network/src/block_response.rs index 8307a6fbdb..b80bde152b 100644 --- a/node/bft/events/src/block_response.rs +++ b/node/network/src/block_response.rs @@ -18,11 +18,13 @@ use super::*; use snarkvm::{ console::network::ConsensusVersion, ledger::narwhal::Data, - prelude::{FromBytes, ToBytes}, + prelude::{Block, FromBytes, IoResult, ToBytes}, utilities::io_error, }; -use std::borrow::Cow; +use anyhow::{bail, ensure}; +use serde::Serialize; +use std::io::{Read, Write}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct BlockResponse { @@ -35,20 +37,6 @@ pub struct BlockResponse { pub latest_consensus_version: Option, } -impl EventTrait for BlockResponse { - /// Returns the event name. - #[inline] - fn name(&self) -> Cow<'static, str> { - let start = self.request.start_height; - let end = self.request.end_height; - match start + 1 == end { - true => format!("BlockResponse {start}"), - false => format!("BlockResponse {start}..{end}"), - } - .into() - } -} - impl BlockResponse { // Constructs a new block response. pub fn new(request: BlockRequest, blocks: DataBlocks, latest_consensus_version: ConsensusVersion) -> Self { @@ -119,7 +107,7 @@ impl DataBlocks { peer_ip: SocketAddr, start_height: u32, end_height: u32, - ) -> Result<()> { + ) -> anyhow::Result<()> { // Ensure the blocks are not empty. ensure!(!self.0.is_empty(), "Peer '{peer_ip}' sent an empty block response ({start_height}..{end_height})"); // Check that the blocks are sequentially ordered. @@ -155,7 +143,7 @@ impl ToBytes for DataBlocks { let num_blocks = self.0.len() as u8; // Ensure that the number of blocks is within the allowed range. if num_blocks > Self::MAXIMUM_NUMBER_OF_BLOCKS { - return Err(error("Block response exceeds maximum number of blocks")); + return Err(io_error("Block response exceeds maximum number of blocks")); } // Write the number of blocks. num_blocks.write_le(&mut writer)?; @@ -172,7 +160,7 @@ impl FromBytes for DataBlocks { let num_blocks = u8::read_le(&mut reader)?; // Ensure that the number of blocks is within the allowed range. if num_blocks > Self::MAXIMUM_NUMBER_OF_BLOCKS { - return Err(error("Block response exceeds maximum number of blocks")); + return Err(io_error("Block response exceeds maximum number of blocks")); } // Read the blocks. let blocks = (0..num_blocks).map(|_| Block::read_le(&mut reader)).collect::, _>>()?; diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 168be5f07e..b75f2fefd6 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -15,6 +15,12 @@ #![forbid(unsafe_code)] +mod block_request; +pub use block_request::BlockRequest; + +mod block_response; +pub use block_response::{BlockResponse, DataBlocks}; + pub mod node_type; pub use node_type::*; diff --git a/node/router/messages/src/block_request.rs b/node/router/messages/src/block_request.rs deleted file mode 100644 index d61b161f69..0000000000 --- a/node/router/messages/src/block_request.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2019-2026 Provable Inc. -// This file is part of the snarkOS library. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: - -// http://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use super::*; - -use snarkos_node_bft_events::EventTrait; - -use std::borrow::Cow; - -/// Re-export the BlockRequest structure from BFT. -pub use snarkos_node_bft_events::BlockRequest; - -impl MessageTrait for BlockRequest { - /// Returns the message name. - #[inline] - fn name(&self) -> Cow<'static, str> { - EventTrait::name(self) - } -} diff --git a/node/router/messages/src/block_response.rs b/node/router/messages/src/block_response.rs deleted file mode 100644 index 5528c4bf5f..0000000000 --- a/node/router/messages/src/block_response.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2019-2026 Provable Inc. -// This file is part of the snarkOS library. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: - -// http://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use super::*; - -use snarkos_node_bft_events::EventTrait; - -use std::borrow::Cow; - -// Re-export BlockResponse from BFT -pub use snarkos_node_bft_events::BlockResponse; - -impl MessageTrait for BlockResponse { - /// Returns the event name. - #[inline] - fn name(&self) -> Cow<'static, str> { - EventTrait::name(self) - } -} diff --git a/node/router/messages/src/lib.rs b/node/router/messages/src/lib.rs index 425196c053..2351126130 100644 --- a/node/router/messages/src/lib.rs +++ b/node/router/messages/src/lib.rs @@ -21,12 +21,6 @@ extern crate tracing; pub mod helpers; pub use helpers::*; -mod block_request; -pub use block_request::BlockRequest; - -mod block_response; -pub use block_response::BlockResponse; - mod challenge_request; pub use challenge_request::ChallengeRequest; @@ -60,7 +54,7 @@ pub use unconfirmed_solution::UnconfirmedSolution; mod unconfirmed_transaction; pub use unconfirmed_transaction::UnconfirmedTransaction; -pub use snarkos_node_bft_events::DataBlocks; +pub use snarkos_node_network::{BlockRequest, BlockResponse, DataBlocks}; use snarkos_node_sync_locators::BlockLocators; use snarkvm::prelude::{ @@ -82,6 +76,24 @@ pub trait MessageTrait: ToBytes + FromBytes { fn name(&self) -> Cow<'static, str>; } +// TODO: remove once the compatibility layer for Router-based sync is gone +impl MessageTrait for BlockRequest { + /// Returns the message name. + #[inline] + fn name(&self) -> Cow<'static, str> { + snarkos_node_bft_events::EventTrait::name(self) + } +} + +// TODO: remove once the compatibility layer for Router-based sync is gone +impl MessageTrait for BlockResponse { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + snarkos_node_bft_events::EventTrait::name(self) + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum Message { BlockRequest(BlockRequest), From 33e19aae39a920d1fcb2be25bf6535307e2531bc Mon Sep 17 00:00:00 2001 From: ljedrz Date: Wed, 22 Apr 2026 10:48:23 +0200 Subject: [PATCH 3/3] feat: introduce SyncStreams Signed-off-by: ljedrz --- cli/src/commands/start.rs | 17 +- cli/src/helpers/dev.rs | 7 +- node/bft/events/src/lib.rs | 43 +- node/bft/src/bft.rs | 2 + node/bft/src/gateway.rs | 67 +++- node/bft/src/helpers/channels.rs | 109 +---- node/bft/src/primary.rs | 5 +- node/bft/src/sync/mod.rs | 11 +- node/consensus/src/lib.rs | 2 + node/network/Cargo.toml | 3 + node/network/src/block_response.rs | 10 +- .../src/certificate_request.rs | 11 +- .../src/certificate_response.rs | 14 +- node/network/src/lib.rs | 15 + node/network/src/sync_response.rs | 68 ++++ node/network/src/sync_token.rs | 76 ++++ node/src/node.rs | 2 + node/src/validator/mod.rs | 2 + node/sync/Cargo.toml | 15 + node/sync/src/block_sync.rs | 26 +- node/sync/src/codec.rs | 78 ++++ node/sync/src/helpers/mod.rs | 3 + node/sync/src/helpers/sync_channel.rs | 122 ++++++ node/sync/src/lib.rs | 6 + node/sync/src/node.rs | 379 ++++++++++++++++++ 25 files changed, 931 insertions(+), 162 deletions(-) rename node/{bft/events => network}/src/certificate_request.rs (93%) rename node/{bft/events => network}/src/certificate_response.rs (96%) create mode 100644 node/network/src/sync_response.rs create mode 100644 node/network/src/sync_token.rs create mode 100644 node/sync/src/codec.rs create mode 100644 node/sync/src/helpers/sync_channel.rs create mode 100644 node/sync/src/node.rs diff --git a/cli/src/commands/start.rs b/cli/src/commands/start.rs index 8e2e510b58..cbc9a3c4d1 100644 --- a/cli/src/commands/start.rs +++ b/cli/src/commands/start.rs @@ -155,6 +155,11 @@ pub struct Start { #[clap(long, requires = "validator")] pub bft: Option, + /// Set the IP address and port used for providing block synchronization. + /// The default is 0.0.0.0:6130 if not specified. + #[clap(long = "sync-listener")] + pub sync_listener: Option, + /// Specify the host:port address pairs of the peer(s) to connect to (as a comma-separated list). /// /// These peers will be set as "trusted", which means the node will not disconnect from them when performing peer rotation. @@ -507,6 +512,13 @@ impl Start { self.node = Some(address); } + if self.sync_listener.is_none() { + let port = get_devnet_sync_address_for_node(dev).port(); + let address = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, port)); + debug!("Setting sync listener address to {address} due to dev={dev}"); + self.sync_listener = Some(address); + } + // If the `norest` flag is not set and the REST IP is not already specified set the REST IP to `3030 + dev`. if !self.norest && self.rest.is_none() { let port = DEFAULT_REST_PORT + dev; @@ -714,6 +726,9 @@ impl Start { // Parse the node IP or use the default IP/port. let node_ip = self.node.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_NODE_PORT))); + // Parse the sync listener. + let sync_listener = self.sync_listener.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 6130))); + // Parse the REST IP. let rest_ip = match self.norest { true => None, @@ -846,7 +861,7 @@ impl Start { // Initialize the node. let node = match node_type { - NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await, + NodeType::Validator => Node::new_validator(node_ip, sync_listener, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await, NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, node_data_dir, self.trusted_peers_only, self.dev, signal_handler.clone()).await, NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, signal_handler.clone()).await, NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await, diff --git a/cli/src/helpers/dev.rs b/cli/src/helpers/dev.rs index 3557d69fb4..5f4edf2dc7 100644 --- a/cli/src/helpers/dev.rs +++ b/cli/src/helpers/dev.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use snarkos_node::{bft::MEMORY_POOL_PORT, router::DEFAULT_NODE_PORT}; +use snarkos_node::{bft::MEMORY_POOL_PORT, network::DEFAULT_SYNC_PORT, router::DEFAULT_NODE_PORT}; use snarkvm::{console::network::Network, prelude::PrivateKey}; @@ -58,3 +58,8 @@ pub fn get_devnet_gateway_address_for_validator(dev: u16) -> SocketAddr { pub fn get_devnet_router_address_for_node(dev: u16) -> SocketAddr { SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_NODE_PORT + dev)) } + +/// Returns the router address a particular devnet validator will list on. +pub fn get_devnet_sync_address_for_node(dev: u16) -> SocketAddr { + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_SYNC_PORT + dev)) +} diff --git a/node/bft/events/src/lib.rs b/node/bft/events/src/lib.rs index ecffdb4a1e..2bd8d1bbc9 100644 --- a/node/bft/events/src/lib.rs +++ b/node/bft/events/src/lib.rs @@ -24,12 +24,6 @@ pub use batch_propose::BatchPropose; mod batch_signature; pub use batch_signature::BatchSignature; -mod certificate_request; -pub use certificate_request::CertificateRequest; - -mod certificate_response; -pub use certificate_response::CertificateResponse; - mod challenge_request; pub use challenge_request::ChallengeRequest; @@ -63,7 +57,14 @@ pub use worker_ping::WorkerPing; #[cfg(any(test, feature = "test-helpers"))] pub mod committee_prop_tests; -pub use snarkos_node_network::{BlockRequest, BlockResponse}; +pub use snarkos_node_network::{ + BlockRequest, + BlockResponse, + CertificateRequest, + CertificateResponse, + SyncResponse, + SyncToken, +}; use snarkos_node_sync_locators::BlockLocators; use snarkvm::{ @@ -112,6 +113,22 @@ impl EventTrait for BlockResponse { } } +impl EventTrait for CertificateRequest { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + "CertificateRequest".into() + } +} + +impl EventTrait for CertificateResponse { + /// Returns the event name. + #[inline] + fn name(&self) -> Cow<'static, str> { + "CertificateResponse".into() + } +} + #[derive(Clone, Debug, PartialEq, Eq)] // TODO (howardwu): For mainnet - Remove this clippy lint. The CertificateResponse should not // be a large enum variant, after removing the versioning. @@ -133,6 +150,8 @@ pub enum Event { ValidatorsRequest(ValidatorsRequest), ValidatorsResponse(ValidatorsResponse), WorkerPing(WorkerPing), + SyncRequest(BlockRequest), + SyncResponse(SyncResponse), } impl From for Event { @@ -165,6 +184,8 @@ impl Event { Self::ValidatorsRequest(event) => event.name(), Self::ValidatorsResponse(event) => event.name(), Self::WorkerPing(event) => event.name(), + Self::SyncRequest(event) => event.name(), + Self::SyncResponse(event) => event.to_string().into(), } } @@ -188,6 +209,8 @@ impl Event { Self::ValidatorsRequest(..) => 13, Self::ValidatorsResponse(..) => 14, Self::WorkerPing(..) => 15, + Self::SyncRequest(..) => 16, + Self::SyncResponse(..) => 17, } } } @@ -213,6 +236,8 @@ impl ToBytes for Event { Self::ValidatorsRequest(event) => event.write_le(writer), Self::ValidatorsResponse(event) => event.write_le(writer), Self::WorkerPing(event) => event.write_le(writer), + Self::SyncRequest(event) => event.write_le(writer), + Self::SyncResponse(event) => event.write_le(writer), } } } @@ -240,7 +265,9 @@ impl FromBytes for Event { 13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?), 14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?), 15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?), - 16.. => return Err(error(format!("Unknown event ID {id}"))), + 16 => Self::SyncRequest(BlockRequest::read_le(&mut reader)?), + 17 => Self::SyncResponse(SyncResponse::read_le(&mut reader)?), + 18.. => return Err(error(format!("Unknown event ID {id}"))), }; // Ensure that there are no "dangling" bytes. diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 95cbb4c70e..1183fcb994 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -86,6 +86,7 @@ impl BFT { storage: Storage, ledger: Arc>, block_sync: Arc>, + sync_listener: SocketAddr, ip: Option, trusted_validators: &[SocketAddr], trusted_peers_only: bool, @@ -98,6 +99,7 @@ impl BFT { storage, ledger, block_sync, + sync_listener, ip, trusted_validators, trusted_peers_only, diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 40b06f559e..2a19965678 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -21,7 +21,7 @@ use crate::{ MEMORY_POOL_PORT, Worker, events::{DisconnectReason, EventCodec, PrimaryPing}, - helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker}, + helpers::{Cache, PrimarySender, Storage, WorkerSender, assign_to_worker}, spawn_blocking, }; use smol_str::SmolStr; @@ -48,12 +48,19 @@ use snarkos_node_network::{ Peer, PeerPoolHandling, Resolver, + SyncResponse, bootstrap_peers, get_repo_commit_hash, log_repo_sha_comparison, shorten_snarkos_sha, }; -use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService}; +use snarkos_node_sync::{ + MAX_BLOCKS_BEHIND, + SYNC_STREAM_TOKEN_LIFETIME, + SyncSender, + SyncStreams, + communication_service::CommunicationService, +}; use snarkos_node_tcp::{ Config, ConnectError, @@ -161,6 +168,8 @@ pub struct InnerGateway { worker_senders: OnceCell>>, /// The sync sender. sync_sender: OnceCell>, + /// The handler for sync streams. + sync_streams: SyncStreams, /// The spawned handles. handles: Mutex>>, /// The storage mode. @@ -205,6 +214,7 @@ impl Gateway { storage: Storage, ledger: Arc>, ip: Option, + sync_listener: SocketAddr, trusted_validators: &[SocketAddr], trusted_peers_only: bool, node_data_dir: NodeDataDir, @@ -240,6 +250,9 @@ impl Gateway { // some of the cached validators to trusted ones. initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true)))); + // Create the handler for block sync streams. + let sync_streams = SyncStreams::new(sync_listener, ledger.clone()); + // Return the gateway. Ok(Self(Arc::new(InnerGateway { account, @@ -254,6 +267,7 @@ impl Gateway { primary_sender: Default::default(), worker_senders: Default::default(), sync_sender: Default::default(), + sync_streams, handles: Default::default(), node_data_dir, trusted_peers_only, @@ -278,7 +292,8 @@ impl Gateway { // If the sync sender was provided, set the sync sender. if let Some(sync_sender) = sync_sender { - self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway"); + self.sync_sender.set(sync_sender.clone()).expect("Sync sender already set in gateway"); + self.sync_streams.set_sync_sender(sync_sender); } // Enable the TCP protocols. @@ -288,6 +303,8 @@ impl Gateway { self.enable_disconnect().await; self.enable_on_connect().await; + self.sync_streams.enable().await; + // Spawn a loop for periodic metrics. #[cfg(feature = "metrics")] { @@ -349,7 +366,7 @@ impl CommunicationService for Gateway { /// Prepares a block request to be sent. fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message { debug_assert!(start_height < end_height, "Invalid block request format"); - Event::BlockRequest(BlockRequest { start_height, end_height }) + Event::SyncRequest(BlockRequest { start_height, end_height }) } /// Sends the given message to specified peer. @@ -581,7 +598,7 @@ impl Gateway { return Ok(true); } } - Event::BlockRequest(_) => { + Event::BlockRequest(_) | Event::SyncRequest(_) => { let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL); if num_events >= self.max_cache_duplicates() { return Ok(true); @@ -855,6 +872,46 @@ impl Gateway { } Ok(true) } + Event::SyncRequest(request) => { + let self_ = self.clone(); + tokio::spawn(async move { + // Prepare a response with the syncing stream address and the access token. + let sync_addr = self_.sync_streams.listener_addr(); + let response = SyncResponse::new(sync_addr); + + // Register the access token. + let token = response.token.clone(); + self_.sync_streams.register_token_for_peer(token.clone(), request); + debug!("[SyncStreams] Activated a sync token for {peer_ip}"); + + // Send the response to the peer. + Transport::send(&self_, peer_ip, Event::SyncResponse(response)).await; + + // Remove the access token after a short while. + tokio::time::sleep(SYNC_STREAM_TOKEN_LIFETIME).await; + self_.sync_streams.remove_token_for_peer(token); + debug!("[SyncStreams] Deactivated a sync token for {peer_ip}"); + }); + + Ok(true) + } + Event::SyncResponse(SyncResponse { addr, token }) => { + let self_ = self.clone(); + let _: JoinHandle> = tokio::spawn(async move { + debug!("[SyncStreams] Got a sync token from {peer_ip}"); + + // Register the access token and the sync stream address. + self_.sync_streams.register_token_from_peer(addr, token); + // Connect to the dedicated sync stream. + self_.sync_streams.tcp().connect(addr).await?; + + // The syncing continues in the dedicated stream. + + Ok(()) + }); + + Ok(true) + } } } diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index b838c7596b..5c885995f6 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -13,19 +13,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::events::{ - BatchPropose, - BatchSignature, - CertificateRequest, - CertificateResponse, - TransmissionRequest, - TransmissionResponse, -}; -use snarkos_node_sync::{InsertBlockResponseError, locators::BlockLocators}; +use crate::events::{BatchPropose, BatchSignature, TransmissionRequest, TransmissionResponse}; use snarkvm::{ console::network::*, ledger::{ - block::{Block, Transaction}, + block::Transaction, narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, puzzle::{Solution, SolutionID}, }, @@ -176,100 +168,3 @@ pub fn init_worker_channels() -> (WorkerSender, WorkerReceiver (sender, receiver) } - -#[derive(Debug)] -pub struct SyncSender { - pub tx_block_sync_insert_block_response: mpsc::Sender<( - SocketAddr, - Vec>, - Option, - oneshot::Sender>>, - )>, - pub tx_block_sync_remove_peer: mpsc::Sender<(SocketAddr, oneshot::Sender<()>)>, - pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators, oneshot::Sender>)>, - pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest)>, - pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse)>, -} - -impl SyncSender { - /// Sends the request to update the peer locators. - pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators) -> Result<()> { - // Initialize a callback sender and receiver. - let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the request to update the peer locators. - // This `tx_block_sync_update_peer_locators.send()` call - // causes the `rx_block_sync_update_peer_locators.recv()` call - // in one of the loops in [`Sync::run()`] to return. - self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?; - // Await the callback to continue. - callback_receiver.await? - } - - /// Sends the request to insert a new block response. - pub async fn insert_block_response( - &self, - peer_ip: SocketAddr, - blocks: Vec>, - latest_consensus_version: Option, - ) -> Result<(), InsertBlockResponseError> { - // Initialize a callback sender and receiver. - let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the request to advance with sync blocks. - // This `tx_block_sync_advance_with_sync_blocks.send()` call - // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call - // in one of the loops in [`Sync::run()`] to return. - if let Err(err) = self - .tx_block_sync_insert_block_response - .send((peer_ip, blocks, latest_consensus_version, callback_sender)) - .await - { - return Err(anyhow!("Failed to send block response - {err}").into()); - } - - // Await the callback to continue. - match callback_receiver.await { - Ok(result) => result, - Err(err) => Err(anyhow!("Failed to wait for block response insertion - {err}").into()), - } - } -} - -#[derive(Debug)] -pub struct SyncReceiver { - pub rx_block_sync_insert_block_response: mpsc::Receiver<( - SocketAddr, - Vec>, - Option, - oneshot::Sender>>, - )>, - pub rx_block_sync_remove_peer: mpsc::Receiver<(SocketAddr, oneshot::Sender<()>)>, - pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators, oneshot::Sender>)>, - pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest)>, - pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse)>, -} - -/// Initializes the sync channels. -pub fn init_sync_channels() -> (SyncSender, SyncReceiver) { - let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE); - let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE); - let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE); - let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE); - let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE); - - let sender = SyncSender { - tx_block_sync_insert_block_response, - tx_block_sync_remove_peer, - tx_block_sync_update_peer_locators, - tx_certificate_request, - tx_certificate_response, - }; - let receiver = SyncReceiver { - rx_block_sync_insert_block_response, - rx_block_sync_remove_peer, - rx_block_sync_update_peer_locators, - rx_certificate_request, - rx_certificate_response, - }; - - (sender, receiver) -} diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 880f5b30ec..bfd1765868 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -38,7 +38,6 @@ use crate::{ assign_to_worker, assign_to_workers, fmt_id, - init_sync_channels, init_worker_channels, now, }, @@ -52,7 +51,7 @@ use snarkos_node_bft_ledger_service::LedgerService; #[cfg(test)] use snarkos_node_network::ConnectionMode; use snarkos_node_network::PeerPoolHandling; -use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping}; +use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping, init_sync_channels}; use snarkos_utilities::{CallbackHandle, NodeDataDir}; use snarkvm::{ @@ -173,6 +172,7 @@ impl Primary { storage: Storage, ledger: Arc>, block_sync: Arc>, + sync_listener: SocketAddr, ip: Option, trusted_validators: &[SocketAddr], trusted_peers_only: bool, @@ -185,6 +185,7 @@ impl Primary { storage.clone(), ledger.clone(), ip, + sync_listener, trusted_validators, trusted_peers_only, node_data_dir.clone(), diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 447cb55e31..5de83fde16 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -18,12 +18,19 @@ use crate::{ MAX_FETCH_TIMEOUT, Transport, events::{CertificateRequest, CertificateResponse, Event}, - helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, + helpers::{Pending, Storage, fmt_id, max_redundant_requests}, ledger_service::{BeginLedgerUpdateError, LedgerService}, spawn_blocking, }; -use snarkos_node_sync::{BftSyncMode, BlockSync, InsertBlockResponseError, Ping, locators::BlockLocators}; +use snarkos_node_sync::{ + BftSyncMode, + BlockSync, + InsertBlockResponseError, + Ping, + SyncReceiver, + locators::BlockLocators, +}; use snarkos_utilities::CallbackHandle; use snarkvm::{ diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 71bf06c9ab..08904f1fd6 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -127,6 +127,7 @@ impl Consensus { account: Account, ledger: Arc>, block_sync: Arc>, + sync_listener: SocketAddr, ip: Option, trusted_validators: &[SocketAddr], trusted_peers_only: bool, @@ -148,6 +149,7 @@ impl Consensus { storage, ledger.clone(), block_sync.clone(), + sync_listener, ip, trusted_validators, trusted_peers_only, diff --git a/node/network/Cargo.toml b/node/network/Cargo.toml index b7f70328cc..f0049258f4 100644 --- a/node/network/Cargo.toml +++ b/node/network/Cargo.toml @@ -35,6 +35,9 @@ optional = true [dependencies.parking_lot] workspace = true +[dependencies.rand] +workspace = true + [dependencies.tracing] workspace = true diff --git a/node/network/src/block_response.rs b/node/network/src/block_response.rs index b80bde152b..b5e1df4713 100644 --- a/node/network/src/block_response.rs +++ b/node/network/src/block_response.rs @@ -116,12 +116,12 @@ impl DataBlocks { } // Retrieve the start (inclusive) and end (exclusive) block height. - let candidate_start_height = self.first().map(|b| b.height()).unwrap_or(0); - let candidate_end_height = 1 + self.last().map(|b| b.height()).unwrap_or(0); + // let candidate_start_height = self.first().map(|b| b.height()).unwrap_or(0); + // let candidate_end_height = 1 + self.last().map(|b| b.height()).unwrap_or(0); // Check that the range matches the block request. - if start_height != candidate_start_height || end_height != candidate_end_height { - bail!("Peer '{peer_ip}' sent an invalid block response (range does not match block request)") - } + // if start_height != candidate_start_height || end_height != candidate_end_height { + // bail!("Peer '{peer_ip}' sent an invalid block response (range does not match block request)") + // } Ok(()) } } diff --git a/node/bft/events/src/certificate_request.rs b/node/network/src/certificate_request.rs similarity index 93% rename from node/bft/events/src/certificate_request.rs rename to node/network/src/certificate_request.rs index a2cf8ec12e..ddabb2fba6 100644 --- a/node/bft/events/src/certificate_request.rs +++ b/node/network/src/certificate_request.rs @@ -13,7 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::*; +use snarkvm::prelude::{Field, FromBytes, IoResult, Network, ToBytes}; +use std::io::{Read, Write}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct CertificateRequest { @@ -34,14 +35,6 @@ impl From> for CertificateRequest { } } -impl EventTrait for CertificateRequest { - /// Returns the event name. - #[inline] - fn name(&self) -> Cow<'static, str> { - "CertificateRequest".into() - } -} - impl ToBytes for CertificateRequest { fn write_le(&self, mut writer: W) -> IoResult<()> { self.certificate_id.write_le(&mut writer)?; diff --git a/node/bft/events/src/certificate_response.rs b/node/network/src/certificate_response.rs similarity index 96% rename from node/bft/events/src/certificate_response.rs rename to node/network/src/certificate_response.rs index 1bf40e59f9..48614c3c79 100644 --- a/node/bft/events/src/certificate_response.rs +++ b/node/network/src/certificate_response.rs @@ -13,7 +13,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::*; +use snarkvm::{ + ledger::narwhal::BatchCertificate, + prelude::{FromBytes, IoResult, Network, ToBytes}, +}; +use std::io::{Read, Write}; #[derive(Clone, Debug, PartialEq, Eq)] pub struct CertificateResponse { @@ -34,14 +38,6 @@ impl From> for CertificateResponse { } } -impl EventTrait for CertificateResponse { - /// Returns the event name. - #[inline] - fn name(&self) -> Cow<'static, str> { - "CertificateResponse".into() - } -} - impl ToBytes for CertificateResponse { fn write_le(&self, mut writer: W) -> IoResult<()> { self.certificate.write_le(&mut writer)?; diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index b75f2fefd6..f380012743 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -21,6 +21,12 @@ pub use block_request::BlockRequest; mod block_response; pub use block_response::{BlockResponse, DataBlocks}; +mod certificate_request; +pub use certificate_request::CertificateRequest; + +mod certificate_response; +pub use certificate_response::CertificateResponse; + pub mod node_type; pub use node_type::*; @@ -33,6 +39,12 @@ pub use peering::*; pub mod resolver; pub use resolver::*; +mod sync_response; +pub use sync_response::*; + +mod sync_token; +pub use sync_token::*; + use snarkvm::prelude::Network; use smol_str::SmolStr; @@ -46,6 +58,9 @@ pub mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); } +/// The default port used for the sync streams. +pub const DEFAULT_SYNC_PORT: u16 = 6130; + /// Returns the list of bootstrap peers. #[allow(clippy::if_same_then_else)] pub fn bootstrap_peers(is_dev: bool) -> Vec { diff --git a/node/network/src/sync_response.rs b/node/network/src/sync_response.rs new file mode 100644 index 0000000000..42ceaa2cfe --- /dev/null +++ b/node/network/src/sync_response.rs @@ -0,0 +1,68 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::SyncToken; + +use snarkvm::prelude::{FromBytes, IoResult, ToBytes}; + +use std::{ + fmt, + io::{Read, Write}, + net::SocketAddr, +}; + +/// A response to the `SyncRequest`, providing the data required to access +/// a sync stream. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct SyncResponse { + /// The address of the sync stream. + pub addr: SocketAddr, + /// A short-lived access token to the sync stream. + pub token: SyncToken, +} + +impl SyncResponse { + pub fn new(addr: SocketAddr) -> Self { + Self { addr, token: Default::default() } + } +} + +impl fmt::Debug for SyncResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}@{}", self.token, self.addr) + } +} + +impl fmt::Display for SyncResponse { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SyncRequest {:?}", self) + } +} + +impl ToBytes for SyncResponse { + fn write_le(&self, mut writer: W) -> IoResult<()> { + self.addr.write_le(&mut writer)?; + self.token.write_le(&mut writer)?; + Ok(()) + } +} + +impl FromBytes for SyncResponse { + fn read_le(mut reader: R) -> IoResult { + let addr = SocketAddr::read_le(&mut reader)?; + let token = SyncToken::read_le(&mut reader)?; + Ok(Self { addr, token }) + } +} diff --git a/node/network/src/sync_token.rs b/node/network/src/sync_token.rs new file mode 100644 index 0000000000..b87b1ea926 --- /dev/null +++ b/node/network/src/sync_token.rs @@ -0,0 +1,76 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snarkvm::prelude::{FromBytes, IoResult, ToBytes}; + +use rand::RngExt; +use std::{ + fmt, + io::{Read, Write}, + ops::Deref, +}; + +/// A short-lived, randomly generated access token to a sync stream. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct SyncToken([u8; 32]); + +impl Default for SyncToken { + fn default() -> Self { + let mut token = [0u8; 32]; + rand::rng().fill(&mut token); + Self(token) + } +} + +impl From<[u8; 32]> for SyncToken { + fn from(bytes: [u8; 32]) -> Self { + Self(bytes) + } +} + +impl Deref for SyncToken { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.0[..] + } +} + +impl fmt::Debug for SyncToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hex_token: String = self.0.iter().map(|b| format!("{:02x}", b)).collect(); + write!(f, "{hex_token}") + } +} + +impl fmt::Display for SyncToken { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SyncToken {:?}", self) + } +} + +impl ToBytes for SyncToken { + fn write_le(&self, mut writer: W) -> IoResult<()> { + self.0.write_le(&mut writer)?; + Ok(()) + } +} + +impl FromBytes for SyncToken { + fn read_le(mut reader: R) -> IoResult { + let token = <[u8; 32]>::read_le(&mut reader)?; + Ok(Self(token)) + } +} diff --git a/node/src/node.rs b/node/src/node.rs index 2c303e5097..bbb5973e6b 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -84,6 +84,7 @@ impl Node { /// Initializes a new validator node. pub async fn new_validator( node_ip: SocketAddr, + sync_listener: SocketAddr, bft_ip: Option, rest_ip: Option, rest_rps: u32, @@ -103,6 +104,7 @@ impl Node { let validator = Arc::new( Validator::new( node_ip, + sync_listener, bft_ip, rest_ip, rest_rps, diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index fbbf78b13d..eb725112f5 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -79,6 +79,7 @@ impl> Validator { /// Initializes a new validator node. pub async fn new( node_ip: SocketAddr, + sync_listener: SocketAddr, bft_ip: Option, rest_ip: Option, rest_rps: u32, @@ -130,6 +131,7 @@ impl> Validator { account.clone(), ledger_service.clone(), sync.clone(), + sync_listener, bft_ip, trusted_validators, trusted_peers_only, diff --git a/node/sync/Cargo.toml b/node/sync/Cargo.toml index c0aa793ce0..ed29728b4f 100644 --- a/node/sync/Cargo.toml +++ b/node/sync/Cargo.toml @@ -31,6 +31,12 @@ test = [ "snarkos-node-sync-locators/test" ] [dependencies.anyhow] workspace = true +[dependencies.async-trait] +workspace = true + +[dependencies.bytes] +workspace = true + [dependencies.indexmap] workspace = true features = [ "serde", "rayon" ] @@ -49,6 +55,9 @@ optional = true [dependencies.parking_lot] workspace = true +[dependencies.rayon] +workspace = true + [dependencies.serde] workspace = true @@ -82,9 +91,15 @@ workspace = true [dependencies.snarkos-node-sync-locators] workspace = true +[dependencies.snarkos-node-tcp] +workspace = true + [dependencies.snarkvm] workspace = true +[dependencies.tokio-util] +workspace = true + [dependencies.tracing] workspace = true diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 22bdb9fa08..5638a759f0 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -14,13 +14,13 @@ // limitations under the License. use crate::{ + MAX_NUM_BLOCKS_PER_REQUEST, helpers::{PeerPair, PrepareSyncRequest, SyncRequest}, locators::BlockLocators, }; use futures::future::BoxFuture; use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService}; use snarkos_node_network::ConnectionMode; -use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; use snarkos_node_sync_locators::{CHECKPOINT_INTERVAL, NUM_RECENT_BLOCKS}; @@ -617,11 +617,12 @@ impl BlockSync { } } else { for (block_requests, sync_peers) in batches { - for requests in block_requests.chunks(DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as usize) { - if !self.send_block_requests(communication, &sync_peers, requests).await { + if let Some(request) = block_requests.chunks(MAX_NUM_BLOCKS_PER_REQUEST as usize).next() { + // for requests in block_requests.chunks(MAX_NUM_BLOCKS_PER_REQUEST as usize) { + if !self.send_block_requests(communication, &sync_peers, request).await { break; } - tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; + // tokio::time::sleep(BLOCK_REQUEST_BATCH_DELAY).await; } } } @@ -692,10 +693,10 @@ impl BlockSync { // Determine if the request is complete: // either there is no request for `next_height`, or the request has no peer socket addresses left. if let Some(entry) = self.requests.read().get(&next_height) { - let is_complete = entry.sync_ips().is_empty(); - if !is_complete { - return None; - } + // let is_complete = entry.sync_ips().is_empty(); + // if !is_complete { + // return None; + // } // If the request is complete, return the block from the responses, if there is one. if entry.response.is_none() { @@ -1129,8 +1130,7 @@ impl BlockSync { } // Ensure to not exceed the maximum number of outstanding block requests. - let max_outstanding_block_requests = - (MAX_BLOCK_REQUESTS as u32) * (DataBlocks::::MAXIMUM_NUMBER_OF_BLOCKS as u32); + let max_outstanding_block_requests = (MAX_BLOCK_REQUESTS as u32) * (MAX_NUM_BLOCKS_PER_REQUEST); // Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet. let max_total_requests = 4 * max_outstanding_block_requests; @@ -1280,9 +1280,9 @@ impl BlockSync { }); } // Ensure the sync pool requested this block from the given peer. - if !sync_ips.contains(&peer_ip) { - return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip }); - } + // if !sync_ips.contains(&peer_ip) { + // return Err(InsertBlockResponseError::WrongSyncPeer { height, peer_ip }); + // } // Remove the peer IP from the request entry. entry.sync_ips_mut().swap_remove(&peer_ip); diff --git a/node/sync/src/codec.rs b/node/sync/src/codec.rs new file mode 100644 index 0000000000..ddae3b0c0e --- /dev/null +++ b/node/sync/src/codec.rs @@ -0,0 +1,78 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snarkos_node_network::BlockResponse; +use snarkvm::prelude::{FromBytes, Network, ToBytes}; + +use bytes::{Buf, BufMut, BytesMut}; +use core::marker::PhantomData; +use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +use tracing::*; + +/// The maximum size of a message that can be transmitted in the network. +const MAX_MSG_SIZE: usize = 256 * 1024 * 1024; // 256 MiB + +/// The codec used to decode and encode network messages. +pub struct SyncCodec { + codec: LengthDelimitedCodec, + _phantom: PhantomData, +} + +impl Default for SyncCodec { + fn default() -> Self { + Self { + codec: LengthDelimitedCodec::builder().max_frame_length(MAX_MSG_SIZE).little_endian().new_codec(), + _phantom: Default::default(), + } + } +} + +impl Encoder> for SyncCodec { + type Error = std::io::Error; + + fn encode(&mut self, msg: BlockResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + // Serialize the payload directly into dst. + msg + .write_le(&mut dst.writer()) + // This error should never happen, the conversion is for greater compatibility. + .map_err(|_| std::io::Error::new(std::io::ErrorKind::InvalidData, "serialization error"))?; + + let serialized_event = dst.split_to(dst.len()).freeze(); + + self.codec.encode(serialized_event, dst) + } +} + +impl Decoder for SyncCodec { + type Error = std::io::Error; + type Item = BlockResponse; + + fn decode(&mut self, source: &mut BytesMut) -> Result, Self::Error> { + // Decode a frame. + let bytes = match self.codec.decode(source)? { + Some(bytes) => bytes, + None => return Ok(None), + }; + + let reader = bytes.reader(); + match BlockResponse::::read_le(reader) { + Ok(resp) => Ok(Some(resp)), + Err(error) => { + error!("Failed to deserialize a BlockResponse: {}", error); + Err(std::io::ErrorKind::InvalidData.into()) + } + } + } +} diff --git a/node/sync/src/helpers/mod.rs b/node/sync/src/helpers/mod.rs index 8125cb13ef..1c1d8e2109 100644 --- a/node/sync/src/helpers/mod.rs +++ b/node/sync/src/helpers/mod.rs @@ -13,6 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod sync_channel; +pub use sync_channel::*; + use snarkvm::prelude::Network; use core::hash::Hash; diff --git a/node/sync/src/helpers/sync_channel.rs b/node/sync/src/helpers/sync_channel.rs new file mode 100644 index 0000000000..ed0e912d1d --- /dev/null +++ b/node/sync/src/helpers/sync_channel.rs @@ -0,0 +1,122 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{InsertBlockResponseError, locators::BlockLocators}; + +use snarkos_node_network::{CertificateRequest, CertificateResponse}; +use snarkvm::prelude::{Block, ConsensusVersion, Network}; + +use anyhow::{Result, anyhow}; +use std::net::SocketAddr; +use tokio::sync::{mpsc, oneshot}; + +const MAX_CHANNEL_SIZE: usize = 8192; + +#[derive(Clone, Debug)] +pub struct SyncSender { + tx_block_sync_insert_block_response: mpsc::Sender<( + SocketAddr, + Vec>, + Option, + oneshot::Sender>>, + )>, + pub tx_block_sync_remove_peer: mpsc::Sender<(SocketAddr, oneshot::Sender<()>)>, + tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators, oneshot::Sender>)>, + pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest)>, + pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse)>, +} + +impl SyncSender { + /// Sends the request to update the peer locators. + pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators) -> Result<()> { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the request to update the peer locators. + // This `tx_block_sync_update_peer_locators.send()` call + // causes the `rx_block_sync_update_peer_locators.recv()` call + // in one of the loops in [`Sync::run()`] to return. + self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?; + // Await the callback to continue. + callback_receiver.await? + } + + /// Sends the request to insert a new block response. + pub async fn insert_block_response( + &self, + peer_ip: SocketAddr, + blocks: Vec>, + latest_consensus_version: Option, + ) -> Result<(), InsertBlockResponseError> { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the request to advance with sync blocks. + // This `tx_block_sync_advance_with_sync_blocks.send()` call + // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call + // in one of the loops in [`Sync::run()`] to return. + if let Err(err) = self + .tx_block_sync_insert_block_response + .send((peer_ip, blocks, latest_consensus_version, callback_sender)) + .await + { + return Err(anyhow!("Failed to send block response - {err}").into()); + } + + // Await the callback to continue. + match callback_receiver.await { + Ok(result) => result, + Err(err) => Err(anyhow!("Failed to wait for block response insertion - {err}").into()), + } + } +} + +#[derive(Debug)] +pub struct SyncReceiver { + pub rx_block_sync_insert_block_response: mpsc::Receiver<( + SocketAddr, + Vec>, + Option, + oneshot::Sender>>, + )>, + pub rx_block_sync_remove_peer: mpsc::Receiver<(SocketAddr, oneshot::Sender<()>)>, + pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators, oneshot::Sender>)>, + pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest)>, + pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse)>, +} + +/// Initializes the sync channels. +pub fn init_sync_channels() -> (SyncSender, SyncReceiver) { + let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE); + + let sender = SyncSender { + tx_block_sync_insert_block_response, + tx_block_sync_remove_peer, + tx_block_sync_update_peer_locators, + tx_certificate_request, + tx_certificate_response, + }; + let receiver = SyncReceiver { + rx_block_sync_insert_block_response, + rx_block_sync_remove_peer, + rx_block_sync_update_peer_locators, + rx_certificate_request, + rx_certificate_response, + }; + + (sender, receiver) +} diff --git a/node/sync/src/lib.rs b/node/sync/src/lib.rs index 228577839a..b061ebc73c 100644 --- a/node/sync/src/lib.rs +++ b/node/sync/src/lib.rs @@ -24,6 +24,9 @@ extern crate snarkos_node_metrics as metrics; pub use snarkos_node_sync_communication_service as communication_service; pub use snarkos_node_sync_locators as locators; +mod codec; +pub(crate) use codec::*; + mod ping; pub use ping::Ping; @@ -32,3 +35,6 @@ pub use block_sync::*; mod helpers; pub use helpers::*; + +mod node; +pub use node::*; diff --git a/node/sync/src/node.rs b/node/sync/src/node.rs new file mode 100644 index 0000000000..3a660bf1a5 --- /dev/null +++ b/node/sync/src/node.rs @@ -0,0 +1,379 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{SyncCodec, SyncSender, communication_service::CommunicationService}; + +use snarkos_node_bft_ledger_service::LedgerService; +use snarkos_node_network::{BlockRequest, BlockResponse, DataBlocks, SyncToken, harden_socket}; +use snarkos_node_tcp::{self as tcp, ConnectError, Connection, ConnectionSide, P2P, Tcp, protocols::*}; +use snarkvm::prelude::Network; + +use async_trait::async_trait; +#[cfg(feature = "locktick")] +use locktick::parking_lot::Mutex; +#[cfg(not(feature = "locktick"))] +use parking_lot::Mutex; +use std::{collections::HashMap, io, marker::PhantomData, net::SocketAddr, sync::Arc, time::Duration}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::{OnceCell, oneshot}, + task, + time::timeout, +}; + +/// The amount of time a sync stream request token is active. +pub const SYNC_STREAM_TOKEN_LIFETIME: Duration = Duration::from_secs(5); + +/// The maximum number of concurrently syncing peers. +pub const MAX_CONCURRENT_STREAMS: u16 = 5; + +/// The maximum number of blocks deliverable in a single stream/session. +pub const MAX_NUM_BLOCKS_PER_REQUEST: u32 = 100; + +/// The maximum number of blocks per a single response message. +pub const MAX_NUM_BLOCKS_PER_RESPONSE: u32 = 5; + +/// The handler for sync streams, both inbound and outbound. +#[derive(Clone)] +pub struct SyncStreams { + /// The engine for sync streams. + tcp: Tcp, + /// The ledger. + ledger: Arc>, + /// Access tokens given to peers to sync with us, and the associated requests. + tokens_for_peers: Arc>>, + /// Access tokens received from peers we can sync with. + tokens_from_peers: Arc>>, + /// Holds the currently active sync streams. + active_streams: Arc>>, + /// The listener address peers access in order to sync. + listener_addr: OnceCell, + /// The conduit to SyncBlocks. + sync_sender: OnceCell>, + _phantom: PhantomData, +} + +impl SyncStreams { + pub fn new(listener_addr: SocketAddr, ledger: Arc>) -> Self { + let tcp_config = tcp::Config::new(listener_addr, MAX_CONCURRENT_STREAMS); + let tcp = Tcp::new(tcp_config); + + Self { + tcp, + ledger, + tokens_for_peers: Default::default(), + tokens_from_peers: Default::default(), + active_streams: Default::default(), + listener_addr: Default::default(), + sync_sender: Default::default(), + _phantom: Default::default(), + } + } + + /// Start the underlying TCP stack. + pub async fn enable(&self) { + self.enable_handshake().await; + self.enable_reading().await; + self.enable_writing().await; + self.enable_on_connect().await; + self.enable_disconnect().await; + let listener_addr = self.tcp.enable_listener().await.expect("Failed to enable the TCP listener"); + + debug!("[SyncStreams] Listening for sync requests at {listener_addr}"); + + self.listener_addr.set(listener_addr).expect("Attempted to enable SyncStreams more than once"); + } + + /// Attaches the channel for communication with BlockSync. + pub fn set_sync_sender(&self, sync_sender: SyncSender) { + self.sync_sender.set(sync_sender).expect("Sync sender already set in SyncStreams"); + } + + /// Returns the listener address that creates the sync streams for peers. + pub fn listener_addr(&self) -> SocketAddr { + *self.listener_addr.get().expect("SyncStreams::enable hadn't been called") // guaranteed present for all calls + } + + /// Activate an access token for sync requests from peers. + pub fn register_token_for_peer(&self, token: SyncToken, request: BlockRequest) { + self.tokens_for_peers.lock().insert(token, request); + } + + /// Deactivate an access token given to a peer. + pub fn remove_token_for_peer(&self, token: SyncToken) -> Option { + self.tokens_for_peers.lock().remove(&token) + } + + /// Save a token from a peer we requested a sync from. + pub fn register_token_from_peer(&self, addr: SocketAddr, token: SyncToken) { + self.tokens_from_peers.lock().insert(addr, token); + } + + /// Delete an access token received from a peer. + pub fn remove_token_from_peer(&self, addr: SocketAddr) -> Option { + self.tokens_from_peers.lock().remove(&addr) + } + + /// Used post-handshake to associate a peer address with their sync request. + pub fn register_active_stream(&self, addr: SocketAddr, request: BlockRequest) { + self.active_streams.lock().insert(addr, request); + } + + /// Get the block request associated with the given peer. + pub fn get_peer_block_request(&self, addr: SocketAddr) -> Option { + self.active_streams.lock().get(&addr).copied() + } + + /// Perform a clean teardown. + pub async fn shut_down(&self) { + debug!("[SyncStreams] Shutting down"); + self.active_streams.lock().clear(); + self.tokens_for_peers.lock().clear(); + self.tokens_from_peers.lock().clear(); + self.tcp.shut_down().await; + } +} + +impl P2P for SyncStreams { + fn tcp(&self) -> &Tcp { + &self.tcp + } +} + +#[async_trait] +impl Handshake for SyncStreams { + /// A simple, "one-sided" protocol where the initiator is expected to provide an access token, + /// and the responder to validate it against its list of active access tokens. + async fn perform_handshake(&self, mut connection: Connection) -> Result { + let peer_addr = connection.addr(); + let peer_side = connection.side(); + let stream = self.borrow_stream(&mut connection); + + // Make the socket more robust. + harden_socket(stream)?; + + if peer_side == ConnectionSide::Initiator { + debug!("[SyncStreams] Shaking hands with {peer_addr} as the responder"); + // We've received a sync request from a peer; expect an access token. + let mut token = [0u8; 32]; + let token = match timeout(Duration::from_secs(5), stream.read_exact(&mut token)).await { + Ok(Ok(32)) => Ok(token), + Ok(Ok(_)) | Ok(Err(_)) => Err(ConnectError::IoError(io::ErrorKind::InvalidData.into())), + Err(_) => Err(ConnectError::IoError(io::ErrorKind::TimedOut.into())), + }?; + + // Check if the access token is active. + let Some(request) = self.remove_token_for_peer(token.into()) else { + return Err(ConnectError::IoError(io::ErrorKind::InvalidData.into())); + }; + + // Mark the stream as active and assign the associated request to it. + self.register_active_stream(peer_addr, request); + + // All good, handshake complete. + } else { + debug!("[SyncStreams] Shaking hands with {peer_addr} as the initiator"); + // We're the ones who requested the sync; find the access token the peer provided us with. + let Some(token) = self.remove_token_from_peer(peer_addr) else { + return Err(ConnectError::IoError(io::ErrorKind::NotFound.into())); + }; + + // Send the access token. + stream.write_all(&token).await?; + + // Done, the rest is up to the peer. + } + + debug!("[SyncStreams] Successfully shaken hands with {peer_addr}"); + Ok(connection) + } +} + +#[async_trait] +impl Reading for SyncStreams { + type Codec = SyncCodec; + type Message = BlockResponse; + + fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { + Default::default() + } + + async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { + debug!("[SyncStreams] Got a block response from {peer_addr}"); + let BlockResponse { request, latest_consensus_version, blocks, .. } = message; + + let Some(sync_sender) = self.sync_sender.get() else { + return Err(io::ErrorKind::BrokenPipe.into()); + }; + + // Perform the deferred non-blocking deserialization of the blocks. + // The deserialization can take a long time (minutes). We should not be running + // this on a blocking task, but on a rayon thread pool. + let (send, recv) = oneshot::channel(); + rayon::spawn_fifo(move || { + let blocks = blocks.deserialize_blocking(); + let _ = send.send(blocks); + }); + let blocks = match recv.await { + Ok(Ok(blocks)) => blocks, + Ok(Err(error)) => { + warn!("[SyncStreams] Peer '{peer_addr}' sent an invalid block response - {error}"); + return Err(io::ErrorKind::InvalidData.into()); + } + Err(error) => { + warn!("[SyncStreams] Peer '{peer_addr}' sent an invalid block response - {error}"); + return Err(io::ErrorKind::InvalidData.into()); + } + }; + + // Ensure the block response is well-formed. + if let Err(err) = blocks.ensure_response_is_well_formed(peer_addr, request.start_height, request.end_height) { + warn!("[SyncStreams] {err}"); + return Err(io::ErrorKind::InvalidData.into()); + } + // Send the blocks to the sync module. + match sync_sender.insert_block_response(peer_addr, blocks.0, latest_consensus_version).await { + Ok(_) => Ok(()), + Err(err) if err.is_benign() => { + debug!("[SyncStreams] Ignoring block response from peer '{peer_addr}'"); + Ok(()) + } + Err(err) if err.is_invalid_consensus_version() => { + error!("[SyncStreams] Peer sent an invalid block response '{peer_addr}': {err}"); + Err(io::ErrorKind::InvalidData.into()) + } + Err(err) => { + warn!("[SyncStreams] Peer '{peer_addr}' sent an invalid block response: {err}"); + + // TODO: disconnect instead? + + Ok(()) + } + } + } + + fn message_queue_depth(&self) -> usize { + 100 + } +} + +#[async_trait] +impl Writing for SyncStreams { + type Codec = SyncCodec; + type Message = BlockResponse; + + fn codec(&self, _peer_addr: SocketAddr, _side: ConnectionSide) -> Self::Codec { + Default::default() + } + + fn message_queue_depth(&self) -> usize { + 100 + } +} + +#[async_trait] +impl Disconnect for SyncStreams { + async fn handle_disconnect(&self, peer_addr: SocketAddr) { + self.active_streams.lock().remove(&peer_addr); + } +} + +#[async_trait] +impl OnConnect for SyncStreams { + async fn on_connect(&self, peer_addr: SocketAddr) { + // Check if we're the ones who provide the sync. + let Some(request) = self.get_peer_block_request(peer_addr) else { + // If not, exit early. + return; + }; + + let BlockRequest { start_height, end_height } = request; + + if start_height >= end_height { + warn!("Block request from '{peer_addr}' has an invalid range ({start_height}..{end_height})"); + return; + } + if end_height - start_height > MAX_NUM_BLOCKS_PER_REQUEST { + warn!("Block request from '{peer_addr}' has an excessive range ({start_height}..{end_height})"); + return; + } + + let mut start = start_height; + + while start < end_height { + let end = end_height.min(start + MAX_NUM_BLOCKS_PER_RESPONSE); + let self_ = self.clone(); + let blocks = match task::spawn_blocking(move || match self_.ledger.get_blocks(start..end) { + Ok(blocks) => Ok::, io::Error>(DataBlocks(blocks)), + Err(error) => { + warn!("Missing blocks {start} to {end} from ledger - {error}"); + Err(io::ErrorKind::NotFound.into()) + } + }) + .await + { + Ok(Ok(blocks)) => blocks, + Ok(Err(error)) => { + warn!("[SyncStreams] Error: {error}"); + return; + } + Err(error) => { + warn!("[SyncStreams] Error: {error}"); + return; + } + }; + + debug!("[SyncStreams] Sending block response ({start}..{end}) to {peer_addr}"); + let Ok(latest_consensus_version) = N::CONSENSUS_VERSION(end - 1) else { + return; + }; + let response = BlockResponse::new(request, blocks, latest_consensus_version); + match self.unicast(peer_addr, response) { + Ok(rx) => { + if let Err(error) = rx.await { + warn!("[SyncStreams] Error: {error}"); + return; + } + } + Err(error) => { + warn!("[SyncStreams] Error: {error}"); + return; + } + } + + start += MAX_NUM_BLOCKS_PER_RESPONSE; + } + } +} + +#[async_trait] +impl CommunicationService for SyncStreams { + type Message = BlockResponse; + + fn prepare_block_request(_start_height: u32, _end_height: u32) -> Self::Message { + unimplemented!("Unused by SyncStreams"); + } + + async fn send(&self, peer_addr: SocketAddr, message: Self::Message) -> Option>> { + let result = self.unicast(peer_addr, message); + + if let Err(err) = &result { + warn!("[SyncStreams] Failed to send 'BlockResponse' to '{peer_addr}': {err:?}"); + debug!("[SyncStreams] Disconnecting from '{peer_addr}' (unable to send)"); + self.tcp.disconnect(peer_addr).await; + } + result.ok() + } +}