diff --git a/api/src/handlers/server_api.rs b/api/src/handlers/server_api.rs index a2746a4f6a..446c51d93d 100644 --- a/api/src/handlers/server_api.rs +++ b/api/src/handlers/server_api.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::utils::w; -use crate::chain::{Chain, SyncState, SyncStatus}; +use crate::chain::{Chain, HeaderSyncMode, SyncState, SyncStatus}; use crate::p2p; use crate::rest::*; use crate::router::{Handler, ResponseFuture}; @@ -81,11 +81,19 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option ("awaiting_peers".to_string(), None), SyncStatus::HeaderSync { sync_head, + sync_mode, highest_height, .. } => ( "header_sync".to_string(), - Some(json!({ "current_height": sync_head.height, "highest_height": highest_height })), + Some(json!({ + "current_height": sync_head.height, + "highest_height": highest_height, + "header_sync_type": match sync_mode { + HeaderSyncMode::Legacy => "legacy", + HeaderSyncMode::Pihd => "pihd", + } + })), ), SyncStatus::TxHashsetPibd { aborted, @@ -149,7 +157,5 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option ("shutdown".to_string(), None), - // any other status is considered syncing (should be unreachable) - _ => ("syncing".to_string(), None), } } diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e566d1bc0b..529d626e42 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1731,9 +1731,17 @@ fn setup_head( let head = batch.get_block_header(&head.last_block_h)?; let pibd_tip = store.pibd_head()?; let pibd_head = batch.get_block_header(&pibd_tip.last_block_h)?; + let pibd_mmr_in_progress = !resetting_pibd + && pibd_head.height >= head.height + && (txhashset.output_mmr_size() > head.output_mmr_size + || txhashset.rangeproof_mmr_size() > head.output_mmr_size + || txhashset.kernel_mmr_size() > head.kernel_mmr_size); if pibd_head.height > head.height && !resetting_pibd { pibd_in_progress = true; pibd_head + } else if pibd_mmr_in_progress { + pibd_in_progress = true; + head } else { head } diff --git a/chain/src/lib.rs b/chain/src/lib.rs index 1ece28456e..ceb82b010b 100644 --- a/chain/src/lib.rs +++ b/chain/src/lib.rs @@ -50,6 +50,6 @@ pub use crate::chain::{Chain, MAX_ORPHAN_SIZE}; pub use crate::error::Error; pub use crate::store::ChainStore; pub use crate::types::{ - BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetDownloadStats, - TxHashsetWriteStatus, + BlockStatus, ChainAdapter, HeaderSyncMode, Options, SyncState, SyncStatus, Tip, + TxHashsetDownloadStats, TxHashsetWriteStatus, }; diff --git a/chain/src/pibd_params.rs b/chain/src/pibd_params.rs index fe8dbc7a20..511ce6bfb7 100644 --- a/chain/src/pibd_params.rs +++ b/chain/src/pibd_params.rs @@ -29,19 +29,28 @@ pub const RANGEPROOF_SEGMENT_HEIGHT: u8 = 11; pub const KERNEL_SEGMENT_HEIGHT: u8 = 11; /// Maximum number of received segments to cache (across all trees) before we stop requesting others -pub const MAX_CACHED_SEGMENTS: usize = 15; +pub const MAX_CACHED_SEGMENTS: usize = 30; /// Number of segments to apply in a single LMDB transaction -pub const SEGMENT_APPLY_BATCH_SIZE: usize = 4; +pub const SEGMENT_APPLY_BATCH_SIZE: usize = 12; /// How long the state sync should wait after requesting a segment from a peer before /// deciding the segment isn't going to arrive. The syncer will then re-request the segment -pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20; +pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60; + +/// How long to wait before retrying a pending segment that may be blocking progress +pub const BLOCKING_SEGMENT_RETRY_SECS: i64 = 10; + +/// Maximum number of pending blocking segments to retry in one state sync loop +pub const BLOCKING_SEGMENT_RETRY_COUNT: usize = 2; /// Number of simultaneous requests for segments we should make. Note this is currently /// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments /// will always be requested first) -pub const SEGMENT_REQUEST_COUNT: usize = 15; +pub const SEGMENT_REQUEST_COUNT: usize = 9; + +/// How many blocks behind the tip a PIBD peer may be and still be considered usable. +pub const SYNC_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2; /// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds /// give up and revert back to the txhashset.zip download method diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index ca391397a8..f11de801a3 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -140,6 +140,14 @@ impl Desegmenter { self.bitmap_mmr_size } + /// Lightweight applied leaf count for progress display. + pub fn applied_leaf_count(&self) -> u64 { + let txhashset = self.txhashset.read(); + pmmr::n_leaves(txhashset.output_mmr_size()) + + pmmr::n_leaves(txhashset.rangeproof_mmr_size()) + + pmmr::n_leaves(txhashset.kernel_mmr_size()) + } + /// Whether we have all the segments we need pub fn is_complete(&self) -> bool { self.all_segments_complete @@ -593,22 +601,54 @@ impl Desegmenter { } } } - // Always ensure we explicitly ask for the very next kernel segment we are waiting on. - // The regular round-robin above can get saturated with outputs and rangeproofs while - // the desegmenter is blocked on a missing kernel, so we force this one in. - if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() { - let seg_id = SegmentIdentifier { - height: self.default_kernel_segment_height, - idx: next_kernel_idx, - }; - let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id); - if !self.has_kernel_segment_with_id(seg_id) - && !return_vec.iter().any(|x| x == &next_kernel_seg_id) - { - if return_vec.len() >= max_elements { - return_vec.pop(); + if self.bitmap_cache.is_some() { + // Always ensure we explicitly ask for the very next segment we are waiting on. + // The regular round-robin above can get saturated while the desegmenter is + // blocked on the next required segment, so we force these in. + if let Some(next_output_idx) = self.next_required_output_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_output_segment_height, + idx: next_output_idx, + }; + let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id); + if !self.has_output_segment_with_id(seg_id) + && !return_vec.iter().any(|x| x == &next_output_seg_id) + { + if return_vec.len() >= max_elements { + return_vec.pop(); + } + return_vec.push(next_output_seg_id); + } + } + if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_rangeproof_segment_height, + idx: next_rp_idx, + }; + let next_rp_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id); + if !self.has_rangeproof_segment_with_id(seg_id) + && !return_vec.iter().any(|x| x == &next_rp_seg_id) + { + if return_vec.len() >= max_elements { + return_vec.pop(); + } + return_vec.push(next_rp_seg_id); + } + } + if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() { + let seg_id = SegmentIdentifier { + height: self.default_kernel_segment_height, + idx: next_kernel_idx, + }; + let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id); + if !self.has_kernel_segment_with_id(seg_id) + && !return_vec.iter().any(|x| x == &next_kernel_seg_id) + { + if return_vec.len() >= max_elements { + return_vec.pop(); + } + return_vec.push(next_kernel_seg_id); } - return_vec.push(next_kernel_seg_id); } } if return_vec.is_empty() && self.bitmap_cache.is_some() { @@ -838,17 +878,20 @@ impl Desegmenter { ) }; - // When resuming, we need to ensure we're getting the previous segment if needed - let theoretical_pmmr_size = - SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height); - if local_output_mmr_size < theoretical_pmmr_size { - cur_segment_count -= 1; - } - let total_segment_count = SegmentIdentifier::count_segments_required( self.archive_header.output_mmr_size, self.default_output_segment_height, ); + + // When resuming, we need to ensure we're getting the previous segment if needed. + // Do not apply this to the final partial segment once the target size is reached. + if total_segment_count != cur_segment_count { + let theoretical_pmmr_size = + SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height); + if local_output_mmr_size < theoretical_pmmr_size { + cur_segment_count -= 1; + } + } trace!( "Next required output segment is {} of {}", cur_segment_count, @@ -957,17 +1000,22 @@ impl Desegmenter { ) }; - // When resuming, we need to ensure we're getting the previous segment if needed - let theoretical_pmmr_size = - SegmentIdentifier::pmmr_size(cur_segment_count, self.default_rangeproof_segment_height); - if local_rangeproof_mmr_size < theoretical_pmmr_size { - cur_segment_count -= 1; - } - let total_segment_count = SegmentIdentifier::count_segments_required( self.archive_header.output_mmr_size, self.default_rangeproof_segment_height, ); + + // When resuming, we need to ensure we're getting the previous segment if needed. + // Do not apply this to the final partial segment once the target size is reached. + if total_segment_count != cur_segment_count { + let theoretical_pmmr_size = SegmentIdentifier::pmmr_size( + cur_segment_count, + self.default_rangeproof_segment_height, + ); + if local_rangeproof_mmr_size < theoretical_pmmr_size { + cur_segment_count -= 1; + } + } trace!( "Next required rangeproof segment is {} of {}", cur_segment_count, diff --git a/chain/src/types.rs b/chain/src/types.rs index 370f4a954c..72a7688723 100644 --- a/chain/src/types.rs +++ b/chain/src/types.rs @@ -19,7 +19,9 @@ use chrono::Duration; use std::net::SocketAddr; use crate::core::core::hash::{Hash, Hashed, ZERO_HASH}; -use crate::core::core::{pmmr, Block, BlockHeader, HeaderVersion, SegmentTypeIdentifier}; +use crate::core::core::{ + pmmr, Block, BlockHeader, HeaderVersion, SegmentIdentifier, SegmentTypeIdentifier, +}; use crate::core::pow::Difficulty; use crate::core::ser::{self, PMMRIndexHashable, Readable, Reader, Writeable, Writer}; use crate::error::Error; @@ -39,6 +41,15 @@ bitflags! { } } +/// Header sync implementation currently being used. +#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)] +pub enum HeaderSyncMode { + /// Legacy locator-based header sync. + Legacy, + /// Parallel Initial Header Download. + Pihd, +} + /// Various status sync can be in, whether it's fast sync or archival. #[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)] pub enum SyncStatus { @@ -53,6 +64,8 @@ pub enum SyncStatus { HeaderSync { /// current sync head sync_head: Tip, + /// active header sync implementation + sync_mode: HeaderSyncMode, /// height of the most advanced peer highest_height: u64, /// diff of the most advanced peer @@ -171,6 +184,44 @@ impl PIBDSegmentContainer { } } +/// Recently rejected PIBD segment from a specific peer. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RejectedPIBDSegment { + /// Segment+Type Identifier + pub identifier: SegmentTypeIdentifier, + /// Peer that provided invalid data for this segment + pub peer_addr: SocketAddr, + /// Time at which this segment was rejected + pub reject_time: DateTime, +} + +const MAX_REJECTED_PIBD_SEGMENTS: usize = 1024; + +/// Container for pending PIHD header segment requests. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PIHDHeaderSegmentContainer { + /// Header segment identifier + pub identifier: SegmentIdentifier, + /// Time at which this request was made + pub request_time: DateTime, + /// Peer that received this request + pub peer_addr: SocketAddr, + /// Highest header height advertised by the peer when requested + pub target_height: u64, +} + +impl PIHDHeaderSegmentContainer { + /// Return container with timestamp + pub fn new(identifier: SegmentIdentifier, peer_addr: SocketAddr, target_height: u64) -> Self { + Self { + identifier, + request_time: Utc::now(), + peer_addr, + target_height, + } + } +} + /// Current sync state. Encapsulates the current SyncStatus. pub struct SyncState { current: RwLock, @@ -182,6 +233,8 @@ pub struct SyncState { /// available where it will be needed (both in the adapter /// and the sync loop) requested_pibd_segments: RwLock>, + rejected_pibd_segments: RwLock>, + requested_pihd_header_segments: RwLock>, } impl SyncState { @@ -191,6 +244,8 @@ impl SyncState { current: RwLock::new(SyncStatus::Initial), sync_error: RwLock::new(None), requested_pibd_segments: RwLock::new(vec![]), + rejected_pibd_segments: RwLock::new(vec![]), + requested_pihd_header_segments: RwLock::new(vec![]), } } @@ -281,12 +336,56 @@ impl SyncState { }; } + /// Update lightweight PIBD leaf progress for TUI/API display. + pub fn update_pibd_leaf_progress(&self, completed_leaves: u64, archive_header: &BlockHeader) { + let leaves_required = pmmr::n_leaves(archive_header.output_mmr_size) * 2 + + pmmr::n_leaves(archive_header.kernel_mmr_size); + let status: &mut SyncStatus = &mut self.current.write(); + match status { + SyncStatus::TxHashsetPibd { + completed_leaves: current_completed_leaves, + leaves_required: current_leaves_required, + required_height, + .. + } => { + *current_completed_leaves = completed_leaves; + *current_leaves_required = leaves_required; + *required_height = archive_header.height; + } + _ => { + *status = SyncStatus::TxHashsetPibd { + aborted: false, + errored: false, + completed_leaves, + leaves_required, + completed_to_height: 0, + required_height: archive_header.height, + }; + } + } + } + /// Update PIBD segment list pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) { debug!("sync_state: tracking PIBD request for {:?}", id); - self.requested_pibd_segments - .write() - .push(PIBDSegmentContainer::new(id.clone(), Some(peer_addr))); + let mut requested_segments = self.requested_pibd_segments.write(); + if let Some(existing) = requested_segments.iter_mut().find(|i| &i.identifier == id) { + existing.request_time = Utc::now(); + existing.last_peer = Some(peer_addr); + } else { + requested_segments.push(PIBDSegmentContainer::new(id.clone(), Some(peer_addr))); + } + } + + /// Add or refresh PIBD segment request tracking + pub fn refresh_pibd_segment(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) { + let mut requested_segments = self.requested_pibd_segments.write(); + if let Some(existing) = requested_segments.iter_mut().find(|i| &i.identifier == id) { + existing.request_time = Utc::now(); + existing.last_peer = Some(peer_addr); + } else { + requested_segments.push(PIBDSegmentContainer::new(id.clone(), Some(peer_addr))); + } } /// Remove segment from list @@ -297,6 +396,18 @@ impl SyncState { .retain(|i| &i.identifier != id); } + /// Remove segment from list only if it is still pending for the given peer. + pub fn remove_pibd_segment_from(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) { + trace!( + "sync_state: removing PIBD request tracking for {:?} from {}", + id, + peer_addr + ); + self.requested_pibd_segments + .write() + .retain(|i| &i.identifier != id || i.last_peer != Some(peer_addr)); + } + /// Remove segments with request timestamps less than cutoff time pub fn remove_stale_pibd_requests( &self, @@ -324,6 +435,134 @@ impl SyncState { .any(|i| &i.identifier == id) } + /// Check whether segment is in request list for the given peer. + pub fn contains_pibd_segment_from( + &self, + id: &SegmentTypeIdentifier, + peer_addr: SocketAddr, + ) -> bool { + self.requested_pibd_segments + .read() + .iter() + .any(|i| &i.identifier == id && i.last_peer == Some(peer_addr)) + } + + /// Mark a requested PIBD segment as rejected for this peer. + pub fn reject_pibd_segment_from(&self, id: &SegmentTypeIdentifier, peer_addr: SocketAddr) { + self.remove_pibd_segment_from(id, peer_addr); + let mut rejected = self.rejected_pibd_segments.write(); + rejected.retain(|i| &i.identifier != id || i.peer_addr != peer_addr); + rejected.push(RejectedPIBDSegment { + identifier: id.clone(), + peer_addr, + reject_time: Utc::now(), + }); + if rejected.len() > MAX_REJECTED_PIBD_SEGMENTS { + rejected.remove(0); + } + } + + /// Check whether this peer recently provided invalid data for this PIBD segment. + pub fn rejected_pibd_segment_from( + &self, + id: &SegmentTypeIdentifier, + peer_addr: SocketAddr, + reject_seconds: i64, + ) -> bool { + let cutoff_time = Utc::now() - Duration::seconds(reject_seconds); + let mut rejected = self.rejected_pibd_segments.write(); + rejected.retain(|i| i.reject_time > cutoff_time); + rejected + .iter() + .any(|i| &i.identifier == id && i.peer_addr == peer_addr) + } + + /// Number of currently pending PIBD segment requests + pub fn pending_pibd_segment_count(&self) -> usize { + self.requested_pibd_segments.read().len() + } + + /// Pending PIBD segment requests older than the provided retry delay + pub fn retryable_pibd_segments( + &self, + retry_seconds: i64, + limit: usize, + ) -> Vec<(SegmentTypeIdentifier, Option)> { + let cutoff_time = Utc::now() - Duration::seconds(retry_seconds); + self.requested_pibd_segments + .read() + .iter() + .filter(|i| i.request_time <= cutoff_time) + .take(limit) + .map(|i| (i.identifier.clone(), i.last_peer)) + .collect() + } + + /// Track a pending PIHD header segment request. + pub fn add_pihd_header_segment( + &self, + id: SegmentIdentifier, + peer_addr: SocketAddr, + target_height: u64, + ) { + let mut requested_segments = self.requested_pihd_header_segments.write(); + if let Some(existing) = requested_segments + .iter_mut() + .find(|i| i.identifier == id && i.peer_addr == peer_addr) + { + existing.request_time = Utc::now(); + existing.target_height = target_height; + } else { + requested_segments.push(PIHDHeaderSegmentContainer::new( + id, + peer_addr, + target_height, + )); + } + } + + /// Remove a pending PIHD header segment request. + pub fn remove_pihd_header_segment(&self, id: SegmentIdentifier, peer_addr: SocketAddr) { + self.requested_pihd_header_segments + .write() + .retain(|i| i.identifier != id || i.peer_addr != peer_addr); + } + + /// Check whether a PIHD header segment was requested from the given peer. + pub fn contains_pihd_header_segment_from( + &self, + id: SegmentIdentifier, + peer_addr: SocketAddr, + ) -> bool { + self.requested_pihd_header_segments + .read() + .iter() + .any(|i| i.identifier == id && i.peer_addr == peer_addr) + } + + /// Highest header height expected for a pending PIHD header segment request. + pub fn pihd_header_segment_target_height( + &self, + id: SegmentIdentifier, + peer_addr: SocketAddr, + ) -> Option { + self.requested_pihd_header_segments + .read() + .iter() + .find(|i| i.identifier == id && i.peer_addr == peer_addr) + .map(|i| i.target_height) + } + + /// Remove completed or stale PIHD header segment requests. + pub fn retain_pihd_header_segments(&self, mut keep: F) + where + F: FnMut(&PIHDHeaderSegmentContainer) -> bool, + { + self.requested_pihd_header_segments + .write() + .retain(|i| keep(i)); + } + /// Communicate sync error pub fn set_sync_error(&self, error: Error) { *self.sync_error.write() = Some(error); @@ -661,3 +900,32 @@ impl BlockStatus { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::core::{SegmentIdentifier, SegmentType}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + #[test] + fn rejected_pibd_segment_tracking_is_bounded() { + let sync_state = SyncState::new(); + + for idx in 0..(MAX_REJECTED_PIBD_SEGMENTS + 10) { + let id = SegmentTypeIdentifier::new( + SegmentType::Kernel, + SegmentIdentifier { + height: 9, + idx: idx as u64, + }, + ); + let peer_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10_000 + idx as u16); + sync_state.reject_pibd_segment_from(&id, peer_addr); + } + + assert_eq!( + sync_state.rejected_pibd_segments.read().len(), + MAX_REJECTED_PIBD_SEGMENTS + ); + } +} diff --git a/p2p/src/codec.rs b/p2p/src/codec.rs index 53278d777e..7891e6f852 100644 --- a/p2p/src/codec.rs +++ b/p2p/src/codec.rs @@ -273,6 +273,8 @@ fn decode_message( Type::RangeProofSegment => Message::RangeProofSegment(msg.body()?), Type::GetKernelSegment => Message::GetKernelSegment(msg.body()?), Type::KernelSegment => Message::KernelSegment(msg.body()?), + Type::GetHeaderSegment => Message::GetHeaderSegment(msg.body()?), + Type::HeaderSegment => Message::HeaderSegment(msg.body()?), Type::Error | Type::Hand | Type::Shake | Type::Headers => { return Err(Error::UnexpectedMessage) } diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index a71e861be6..e0d8bc10cf 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -55,4 +55,5 @@ pub use crate::store::{PeerData, State}; pub use crate::types::{ Capabilities, ChainAdapter, Direction, Error, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, Seeding, TxHashSetRead, MAX_BLOCK_HEADERS, MAX_LOCATORS, MAX_PEER_ADDRS, + PIHD_HEADER_SEGMENT_HEIGHT, }; diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 69c448e927..d9e8e88619 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -94,6 +94,8 @@ enum_from_primitive! { RangeProofSegment = 26, GetKernelSegment = 27, KernelSegment = 28, + GetHeaderSegment = 29, + HeaderSegment = 30, } } @@ -139,6 +141,8 @@ fn max_msg_size(msg_type: Type) -> u64 { Type::RangeProofSegment => 2 * max_block_size(), Type::GetKernelSegment => 41, Type::KernelSegment => 2 * max_block_size(), + Type::GetHeaderSegment => 9, + Type::HeaderSegment => 11 + 365 * MAX_BLOCK_HEADERS as u64, } } @@ -649,6 +653,42 @@ impl Writeable for Headers { } } +/// Serializable wrapper for a deterministic header segment. +pub struct HeaderSegment { + pub identifier: SegmentIdentifier, + pub headers: Vec, +} + +impl Readable for HeaderSegment { + fn read(reader: &mut R) -> Result { + let identifier = SegmentIdentifier::read(reader)?; + let len = reader.read_u16()?; + if len > (MAX_BLOCK_HEADERS as u16) { + return Err(ser::Error::TooLargeReadErr); + } + let mut headers = Vec::with_capacity(len as usize); + for _ in 0..len { + let header: UntrustedBlockHeader = Readable::read(reader)?; + headers.push(header.into()); + } + Ok(HeaderSegment { + identifier, + headers, + }) + } +} + +impl Writeable for HeaderSegment { + fn write(&self, writer: &mut W) -> Result<(), ser::Error> { + self.identifier.write(writer)?; + writer.write_u16(self.headers.len() as u16)?; + for h in &self.headers { + h.write(writer)?; + } + Ok(()) + } +} + pub struct Ping { /// total difficulty accumulated by the sender, used to check whether sync /// may be needed @@ -926,6 +966,8 @@ pub enum Message { RangeProofSegment(SegmentResponse), GetKernelSegment(SegmentRequest), KernelSegment(SegmentResponse), + GetHeaderSegment(SegmentIdentifier), + HeaderSegment(HeaderSegment), } /// We receive 512 headers from a peer. @@ -970,6 +1012,8 @@ impl fmt::Display for Message { Message::RangeProofSegment(_) => write!(f, "range proof segment"), Message::GetKernelSegment(_) => write!(f, "get kernel segment"), Message::KernelSegment(_) => write!(f, "kernel segment"), + Message::GetHeaderSegment(_) => write!(f, "get header segment"), + Message::HeaderSegment(_) => write!(f, "header segment"), } } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index a0433d25ec..7ce8df9b61 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -36,8 +36,8 @@ use crate::msg::{ }; use crate::protocol::Protocol; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::{DateTime, Utc}; @@ -329,6 +329,11 @@ impl Peer { self.send(&Locator { hashes: locator }, msg::Type::GetHeaders) } + /// Sends a request for a deterministic header segment. + pub fn send_header_segment_request(&self, identifier: SegmentIdentifier) -> Result<(), Error> { + self.send(&identifier, msg::Type::GetHeaderSegment) + } + pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> { debug!( "Requesting tx (kernel hash) {} from peer {}.", @@ -570,6 +575,14 @@ impl ChainAdapter for TrackingAdapter { self.adapter.locate_headers(locator) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -650,9 +663,10 @@ impl ChainAdapter for TrackingAdapter { block_hash: Hash, output_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { self.adapter - .receive_bitmap_segment(block_hash, output_root, segment) + .receive_bitmap_segment(block_hash, output_root, segment, peer_info) } fn receive_output_segment( @@ -660,25 +674,39 @@ impl ChainAdapter for TrackingAdapter { block_hash: Hash, bitmap_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { self.adapter - .receive_output_segment(block_hash, bitmap_root, segment) + .receive_output_segment(block_hash, bitmap_root, segment, peer_info) } fn receive_rangeproof_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter.receive_rangeproof_segment(block_hash, segment) + self.adapter + .receive_rangeproof_segment(block_hash, segment, peer_info) } fn receive_kernel_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter.receive_kernel_segment(block_hash, segment) + self.adapter + .receive_kernel_segment(block_hash, segment, peer_info) + } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + self.adapter.receive_header_segment(id, headers, peer_info) } } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 0f10a84eca..7cb0ca919c 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -32,8 +32,8 @@ use crate::msg::PeerAddrs; use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, MAX_PEER_ADDRS, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; use crate::util::secp::pedersen::RangeProof; use chrono::prelude::*; @@ -45,6 +45,7 @@ pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, + blocked: RwLock, u32)>>, config: P2PConfig, } @@ -55,6 +56,7 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), + blocked: RwLock::new(HashMap::new()), } } @@ -141,6 +143,19 @@ impl Peers { self.iter().connected().by_addr(addr) } + /// Disconnect a connected peer without banning it. + pub fn disconnect_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> { + let mut peers = self.peers.try_write_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("disconnect_peer: failed to get peers lock"); + Error::PeerException + })?; + if let Some(peer) = peers.remove(&peer_addr) { + warn!("disconnecting peer {} ({})", peer_addr, reason); + peer.stop(); + } + Ok(()) + } + pub fn is_banned(&self, peer_addr: PeerAddr) -> bool { if let Ok(peer) = self.store.get_peer(peer_addr) { return peer.flags == State::Banned; @@ -440,6 +455,56 @@ impl Peers { >= self.config.peer_min_preferred_outbound_count() as usize } + /// Whether this peer has been temporarily blocked. + pub fn is_blocked(&self, peer_addr: PeerAddr) -> bool { + match self.blocked.try_read_for(LOCK_TIMEOUT) { + Some(blocked) => match blocked.get(&peer_addr) { + None => false, + Some((expiry, _)) => expiry > &Utc::now(), + }, + None => { + error!("is_blocked: failed to get blocked lock"); + false + } + } + } + + /// Temporarily block a peer without banning it. + pub fn block_peer(&self, peer_addr: PeerAddr, reason: &str) -> Result<(), Error> { + let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("block_peer: failed to get blocked lock"); + Error::PeerException + })?; + + let times = match blocked.get(&peer_addr) { + Some((_, times)) => times + 1, + None => 1, + }; + let duration = match times { + 1 => 60, + 2 => 180, + _ => 600, + }; + let expiry = Utc::now() + Duration::seconds(duration); + blocked.insert(peer_addr, (expiry, times)); + + warn!( + "state_sync: block peer {} ({}) for {} seconds after {} timeout(s)", + peer_addr, reason, duration, times + ); + Ok(()) + } + + /// Clear all temporarily blocked peers. + pub fn unblock_peers(&self) -> Result<(), Error> { + let mut blocked = self.blocked.try_write_for(LOCK_TIMEOUT).ok_or_else(|| { + error!("unblock_peers: failed to get blocked lock"); + Error::PeerException + })?; + blocked.clear(); + Ok(()) + } + /// Removes those peers that seem to have expired pub fn remove_expired(&self) { let now = Utc::now(); @@ -574,6 +639,14 @@ impl ChainAdapter for Peers { self.adapter.locate_headers(hs) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + self.adapter.locate_header_segment(id, peer_info) + } + fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option { self.adapter.get_block(h, peer_info) } @@ -664,9 +737,18 @@ impl ChainAdapter for Peers { block_hash: Hash, output_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter - .receive_bitmap_segment(block_hash, output_root, segment) + if !self + .adapter + .receive_bitmap_segment(block_hash, output_root, segment, peer_info)? + { + self.block_peer(peer_info.addr, "unexpected bitmap PIBD segment") + .map_err(|e| chain::Error::Other(format!("block peer error: {:?}", e)))?; + Ok(false) + } else { + Ok(true) + } } fn receive_output_segment( @@ -674,25 +756,73 @@ impl ChainAdapter for Peers { block_hash: Hash, bitmap_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter - .receive_output_segment(block_hash, bitmap_root, segment) + if !self + .adapter + .receive_output_segment(block_hash, bitmap_root, segment, peer_info)? + { + self.block_peer(peer_info.addr, "unexpected output PIBD segment") + .map_err(|e| chain::Error::Other(format!("block peer error: {:?}", e)))?; + Ok(false) + } else { + Ok(true) + } } fn receive_rangeproof_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter.receive_rangeproof_segment(block_hash, segment) + if !self + .adapter + .receive_rangeproof_segment(block_hash, segment, peer_info)? + { + self.block_peer(peer_info.addr, "unexpected rangeproof PIBD segment") + .map_err(|e| chain::Error::Other(format!("block peer error: {:?}", e)))?; + Ok(false) + } else { + Ok(true) + } } fn receive_kernel_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - self.adapter.receive_kernel_segment(block_hash, segment) + if !self + .adapter + .receive_kernel_segment(block_hash, segment, peer_info)? + { + self.block_peer(peer_info.addr, "unexpected kernel PIBD segment") + .map_err(|e| chain::Error::Other(format!("block peer error: {:?}", e)))?; + Ok(false) + } else { + Ok(true) + } + } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + match self + .adapter + .receive_header_segment(id, headers, peer_info)? + { + HeaderSegmentAcceptance::Accepted => Ok(HeaderSegmentAcceptance::Accepted), + HeaderSegmentAcceptance::Ban => { + self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader) + .map_err(|e| chain::Error::Other(format!("ban peer error: {:?}", e)))?; + Ok(HeaderSegmentAcceptance::Ban) + } + } } } @@ -819,6 +949,16 @@ impl>> PeersIter { } } + /// Custom filter. + pub fn with_filter( + self, + f: impl Fn(&Arc) -> bool, + ) -> PeersIter>> { + PeersIter { + iter: self.iter.filter(move |p| f(p)), + } + } + pub fn by_addr(&mut self, addr: PeerAddr) -> Option> { self.iter.find(|p| p.info.addr == addr) } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 31e069321b..16ed09ffab 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -17,8 +17,9 @@ use crate::conn::MessageHandler; use crate::core::core::{hash::Hashed, CompactBlock}; use crate::msg::{ - Consumed, Headers, Message, Msg, OutputBitmapSegmentResponse, OutputSegmentResponse, PeerAddrs, - Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, Type, + Consumed, HeaderSegment, Headers, Message, Msg, OutputBitmapSegmentResponse, + OutputSegmentResponse, PeerAddrs, Pong, SegmentRequest, SegmentResponse, TxHashSetArchive, + Type, }; use crate::types::{AttachmentMeta, Error, NetAdapter, PeerInfo}; use chrono::prelude::Utc; @@ -192,6 +193,28 @@ impl MessageHandler for Protocol { )?) } + Message::GetHeaderSegment(identifier) => { + if !self + .peer_info + .capabilities + .contains(crate::types::Capabilities::PIHD_HIST) + { + return Ok(Consumed::None); + } + if let Some(headers) = adapter.locate_header_segment(identifier, &self.peer_info)? { + Consumed::Response(Msg::new( + Type::HeaderSegment, + HeaderSegment { + identifier, + headers, + }, + self.peer_info.version, + )?) + } else { + Consumed::None + } + } + // "header first" block propagation - if we have not yet seen this block // we can go request it from some of our peers Message::Header(header) => { @@ -204,6 +227,15 @@ impl MessageHandler for Protocol { Consumed::None } + Message::HeaderSegment(segment) => { + adapter.receive_header_segment( + segment.identifier, + &segment.headers, + &self.peer_info, + )?; + Consumed::None + } + Message::GetPeerAddrs(get_peers) => { let peers = adapter.find_peer_addrs(get_peers.capabilities); Consumed::Response(Msg::new( @@ -382,7 +414,12 @@ impl MessageHandler for Protocol { block_hash, output_root ); - adapter.receive_bitmap_segment(block_hash, output_root, segment.into())?; + adapter.receive_bitmap_segment( + block_hash, + output_root, + segment.into(), + &self.peer_info, + )?; Consumed::None } Message::OutputSegment(req) => { @@ -399,6 +436,7 @@ impl MessageHandler for Protocol { response.block_hash, output_bitmap_root, response.segment.into(), + &self.peer_info, )?; Consumed::None } @@ -408,7 +446,7 @@ impl MessageHandler for Protocol { segment, } = req; trace!("Received Rangeproof Segment: bh: {}", block_hash); - adapter.receive_rangeproof_segment(block_hash, segment.into())?; + adapter.receive_rangeproof_segment(block_hash, segment.into(), &self.peer_info)?; Consumed::None } Message::KernelSegment(req) => { @@ -417,7 +455,7 @@ impl MessageHandler for Protocol { segment, } = req; trace!("Received Kernel Segment: bh: {}", block_hash); - adapter.receive_kernel_segment(block_hash, segment.into())?; + adapter.receive_kernel_segment(block_hash, segment.into(), &self.peer_info)?; Consumed::None } Message::Unknown(_) => Consumed::None, @@ -425,3 +463,85 @@ impl MessageHandler for Protocol { Ok(consumed) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::conn::Tracker; + use crate::core::core::SegmentIdentifier; + use crate::core::global; + use crate::core::pow::Difficulty; + use crate::core::ser::ProtocolVersion; + use crate::msg::{read_message, write_message}; + use crate::serv::DummyAdapter; + use crate::types::{ + Capabilities, Direction, PeerAddr, PeerLiveInfo, PIHD_HEADER_SEGMENT_HEIGHT, + }; + use crate::util::RwLock; + use std::io::Cursor; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::atomic::AtomicBool; + + fn test_peer_info(capabilities: Capabilities) -> PeerInfo { + PeerInfo { + capabilities, + user_agent: "test".to_owned(), + version: ProtocolVersion::local(), + addr: PeerAddr(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 10000)), + direction: Direction::Outbound, + live_info: Arc::new(RwLock::new(PeerLiveInfo::new(Difficulty::zero()))), + } + } + + #[test] + fn get_header_segment_returns_header_segment_response() { + global::set_local_chain_type(global::ChainTypes::AutomatedTesting); + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::default()), + Arc::new(AtomicBool::new(false)), + ); + let identifier = SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 2, + }; + + let consumed = protocol + .consume(Message::GetHeaderSegment(identifier)) + .expect("get header segment response"); + let response = match consumed { + Consumed::Response(response) => response, + other => panic!("expected response, got {:?}", other), + }; + + let mut bytes = vec![]; + write_message(&mut bytes, &response, Arc::new(Tracker::new())).expect("write response"); + let segment: HeaderSegment = read_message( + &mut Cursor::new(bytes), + ProtocolVersion::local(), + Type::HeaderSegment, + ) + .expect("read header segment response"); + + assert_eq!(segment.identifier, identifier); + assert!(segment.headers.is_empty()); + } + + #[test] + fn get_header_segment_requires_pihd_capability() { + let protocol = Protocol::new( + Arc::new(DummyAdapter {}), + test_peer_info(Capabilities::HEADER_HIST), + Arc::new(AtomicBool::new(false)), + ); + + let consumed = protocol + .consume(Message::GetHeaderSegment(SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0, + })) + .expect("get header segment handling"); + + assert!(matches!(consumed, Consumed::None)); + } +} diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 2de9c1f897..57ac9631c5 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -32,8 +32,8 @@ use crate::peer::Peer; use crate::peers::Peers; use crate::store::PeerStore; use crate::types::{ - Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, - TxHashSetRead, + Capabilities, ChainAdapter, Error, HeaderSegmentAcceptance, NetAdapter, P2PConfig, PeerAddr, + PeerInfo, ReasonForBan, TxHashSetRead, }; use crate::util::secp::pedersen::RangeProof; use crate::util::StopState; @@ -119,6 +119,27 @@ impl Server { } match self.handle_new_peer(stream) { Err(Error::ConnectionClose) => debug!("shutting down, ignoring a new peer"), + Err(Error::Connection(e)) => { + if matches!( + e.kind(), + io::ErrorKind::UnexpectedEof + | io::ErrorKind::ConnectionAborted + | io::ErrorKind::ConnectionReset + | io::ErrorKind::BrokenPipe | io::ErrorKind::TimedOut + ) { + debug!( + "Temporary peer connection error from {}: {:?}", + peer_addr, e + ); + } else { + debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); + let _ = + self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); + } + } + Err(Error::PeerWithSelf) | Err(Error::Timeout) | Err(Error::Send(_)) => { + debug!("Ignoring peer accept error from {}", peer_addr); + } Err(e) => { debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); @@ -341,6 +362,13 @@ impl ChainAdapter for DummyAdapter { fn locate_headers(&self, _: &[Hash]) -> Result, chain::Error> { Ok(vec![]) } + fn locate_header_segment( + &self, + _: SegmentIdentifier, + _: &PeerInfo, + ) -> Result>, chain::Error> { + Ok(Some(vec![])) + } fn get_block(&self, _: Hash, _: &PeerInfo) -> Option { None } @@ -419,6 +447,7 @@ impl ChainAdapter for DummyAdapter { _block_hash: Hash, _output_root: Hash, _segment: Segment, + _peer_info: &PeerInfo, ) -> Result { unimplemented!() } @@ -428,6 +457,7 @@ impl ChainAdapter for DummyAdapter { _block_hash: Hash, _bitmap_root: Hash, _segment: Segment, + _peer_info: &PeerInfo, ) -> Result { unimplemented!() } @@ -436,6 +466,7 @@ impl ChainAdapter for DummyAdapter { &self, _block_hash: Hash, _segment: Segment, + _peer_info: &PeerInfo, ) -> Result { unimplemented!() } @@ -444,9 +475,19 @@ impl ChainAdapter for DummyAdapter { &self, _block_hash: Hash, _segment: Segment, + _peer_info: &PeerInfo, ) -> Result { unimplemented!() } + + fn receive_header_segment( + &self, + _id: SegmentIdentifier, + _headers: &[core::BlockHeader], + _peer_info: &PeerInfo, + ) -> Result { + Ok(HeaderSegmentAcceptance::Accepted) + } } impl NetAdapter for DummyAdapter { diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 76cc3fe79f..15873527c9 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -40,6 +40,9 @@ use crate::util::RwLock; /// Maximum number of block headers a peer should ever send pub const MAX_BLOCK_HEADERS: u32 = 512; +/// Header segment height used for PIHD header segment requests. +pub const PIHD_HEADER_SEGMENT_HEIGHT: u8 = 9; + /// Maximum number of block bodies a peer should ever ask for and send #[allow(dead_code)] pub const MAX_BLOCK_BODIES: u32 = 16; @@ -396,6 +399,8 @@ bitflags! { const BLOCK_HIST = 0b0010_0000; /// As above, with crucial serialization fix #3705 applied const PIBD_HIST_1 = 0b0100_0000; + /// Can provide deterministic historical header segments. + const PIHD_HIST = 0b1000_0000; } } @@ -408,6 +413,7 @@ impl Default for Capabilities { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST } } @@ -547,6 +553,15 @@ pub struct TxHashSetRead { pub reader: File, } +/// Result of processing a PIHD header segment response. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HeaderSegmentAcceptance { + /// Segment was accepted or intentionally ignored. + Accepted, + /// Segment is malformed or invalid enough to justify banning the sender. + Ban, +} + /// Bridge between the networking layer and the rest of the system. Handles the /// forwarding or querying of blocks and transactions from the network among /// other things. @@ -606,6 +621,13 @@ pub trait ChainAdapter: Sync + Send { /// immediately. fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error>; + /// Finds a deterministic header segment based on the provided segment identifier. + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error>; + /// Gets a full block by its hash. /// Converts block to v2 compatibility if necessary (based on peer protocol version). fn get_block(&self, h: Hash, peer_info: &PeerInfo) -> Option; @@ -679,6 +701,7 @@ pub trait ChainAdapter: Sync + Send { block_hash: Hash, output_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result; fn receive_output_segment( @@ -686,19 +709,29 @@ pub trait ChainAdapter: Sync + Send { block_hash: Hash, bitmap_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result; fn receive_rangeproof_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result; fn receive_kernel_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result; + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result; } /// Additional methods required by the protocol that don't need to be diff --git a/p2p/tests/capabilities.rs b/p2p/tests/capabilities.rs index 5b20e099b4..c6a8bf74c1 100644 --- a/p2p/tests/capabilities.rs +++ b/p2p/tests/capabilities.rs @@ -43,6 +43,7 @@ fn default_capabilities() { assert!(x.contains(Capabilities::TX_KERNEL_HASH)); assert!(x.contains(Capabilities::PIBD_HIST)); assert!(x.contains(Capabilities::PIBD_HIST_1)); + assert!(x.contains(Capabilities::PIHD_HIST)); assert_eq!( x, @@ -52,5 +53,6 @@ fn default_capabilities() { | Capabilities::TX_KERNEL_HASH | Capabilities::PIBD_HIST | Capabilities::PIBD_HIST_1 + | Capabilities::PIHD_HIST ); } diff --git a/p2p/tests/ser_deser.rs b/p2p/tests/ser_deser.rs index 1d50aff6a4..613b183071 100644 --- a/p2p/tests/ser_deser.rs +++ b/p2p/tests/ser_deser.rs @@ -50,20 +50,20 @@ fn test_capabilities() { ); assert_eq!( p2p::types::Capabilities::from_bits_truncate(0b10000000 as u32), - p2p::types::Capabilities::UNKNOWN + p2p::types::Capabilities::PIHD_HIST ); assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b1011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); assert_eq!( expected, - p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32), + p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32), ); - assert!(p2p::types::Capabilities::from_bits_truncate(0b01011111 as u32).contains(expected)); + assert!(p2p::types::Capabilities::from_bits_truncate(0b11011111 as u32).contains(expected)); assert!( p2p::types::Capabilities::from_bits_truncate(0b00101111 as u32) diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 8b475007a1..c75563480a 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -16,9 +16,11 @@ //! events to consumers of those events. use crate::util::RwLock; +use std::collections::HashMap; use std::fs::File; +use std::net::SocketAddr; use std::path::PathBuf; -use std::sync::{Arc, Weak}; +use std::sync::{mpsc, Arc, Weak}; use std::thread; use std::time::Instant; @@ -38,7 +40,7 @@ use crate::core::pow::Difficulty; use crate::core::ser::ProtocolVersion; use crate::core::{core, global}; use crate::p2p; -use crate::p2p::types::PeerInfo; +use crate::p2p::types::{HeaderSegmentAcceptance, PeerInfo}; use crate::pool::{self, BlockChain, PoolAdapter}; use crate::util::secp::pedersen::RangeProof; use crate::util::OneTime; @@ -51,6 +53,202 @@ const KERNEL_SEGMENT_HEIGHT_RANGE: Range = 9..14; const BITMAP_SEGMENT_HEIGHT_RANGE: Range = 9..14; const OUTPUT_SEGMENT_HEIGHT_RANGE: Range = 11..16; const RANGEPROOF_SEGMENT_HEIGHT_RANGE: Range = 7..12; +const MAX_CACHED_HEADER_BATCHES: usize = 16; +const PIBD_SEGMENT_QUEUE_CAP: usize = 64; +const REJECTED_PIBD_SEGMENT_SECS: i64 = 600; +const HEADER_SEGMENT_REQUEST_WINDOW_SECS: i64 = 60; +const MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW: usize = 1000; +const HEADER_BATCH_CACHE_LOOKAHEAD: u64 = + MAX_CACHED_HEADER_BATCHES as u64 * p2p::MAX_BLOCK_HEADERS as u64; + +#[derive(Clone)] +struct HeaderBatch { + headers: Vec, + peer_info: PeerInfo, +} + +enum PibdSegment { + Bitmap { + block_hash: Hash, + output_root: Hash, + segment: Segment, + }, + Output { + block_hash: Hash, + bitmap_root: Hash, + segment: Segment, + }, + RangeProof { + block_hash: Hash, + segment: Segment, + }, + Kernel { + block_hash: Hash, + segment: Segment, + }, +} + +impl PibdSegment { + fn segment_id(&self) -> SegmentTypeIdentifier { + match self { + PibdSegment::Bitmap { segment, .. } => SegmentTypeIdentifier { + segment_type: SegmentType::Bitmap, + identifier: segment.identifier().clone(), + }, + PibdSegment::Output { segment, .. } => SegmentTypeIdentifier { + segment_type: SegmentType::Output, + identifier: segment.identifier().clone(), + }, + PibdSegment::RangeProof { segment, .. } => SegmentTypeIdentifier { + segment_type: SegmentType::RangeProof, + identifier: segment.identifier().clone(), + }, + PibdSegment::Kernel { segment, .. } => SegmentTypeIdentifier { + segment_type: SegmentType::Kernel, + identifier: segment.identifier().clone(), + }, + } + } +} + +struct QueuedPibdSegment { + received_at: Instant, + peer_info: PeerInfo, + segment: PibdSegment, +} + +fn spawn_pibd_segment_worker( + sync_state: Arc, + chain: Weak, + rx: mpsc::Receiver, +) { + thread::Builder::new() + .name("pibd_receive".to_string()) + .spawn(move || { + while let Ok(queued_segment) = rx.recv() { + let segment_id = queued_segment.segment.segment_id(); + let queued_ms = queued_segment.received_at.elapsed().as_millis(); + let started = Instant::now(); + if let Err(e) = process_queued_pibd_segment(&sync_state, &chain, queued_segment) { + error!("PIBD segment processing failed for {:?}: {}", segment_id, e); + } + trace!( + "PIBD segment {:?} processed after queued_ms={}, process_ms={}", + segment_id, + queued_ms, + started.elapsed().as_millis() + ); + } + debug!("PIBD receive worker shutting down"); + }) + .expect("failed to spawn PIBD receive worker"); +} + +fn process_queued_pibd_segment( + sync_state: &Arc, + chain: &Weak, + queued_segment: QueuedPibdSegment, +) -> Result<(), chain::Error> { + let total_started = Instant::now(); + let queued_before_process_ms = queued_segment.received_at.elapsed().as_millis(); + let peer_addr = queued_segment.peer_info.addr; + let chain_started = Instant::now(); + let chain = chain + .upgrade() + .ok_or_else(|| chain::Error::Other("chain not available".to_owned()))?; + let chain_upgrade_ms = chain_started.elapsed().as_millis(); + let archive_header_started = Instant::now(); + let archive_header = chain.txhashset_archive_header_header_only()?; + let archive_header_ms = archive_header_started.elapsed().as_millis(); + let segment_id = queued_segment.segment.segment_id(); + let desegmenter_started = Instant::now(); + let desegmenter = chain.desegmenter(&archive_header)?; + let desegmenter_lookup_ms = desegmenter_started.elapsed().as_millis(); + let lock_started = Instant::now(); + let mut desegmenter = desegmenter.write(); + let lock_wait_ms = lock_started.elapsed().as_millis(); + let validate_started = Instant::now(); + let res = if let Some(d) = desegmenter.as_mut() { + match queued_segment.segment { + PibdSegment::Bitmap { + block_hash, + output_root, + segment, + } => { + debug!( + "Received bitmap segment {} for block_hash: {}, output_root: {}", + segment.identifier().idx, + block_hash, + output_root + ); + d.add_bitmap_segment(segment, output_root) + } + PibdSegment::Output { + block_hash, + bitmap_root, + segment, + } => { + debug!( + "Received output segment {} for block_hash: {}, bitmap_root: {:?}", + segment.identifier().idx, + block_hash, + bitmap_root, + ); + d.add_output_segment(segment, Some(bitmap_root)) + } + PibdSegment::RangeProof { + block_hash, + segment, + } => { + debug!( + "Received proof segment {} for block_hash: {}", + segment.identifier().idx, + block_hash, + ); + d.add_rangeproof_segment(segment) + } + PibdSegment::Kernel { + block_hash, + segment, + } => { + debug!( + "Received kernel segment {} for block_hash: {}", + segment.identifier().idx, + block_hash, + ); + d.add_kernel_segment(segment) + } + } + } else { + Ok(()) + }; + let validate_cache_ms = validate_started.elapsed().as_millis(); + let remove_pending_ms = if res.is_ok() { + let remove_started = Instant::now(); + sync_state.remove_pibd_segment(&segment_id); + remove_started.elapsed().as_millis() + } else { + warn!( + "PIBD segment {:?} from peer {} failed validation and remains pending for retry", + segment_id, peer_addr + ); + sync_state.reject_pibd_segment_from(&segment_id, peer_addr.0); + 0 + }; + trace!( + "PIBD segment {:?} timing queued_before_process_ms={}, chain_upgrade_ms={}, archive_header_ms={}, desegmenter_lookup_ms={}, lock_wait_ms={}, validate_cache_ms={}, remove_pending_ms={}, total_process_ms={}", + segment_id, + queued_before_process_ms, + chain_upgrade_ms, + archive_header_ms, + desegmenter_lookup_ms, + lock_wait_ms, + validate_cache_ms, + remove_pending_ms, + total_started.elapsed().as_millis() + ); + res +} /// Implementation of the NetAdapter for the . Gets notified when new /// blocks and transactions are received and forwards to the chain and pool @@ -66,6 +264,9 @@ where peers: OneTime>, config: ServerConfig, hooks: Vec>, + header_batch_cache: RwLock>, + header_segment_requests: RwLock, usize)>>, + pibd_segment_tx: mpsc::SyncSender, } impl p2p::ChainAdapter for NetToChainAdapter @@ -329,27 +530,7 @@ where } }; - match self - .chain() - .sync_block_headers(bhs, sync_head, chain::Options::SYNC) - { - Ok(sync_head) => { - // If we have an updated sync_head after processing this batch of headers - // then update our sync_state so we can request relevant headers in the next batch. - if let Some(sync_head) = sync_head { - self.sync_state.update_header_sync(sync_head); - } - Ok(true) - } - Err(e) => { - debug!("Block headers refused by chain: {:?}", e); - if e.is_bad_data() { - Ok(false) - } else { - Err(e) - } - } - } + self.cache_and_process_header_batch(bhs, peer_info, sync_head) } fn locate_headers(&self, locator: &[Hash]) -> Result, chain::Error> { @@ -387,6 +568,60 @@ where Ok(headers) } + fn locate_header_segment( + &self, + id: SegmentIdentifier, + peer_info: &PeerInfo, + ) -> Result>, chain::Error> { + if !peer_info + .capabilities + .contains(p2p::Capabilities::PIHD_HIST) + || id.height != p2p::PIHD_HEADER_SEGMENT_HEIGHT + { + return Ok(None); + } + if !self.header_segment_request_allowed(peer_info.addr.0) { + warn!( + "throttling PIHD header segment request {:?} from {}", + id, peer_info.addr + ); + return Ok(None); + } + + let segment_capacity = id.segment_capacity(); + let start_height = match id + .idx + .checked_mul(segment_capacity) + .and_then(|height| height.checked_add(1)) + { + Some(height) => height, + None => return Ok(None), + }; + let max_height = self.chain().header_head()?.height; + let end_height = match start_height + .checked_add(segment_capacity) + .and_then(|height| height.checked_sub(1)) + { + Some(height) => std::cmp::min(height, max_height), + None => max_height, + }; + if start_height > end_height { + return Ok(Some(vec![])); + } + + let header_pmmr = self.chain().header_pmmr(); + let header_pmmr = header_pmmr.read(); + let mut headers = vec![]; + for h in start_height..=end_height { + if let Ok(hash) = header_pmmr.get_header_hash_by_height(h) { + headers.push(self.chain().get_block_header(&hash)?); + } else { + break; + } + } + Ok(Some(headers)) + } + /// Gets a full block by its hash. /// We only support v3 blocks since HF4. /// If a peer is requesting a block and only appears to support v2 @@ -566,34 +801,16 @@ where block_hash: Hash, output_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - debug!( - "Received bitmap segment {} for block_hash: {}, output_root: {}", - segment.identifier().idx, - block_hash, - output_root - ); - // TODO: Entire process needs to be restarted if the horizon block - // has changed (perhaps not here, NB this has to go somewhere) - let archive_header = self.chain().txhashset_archive_header_header_only()?; - let identifier = segment.identifier().clone(); - let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_bitmap_segment(segment, output_root); - if let Err(e) = res { - error!( - "Validation of incoming bitmap segment failed: {:?}, reason: {}", - identifier, e - ); - retval = Err(e); - } - } - // Remove segment from outgoing list - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::Bitmap, - identifier, - }); - retval + self.queue_pibd_segment( + PibdSegment::Bitmap { + block_hash, + output_root, + segment, + }, + peer_info, + ) } fn receive_output_segment( @@ -601,94 +818,116 @@ where block_hash: Hash, bitmap_root: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - debug!( - "Received output segment {} for block_hash: {}, bitmap_root: {:?}", - segment.identifier().idx, - block_hash, - bitmap_root, - ); - let archive_header = self.chain().txhashset_archive_header_header_only()?; - let identifier = segment.identifier().clone(); - let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_output_segment(segment, Some(bitmap_root)); - if let Err(e) = res { - error!( - "Validation of incoming output segment failed: {:?}, reason: {}", - identifier, e - ); - retval = Err(e); - } - } - // Remove segment from outgoing list - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::Output, - identifier, - }); - retval + self.queue_pibd_segment( + PibdSegment::Output { + block_hash, + bitmap_root, + segment, + }, + peer_info, + ) } fn receive_rangeproof_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - debug!( - "Received proof segment {} for block_hash: {}", - segment.identifier().idx, - block_hash, - ); - let archive_header = self.chain().txhashset_archive_header_header_only()?; - let identifier = segment.identifier().clone(); - let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_rangeproof_segment(segment); - if let Err(e) = res { - error!( - "Validation of incoming rangeproof segment failed: {:?}, reason: {}", - identifier, e - ); - retval = Err(e); - } - } - // Remove segment from outgoing list - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::RangeProof, - identifier, - }); - retval + self.queue_pibd_segment( + PibdSegment::RangeProof { + block_hash, + segment, + }, + peer_info, + ) } fn receive_kernel_segment( &self, block_hash: Hash, segment: Segment, + peer_info: &PeerInfo, ) -> Result { - debug!( - "Received kernel segment {} for block_hash: {}", - segment.identifier().idx, - block_hash, - ); - let archive_header = self.chain().txhashset_archive_header_header_only()?; - let identifier = segment.identifier().clone(); - let mut retval = Ok(true); - if let Some(d) = self.chain().desegmenter(&archive_header)?.write().as_mut() { - let res = d.add_kernel_segment(segment); - if let Err(e) = res { - error!( - "Validation of incoming rangeproof segment failed: {:?}, reason: {}", - identifier, e - ); - retval = Err(e); + self.queue_pibd_segment( + PibdSegment::Kernel { + block_hash, + segment, + }, + peer_info, + ) + } + + fn receive_header_segment( + &self, + id: SegmentIdentifier, + headers: &[core::BlockHeader], + peer_info: &PeerInfo, + ) -> Result { + if id.height != p2p::PIHD_HEADER_SEGMENT_HEIGHT { + return Ok(HeaderSegmentAcceptance::Ban); + } + if !self + .sync_state + .contains_pihd_header_segment_from(id, peer_info.addr.0) + { + debug!( + "ignoring unsolicited PIHD header segment {:?} from {}", + id, peer_info.addr + ); + return Ok(HeaderSegmentAcceptance::Accepted); + } + let expected_first_height = match id + .idx + .checked_mul(id.segment_capacity()) + .and_then(|height| height.checked_add(1)) + { + Some(height) => height, + None => return Ok(HeaderSegmentAcceptance::Ban), + }; + let target_height = self + .sync_state + .pihd_header_segment_target_height(id, peer_info.addr.0) + .unwrap_or(0); + if headers.is_empty() { + if expected_first_height > target_height { + self.sync_state + .remove_pihd_header_segment(id, peer_info.addr.0); + return Ok(HeaderSegmentAcceptance::Accepted); } + return Ok(HeaderSegmentAcceptance::Ban); + } + if headers[0].height != expected_first_height { + return Ok(HeaderSegmentAcceptance::Ban); + } + if !headers.windows(2).all(|w| w[1].height == w[0].height + 1) { + return Ok(HeaderSegmentAcceptance::Ban); + } + if headers + .last() + .map(|h| h.height >= target_height) + .unwrap_or(false) + { + self.sync_state + .remove_pihd_header_segment(id, peer_info.addr.0); + } + + let sync_head = match self.sync_state.status() { + SyncStatus::HeaderSync { sync_head, .. } => sync_head, + _ => return Ok(HeaderSegmentAcceptance::Accepted), + }; + let res = self.cache_and_process_header_batch(headers, peer_info, sync_head); + if res.as_ref().map(|accepted| *accepted).unwrap_or(false) { + self.sync_state + .remove_pihd_header_segment(id, peer_info.addr.0); + } + match res { + Ok(true) => Ok(HeaderSegmentAcceptance::Accepted), + Ok(false) => Ok(HeaderSegmentAcceptance::Ban), + Err(e) => Err(e), } - // Remove segment from outgoing list - self.sync_state.remove_pibd_segment(&SegmentTypeIdentifier { - segment_type: SegmentType::Kernel, - identifier, - }); - retval } } @@ -705,6 +944,8 @@ where config: ServerConfig, hooks: Vec>, ) -> Self { + let (pibd_segment_tx, pibd_segment_rx) = mpsc::sync_channel(PIBD_SEGMENT_QUEUE_CAP); + spawn_pibd_segment_worker(sync_state.clone(), Arc::downgrade(&chain), pibd_segment_rx); NetToChainAdapter { sync_state, chain: Arc::downgrade(&chain), @@ -712,6 +953,9 @@ where peers: OneTime::new(), config, hooks, + header_batch_cache: RwLock::new(vec![]), + header_segment_requests: RwLock::new(HashMap::new()), + pibd_segment_tx, } } @@ -734,6 +978,69 @@ where .expect("Failed to upgrade weak ref to our chain.") } + fn queue_pibd_segment( + &self, + segment: PibdSegment, + peer_info: &PeerInfo, + ) -> Result { + let segment_id = segment.segment_id(); + if self.sync_state.rejected_pibd_segment_from( + &segment_id, + peer_info.addr.0, + REJECTED_PIBD_SEGMENT_SECS, + ) { + debug!( + "ignoring rejected PIBD segment {:?} from {}", + segment_id, peer_info.addr + ); + return Ok(false); + } + if !self + .sync_state + .contains_pibd_segment_from(&segment_id, peer_info.addr.0) + { + debug!( + "ignoring unsolicited PIBD segment {:?} from {}", + segment_id, peer_info.addr + ); + return Ok(true); + } + let queued = QueuedPibdSegment { + received_at: Instant::now(), + peer_info: peer_info.clone(), + segment, + }; + match self.pibd_segment_tx.try_send(queued) { + Ok(()) => Ok(true), + Err(mpsc::TrySendError::Full(_)) => { + warn!( + "PIBD receive queue full, dropping segment {:?} from {}", + segment_id, peer_info.addr + ); + Ok(true) + } + Err(mpsc::TrySendError::Disconnected(_)) => Err(chain::Error::Other( + "PIBD receive queue disconnected".to_owned(), + )), + } + } + + fn header_segment_request_allowed(&self, peer_addr: SocketAddr) -> bool { + let now = Utc::now(); + let cutoff = now - Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS); + let mut requests = self.header_segment_requests.write(); + requests.retain(|_, (window_start, _)| *window_start > cutoff); + let entry = requests.entry(peer_addr).or_insert((now, 0)); + if now > entry.0 + Duration::seconds(HEADER_SEGMENT_REQUEST_WINDOW_SECS) { + *entry = (now, 0); + } + if entry.1 >= MAX_HEADER_SEGMENT_REQUESTS_PER_WINDOW { + return false; + } + entry.1 += 1; + true + } + // Find the first locator hash that refers to a known header on our main chain. fn find_common_header(&self, locator: &[Hash]) -> Option { let header_pmmr = self.chain().header_pmmr(); @@ -915,6 +1222,105 @@ where ), } } + + fn cache_and_process_header_batch( + &self, + headers: &[BlockHeader], + peer_info: &PeerInfo, + sync_head: chain::Tip, + ) -> Result { + let headers = headers + .iter() + .skip_while(|h| h.height <= sync_head.height) + .cloned() + .collect::>(); + if headers.is_empty() { + return Ok(true); + } + if headers + .first() + .map(|h| { + h.height + > sync_head + .height + .saturating_add(HEADER_BATCH_CACHE_LOOKAHEAD) + }) + .unwrap_or(false) + { + debug!( + "ignoring far-future header batch starting at height {} while sync head is {}", + headers[0].height, sync_head.height + ); + return Ok(true); + } + + { + let mut cache = self.header_batch_cache.write(); + let first = headers.first().map(|h| h.hash()); + let last = headers.last().map(|h| h.hash()); + if !cache.iter().any(|b| { + b.headers.first().map(|h| h.hash()) == first + || b.headers.last().map(|h| h.hash()) == last + }) { + if cache.len() >= MAX_CACHED_HEADER_BATCHES { + cache.remove(0); + } + cache.push(HeaderBatch { + headers, + peer_info: peer_info.clone(), + }); + } + } + + self.process_ready_header_batches(peer_info) + } + + fn process_ready_header_batches(&self, current_peer: &PeerInfo) -> Result { + loop { + let sync_head = match self.sync_state.status() { + SyncStatus::HeaderSync { sync_head, .. } => sync_head, + _ => return Ok(true), + }; + let batch = { + let mut cache = self.header_batch_cache.write(); + cache.sort_by_key(|b| b.headers.first().map(|h| h.height).unwrap_or(u64::MAX)); + let pos = cache.iter().position(|b| { + b.headers.first().map(|h| h.height) == Some(sync_head.height + 1) + }); + match pos { + Some(pos) => cache.remove(pos), + None => return Ok(true), + } + }; + + match self + .chain() + .sync_block_headers(&batch.headers, sync_head, chain::Options::SYNC) + { + Ok(sync_head) => { + if let Some(sync_head) = sync_head { + self.sync_state.update_header_sync(sync_head); + } + } + Err(e) => { + debug!("Block headers refused by chain: {:?}", e); + if e.is_bad_data() { + if batch.peer_info.addr == current_peer.addr { + return Ok(false); + } + if let Err(e) = self.peers().ban_peer( + batch.peer_info.addr, + p2p::types::ReasonForBan::BadBlockHeader, + ) { + error!("failed to ban peer {}: {:?}", batch.peer_info.addr, e); + } + } else { + return Err(e); + } + } + } + } + } } /// Implementation of the ChainAdapter for the network. Gets notified when the diff --git a/servers/src/common/stats.rs b/servers/src/common/stats.rs index 308a619819..6e7099e485 100644 --- a/servers/src/common/stats.rs +++ b/servers/src/common/stats.rs @@ -48,6 +48,8 @@ impl Default for ServerStateInfo { /// consumers might be interested in, such as test results or UI #[derive(Debug, Clone)] pub struct ServerStats { + /// Server uptime in seconds + pub uptime_seconds: u64, /// Number of peers pub peer_count: u32, /// Chain head diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 3d5fe8d85a..e9c0889d85 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -229,6 +229,8 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende // and queue them up for a connection attempt // intentionally make too many attempts (2x) as some (most?) will fail // as many nodes in our db are not publicly accessible + let default_peers = PeerAddrs::default(); + let peers_deny = config.peers_deny.as_ref().unwrap_or(&default_peers); let mut new_peers = vec![]; let max_peer_attempts = 128; let max_attempt_delay = Duration::hours(1).num_seconds(); @@ -241,7 +243,7 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende }) .choose_multiple(&mut thread_rng(), max_peer_attempts / 2) { - new_peers.push(&hp.addr); + new_peers.push(hp.addr); } // always check min 32 (max 96, if there are no healthy) random unknown peers received from peer list request. let req_unk_count = cmp::max( @@ -252,7 +254,7 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende .iter() .choose_multiple(&mut thread_rng(), req_unk_count) { - new_peers.push(upa); + new_peers.push(*upa); } debug!( "monitor_peers: check {} healthy, {} unknown, {} defuncts", @@ -269,7 +271,16 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende .filter(|p| Utc::now().timestamp() - p.last_attempt >= max_attempt_delay) .choose_multiple(&mut thread_rng(), max_peer_attempts - new_peers.len()) { - new_peers.push(&dp.addr); + new_peers.push(dp.addr); + } + + // If the peer db is stale or mostly defunct, include seeds as recovery candidates. + if !peers.enough_outbound_peers() { + for addr in seed_list(&config) { + if !peers_deny.as_slice().contains(&addr) && !new_peers.contains(&addr) { + new_peers.push(addr); + } + } } // Only queue up connection attempts for candidate peers where we @@ -277,8 +288,8 @@ fn monitor_peers(peers: Arc, config: p2p::P2PConfig, tx: mpsc::Sende // The call to is_known() may fail due to contention on the peers map. // Do not attempt any connection where is_known() fails for any reason. for pa in new_peers { - if let Ok(false) = peers.is_known(*pa) { - tx.send(*pa).unwrap(); + if let Ok(false) = peers.is_known(pa) { + tx.send(pa).unwrap(); } } } @@ -314,12 +325,31 @@ fn connect_to_seeds_and_peers( let peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 128); // if so, get their addresses, otherwise use our seeds - let peer_addrs = if peers.len() > 3 { + let mut peer_addrs = if peers.len() > 3 { peers.iter().map(|p| p.addr).collect::>() } else { seed_list(&config) }; + // If the peer db has too few healthy peer-list providers, fill from seeds. + let min_outbound = config.peer_min_preferred_outbound_count() as usize; + let allowed_peer_count = peer_addrs + .iter() + .filter(|addr| !peers_deny.as_slice().contains(addr)) + .count(); + if allowed_peer_count < min_outbound { + let mut allowed_peer_count = allowed_peer_count; + for addr in seed_list(&config) { + if !peers_deny.as_slice().contains(&addr) && !peer_addrs.contains(&addr) { + peer_addrs.push(addr); + allowed_peer_count += 1; + } + if allowed_peer_count >= min_outbound { + break; + } + } + } + if peer_addrs.is_empty() { warn!("No seeds were retrieved."); } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index d3510fe228..78cb6be622 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -75,6 +75,7 @@ pub struct Server { pub stop_state: Arc, /// Maintain a lock_file so we do not run multiple Grin nodes from same dir. lock_file: Arc, + start_time: time::Instant, connect_thread: Option>, sync_thread: JoinHandle<()>, dandelion_thread: JoinHandle<()>, @@ -155,6 +156,7 @@ impl Server { ) -> Result { // Obtain our lock_file or fail immediately with an error. let lock_file = Server::one_grin_at_a_time(&config)?; + let start_time = time::Instant::now(); // Defaults to None (optional) in config file. // This translates to false here. @@ -315,6 +317,7 @@ impl Server { }, stop_state, lock_file, + start_time, connect_thread, sync_thread, dandelion_thread, @@ -490,6 +493,7 @@ impl Server { stem_pool_kernels: pool.stempool.kernel_count(), }); + let sync_status = self.sync_state.status(); let head = self.chain.head_header()?; let head_stats = ChainStats { latest_timestamp: head.timestamp, @@ -498,13 +502,23 @@ impl Server { total_difficulty: head.total_difficulty(), }; - let header_head = self.chain.header_head()?; - let header = self.chain.get_block_header(&header_head.hash())?; - let header_stats = ChainStats { - latest_timestamp: header.timestamp, - height: header.height, - last_block_h: header.hash(), - total_difficulty: header.total_difficulty(), + let header_stats = match sync_status { + SyncStatus::HeaderSync { sync_head, .. } => ChainStats { + latest_timestamp: head.timestamp, + height: sync_head.height, + last_block_h: sync_head.last_block_h, + total_difficulty: sync_head.total_difficulty, + }, + _ => { + let header_head = self.chain.header_head()?; + let header = self.chain.get_block_header(&header_head.hash())?; + ChainStats { + latest_timestamp: header.timestamp, + height: header.height, + last_block_h: header.hash(), + total_difficulty: header.total_difficulty(), + } + } }; let disk_usage_bytes = WalkDir::new(&self.config.db_root) @@ -519,10 +533,11 @@ impl Server { let disk_usage_gb = format!("{:.*}", 3, (disk_usage_bytes as f64 / 1_000_000_000_f64)); Ok(ServerStats { + uptime_seconds: self.start_time.elapsed().as_secs(), peer_count: self.peer_count(), chain_stats: head_stats, header_stats: header_stats, - sync_status: self.sync_state.status(), + sync_status, disk_usage_gb: disk_usage_gb, stratum_stats: stratum_stats, peer_stats: peer_stats, diff --git a/servers/src/grin/sync/header_sync.rs b/servers/src/grin/sync/header_sync.rs index ed2cc1ffc4..81cf30d1ce 100644 --- a/servers/src/grin/sync/header_sync.rs +++ b/servers/src/grin/sync/header_sync.rs @@ -16,11 +16,36 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; use std::sync::Arc; -use crate::chain::{self, SyncState, SyncStatus}; +use crate::chain::{self, pibd_params, HeaderSyncMode, SyncState, SyncStatus}; use crate::common::types::Error; use crate::core::core::hash::Hash; +use crate::core::core::SegmentIdentifier; use crate::core::pow::Difficulty; -use crate::p2p::{self, types::ReasonForBan, Capabilities, Peer}; +use crate::p2p::{ + self, types::PeerAddr, types::ReasonForBan, Capabilities, Peer, PIHD_HEADER_SEGMENT_HEIGHT, +}; + +const PIHD_MAX_IN_FLIGHT_SEGMENTS: usize = 8; +const PIHD_MAX_REQUESTS_PER_TICK: usize = 8; +const PIHD_MAX_IN_FLIGHT_SEGMENTS_PER_PEER: usize = 2; +const HEADER_REQUEST_TIMEOUT_SECS: i64 = 10; +const PIHD_MAX_TIMED_OUT_SEGMENTS: usize = 3; +const PIHD_DISABLE_SECS: i64 = 120; +const PIHD_PEER_TIMEOUT_COOLDOWN_SECS: i64 = 30; +const PIHD_STALL_FALLBACK_SECS: i64 = 120; + +struct PihdHeaderRequest { + identifier: SegmentIdentifier, + peer_addr: PeerAddr, + requested_at: DateTime, + target_height: u64, +} + +struct LegacyHeaderRequest { + peer_addr: PeerAddr, + height: u64, + requested_at: DateTime, +} pub struct HeaderSync { sync_state: Arc, @@ -29,6 +54,13 @@ pub struct HeaderSync { prev_header_sync: (DateTime, u64, u64), syncing_peer: Option>, stalling_ts: Option>, + pending_pihd: Vec, + pending_legacy: Option, + pihd_failure_count: usize, + pihd_peer_timeout_until: Vec<(PeerAddr, DateTime)>, + pihd_stalling_ts: Option>, + pihd_disabled_until: Option>, + pihd_active: bool, } impl HeaderSync { @@ -44,6 +76,13 @@ impl HeaderSync { prev_header_sync: (Utc::now(), 0, 0), syncing_peer: None, stalling_ts: None, + pending_pihd: vec![], + pending_legacy: None, + pihd_failure_count: 0, + pihd_peer_timeout_until: vec![], + pihd_stalling_ts: None, + pihd_disabled_until: None, + pihd_active: false, } } @@ -63,9 +102,14 @@ impl HeaderSync { return Ok(false); } - // TODO - can we safely reuse the peer here across multiple runs? - let sync_peer = self.choose_sync_peer(); + self.cleanup_pending_requests(sync_head); + let sync_peer = self + .syncing_peer + .clone() + .filter(|p| self.peers.get_connected_peer(p.info.addr).is_some()) + .map(|p| Some(p)) + .unwrap_or_else(|| self.choose_sync_peer()); if let Some(sync_peer) = sync_peer { let (peer_height, peer_diff) = { let info = sync_peer.info.live_info.read(); @@ -74,6 +118,13 @@ impl HeaderSync { // Quick check - nothing to sync if we are caught up with the peer. if peer_diff <= sync_head.total_difficulty { + if self.pihd_active { + info!( + "sync: PIHD header sync completed at height {}, total difficulty {}", + sync_head.height, sync_head.total_difficulty + ); + self.pihd_active = false; + } return Ok(false); } @@ -81,18 +132,165 @@ impl HeaderSync { return Ok(false); } - self.sync_state.update(SyncStatus::HeaderSync { - sync_head, - highest_height: peer_height, - highest_diff: peer_diff, - }); - - self.header_sync(sync_head, sync_peer.clone()); - self.syncing_peer = Some(sync_peer.clone()); + let pihd_peers = if self.pihd_enabled() { + self.choose_pihd_peers(sync_head) + } else { + vec![] + }; + if pihd_peers.is_empty() { + if self.pihd_active { + info!( + "sync: PIHD header sync aborted at height {}; falling back to legacy header sync", + sync_head.height + ); + self.pihd_active = false; + } + self.pending_pihd.clear(); + self.sync_state.retain_pihd_header_segments(|_| false); + self.sync_state.update(SyncStatus::HeaderSync { + sync_head, + sync_mode: HeaderSyncMode::Legacy, + highest_height: peer_height, + highest_diff: peer_diff, + }); + if self.header_sync(sync_head, sync_peer.clone()) { + self.syncing_peer = Some(sync_peer.clone()); + } else { + self.syncing_peer = None; + } + } else { + if !self.pihd_active { + info!( + "sync: PIHD header sync started at height {} with {} eligible peer(s)", + sync_head.height, + pihd_peers.len() + ); + self.pihd_active = true; + } + self.sync_state.update(SyncStatus::HeaderSync { + sync_head, + sync_mode: HeaderSyncMode::Pihd, + highest_height: peer_height, + highest_diff: peer_diff, + }); + self.pihd_header_sync(sync_head, pihd_peers); + self.syncing_peer = None; + } } Ok(true) } + fn cleanup_pending_requests(&mut self, header_head: chain::Tip) { + let now = Utc::now(); + let peers = self.peers.clone(); + if header_head.height > self.prev_header_sync.1 { + self.pihd_failure_count = 0; + self.pihd_stalling_ts = None; + } + + let mut failed = 0; + let mut failed_peers = vec![]; + self.pending_pihd.retain(|req| { + let completed_height = req + .identifier + .idx + .saturating_mul(req.identifier.segment_capacity()) + .saturating_add(req.identifier.segment_capacity()) + .min(req.target_height); + let connected = peers.get_connected_peer(req.peer_addr).is_some(); + let complete = header_head.height >= completed_height; + let timeout = now > req.requested_at + Duration::seconds(HEADER_REQUEST_TIMEOUT_SECS); + if !complete && connected && timeout { + failed += 1; + failed_peers.push(req.peer_addr); + } + if !complete && !connected { + failed += 1; + failed_peers.push(req.peer_addr); + } + !complete && connected && !timeout + }); + self.sync_state.retain_pihd_header_segments(|req| { + let completed_height = req + .identifier + .idx + .saturating_mul(req.identifier.segment_capacity()) + .saturating_add(req.identifier.segment_capacity()); + let completed_height = completed_height.min(req.target_height); + let connected = peers.get_connected_peer(PeerAddr(req.peer_addr)).is_some(); + let complete = header_head.height >= completed_height; + let timeout = now > req.request_time + Duration::seconds(HEADER_REQUEST_TIMEOUT_SECS); + !complete && connected && !timeout + }); + if failed > 0 { + for peer_addr in failed_peers { + self.note_pihd_peer_failure(peer_addr, now); + } + self.pihd_failure_count += failed; + if self.pihd_stalling_ts.is_none() { + self.pihd_stalling_ts = Some(now); + } + let pihd_stalled = self + .pihd_stalling_ts + .map(|stalling_ts| now > stalling_ts + Duration::seconds(PIHD_STALL_FALLBACK_SECS)) + .unwrap_or(false); + if self.pihd_failure_count >= PIHD_MAX_TIMED_OUT_SEGMENTS && pihd_stalled { + info!( + "sync: disabling PIHD for {} seconds after {} failed header segment request(s) and {} seconds without header progress", + PIHD_DISABLE_SECS, self.pihd_failure_count, PIHD_STALL_FALLBACK_SECS + ); + if self.pihd_active { + info!( + "sync: PIHD header sync aborted at height {}; failed {} header segment request(s), falling back to legacy header sync", + header_head.height, + self.pihd_failure_count + ); + self.pihd_active = false; + } + self.pending_pihd.clear(); + self.sync_state.retain_pihd_header_segments(|_| false); + self.pihd_failure_count = 0; + self.pihd_stalling_ts = None; + self.pihd_disabled_until = Some(now + Duration::seconds(PIHD_DISABLE_SECS)); + } + } + + if let Some(req) = &self.pending_legacy { + let connected = self.peers.get_connected_peer(req.peer_addr).is_some(); + let complete = header_head.height > req.height; + let timed_out = now > req.requested_at + Duration::seconds(HEADER_REQUEST_TIMEOUT_SECS); + if complete || timed_out || !connected { + self.pending_legacy = None; + } + } + } + + fn note_pihd_peer_failure(&mut self, peer_addr: PeerAddr, now: DateTime) { + self.pihd_peer_timeout_until + .retain(|(addr, until)| *addr != peer_addr && *until > now); + self.pihd_peer_timeout_until.push(( + peer_addr, + now + Duration::seconds(PIHD_PEER_TIMEOUT_COOLDOWN_SECS), + )); + } + + fn pihd_peer_available(&self, peer_addr: PeerAddr, now: DateTime) -> bool { + !self + .pihd_peer_timeout_until + .iter() + .any(|(addr, until)| *addr == peer_addr && *until > now) + } + + fn pihd_enabled(&mut self) -> bool { + if let Some(disabled_until) = self.pihd_disabled_until { + if Utc::now() < disabled_until { + return false; + } + self.pihd_disabled_until = None; + } + true + } + fn header_sync_due(&mut self, header_head: chain::Tip) -> bool { let now = Utc::now(); let (timeout, latest_height, prev_height) = self.prev_header_sync; @@ -152,9 +350,9 @@ impl HeaderSync { } _ => (), } + self.syncing_peer = None; } } - self.syncing_peer = None; true } else { // resetting the timeout as long as we progress @@ -185,22 +383,141 @@ impl HeaderSync { }) } - fn header_sync(&self, sync_head: chain::Tip, peer: Arc) { + fn choose_pihd_peers(&self, sync_head: chain::Tip) -> Vec> { + let peers_iter = || { + self.peers + .iter() + .with_capabilities(Capabilities::HEADER_HIST) + .connected() + }; + let max_height = peers_iter() + .into_iter() + .map(|p| p.info.height()) + .max() + .unwrap_or(0); + let height_slack = pibd_params::SYNC_PEER_HEIGHT_SLACK_BLOCKS; + peers_iter() + .with_capabilities(Capabilities::PIHD_HIST) + .with_difficulty(|x| x > sync_head.total_difficulty) + .with_filter(|p| p.info.height() > sync_head.height) + .with_filter(|p| p.info.height().saturating_add(height_slack) >= max_height) + .into_iter() + .collect() + } + + fn header_sync(&mut self, sync_head: chain::Tip, peer: Arc) -> bool { if peer.info.total_difficulty() > sync_head.total_difficulty { - self.request_headers(sync_head, peer); + self.request_headers(sync_head, peer) + } else { + false + } + } + + fn pihd_header_sync(&mut self, sync_head: chain::Tip, peers: Vec>) { + let now = Utc::now(); + self.pihd_peer_timeout_until + .retain(|(_, until)| *until > now); + let preferred_peers = peers + .iter() + .filter(|peer| self.pihd_peer_available(peer.info.addr, now)) + .cloned() + .collect::>(); + let peers = if preferred_peers.is_empty() { + peers + } else { + preferred_peers + }; + if self.pending_pihd.len() >= PIHD_MAX_IN_FLIGHT_SEGMENTS { + return; + } + let mut sent = 0; + let mut segment_idx = sync_head.height / (p2p::MAX_BLOCK_HEADERS as u64); + while self.pending_pihd.len() < PIHD_MAX_IN_FLIGHT_SEGMENTS + && sent < PIHD_MAX_REQUESTS_PER_TICK + { + let identifier = SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: segment_idx, + }; + let start_height = match pihd_segment_start_height(identifier) { + Some(height) => height, + None => return, + }; + if self + .pending_pihd + .iter() + .any(|req| req.identifier == identifier) + { + segment_idx += 1; + continue; + } + let peer = match peers + .iter() + .find(|peer| { + peer.info.height() >= start_height + && self + .pending_pihd + .iter() + .filter(|req| req.peer_addr == peer.info.addr) + .count() < PIHD_MAX_IN_FLIGHT_SEGMENTS_PER_PEER + }) + .or_else(|| { + peers.iter().find(|peer| { + peer.info.height() >= start_height + && self + .pending_pihd + .iter() + .filter(|req| req.peer_addr == peer.info.addr) + .count() < PIHD_MAX_IN_FLIGHT_SEGMENTS + }) + }) { + Some(peer) => peer.clone(), + None => return, + }; + if peer.send_header_segment_request(identifier).is_ok() { + let target_height = peer.info.height(); + self.sync_state.add_pihd_header_segment( + identifier, + peer.info.addr.0, + target_height, + ); + self.pending_pihd.push(PihdHeaderRequest { + identifier, + peer_addr: peer.info.addr, + requested_at: Utc::now(), + target_height, + }); + sent += 1; + } + segment_idx += 1; } } /// Request some block headers from a peer to advance us. - fn request_headers(&self, sync_head: chain::Tip, peer: Arc) { + fn request_headers(&mut self, sync_head: chain::Tip, peer: Arc) -> bool { + if let Some(req) = &self.pending_legacy { + return req.peer_addr == peer.info.addr + && self.peers.get_connected_peer(peer.info.addr).is_some(); + } + if self.peers.get_connected_peer(peer.info.addr).is_none() { + return false; + } if let Ok(locator) = self.get_locator(sync_head) { debug!( "sync: request_headers: asking {} for headers, {:?}", peer.info.addr, locator, ); - let _ = peer.send_header_request(locator); + if peer.send_header_request(locator).is_ok() { + self.pending_legacy = Some(LegacyHeaderRequest { + peer_addr: peer.info.addr, + height: sync_head.height, + requested_at: Utc::now(), + }); + return true; + } } + false } /// Build a locator based on header_head. @@ -211,6 +528,12 @@ impl HeaderSync { } } +fn pihd_segment_start_height(id: SegmentIdentifier) -> Option { + id.idx + .checked_mul(id.segment_capacity()) + .and_then(|height| height.checked_add(1)) +} + // current height back to 0 decreasing in powers of 2 fn get_locator_heights(height: u64) -> Vec { let mut current = height; @@ -250,4 +573,22 @@ mod test { vec![10000, 9998, 9994, 9986, 9970, 9938, 9874, 9746, 9490, 8978, 7954, 5906, 1810, 0,] ); } + + #[test] + fn test_pihd_segment_start_height() { + assert_eq!( + pihd_segment_start_height(SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 0 + }), + Some(1) + ); + assert_eq!( + pihd_segment_start_height(SegmentIdentifier { + height: PIHD_HEADER_SEGMENT_HEIGHT, + idx: 1 + }), + Some((p2p::MAX_BLOCK_HEADERS as u64) + 1) + ); + } } diff --git a/servers/src/grin/sync/state_sync.rs b/servers/src/grin/sync/state_sync.rs index aaee2e6a0a..3ff96758c5 100644 --- a/servers/src/grin/sync/state_sync.rs +++ b/servers/src/grin/sync/state_sync.rs @@ -15,14 +15,17 @@ use chrono::prelude::{DateTime, Utc}; use chrono::Duration; use std::sync::Arc; +use std::time::Instant; use crate::chain::{self, pibd_params, SyncState, SyncStatus}; use crate::core::core::{hash::Hashed, pmmr::segment::SegmentType}; use crate::core::global; use crate::core::pow::Difficulty; -use crate::p2p::{self, Capabilities, Peer}; +use crate::p2p::{self, Capabilities, Peer, PeerAddr}; use crate::util::StopState; +const PIBD_PROGRESS_CHECK_SECS: u64 = 10; + /// Fast sync has 3 "states": /// * syncing headers /// * once all headers are sync'd, requesting the txhashset state @@ -39,6 +42,7 @@ pub struct StateSync { pibd_aborted: bool, earliest_zero_pibd_peer_time: Option>, + last_pibd_progress_check: Option, } impl StateSync { @@ -55,6 +59,7 @@ impl StateSync { state_sync_peer: None, pibd_aborted: false, earliest_zero_pibd_peer_time: None, + last_pibd_progress_check: None, } } @@ -76,15 +81,11 @@ impl StateSync { pub fn check_run( &mut self, header_head: &chain::Tip, - head: &chain::Tip, - tail: &chain::Tip, + _head: &chain::Tip, + _tail: &chain::Tip, highest_height: u64, stop_state: Arc, ) -> bool { - trace!("state_sync: head.height: {}, tail.height: {}. header_head.height: {}, highest_height: {}", - head.height, tail.height, header_head.height, highest_height, - ); - let mut sync_need_restart = false; // check sync error @@ -172,8 +173,14 @@ impl StateSync { let (launch, _download_timeout) = self.state_sync_due(); let archive_header = { self.chain.txhashset_archive_header_header_only().unwrap() }; if launch { + info!( + "state_sync: PIBD started for archive header {} at height {}", + archive_header.hash(), + archive_header.height + ); self.sync_state .update_pibd_progress(false, false, 0, 1, &archive_header); + self.last_pibd_progress_check = Some(Instant::now()); } // Continue our PIBD process (which returns true if all segments are in) if self.continue_pibd() { @@ -181,6 +188,11 @@ impl StateSync { // All segments in, validate if let Some(d) = desegmenter.write().as_mut() { if let Ok(true) = d.check_progress(self.sync_state.clone()) { + info!( + "state_sync: PIBD segments downloaded for archive header {} at height {}; validating final txhashset", + archive_header.hash(), + archive_header.height + ); if let Err(e) = d.check_update_leaf_set_state() { error!("error updating PIBD leaf set: {}", e); self.sync_state.update_pibd_progress( @@ -206,6 +218,11 @@ impl StateSync { ); return false; } + info!( + "state_sync: PIBD completed for archive header {} at height {}", + archive_header.hash(), + archive_header.height + ); return true; } }; @@ -255,50 +272,162 @@ impl StateSync { let stale_segments = self .sync_state .remove_stale_pibd_requests(pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS); + let retry_segments = self.sync_state.retryable_pibd_segments( + pibd_params::BLOCKING_SEGMENT_RETRY_SECS, + pibd_params::BLOCKING_SEGMENT_RETRY_COUNT, + ); + if !stale_segments.is_empty() { + for (seg_id, peer_addr) in stale_segments.iter() { + if let Some(peer_addr) = peer_addr { + let peer_addr = PeerAddr(*peer_addr); + let _ = self.peers.block_peer(peer_addr, "PIBD segment timeout"); + let is_outbound = self.peers.iter().outbound().by_addr(peer_addr).is_some(); + if is_outbound { + debug!( + "state_sync: disconnecting outbound peer {} after PIBD timeout for {:?}", + peer_addr, seg_id + ); + if let Err(e) = self + .peers + .disconnect_peer(peer_addr, "PIBD segment timeout") + { + debug!( + "state_sync: failed to disconnect timed-out peer {}: {:?}", + peer_addr, e + ); + } + } + } + } + } // Apply segments... TODO: figure out how this should be called, might // need to be a separate thread. if let Some(mut de) = desegmenter.try_write() { if let Some(d) = de.as_mut() { + let apply_started = Instant::now(); let res = d.apply_next_segments(); + trace!( + "state_sync: PIBD apply_next_segments completed in {}ms", + apply_started.elapsed().as_millis() + ); if let Err(e) = res { error!("error applying segment: {}", e); self.sync_state .update_pibd_progress(false, true, 0, 1, &archive_header); return false; } + self.sync_state + .update_pibd_leaf_progress(d.applied_leaf_count(), &archive_header); } } - // TODO and consider: number here depends on how many simultaneous - // requests we want to send to peers - let mut next_segment_ids = vec![]; - if let Some(d) = desegmenter.write().as_mut() { - if let Ok(true) = d.check_progress(self.sync_state.clone()) { - return true; + let pending_segment_count = self.sync_state.pending_pibd_segment_count(); + let progress_check_due = pending_segment_count == 0 + && self + .last_pibd_progress_check + .map(|last| last.elapsed().as_secs() >= PIBD_PROGRESS_CHECK_SECS) + .unwrap_or(true); + if progress_check_due { + if let Some(mut de) = desegmenter.try_write() { + if let Some(d) = de.as_mut() { + let progress_started = Instant::now(); + self.last_pibd_progress_check = Some(Instant::now()); + match d.check_progress(self.sync_state.clone()) { + Ok(true) => return true, + Ok(false) => trace!( + "state_sync: PIBD check_progress completed in {}ms", + progress_started.elapsed().as_millis() + ), + Err(e) => error!("state_sync: PIBD check_progress error: {}", e), + } + } + } else { + trace!("state_sync: PIBD check_progress skipped, desegmenter busy"); } - // Figure out the next segments we need - // (12 is divisible by 3, to try and evenly spread the requests among the 3 - // main pmmrs. Bitmaps segments will always be requested first) - next_segment_ids = d.next_desired_segments(pibd_params::SEGMENT_REQUEST_COUNT); - if !next_segment_ids.is_empty() { - debug!( - "state_sync: requesting next PIBD segments {:?}", - next_segment_ids - ); + } + + let request_budget = + pibd_params::SEGMENT_REQUEST_COUNT.saturating_sub(pending_segment_count); + + let mut next_segment_ids = vec![]; + if request_budget > 0 { + if let Some(mut de) = desegmenter.try_write() { + if let Some(d) = de.as_mut() { + // Figure out the next segments we need, looking past currently + // pending requests so we can keep the request window full. + next_segment_ids = d.next_desired_segments( + pibd_params::SEGMENT_REQUEST_COUNT + pending_segment_count, + ); + if !next_segment_ids.is_empty() { + trace!( + "state_sync: requesting {} PIBD segment candidate(s)", + next_segment_ids.len() + ); + } else { + trace!("state_sync: no PIBD segments requested this loop"); + } + } } else { - trace!("state_sync: no PIBD segments requested this loop"); + trace!("state_sync: PIBD request scheduling skipped, desegmenter busy"); } } // For each segment, pick a desirable peer and send message // (Provided we're not waiting for a response for this message from someone else) - for seg_id in next_segment_ids.iter() { - if self.sync_state.contains_pibd_segment(seg_id) { - debug!( - "state_sync: segment {:?} already requested, waiting for response", - seg_id - ); + let mut sent_requests = 0; + let mut request_candidates: Vec<_> = retry_segments + .into_iter() + .map(|(seg_id, peer)| (seg_id, peer, true)) + .collect(); + + let mut bitmap_candidates = vec![]; + let mut output_candidates = vec![]; + let mut rangeproof_candidates = vec![]; + let mut kernel_candidates = vec![]; + + for seg_id in next_segment_ids.into_iter() { + let excluded_peer = stale_segments + .iter() + .find(|(stale_id, _)| stale_id == &seg_id) + .and_then(|(_, addr)| *addr); + let candidate = (seg_id, excluded_peer, false); + match candidate.0.segment_type { + SegmentType::Bitmap => bitmap_candidates.push(candidate), + SegmentType::Output => output_candidates.push(candidate), + SegmentType::RangeProof => rangeproof_candidates.push(candidate), + SegmentType::Kernel => kernel_candidates.push(candidate), + } + } + + bitmap_candidates.reverse(); + output_candidates.reverse(); + rangeproof_candidates.reverse(); + kernel_candidates.reverse(); + loop { + let len_before = request_candidates.len(); + if let Some(candidate) = bitmap_candidates.pop() { + request_candidates.push(candidate); + } + if let Some(candidate) = kernel_candidates.pop() { + request_candidates.push(candidate); + } + if let Some(candidate) = output_candidates.pop() { + request_candidates.push(candidate); + } + if let Some(candidate) = rangeproof_candidates.pop() { + request_candidates.push(candidate); + } + if request_candidates.len() == len_before { + break; + } + } + + for (seg_id, excluded_peer, is_retry) in request_candidates.iter() { + if !is_retry && sent_requests >= request_budget { + continue; + } + if !is_retry && self.sync_state.contains_pibd_segment(seg_id) { trace!("Request list contains, continuing: {:?}", seg_id); continue; } @@ -317,12 +446,23 @@ impl StateSync { .with_capabilities(Capabilities::PIBD_HIST_1) .connected() }; + let height_slack = pibd_params::SYNC_PEER_HEIGHT_SLACK_BLOCKS; + let max_pibd_height = peers_iter_pibd() + .into_iter() + .map(|p| p.info.height()) + .max() + .unwrap_or(0); + let available_pibd_peers = || { + peers_iter_pibd().with_filter(|p| { + p.info.height().saturating_add(height_slack) >= max_pibd_height + }) + }; // If there are no suitable PIBD-Enabled peers, AND there hasn't been one for a minute, // abort PIBD and fall back to txhashset download // Waiting a minute helps ensures that the cancellation isn't simply due to a single non-PIBD enabled // peer having the max difficulty - if peers_iter_pibd().count() == 0 { + if available_pibd_peers().count() == 0 { if let None = self.earliest_zero_pibd_peer_time { self.set_earliest_zero_pibd_peer_time(Some(Utc::now())); } @@ -330,8 +470,12 @@ impl StateSync { + Duration::seconds(pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS) < Utc::now() { - // random abort test - info!("No PIBD-enabled max-difficulty peers for the past {} seconds - Aborting PIBD and falling back to TxHashset.zip download", pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS); + info!( + "state_sync: PIBD aborted for archive header {} at height {}; no PIBD-enabled max-difficulty peers for {} seconds, falling back to TxHashset.zip download", + archive_header.hash(), + archive_header.height, + pibd_params::TXHASHSET_ZIP_FALLBACK_TIME_SECS + ); self.sync_state .update_pibd_progress(true, true, 0, 1, &archive_header); self.sync_state @@ -343,26 +487,48 @@ impl StateSync { self.set_earliest_zero_pibd_peer_time(None) } - // Choose a random "most work" peer, excluding peer from stale segment and preferring outbound if at all possible. - let excluded_peer = stale_segments - .iter() - .find(|(stale_id, _)| stale_id == seg_id) - .and_then(|(_, addr)| *addr); - let peer = peers_iter_pibd() + // Choose a random "most work" peer, excluding peer from stale/retry segment + // and preferring outbound if at all possible. + let sync_state = self.sync_state.clone(); + let peer = available_pibd_peers() .outbound() - .exclude(excluded_peer) + .with_filter(|p| { + !peers.is_blocked(p.info.addr) + && !sync_state.rejected_pibd_segment_from( + seg_id, + p.info.addr.0, + pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS, + ) + }) + .exclude(*excluded_peer) .choose_random() .or_else(|| { - peers_iter_pibd() + available_pibd_peers() .inbound() - .exclude(excluded_peer) + .with_filter(|p| { + !peers.is_blocked(p.info.addr) + && !sync_state.rejected_pibd_segment_from( + seg_id, + p.info.addr.0, + pibd_params::SEGMENT_REQUEST_TIMEOUT_SECS, + ) + }) + .exclude(*excluded_peer) .choose_random() + .or_else(|| { + // If all otherwise eligible peers are blocked, keep sync moving. + available_pibd_peers() + .exclude(*excluded_peer) + .choose_random() + }) }); - trace!("Chosen peer is {:?}", peer); - if let Some(p) = peer { // add to list of segments that are being tracked - self.sync_state.add_pibd_segment(seg_id, p.info.addr.0); + if *is_retry { + self.sync_state.refresh_pibd_segment(seg_id, p.info.addr.0); + } else { + self.sync_state.add_pibd_segment(seg_id, p.info.addr.0); + } let res = match seg_id.segment_type { SegmentType::Bitmap => p.send_bitmap_segment_request( archive_header.hash(), @@ -387,8 +553,17 @@ impl StateSync { p.info.addr, e ); self.sync_state.remove_pibd_segment(seg_id); + } else if *is_retry { + if let Some(prev_peer) = excluded_peer { + if p.info.addr.0 != *prev_peer { + info!( + "state_sync: retrying blocking segment {:?} with new peer {} (previously {})", + seg_id, p.info.addr, prev_peer + ); + } + } } else if let Some(prev_peer) = excluded_peer { - if p.info.addr.0 != prev_peer { + if p.info.addr.0 != *prev_peer { info!( "state_sync: retrying segment {:?} with new peer {} (previously {})", seg_id, p.info.addr, prev_peer @@ -405,6 +580,9 @@ impl StateSync { seg_id, p.info.addr ); } + if !is_retry { + sent_requests += 1; + } } } false @@ -495,7 +673,9 @@ impl StateSync { } fn state_sync_reset(&mut self) { + let _ = self.peers.unblock_peers(); self.prev_state_sync = None; self.state_sync_peer = None; + self.last_pibd_progress_check = None; } } diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 8538e63458..9f08969c31 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -25,7 +25,7 @@ use std::borrow::Cow; use crate::tui::constants::VIEW_BASIC_STATUS; use crate::tui::types::TUIStatusListener; -use crate::chain::SyncStatus; +use crate::chain::{HeaderSyncMode, SyncStatus}; use crate::servers::ServerStats; const NANO_TO_MILLIS: f64 = 1.0 / 1_000_000.0; @@ -33,6 +33,16 @@ const NANO_TO_MILLIS: f64 = 1.0 / 1_000_000.0; pub struct TUIStatusView; impl TUIStatusView { + fn header_sync_mode_label(sync_status: SyncStatus) -> Cow<'static, str> { + match sync_status { + SyncStatus::HeaderSync { sync_mode, .. } => match sync_mode { + HeaderSyncMode::Legacy => Cow::Borrowed("Legacy"), + HeaderSyncMode::Pihd => Cow::Borrowed("PIHD"), + }, + _ => Cow::Borrowed("Inactive"), + } + } + pub fn update_sync_status(sync_status: SyncStatus) -> Cow<'static, str> { match sync_status { SyncStatus::Initial => Cow::Borrowed("Initializing"), @@ -40,6 +50,7 @@ impl TUIStatusView { SyncStatus::AwaitingPeers(_) => Cow::Borrowed("Waiting for peers"), SyncStatus::HeaderSync { sync_head, + sync_mode, highest_height, .. } => { @@ -48,7 +59,14 @@ impl TUIStatusView { } else { sync_head.height * 100 / highest_height }; - Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent)) + let sync_mode = match sync_mode { + HeaderSyncMode::Legacy => "Legacy", + HeaderSyncMode::Pihd => "PIHD", + }; + Cow::Owned(format!( + "Sync step 1/7: Downloading headers ({}) - {}%", + sync_mode, percent + )) } SyncStatus::TxHashsetPibd { aborted: _, @@ -172,11 +190,21 @@ impl TUIStatusView { pub fn create() -> impl View { let basic_status_view = ResizedView::with_full_screen( LinearLayout::new(Orientation::Vertical) + .child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("Runtime (s): ")) + .child(TextView::new("0").with_name("basic_runtime_seconds")), + ) .child( LinearLayout::new(Orientation::Horizontal) .child(TextView::new("Current Status: ")) .child(TextView::new("Starting").with_name("basic_current_status")), ) + .child( + LinearLayout::new(Orientation::Horizontal) + .child(TextView::new("Header Sync: ")) + .child(TextView::new("Inactive").with_name("basic_header_sync_mode")), + ) .child( LinearLayout::new(Orientation::Horizontal) .child(TextView::new("Connected Peers: ")) @@ -283,10 +311,17 @@ impl TUIStatusView { impl TUIStatusListener for TUIStatusView { fn update(c: &mut Cursive, stats: &ServerStats) { let basic_status = TUIStatusView::update_sync_status(stats.sync_status); + let header_sync_mode = TUIStatusView::header_sync_mode_label(stats.sync_status); + c.call_on_name("basic_runtime_seconds", |t: &mut TextView| { + t.set_content(stats.uptime_seconds.to_string()); + }); c.call_on_name("basic_current_status", |t: &mut TextView| { t.set_content(basic_status); }); + c.call_on_name("basic_header_sync_mode", |t: &mut TextView| { + t.set_content(header_sync_mode); + }); c.call_on_name("connected_peers", |t: &mut TextView| { t.set_content(stats.peer_count.to_string()); }); diff --git a/store/src/pmmr.rs b/store/src/pmmr.rs index 8a64d6822d..53c6871e13 100644 --- a/store/src/pmmr.rs +++ b/store/src/pmmr.rs @@ -399,6 +399,9 @@ impl PMMRBackend { self.hash_file.discard(); self.data_file.discard(); self.leaf_set.discard(); + if let Err(e) = self.prune_list.discard() { + error!("Discarding prune list changes: {}", e); + } } /// Takes the leaf_set at a given cutoff_pos and generates an updated diff --git a/store/src/prune_list.rs b/store/src/prune_list.rs index 2becf85f65..1066016e86 100644 --- a/store/src/prune_list.rs +++ b/store/src/prune_list.rs @@ -129,6 +129,25 @@ impl PruneList { Ok(()) } + /// Discard in-memory changes and restore the last flushed prune_list state. + pub fn discard(&mut self) -> io::Result<()> { + let path = match self.path.clone() { + Some(path) => path, + None => { + *self = PruneList::empty(); + return Ok(()); + } + }; + + let bitmap = if path.exists() { + read_bitmap(&path)? + } else { + Bitmap::new() + }; + *self = PruneList::new(Some(path), bitmap); + Ok(()) + } + /// Return the total shift from all entries in the prune_list. /// This is the shift we need to account for when adding new entries to our PMMR. pub fn get_total_shift(&self) -> u64 { diff --git a/store/tests/prune_list.rs b/store/tests/prune_list.rs index a781dfbf48..35161c247a 100644 --- a/store/tests/prune_list.rs +++ b/store/tests/prune_list.rs @@ -13,6 +13,7 @@ // limitations under the License. use grin_store as store; +use std::fs; use crate::store::prune_list::PruneList; @@ -61,6 +62,27 @@ fn test_is_pruned() { assert_eq!(pl.is_pruned(4), false); } +#[test] +fn test_discard_restores_flushed_state() { + let test_dir = "target/test_prune_list_discard"; + let _ = fs::remove_dir_all(test_dir); + fs::create_dir_all(test_dir).unwrap(); + let path = format!("{}/pmmr_prun.bin", test_dir); + + let mut pl = PruneList::new(Some(path.clone().into()), Default::default()); + pl.append(0); + pl.flush().unwrap(); + + pl.append(2); + assert!(pl.is_pruned_root(2)); + + pl.discard().unwrap(); + assert!(pl.is_pruned_root(0)); + assert!(!pl.is_pruned_root(2)); + + let _ = fs::remove_dir_all(test_dir); +} + #[test] fn test_get_leaf_shift() { let mut pl = PruneList::empty();