Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions api/src/handlers/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use super::utils::w;
use crate::chain::{Chain, SyncState, SyncStatus};
use crate::chain::{Chain, HeaderSyncMode, SyncState, SyncStatus};
use crate::p2p;
use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
Expand Down Expand Up @@ -81,11 +81,19 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
SyncStatus::AwaitingPeers(_) => ("awaiting_peers".to_string(), None),
SyncStatus::HeaderSync {
sync_head,
sync_mode,
highest_height,
..
} => (
"header_sync".to_string(),
Some(json!({ "current_height": sync_head.height, "highest_height": highest_height })),
Some(json!({
"current_height": sync_head.height,
"highest_height": highest_height,
"header_sync_type": match sync_mode {
HeaderSyncMode::Legacy => "legacy",
HeaderSyncMode::Pihd => "pihd",
}
})),
),
SyncStatus::TxHashsetPibd {
aborted,
Expand Down Expand Up @@ -149,7 +157,5 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
Some(json!({ "current_height": current_height, "highest_height": highest_height })),
),
SyncStatus::Shutdown => ("shutdown".to_string(), None),
// any other status is considered syncing (should be unreachable)
_ => ("syncing".to_string(), None),
}
}
4 changes: 2 additions & 2 deletions chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ pub use crate::chain::{Chain, MAX_ORPHAN_SIZE};
pub use crate::error::Error;
pub use crate::store::ChainStore;
pub use crate::types::{
BlockStatus, ChainAdapter, Options, SyncState, SyncStatus, Tip, TxHashsetDownloadStats,
TxHashsetWriteStatus,
BlockStatus, ChainAdapter, HeaderSyncMode, Options, SyncState, SyncStatus, Tip,
TxHashsetDownloadStats, TxHashsetWriteStatus,
};
17 changes: 13 additions & 4 deletions chain/src/pibd_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,28 @@ pub const RANGEPROOF_SEGMENT_HEIGHT: u8 = 11;
pub const KERNEL_SEGMENT_HEIGHT: u8 = 11;

/// Maximum number of received segments to cache (across all trees) before we stop requesting others
pub const MAX_CACHED_SEGMENTS: usize = 15;
pub const MAX_CACHED_SEGMENTS: usize = 30;

/// Number of segments to apply in a single LMDB transaction
pub const SEGMENT_APPLY_BATCH_SIZE: usize = 4;
pub const SEGMENT_APPLY_BATCH_SIZE: usize = 12;

/// How long the state sync should wait after requesting a segment from a peer before
/// deciding the segment isn't going to arrive. The syncer will then re-request the segment
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 20;
pub const SEGMENT_REQUEST_TIMEOUT_SECS: i64 = 60;

/// How long to wait before retrying a pending segment that may be blocking progress
pub const BLOCKING_SEGMENT_RETRY_SECS: i64 = 10;

/// Maximum number of pending blocking segments to retry in one state sync loop
pub const BLOCKING_SEGMENT_RETRY_COUNT: usize = 2;

/// Number of simultaneous requests for segments we should make. Note this is currently
/// divisible by 3 to try and evenly spread requests amount the 3 main MMRs (Bitmap segments
/// will always be requested first)
pub const SEGMENT_REQUEST_COUNT: usize = 15;
pub const SEGMENT_REQUEST_COUNT: usize = 9;

/// How many blocks behind the tip a PIBD peer may be and still be considered usable.
pub const SYNC_PEER_HEIGHT_SLACK_BLOCKS: u64 = 2;

/// If the syncer hasn't seen a max work peer that supports PIBD in this number of seconds
/// give up and revert back to the txhashset.zip download method
Expand Down
106 changes: 77 additions & 29 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ impl Desegmenter {
self.bitmap_mmr_size
}

/// Lightweight applied leaf count for progress display.
pub fn applied_leaf_count(&self) -> u64 {
let txhashset = self.txhashset.read();
pmmr::n_leaves(txhashset.output_mmr_size())
+ pmmr::n_leaves(txhashset.rangeproof_mmr_size())
+ pmmr::n_leaves(txhashset.kernel_mmr_size())
}

/// Whether we have all the segments we need
pub fn is_complete(&self) -> bool {
self.all_segments_complete
Expand Down Expand Up @@ -593,22 +601,54 @@ impl Desegmenter {
}
}
}
// Always ensure we explicitly ask for the very next kernel segment we are waiting on.
// The regular round-robin above can get saturated with outputs and rangeproofs while
// the desegmenter is blocked on a missing kernel, so we force this one in.
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
if !self.has_kernel_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
if self.bitmap_cache.is_some() {
// Always ensure we explicitly ask for the very next segment we are waiting on.
// The regular round-robin above can get saturated while the desegmenter is
// blocked on the next required segment, so we force these in.
if let Some(next_output_idx) = self.next_required_output_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_output_segment_height,
idx: next_output_idx,
};
let next_output_seg_id = SegmentTypeIdentifier::new(SegmentType::Output, seg_id);
if !self.has_output_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_output_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_output_seg_id);
}
}
if let Some(next_rp_idx) = self.next_required_rangeproof_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_rangeproof_segment_height,
idx: next_rp_idx,
};
let next_rp_seg_id = SegmentTypeIdentifier::new(SegmentType::RangeProof, seg_id);
if !self.has_rangeproof_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_rp_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_rp_seg_id);
}
}
if let Some(next_kernel_idx) = self.next_required_kernel_segment_index() {
let seg_id = SegmentIdentifier {
height: self.default_kernel_segment_height,
idx: next_kernel_idx,
};
let next_kernel_seg_id = SegmentTypeIdentifier::new(SegmentType::Kernel, seg_id);
if !self.has_kernel_segment_with_id(seg_id)
&& !return_vec.iter().any(|x| x == &next_kernel_seg_id)
{
if return_vec.len() >= max_elements {
return_vec.pop();
}
return_vec.push(next_kernel_seg_id);
}
return_vec.push(next_kernel_seg_id);
}
}
if return_vec.is_empty() && self.bitmap_cache.is_some() {
Expand Down Expand Up @@ -838,17 +878,20 @@ impl Desegmenter {
)
};

// When resuming, we need to ensure we're getting the previous segment if needed
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height);
if local_output_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}

let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_output_segment_height,
);

// When resuming, we need to ensure we're getting the previous segment if needed.
// Do not apply this to the final partial segment once the target size is reached.
if total_segment_count != cur_segment_count {
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_output_segment_height);
if local_output_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}
}
trace!(
"Next required output segment is {} of {}",
cur_segment_count,
Expand Down Expand Up @@ -957,17 +1000,22 @@ impl Desegmenter {
)
};

// When resuming, we need to ensure we're getting the previous segment if needed
let theoretical_pmmr_size =
SegmentIdentifier::pmmr_size(cur_segment_count, self.default_rangeproof_segment_height);
if local_rangeproof_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}

let total_segment_count = SegmentIdentifier::count_segments_required(
self.archive_header.output_mmr_size,
self.default_rangeproof_segment_height,
);

// When resuming, we need to ensure we're getting the previous segment if needed.
// Do not apply this to the final partial segment once the target size is reached.
if total_segment_count != cur_segment_count {
let theoretical_pmmr_size = SegmentIdentifier::pmmr_size(
cur_segment_count,
self.default_rangeproof_segment_height,
);
if local_rangeproof_mmr_size < theoretical_pmmr_size {
cur_segment_count -= 1;
}
}
trace!(
"Next required rangeproof segment is {} of {}",
cur_segment_count,
Expand Down
Loading
Loading