Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::{
};
use tokio::{
runtime::{self, Runtime},
sync::oneshot,
task,
};
use tracing::warn;
Expand Down Expand Up @@ -287,13 +288,18 @@ impl Start {
let node_parse_error = || "Failed to parse node arguments";
let display_start_error = || "Failed to initialize the display";

let (shutdown_tx, shutdown_rx) = oneshot::channel();

// Clone the configurations.
let mut cli = self.clone();
// Parse the network.
match cli.network {
MainnetV0::ID => {
// Parse the node from the configurations.
let node = cli.parse_node::<MainnetV0>(shutdown.clone()).await.with_context(node_parse_error)?;
let node = cli
.parse_node::<MainnetV0>(shutdown.clone(), shutdown_tx)
.await
.with_context(node_parse_error)?;
// If the display is enabled, render the display.
if !cli.nodisplay {
// Initialize the display.
Expand All @@ -302,7 +308,10 @@ impl Start {
}
TestnetV0::ID => {
// Parse the node from the configurations.
let node = cli.parse_node::<TestnetV0>(shutdown.clone()).await.with_context(node_parse_error)?;
let node = cli
.parse_node::<TestnetV0>(shutdown.clone(), shutdown_tx)
.await
.with_context(node_parse_error)?;
// If the display is enabled, render the display.
if !cli.nodisplay {
// Initialize the display.
Expand All @@ -311,7 +320,10 @@ impl Start {
}
CanaryV0::ID => {
// Parse the node from the configurations.
let node = cli.parse_node::<CanaryV0>(shutdown.clone()).await.with_context(node_parse_error)?;
let node = cli
.parse_node::<CanaryV0>(shutdown.clone(), shutdown_tx)
.await
.with_context(node_parse_error)?;
// If the display is enabled, render the display.
if !cli.nodisplay {
// Initialize the display.
Expand All @@ -320,9 +332,10 @@ impl Start {
}
_ => panic!("Invalid network ID specified"),
};
// Note: Do not move this. The pending await must be here otherwise
// other snarkOS commands will not exit.
std::future::pending::<()>().await;

// Wait for the shutdown signal.
let _ = shutdown_rx.await;

Ok(String::new())
})
}
Expand Down Expand Up @@ -589,7 +602,7 @@ impl Start {

/// Returns the node type corresponding to the given configurations.
#[rustfmt::skip]
async fn parse_node<N: Network>(&mut self, shutdown: Arc<AtomicBool>) -> Result<Node<N>> {
async fn parse_node<N: Network>(&mut self, shutdown: Arc<AtomicBool>, shutdown_tx: oneshot::Sender<()>) -> Result<Node<N>> {
if !self.nobanner {
// Print the welcome banner.
println!("{}", crate::helpers::welcome_message());
Expand Down Expand Up @@ -735,11 +748,13 @@ impl Start {
println!("🪧 The terminal UI will not start until the node has finished syncing from the CDN. If this step takes too long, consider restarting with `--nodisplay`.");
}

let shutdown_tx = Some(shutdown_tx);

// Initialize the node.
match node_type {
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.trusted_peers_only, dev_txs, self.dev, shutdown.clone()).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, storage_mode, self.trusted_peers_only, self.dev, shutdown.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, self.trusted_peers_only, self.dev, shutdown).await,
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, self.trusted_peers_only, dev_txs, self.dev, shutdown.clone(), shutdown_tx).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, storage_mode, self.trusted_peers_only, self.dev, shutdown.clone(), shutdown_tx).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, self.trusted_peers_only, self.dev, shutdown, shutdown_tx).await,
NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await,
}
}
Expand Down
4 changes: 3 additions & 1 deletion node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use std::{
time::Duration,
};
use tokio::{
sync::oneshot,
task::JoinHandle,
time::{sleep, timeout},
};
Expand Down Expand Up @@ -143,9 +144,10 @@ impl<N: Network, C: ConsensusStorage<N>> Client<N, C> {
trusted_peers_only: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
// Initialize the signal handler.
let signal_node = Self::handle_signals(shutdown.clone());
let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx);

// Initialize the ledger.
let ledger = {
Expand Down
20 changes: 18 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::{
net::SocketAddr,
sync::{Arc, atomic::AtomicBool},
};
use tokio::sync::oneshot;

#[derive(Clone)]
pub enum Node<N: Network> {
Expand Down Expand Up @@ -69,6 +70,7 @@ impl<N: Network> Node<N> {
dev_txs: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
Ok(Self::Validator(Arc::new(
Validator::new(
Expand All @@ -86,6 +88,7 @@ impl<N: Network> Node<N> {
dev_txs,
dev,
shutdown,
shutdown_tx,
)
.await?,
)))
Expand All @@ -101,10 +104,21 @@ impl<N: Network> Node<N> {
trusted_peers_only: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
Ok(Self::Prover(Arc::new(
Prover::new(node_ip, account, trusted_peers, genesis, storage_mode, trusted_peers_only, dev, shutdown)
.await?,
Prover::new(
node_ip,
account,
trusted_peers,
genesis,
storage_mode,
trusted_peers_only,
dev,
shutdown,
shutdown_tx,
)
.await?,
)))
}

Expand All @@ -121,6 +135,7 @@ impl<N: Network> Node<N> {
trusted_peers_only: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
Ok(Self::Client(Arc::new(
Client::new(
Expand All @@ -135,6 +150,7 @@ impl<N: Network> Node<N> {
trusted_peers_only,
dev,
shutdown,
shutdown_tx,
)
.await?,
)))
Expand Down
5 changes: 3 additions & 2 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use std::{
atomic::{AtomicBool, AtomicU8, Ordering},
},
};
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};

/// A prover is a light node, capable of producing proofs for consensus.
#[derive(Clone)]
Expand Down Expand Up @@ -102,9 +102,10 @@ impl<N: Network, C: ConsensusStorage<N>> Prover<N, C> {
trusted_peers_only: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
// Initialize the signal handler.
let signal_node = Self::handle_signals(shutdown.clone());
let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx);

// Initialize the ledger service.
let ledger_service = Arc::new(ProverLedgerService::new());
Expand Down
7 changes: 5 additions & 2 deletions node/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::{
},
time::Duration,
};
use tokio::sync::oneshot;

#[async_trait]
pub trait NodeInterface<N: Network>: Routing<N> {
Expand Down Expand Up @@ -57,7 +58,7 @@ pub trait NodeInterface<N: Network>: Routing<N> {

/// Handles OS signals for the node to intercept and perform a clean shutdown.
/// The optional `shutdown_flag` flag can be used to cleanly terminate the syncing process.
fn handle_signals(shutdown_flag: Arc<AtomicBool>) -> Arc<OnceCell<Self>> {
fn handle_signals(shutdown_flag: Arc<AtomicBool>, shutdown_tx: Option<oneshot::Sender<()>>) -> Arc<OnceCell<Self>> {
// In order for the signal handler to be started as early as possible, a reference to the node needs
// to be passed to it at a later time.
let node: Arc<OnceCell<Self>> = Default::default();
Expand Down Expand Up @@ -109,7 +110,9 @@ pub trait NodeInterface<N: Network>: Routing<N> {
tokio::time::sleep(Duration::from_secs(3)).await;

// Terminate the process.
std::process::exit(0);
if let Some(tx) = shutdown_tx {
let _ = tx.send(());
}
}
Err(error) => error!("tokio::signal::ctrl_c encountered an error: {}", error),
}
Expand Down
6 changes: 4 additions & 2 deletions node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use std::{
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use tokio::task::JoinHandle;
use tokio::{sync::oneshot, task::JoinHandle};

/// A validator is a full node, capable of validating blocks.
#[derive(Clone)]
Expand Down Expand Up @@ -96,9 +96,10 @@ impl<N: Network, C: ConsensusStorage<N>> Validator<N, C> {
dev_txs: bool,
dev: Option<u16>,
shutdown: Arc<AtomicBool>,
shutdown_tx: Option<oneshot::Sender<()>>,
) -> Result<Self> {
// Initialize the signal handler.
let signal_node = Self::handle_signals(shutdown.clone());
let signal_node = Self::handle_signals(shutdown.clone(), shutdown_tx);

// Initialize the ledger.
let ledger = {
Expand Down Expand Up @@ -538,6 +539,7 @@ mod tests {
dev_txs,
None,
Default::default(),
None,
)
.await
.unwrap();
Expand Down
3 changes: 3 additions & 0 deletions node/tests/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub async fn client() -> Client<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
false, // Connect to untrusted peers.
None,
Default::default(),
None,
)
.await
.expect("couldn't create client instance")
Expand All @@ -49,6 +50,7 @@ pub async fn prover() -> Prover<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
false,
None,
Default::default(),
None,
)
.await
.expect("couldn't create prover instance")
Expand All @@ -70,6 +72,7 @@ pub async fn validator() -> Validator<CurrentNetwork, ConsensusMemory<CurrentNet
false, // No dev traffic in production mode.
None,
Default::default(),
None,
)
.await
.expect("couldn't create validator instance")
Expand Down