Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ pub struct Start {
#[clap(long)]
pub nocdn: bool,

/// If set, the node syncs exclusively from the CDN and never starts P2P sync.
/// Whenever the CDN tip is reached or an error occurs, the node sleeps and retries.
#[clap(long, conflicts_with = "nocdn")]
pub onlycdn: bool,

/// Enables development mode used to set up test networks.
///
/// The purpose of this flag is to run multiple nodes on the same machine and in the same working directory.
Expand Down Expand Up @@ -854,9 +859,9 @@ impl Start {

// Initialize the node.
let node = match node_type {
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await,
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, self.onlycdn, signal_handler.clone()).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, node_data_dir, self.trusted_peers_only, self.dev, signal_handler.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, signal_handler.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, self.onlycdn, signal_handler.clone()).await,
NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await,
}?;

Expand Down
95 changes: 57 additions & 38 deletions node/cdn/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// https://github.com/rust-lang/rust-clippy/issues/6446
#![allow(clippy::await_holding_lock)]

use snarkos_utilities::{SignalHandler, Stoppable};
use snarkos_utilities::{SimpleStoppable, Stoppable};

use snarkvm::{
prelude::{Deserialize, DeserializeOwned, Ledger, Network, Serialize, block::Block, store::ConsensusStorage},
Expand Down Expand Up @@ -83,14 +83,19 @@ pub struct CdnBlockSync {

impl CdnBlockSync {
/// Spawn a background task that loads blocks from a CDN into the ledger.
///
/// When `only_cdn` is `true` the worker retries indefinitely after reaching the
/// CDN tip or encountering a transient error, sleeping 30 s between attempts.
/// It exits only when `stoppable.is_stopped()` returns `true`.
pub fn new<N: Network, C: ConsensusStorage<N>>(
base_url: http::Uri,
ledger: Ledger<N, C>,
stoppable: Arc<SignalHandler>,
stoppable: Arc<dyn Stoppable>,
only_cdn: bool,
) -> Self {
let task = {
let base_url = base_url.clone();
tokio::spawn(async move { Self::worker(base_url, ledger, stoppable).await })
tokio::spawn(async move { Self::worker(base_url, ledger, stoppable, only_cdn).await })
};

debug!("Started sync from CDN at {base_url}");
Expand Down Expand Up @@ -122,46 +127,60 @@ impl CdnBlockSync {
base_url: http::Uri,
ledger: Ledger<N, C>,
stoppable: Arc<dyn Stoppable>,
only_cdn: bool,
) -> SyncResult {
// Fetch the node height.
let start_height = ledger.latest_height() + 1;
// Load the blocks from the CDN into the ledger.
let ledger_clone = ledger.clone();
let result = load_blocks(&base_url, start_height, None, stoppable, move |block: Block<N>| {
ledger_clone
.advance_to_next_block(&block)
.with_context(|| format!("Failed to advance to block {} at height {}", block.hash(), block.height()))
})
.await;

// TODO (howardwu): Find a way to resolve integrity failures.
// If the sync failed, check the integrity of the ledger.
match result {
Ok(completed_height) => Ok(completed_height),
Err((completed_height, error)) => {
warn!("{}", flatten_error(error.context("Failed to sync block(s) from the CDN")));

// If the sync made any progress, then check the integrity of the ledger.
if completed_height != start_height {
debug!("Synced the ledger up to block {completed_height}");

// Retrieve the latest height, according to the ledger.
let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
// Check the integrity of the latest height.
if node_height != completed_height {
return Err((
completed_height,
anyhow!("The ledger height does not match the last sync height"),
));
}
loop {
// Fetch the node height.
let start_height = ledger.latest_height() + 1;
// In --onlycdn mode use a fresh per-attempt stoppable so CDN-internal stop() calls
// do not permanently poison the global stoppable used to detect user shutdown.
let attempt_stoppable: Arc<dyn Stoppable> =
if only_cdn { SimpleStoppable::new() } else { stoppable.clone() };
// Load the blocks from the CDN into the ledger.
let ledger_clone = ledger.clone();
let result = load_blocks(&base_url, start_height, None, attempt_stoppable, move |block: Block<N>| {
ledger_clone.advance_to_next_block(&block).with_context(|| {
format!("Failed to advance to block {} at height {}", block.hash(), block.height())
})
})
.await;

// TODO (howardwu): Find a way to resolve integrity failures.
// If the sync failed, check the integrity of the ledger.
let completed_height = match result {
Ok(completed_height) => completed_height,
Err((completed_height, error)) => {
error!("{}", flatten_error(error.context("Failed to sync block(s) from the CDN")));

// If the sync made any progress, then check the integrity of the ledger.
if completed_height != start_height {
debug!("Synced the ledger up to block {completed_height}");

// Retrieve the latest height, according to the ledger.
let node_height = *ledger.vm().block_store().heights().max().unwrap_or_default();
// Check the integrity of the latest height.
if node_height != completed_height {
// Log an error.
error!("The ledger height does not match the last sync height");
}

// Fetch the latest block from the ledger.
if let Err(err) = ledger.get_block(node_height) {
return Err((completed_height, err));
// Fetch the latest block from the ledger.
if let Err(err) = ledger.get_block(node_height) {
return Err((completed_height, err));
}
}

completed_height
}
};

Ok(completed_height)
// In --onlycdn mode, retry after a delay unless the node is shutting down.
if !only_cdn || stoppable.is_stopped() {
return Ok(completed_height);
}
tokio::time::sleep(Duration::from_secs(30)).await;
if stoppable.is_stopped() {
return Ok(completed_height);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
node_data_dir: NodeDataDir,
trusted_peers_only: bool,
dev: Option<u16>,
only_cdn: bool,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
// Initialize the ledger.
Expand Down Expand Up @@ -198,9 +199,11 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
};

// Perform sync with CDN (if enabled).
let cdn_sync = cdn.map(|base_url| {
// When only_cdn is true the CdnBlockSync worker retries indefinitely until
// signal_handler fires, so wait() below blocks until user-initiated shutdown.
let cdn_sync = cdn.as_ref().map(|base_url| {
trace!("CDN sync is enabled");
Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler))
Arc::new(CdnBlockSync::new(base_url.clone(), ledger.clone(), signal_handler.clone(), only_cdn))
});

// Initialize the REST server.
Expand All @@ -211,7 +214,7 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
);
}

// Set up everything else after CDN sync is done.
// Wait for CDN sync to complete (or, in --onlycdn mode, until shutdown).
if let Some(cdn_sync) = cdn_sync {
if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
crate::log_clean_error(&storage_mode);
Expand Down
4 changes: 4 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl<N: Network> Node<N> {
auto_db_checkpoints: Option<PathBuf>,
dev_txs: bool,
dev: Option<u16>,
only_cdn: bool,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let validator = Arc::new(
Expand All @@ -116,6 +117,7 @@ impl<N: Network> Node<N> {
trusted_peers_only,
dev_txs,
dev,
only_cdn,
signal_handler,
)
.await?,
Expand Down Expand Up @@ -173,6 +175,7 @@ impl<N: Network> Node<N> {
trusted_peers_only: bool,
auto_db_checkpoints: Option<PathBuf>,
dev: Option<u16>,
only_cdn: bool,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
let client = Arc::new(
Expand All @@ -188,6 +191,7 @@ impl<N: Network> Node<N> {
node_data_dir,
trusted_peers_only,
dev,
only_cdn,
signal_handler,
)
.await?,
Expand Down
10 changes: 8 additions & 2 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
trusted_peers_only: bool,
dev_txs: bool,
dev: Option<u16>,
only_cdn: bool,
signal_handler: Arc<SignalHandler>,
) -> Result<Self> {
// Initialize the ledger.
Expand Down Expand Up @@ -152,7 +153,11 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
};

// Perform sync with CDN (if enabled).
let cdn_sync = cdn.map(|base_url| Arc::new(CdnBlockSync::new(base_url, ledger.clone(), signal_handler)));
// When only_cdn is true the CdnBlockSync worker retries indefinitely until
// signal_handler fires, so wait() below blocks until user-initiated shutdown.
let cdn_sync = cdn.as_ref().map(|base_url| {
Arc::new(CdnBlockSync::new(base_url.clone(), ledger.clone(), signal_handler.clone(), only_cdn))
});

// Initialize the transaction pool.
node.initialize_transaction_pool(dev, dev_txs)?;
Expand All @@ -173,7 +178,7 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
);
}

// Set up everything else after CDN sync is done.
// Wait for CDN sync to complete (or, in --onlycdn mode, until shutdown).
if let Some(cdn_sync) = cdn_sync {
if let Err(error) = cdn_sync.wait().await.with_context(|| "Failed to synchronize from the CDN") {
crate::log_clean_error(&storage_mode);
Expand Down Expand Up @@ -545,6 +550,7 @@ mod tests {
false,
dev_txs,
None,
false,
SignalHandler::new(None),
)
.await
Expand Down
2 changes: 2 additions & 0 deletions node/tests/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn client() -> Client<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
NodeDataDir::new_test(None),
false, // Connect to untrusted peers.
None,
false, // No CDN-only mode.
SignalHandler::new(None),
)
.await
Expand Down Expand Up @@ -74,6 +75,7 @@ pub async fn validator() -> Validator<CurrentNetwork, ConsensusMemory<CurrentNet
false, // This test requires validators to connect to peers.
false, // No dev traffic in production mode.
None,
false, // No CDN-only mode.
SignalHandler::new(None),
)
.await
Expand Down