From 4ae5b5627372ff4d35640317c3189a08ddb747bb Mon Sep 17 00:00:00 2001 From: Kai Mast Date: Thu, 28 May 2026 21:25:55 -0700 Subject: [PATCH] fix(node/bft): suppress benign message about certificate already existing --- Cargo.lock | 1 + node/bft/Cargo.toml | 3 ++ node/bft/src/helpers/storage.rs | 54 +++++++++++++++++++++++++++------ node/bft/src/primary.rs | 37 +++++++++++++++------- 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f223a9ba85..f890d8afc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4785,6 +4785,7 @@ dependencies = [ "snarkvm", "test-log", "test-strategy 0.4.5", + "thiserror 2.0.18", "time", "tokio", "tokio-stream", diff --git a/node/bft/Cargo.toml b/node/bft/Cargo.toml index 1d479bdd69..44bf085d90 100644 --- a/node/bft/Cargo.toml +++ b/node/bft/Cargo.toml @@ -159,6 +159,9 @@ features = [ "codec" ] [dependencies.tracing] workspace = true +[dependencies.thiserror] +workspace = true + [dev-dependencies.axum] workspace = true diff --git a/node/bft/src/helpers/storage.rs b/node/bft/src/helpers/storage.rs index 957a02f87d..070ca89685 100644 --- a/node/bft/src/helpers/storage.rs +++ b/node/bft/src/helpers/storage.rs @@ -43,6 +43,33 @@ use std::{ }, }; +/// Errors returned by [`Storage::check_certificate`] (and therefore [`Storage::insert_certificate`]). +/// +/// The `SameCertificate` and `SameAuthorAndRound` variants describe benign races: concurrent sync +/// paths regularly try to insert the same certificate, and the loser of that race should treat its +/// failure as a no-op (the certificate is in fact present in storage) rather than a hard error. +#[derive(Debug, thiserror::Error)] +pub enum CheckCertificateError { + #[error("Certificate round {round} already exists in storage (gc_round = {gc_round})")] + SameCertificate { round: u64, gc_round: u64 }, + #[error("Certificate with this author in round {round} is already in storage (gc_round = {gc_round})")] + SameAuthorAndRound { round: u64, gc_round: u64 }, + #[error("Certificate round {round} is at or below the GC round {gc_round}")] + RoundTooLow { round: u64, gc_round: u64 }, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl CheckCertificateError { + /// Whether the error indicates the certificate is already in storage (a benign sync race + /// rather than a hard failure). Callers should not log benign errors at ERROR. + pub fn is_benign(&self) -> bool { + //TODO(kaimast): `SameAuthorRound` should not be benign as it could be a different certificate ID. + // However, we do not hold any locks while performing these checks, so the error can also be caused by benign race conditions. + matches!(self, Self::SameCertificate { .. } | Self::SameAuthorAndRound { .. } | Self::RoundTooLow { .. }) + } +} + #[derive(Clone, Debug)] pub struct Storage(Arc>); @@ -589,6 +616,11 @@ impl Storage { /// - `transmissions`: The transmissions contained in the certificate. /// - `aborted_transmissions`: The aborted transmission contained in the certificate. /// + /// # Errors + /// Returns [`CheckCertificateError::SameCertificate`] or [`CheckCertificateError::SameAuthorAndRound`] + /// if the certificate (or one from the same author for the same round) is already in storage. + /// These are benign during concurrent sync; callers should not log them at ERROR. + /// /// # Invariants /// This method ensures the following invariants: /// - The certificate ID does not already exist in storage. @@ -608,7 +640,7 @@ impl Storage { certificate: &BatchCertificate, transmissions: HashMap, Transmission>, aborted_transmissions: HashSet>, - ) -> Result, Transmission>> { + ) -> Result, Transmission>, CheckCertificateError> { // Retrieve the round. let round = certificate.round(); // Retrieve the GC round. @@ -618,19 +650,19 @@ impl Storage { // Ensure the certificate ID does not already exist in storage. if self.contains_certificate(certificate.id()) { - bail!("Certificate for round {round} already exists in storage {gc_log}") + return Err(CheckCertificateError::SameCertificate { round, gc_round }); } // Ensure the storage does not already contain a certificate for this author in this round. if self.contains_certificate_in_round_from(round, certificate.author()) { - bail!("Certificate with this author for round {round} already exists in storage {gc_log}") + return Err(CheckCertificateError::SameAuthorAndRound { round, gc_round }); } // Ensure the batch header is well-formed. let Some(missing_transmissions) = self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)? else { - bail!("Certificate for round {round} already exists in storage {gc_log}") + return Err(CheckCertificateError::SameCertificate { round, gc_round }); }; // Check the timestamp for liveness. @@ -638,7 +670,7 @@ impl Storage { // Retrieve the committee lookback for the batch round. let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else { - bail!("Storage failed to retrieve the committee for round {round} {gc_log}") + return Err(anyhow!("Storage failed to retrieve the committee for round {round} {gc_log}").into()); }; // Initialize a set of the signers. @@ -652,7 +684,7 @@ impl Storage { let signer = signature.to_address(); // Ensure the signer is in the committee. if !committee_lookback.is_committee_member(signer) { - bail!("Signer {signer} is not in the committee for round {round} {gc_log}") + return Err(anyhow!("Signer {signer} is not in the committee for round {round} {gc_log}").into()); } // Append the signer. signers.insert(signer); @@ -660,7 +692,9 @@ impl Storage { // Ensure the signatures have reached the quorum threshold. if !committee_lookback.is_quorum_threshold_reached(&signers) { - bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}") + return Err( + anyhow!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}").into() + ); } Ok(missing_transmissions) @@ -688,9 +722,11 @@ impl Storage { certificate: BatchCertificate, transmissions: HashMap, Transmission>, aborted_transmissions: HashSet>, - ) -> Result<()> { + ) -> Result<(), CheckCertificateError> { // Ensure the certificate round is above the GC round. - ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round"); + if certificate.round() <= self.gc_round() { + return Err(CheckCertificateError::RoundTooLow { round: certificate.round(), gc_round: self.gc_round() }); + } // Ensure the certificate and its transmissions are valid. let missing_transmissions = self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?; diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index e539db0ab5..19dddbaf4a 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -1764,7 +1764,10 @@ impl Primary { // Store the certified batch. let (storage, certificate_) = (self.storage.clone(), certificate.clone()); - spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; + tokio::task::spawn_blocking(move || { + storage.insert_certificate(certificate_, transmissions, Default::default()) + }) + .await??; debug!("Stored a batch certificate for round {}", certificate.round()); // The batch is now in storage, so late-arriving signatures can find it via contains_batch. // Transition from Certified back to None. @@ -1860,18 +1863,30 @@ impl Primary { self.sync_with_batch_header_from_peer::(peer_ip, batch_header).await?; // Check if the certificate needs to be stored. - if !self.storage.contains_certificate(certificate.id()) { - // Store the batch certificate. - let (storage, certificate_) = (self.storage.clone(), certificate.clone()); - spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?; - debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'"); - // If a BFT sender was provided, send the round and certificate to the BFT. - if let Some(cb) = self.primary_callback.get() { - cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?; + // Store the batch certificate. + let (storage, certificate_) = (self.storage.clone(), certificate.clone()); + match tokio::task::spawn_blocking(move || { + storage.insert_certificate(certificate_, missing_transmissions, Default::default()) + }) + .await + { + Ok(Ok(_)) => {} // continue + Ok(Err(err)) if err.is_benign() => { + trace!("Skipping insertion for certificate - {err}"); + return Ok(()); } - // Wake the round-increment task to re-check quorum. - self.round_increment_notify.notify_one(); + Ok(Err(err)) => return Err(anyhow!("{err}")), + Err(err) => return Err(anyhow!("[tokio::spawn_blocking] {err}")), } + + debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'"); + // If a BFT sender was provided, send the round and certificate to the BFT. + if let Some(cb) = self.primary_callback.get() { + cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?; + } + // Wake the round-increment task to re-check quorum. + self.round_increment_notify.notify_one(); + Ok(()) }