Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 107 additions & 41 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ use std::{
collections::{HashMap, HashSet},
future::Future,
net::SocketAddr,
sync::{Arc, OnceLock},
sync::{
Arc,
OnceLock,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
#[cfg(not(feature = "locktick"))]
Expand Down Expand Up @@ -137,6 +141,11 @@ pub struct Primary<N: Network> {
propose_lock: Arc<TMutex<u64>>,
/// The node configuration directory.
node_data_dir: NodeDataDir,
/// Whether the proposal cache has been loaded (or there is no cache to load).
///
/// This is used to prevent the primary from being considered synced before the proposal cache
/// has been applied, avoiding proposing stale batches after restoring an old ledger snapshot.
proposal_cache_loaded: Arc<AtomicBool>,
}

impl<N: Network> Primary<N> {
Expand Down Expand Up @@ -186,10 +195,19 @@ impl<N: Network> Primary<N> {
handles: Default::default(),
propose_lock: Default::default(),
node_data_dir,
// Default to `true` (loaded / not pending): the proposal cache is not pending
// unless `load_proposal_cache` explicitly defers it.
proposal_cache_loaded: Arc::new(AtomicBool::new(true)),
})
}

/// Load the proposal cache file and update the Primary state with the stored data.
///
/// If the proposal cache is more than `MAX_GC_ROUNDS` ahead of the current ledger, loading is
/// deferred: `proposal_cache_loaded` is set to `false` and a background task is spawned that
/// awaits the first time the node becomes block-synced, then applies the cached state and
/// restores the flag. While the cache is pending, [`Self::is_synced`] returns `false` so
/// the primary will not propose or sign batches until the cache has been loaded.
async fn load_proposal_cache(&self) -> Result<()> {
// Fetch the signed proposals from the file system if it exists.
match ProposalCache::<N>::exists(&self.node_data_dir) {
Expand All @@ -200,42 +218,53 @@ impl<N: Network> Primary<N> {
let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
proposal_cache.into();

// Verify that the proposal cache is not too far ahead of the ledger.
// If the cache round exceeds the ledger round by more than MAX_GC_ROUNDS, the ledger
// snapshot is too old to recover from the cached state. The operator must restore a
// more recent ledger snapshot before restarting the node.
let ledger_round = self.ledger.latest_round();
let max_gc_rounds = BatchHeader::<N>::MAX_GC_ROUNDS as u64;

// Check whether the proposal cache is too far ahead of the current ledger.
if latest_certificate_round > ledger_round.saturating_add(max_gc_rounds) {
bail!(
"The proposal cache (round {latest_certificate_round}) is more than {max_gc_rounds} \
rounds ahead of the ledger (round {ledger_round}). \
Please restore a more recent ledger snapshot before restarting the node."
// The ledger snapshot is older than MAX_GC_ROUNDS relative to the cached
// proposal. Rather than failing hard, mark the cache as not yet loaded and
// defer applying it until the block sync reaches the required height.
info!(
"The proposal cache (round {latest_certificate_round}) is more than \
{max_gc_rounds} GC rounds ahead of the ledger (round {ledger_round}). \
Deferring proposal cache load until the node has finished syncing."
);
// Mark the cache as pending: `is_synced()` will return `false` until
// this flag is cleared after the cache is applied.
self.proposal_cache_loaded.store(false, Ordering::Release);
let self_ = self.clone();
self.spawn(async move {
// Wait for the block sync to reach the Synced state for the first time.
self_.sync.wait_until_synced().await;
info!(
"Loading the deferred proposal cache (round {latest_certificate_round}) \
now that the node has finished syncing to round {}",
self_.ledger.latest_round()
);
self_
.apply_proposal_cache(
latest_certificate_round,
proposed_batch,
signed_proposals,
pending_certificates,
)
.await;
// Mark the proposal cache as loaded so `is_synced()` can return `true`.
self_.proposal_cache_loaded.store(true, Ordering::Release);
});
return Ok(());
}

// Write the proposed batch.
*self.proposed_batch.write() = proposed_batch;
// Write the signed proposals.
*self.signed_proposals.write() = signed_proposals;
// Writ the propose lock.
*self.propose_lock.lock().await = latest_certificate_round;

// Update the storage with the pending certificates.
for certificate in pending_certificates {
let batch_id = certificate.batch_id();
// We use a dummy IP because the node should not need to request from any peers.
// The storage should have stored all the transmissions. If not, we simply
// skip the certificate.
if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
{
let err = err.context(format!(
"Failed to load stored certificate {} from proposal cache",
fmt_id(batch_id)
));
warn!("{}", &flatten_error(err));
}
}
// The cache is within GC range; apply it immediately.
self.apply_proposal_cache(
latest_certificate_round,
proposed_batch,
signed_proposals,
pending_certificates,
)
.await;
Ok(())
}
Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
Expand All @@ -245,6 +274,36 @@ impl<N: Network> Primary<N> {
}
}

/// Applies the proposal cache state to the primary: writes the proposed batch, signed
/// proposals, propose lock, and loads any pending certificates into storage.
async fn apply_proposal_cache(
&self,
latest_certificate_round: u64,
proposed_batch: Option<Proposal<N>>,
signed_proposals: SignedProposals<N>,
pending_certificates: IndexSet<BatchCertificate<N>>,
) {
// Write the proposed batch.
*self.proposed_batch.write() = proposed_batch;
// Write the signed proposals.
*self.signed_proposals.write() = signed_proposals;
// Write the propose lock.
*self.propose_lock.lock().await = latest_certificate_round;

// Update the storage with the pending certificates.
for certificate in pending_certificates {
let batch_id = certificate.batch_id();
// We use a dummy IP because the node should not need to request from any peers.
// The storage should have stored all the transmissions. If not, we simply
// skip the certificate.
if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await {
let err =
err.context(format!("Failed to load stored certificate {} from proposal cache", fmt_id(batch_id)));
warn!("{}", &flatten_error(err));
}
}
}

/// Run the primary instance.
pub async fn run(
&self,
Expand Down Expand Up @@ -293,12 +352,16 @@ impl<N: Network> Primary<N> {
let (sync_sender, sync_receiver) = init_sync_channels();
// Next, initialize the sync module and sync the storage from ledger.
self.sync.initialize(sync_callback)?;
// Next, load and process the proposal cache before running the sync module.
self.load_proposal_cache().await?;
// Next, run the sync module.
self.sync.run(ping, sync_receiver).await?;
// Next, initialize the gateway.
self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
// Next, load and process the proposal cache.
// Note: This is done after starting the sync module and gateway so that, if the proposal
// cache is more than MAX_GC_ROUNDS ahead of the current ledger, a background task can be
// spawned to defer loading until the ledger has synced within GC range of the cached
// proposal round.
self.load_proposal_cache().await?;
// Lastly, start the primary handlers.
// Note: This ensures the primary does not start communicating before syncing is complete.
self.start_handlers(primary_receiver);
Expand All @@ -312,8 +375,11 @@ impl<N: Network> Primary<N> {
}

/// Returns `true` if the primary is synced.
///
/// Returns `false` when the block sync is not yet complete, or when there is a pending
/// proposal cache that has not yet been applied (see [`Self::load_proposal_cache`]).
pub fn is_synced(&self) -> bool {
self.sync.is_synced()
self.proposal_cache_loaded.load(Ordering::Acquire) && self.sync.is_synced()
}

/// Returns the gateway.
Expand Down Expand Up @@ -1328,7 +1394,7 @@ impl<N: Network> Primary<N> {
self.spawn(async move {
while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
// If the primary is not synced, then do not process the primary ping.
if self_.sync.is_synced() {
if self_.is_synced() {
trace!("Processing new primary ping from '{peer_ip}'");
} else {
trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
Expand Down Expand Up @@ -1362,7 +1428,7 @@ impl<N: Network> Primary<N> {
loop {
tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
// If the primary is not synced, then do not broadcast the worker ping(s).
if !self_.sync.is_synced() {
if !self_.is_synced() {
trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
continue;
}
Expand All @@ -1381,7 +1447,7 @@ impl<N: Network> Primary<N> {
tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
let current_round = self_.current_round();
// If the primary is not synced, then do not propose a batch.
if !self_.sync.is_synced() {
if !self_.is_synced() {
debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
continue;
}
Expand All @@ -1408,7 +1474,7 @@ impl<N: Network> Primary<N> {
self.spawn(async move {
while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
// If the primary is not synced, then do not sign the batch.
if !self_.sync.is_synced() {
if !self_.is_synced() {
trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
continue;
}
Expand All @@ -1430,7 +1496,7 @@ impl<N: Network> Primary<N> {
self.spawn(async move {
while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
// If the primary is not synced, then do not store the signature.
if !self_.sync.is_synced() {
if !self_.is_synced() {
trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
continue;
}
Expand All @@ -1452,7 +1518,7 @@ impl<N: Network> Primary<N> {
self.spawn(async move {
while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
// If the primary is not synced, then do not store the certificate.
if !self_.sync.is_synced() {
if !self_.is_synced() {
trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
continue;
}
Expand Down Expand Up @@ -1489,7 +1555,7 @@ impl<N: Network> Primary<N> {
// Sleep briefly.
tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
// If the primary is not synced, then do not increment to the next round.
if !self_.sync.is_synced() {
if !self_.is_synced() {
trace!("Skipping round increment {}", "(node is syncing)".dimmed());
continue;
}
Expand Down
8 changes: 8 additions & 0 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,14 @@ impl<N: Network> Sync<N> {
self.block_sync.is_block_synced()
}

/// Waits asynchronously until the node is fully synced (block-level sync is complete).
///
/// Returns immediately if the node is already synced; otherwise suspends until the
/// block sync state transitions to [`snarkos_node_sync::BftSyncMode`] synced.
pub async fn wait_until_synced(&self) {
self.block_sync.wait_until_block_synced().await;
}

/// Returns the number of blocks the node is behind the greatest peer height.
pub fn num_blocks_behind(&self) -> Option<u32> {
self.block_sync.num_blocks_behind()
Expand Down
Loading