Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ features = [ "codec" ]
[dependencies.tracing]
workspace = true

[dependencies.thiserror]
workspace = true

[dev-dependencies.axum]
workspace = true

Expand Down
54 changes: 45 additions & 9 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,33 @@ use std::{
},
};

/// Errors returned by [`Storage::check_certificate`] (and therefore [`Storage::insert_certificate`]).
///
/// The `SameCertificate` and `SameAuthorAndRound` variants describe benign races: concurrent sync
/// paths regularly try to insert the same certificate, and the loser of that race should treat its
/// failure as a no-op (the certificate is in fact present in storage) rather than a hard error.
#[derive(Debug, thiserror::Error)]
pub enum CheckCertificateError {
#[error("Certificate round {round} already exists in storage (gc_round = {gc_round})")]
SameCertificate { round: u64, gc_round: u64 },
#[error("Certificate with this author in round {round} is already in storage (gc_round = {gc_round})")]
SameAuthorAndRound { round: u64, gc_round: u64 },
#[error("Certificate round {round} is at or below the GC round {gc_round}")]
RoundTooLow { round: u64, gc_round: u64 },
#[error(transparent)]
Other(#[from] anyhow::Error),
}

impl CheckCertificateError {
/// Whether the error indicates the certificate is already in storage (a benign sync race
/// rather than a hard failure). Callers should not log benign errors at ERROR.
pub fn is_benign(&self) -> bool {
//TODO(kaimast): `SameAuthorRound` should not be benign as it could be a different certificate ID.
// However, we do not hold any locks while performing these checks, so the error can also be caused by benign race conditions.
matches!(self, Self::SameCertificate { .. } | Self::SameAuthorAndRound { .. } | Self::RoundTooLow { .. })
}
}

#[derive(Clone, Debug)]
pub struct Storage<N: Network>(Arc<StorageInner<N>>);

Expand Down Expand Up @@ -589,6 +616,11 @@ impl<N: Network> Storage<N> {
/// - `transmissions`: The transmissions contained in the certificate.
/// - `aborted_transmissions`: The aborted transmission contained in the certificate.
///
/// # Errors
/// Returns [`CheckCertificateError::SameCertificate`] or [`CheckCertificateError::SameAuthorAndRound`]
/// if the certificate (or one from the same author for the same round) is already in storage.
/// These are benign during concurrent sync; callers should not log them at ERROR.
///
/// # Invariants
/// This method ensures the following invariants:
/// - The certificate ID does not already exist in storage.
Expand All @@ -608,7 +640,7 @@ impl<N: Network> Storage<N> {
certificate: &BatchCertificate<N>,
transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
aborted_transmissions: HashSet<TransmissionID<N>>,
) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
) -> Result<HashMap<TransmissionID<N>, Transmission<N>>, CheckCertificateError> {
// Retrieve the round.
let round = certificate.round();
// Retrieve the GC round.
Expand All @@ -618,27 +650,27 @@ impl<N: Network> Storage<N> {

// Ensure the certificate ID does not already exist in storage.
if self.contains_certificate(certificate.id()) {
bail!("Certificate for round {round} already exists in storage {gc_log}")
return Err(CheckCertificateError::SameCertificate { round, gc_round });
}

// Ensure the storage does not already contain a certificate for this author in this round.
if self.contains_certificate_in_round_from(round, certificate.author()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These subsequent checks happen without any locking. Otherwise, this second check would only trigger if there are two certificates with same author+round with different IDs.

It would make sense to merge the two checks to at least do this part atomically, and have a dedicated error for the more severe case where two certificates with different IDs exist.

bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
return Err(CheckCertificateError::SameAuthorAndRound { round, gc_round });
}

// Ensure the batch header is well-formed.
let Some(missing_transmissions) =
self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?
else {
bail!("Certificate for round {round} already exists in storage {gc_log}")
return Err(CheckCertificateError::SameCertificate { round, gc_round });
};

// Check the timestamp for liveness.
check_timestamp_for_liveness(certificate.timestamp())?;

// Retrieve the committee lookback for the batch round.
let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
return Err(anyhow!("Storage failed to retrieve the committee for round {round} {gc_log}").into());
};

// Initialize a set of the signers.
Expand All @@ -652,15 +684,17 @@ impl<N: Network> Storage<N> {
let signer = signature.to_address();
// Ensure the signer is in the committee.
if !committee_lookback.is_committee_member(signer) {
bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
return Err(anyhow!("Signer {signer} is not in the committee for round {round} {gc_log}").into());
}
// Append the signer.
signers.insert(signer);
}

// Ensure the signatures have reached the quorum threshold.
if !committee_lookback.is_quorum_threshold_reached(&signers) {
bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
return Err(
anyhow!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}").into()
);
}

Ok(missing_transmissions)
Expand Down Expand Up @@ -688,9 +722,11 @@ impl<N: Network> Storage<N> {
certificate: BatchCertificate<N>,
transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
aborted_transmissions: HashSet<TransmissionID<N>>,
) -> Result<()> {
) -> Result<(), CheckCertificateError> {
// Ensure the certificate round is above the GC round.
ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
if certificate.round() <= self.gc_round() {
return Err(CheckCertificateError::RoundTooLow { round: certificate.round(), gc_round: self.gc_round() });
}
// Ensure the certificate and its transmissions are valid.
let missing_transmissions =
self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
Expand Down
37 changes: 26 additions & 11 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,7 +1764,10 @@ impl<N: Network> Primary<N> {

// Store the certified batch.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
tokio::task::spawn_blocking(move || {
storage.insert_certificate(certificate_, transmissions, Default::default())
})
.await??;
debug!("Stored a batch certificate for round {}", certificate.round());
// The batch is now in storage, so late-arriving signatures can find it via contains_batch.
// Transition from Certified back to None.
Expand Down Expand Up @@ -1860,18 +1863,30 @@ impl<N: Network> Primary<N> {
self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;

// Check if the certificate needs to be stored.
if !self.storage.contains_certificate(certificate.id()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now caught by insert_certficate and handled appropriately below.

// Store the batch certificate.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
// If a BFT sender was provided, send the round and certificate to the BFT.
if let Some(cb) = self.primary_callback.get() {
cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
// Store the batch certificate.
let (storage, certificate_) = (self.storage.clone(), certificate.clone());
match tokio::task::spawn_blocking(move || {
storage.insert_certificate(certificate_, missing_transmissions, Default::default())
})
.await
{
Ok(Ok(_)) => {} // continue
Ok(Err(err)) if err.is_benign() => {
trace!("Skipping insertion for certificate - {err}");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this should even be logged, but it can always be removed in the future

return Ok(());
}
// Wake the round-increment task to re-check quorum.
self.round_increment_notify.notify_one();
Ok(Err(err)) => return Err(anyhow!("{err}")),
Err(err) => return Err(anyhow!("[tokio::spawn_blocking] {err}")),
}

debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
// If a BFT sender was provided, send the round and certificate to the BFT.
if let Some(cb) = self.primary_callback.get() {
cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
}
// Wake the round-increment task to re-check quorum.
self.round_increment_notify.notify_one();

Ok(())
}

Expand Down