diff --git a/rs/artifact_pool/src/consensus_pool_cache.rs b/rs/artifact_pool/src/consensus_pool_cache.rs index fbcc91782af6..c6166baf3fe8 100644 --- a/rs/artifact_pool/src/consensus_pool_cache.rs +++ b/rs/artifact_pool/src/consensus_pool_cache.rs @@ -7,7 +7,7 @@ use ic_interfaces::consensus_pool::{ use ic_protobuf::types::v1 as pb; use ic_types::{ Height, Time, - consensus::{Block, CatchUpPackage, ConsensusMessage, Finalization, HasHeight, HashedBlock}, + consensus::{Block, CatchUpPackage, ConsensusMessage, Finalization, HasHeight}, }; use std::cmp::Ordering; use std::collections::BTreeMap; @@ -59,7 +59,6 @@ impl CachedData { struct CachedChainIterator<'a> { consensus_pool: &'a dyn ConsensusPool, finalized_chain: Arc, - to_block: Option, cursor: Option, } @@ -68,12 +67,10 @@ impl<'a> CachedChainIterator<'a> { consensus_pool: &'a dyn ConsensusPool, finalized_chain: Arc, from_block: Block, - to_block: Option, ) -> Self { CachedChainIterator { consensus_pool, finalized_chain, - to_block, cursor: Some(from_block), } } @@ -85,21 +82,6 @@ impl<'a> CachedChainIterator<'a> { } let parent_height = height.decrement(); let parent_hash = &block.parent; - if let Some(to_block) = &self.to_block { - match parent_height.cmp(&to_block.height()) { - std::cmp::Ordering::Less => { - return None; - } - std::cmp::Ordering::Equal => { - if parent_hash == to_block.get_hash() { - return Some(to_block.as_ref().clone()); - } else { - return None; - } - } - _ => (), - } - } // Use cached blocks if the height is finalized if parent_height <= self.finalized_chain.tip().height() && let Ok(block) = self.finalized_chain.get_block_by_height(parent_height) @@ -183,7 +165,6 @@ impl ConsensusPoolCache for ConsensusCacheImpl { pool, self.finalized_chain(), block, - Some(self.catch_up_package().content.block), )) } } @@ -335,7 +316,7 @@ pub(crate) fn update_summary_block( } // Otherwise, find the parent block at start_height - *summary_block = ChainIterator::new(consensus_pool, finalized_tip.clone(), None) + *summary_block = ChainIterator::new(consensus_pool, finalized_tip.clone()) .take_while(|block| block.height() >= start_height) .find(|block| block.height() == start_height) .unwrap_or_else(|| { @@ -397,7 +378,6 @@ impl ConsensusBlockChainImpl { consensus_pool, consensus_pool.as_block_cache().finalized_chain(), tip, - None, ) .take_while(|block| block.height() >= start_height) .map(|block| (block.height(), block)); @@ -468,7 +448,7 @@ impl ConsensusBlockChainImpl { if summary_height >= start_height && summary_height <= tip.height() { blocks.insert(summary_height, summary_block.clone()); } - ChainIterator::new(consensus_pool, tip.clone(), None) + ChainIterator::new(consensus_pool, tip.clone()) .take_while(|block| block.height() >= start_height) .for_each(|block| { blocks.insert(block.height(), block); diff --git a/rs/consensus/src/consensus/block_maker.rs b/rs/consensus/src/consensus/block_maker.rs index ad75c90b5c54..0632c9ad2fed 100644 --- a/rs/consensus/src/consensus/block_maker.rs +++ b/rs/consensus/src/consensus/block_maker.rs @@ -654,6 +654,7 @@ mod tests { signature::ThresholdSignature, *, }; + use mockall::Sequence; use rstest::rstest; use std::sync::Arc; @@ -889,14 +890,26 @@ mod tests { pool.insert_validated(summary); // Payload builder always returns a batch payload with some canister HTTP data + let mut sequence = Sequence::new(); let mut payload_builder = MockPayloadBuilder::new(); - let expected_payload = BatchPayload { + let expected_payload_1 = BatchPayload { canister_http: vec![1; 64], ..Default::default() }; + let expected_payload_2 = BatchPayload { + canister_http: vec![2; 64], + ..Default::default() + }; + payload_builder + .expect_get_payload() + .times(1) + .return_const(expected_payload_1.clone()) + .in_sequence(&mut sequence); payload_builder .expect_get_payload() - .return_const(expected_payload.clone()); + .times(1) + .return_const(expected_payload_2.clone()) + .in_sequence(&mut sequence); let certified_height = Height::from(1); state_manager .get_mut() @@ -952,10 +965,9 @@ mod tests { .expect("Block creation should succeed"); let block = proposal.content.into_inner(); let filled_batch_payload = &block.payload.as_ref().as_data().batch; - assert_eq!(filled_batch_payload, &expected_payload); + assert_eq!(filled_batch_payload, &expected_payload_1); - // Insert the cup at height 10, the batch payload should be empty - // Since payloads below the CUP are not returned + // Even with the CUP at height 10, the batch payload should include a payload pool.insert_validated(cup); let proposal = { let reader = PoolReader::new(&pool); @@ -963,8 +975,8 @@ mod tests { } .expect("Block creation should succeed"); let block = proposal.content.into_inner(); - let empty_batch_payload = &block.payload.as_ref().as_data().batch; - assert!(empty_batch_payload.is_empty()); + let filled_batch_payload = &block.payload.as_ref().as_data().batch; + assert_eq!(filled_batch_payload, &expected_payload_2); }); } diff --git a/rs/consensus/src/consensus/catchup_package_maker.rs b/rs/consensus/src/consensus/catchup_package_maker.rs index cfdbb4b3f1ad..64a9a1a4f1cf 100644 --- a/rs/consensus/src/consensus/catchup_package_maker.rs +++ b/rs/consensus/src/consensus/catchup_package_maker.rs @@ -172,13 +172,6 @@ impl CatchUpPackageMaker { // Skip if random beacon does not exist for the height let random_beacon = pool.get_random_beacon(height)?; - // Skip if the state referenced by finalization tip has not caught up to - // this height. This is to increase the chance that states are available to - // validate payloads at the chain tip. - if pool.get_finalized_tip().context.certified_height < height { - return None; - } - match self.state_manager.get_state_hash_at(height) { Err(StateHashError::Transient(StateNotCommittedYet(_))) => { // TODO: Setup a delay before retry @@ -336,10 +329,7 @@ mod tests { // Skip the first DKG interval pool.advance_round_normal_operation_n(interval_length); - let mut proposal = pool.make_next_block(); - let block = proposal.content.as_mut(); - block.context.certified_height = block.height(); - proposal.content = HashedBlock::new(ic_types::crypto::crypto_hash, block.clone()); + let proposal = pool.make_next_block(); pool.insert_validated(proposal.clone()); pool.notarize(&proposal); pool.finalize(&proposal); diff --git a/rs/consensus/src/consensus/purger.rs b/rs/consensus/src/consensus/purger.rs index f1a31e60005e..2d5ff4d73607 100644 --- a/rs/consensus/src/consensus/purger.rs +++ b/rs/consensus/src/consensus/purger.rs @@ -27,7 +27,7 @@ use ic_interfaces::{ }; use ic_interfaces_registry::RegistryClient; use ic_interfaces_state_manager::StateManager; -use ic_logger::{ReplicaLogger, error, trace, warn}; +use ic_logger::{ReplicaLogger, error, info, trace, warn}; use ic_metrics::MetricsRegistry; use ic_replicated_state::ReplicatedState; use ic_types::{ @@ -348,6 +348,19 @@ impl Purger { /// state removal, check: [`ic_state_manager::StateManagerImpl::remove_states_below`]. fn purge_checkpoints_below_cup_height(&self, pool: &PoolReader<'_>) { let cup_height = pool.get_catch_up_height(); + let finalized_certified_height = pool.get_finalized_tip().context.certified_height; + if finalized_certified_height < cup_height { + info!( + every_n_seconds => 5, + self.log, + "Finalized certified height {} is still below the CUP height {}. This \ + might be caused by a long checkpoint/manifest computation. Skipping \ + purging checkpoints until finalized certified height catches up.", + finalized_certified_height, + cup_height + ); + return; + } self.state_manager.remove_states_below(cup_height); trace!( self.log, @@ -742,10 +755,6 @@ mod tests { let expected_extra_heights = Arc::new(RwLock::new(BTreeSet::new())); let extra_heights_clone = Arc::clone(&expected_extra_heights); - state_manager - .get_mut() - .expect_latest_state_height() - .return_const(Height::from(0)); state_manager .get_mut() .expect_update_fast_forward_height() @@ -814,6 +823,83 @@ mod tests { }) } + #[test] + fn test_purge_checkpoints_below_cup_height() { + ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { + let Dependencies { + mut pool, + state_manager, + replica_config, + registry, + .. + } = dependencies(pool_config, 10); + + let purger = Purger::new( + replica_config, + state_manager.clone(), + Arc::new(MockMessageRouting::new()), + registry, + no_op_logger(), + MetricsRegistry::new(), + ); + + let expected_removed_height = Arc::new(RwLock::new(Height::from(0))); + let removed_height_clone = Arc::clone(&expected_removed_height); + state_manager + .get_mut() + .expect_remove_states_below() + .times(4) // This will be called exactly four times below + .withf(move |height| *height == *removed_height_clone.read().unwrap()) + .return_const(()); + + // Initially, we expect to purge below the genesis CUP of height 0 + *expected_removed_height.write().unwrap() = Height::from(0); + // Expectation called once: + purger.purge_checkpoints_below_cup_height(&PoolReader::new(&pool)); + + // Advance a bit (less than a DKG interval) + pool.advance_round_normal_operation_n(10); + // We still expect to purge below the genesis CUP of height 0 + *expected_removed_height.write().unwrap() = Height::from(0); + // Expectation called twice: + purger.purge_checkpoints_below_cup_height(&PoolReader::new(&pool)); + + // After a DKG interval, we expect to purge below the next CUP height of 60. + // THOUGH, because the finalized tip still points to a smaller certified height (of 0 + // here), the purger should actually not call `state_manager`. + pool.advance_round_normal_operation_n(60); + // We do not expect the expectation to be called here! + purger.purge_checkpoints_below_cup_height(&PoolReader::new(&pool)); + + let mut proposal = pool.make_next_block(); + let block = proposal.content.as_mut(); + block.context.certified_height = Height::from(60); + proposal.content = HashedBlock::new(ic_types::crypto::crypto_hash, block.clone()); + pool.insert_validated(proposal.clone()); + pool.notarize(&proposal); + pool.finalize(&proposal); + // Now that a finalized block points to a height higher or equal (here equal) than the + // CUP height, we should purge below the CUP height. + *expected_removed_height.write().unwrap() = Height::from(60); + // Expectation called three times: + purger.purge_checkpoints_below_cup_height(&PoolReader::new(&pool)); + + // Same test but with a finalized certified height HIGHER than the CUP height. We also + // expect to purge below the CUP height. + pool.insert_validated(pool.make_next_beacon()); + let mut proposal = pool.make_next_block(); + let block = proposal.content.as_mut(); + block.context.certified_height = Height::from(62); + proposal.content = HashedBlock::new(ic_types::crypto::crypto_hash, block.clone()); + pool.insert_validated(proposal.clone()); + pool.notarize(&proposal); + pool.finalize(&proposal); + *expected_removed_height.write().unwrap() = Height::from(60); + // Expectation called four times: + purger.purge_checkpoints_below_cup_height(&PoolReader::new(&pool)); + }) + } + #[test] fn purging_non_finalized_blocks_test() { ic_test_utilities::artifact_pool_config::with_test_pool_config(|pool_config| { diff --git a/rs/consensus/src/consensus/validator.rs b/rs/consensus/src/consensus/validator.rs index 63bae2cf6629..c36765681f33 100644 --- a/rs/consensus/src/consensus/validator.rs +++ b/rs/consensus/src/consensus/validator.rs @@ -2973,16 +2973,10 @@ pub mod test { let result = validator.check_block_validity(&PoolReader::new(&pool), &test_block); assert_matches!(result, Ok(())); - // Insert the cup at height 10, the payload validation should fail - // Since payloads below the CUP are not returned + // Even with the CUP at height 10, the validation should still succeed pool.insert_validated(cup); let result = validator.check_block_validity(&PoolReader::new(&pool), &test_block); - assert_matches!( - result, - Err(ValidationError::ValidationFailed( - ValidationFailure::MissingPastPayloads, - )) - ); + assert_matches!(result, Ok(())); }); } diff --git a/rs/interfaces/src/consensus_pool.rs b/rs/interfaces/src/consensus_pool.rs index ede2508457a4..ff289aaf2222 100644 --- a/rs/interfaces/src/consensus_pool.rs +++ b/rs/interfaces/src/consensus_pool.rs @@ -11,7 +11,7 @@ use ic_types::{ artifact::ConsensusMessageId, consensus::{ Block, BlockProposal, CatchUpPackage, CatchUpPackageShare, ConsensusMessage, ContentEq, - EquivocationProof, Finalization, FinalizationShare, HasHeight, HashedBlock, Notarization, + EquivocationProof, Finalization, FinalizationShare, HasHeight, Notarization, NotarizationShare, RandomBeacon, RandomBeaconShare, RandomTape, RandomTapeShare, }, crypto::CryptoHashOf, @@ -388,11 +388,7 @@ pub trait ConsensusPoolCache: Send + Sync { pool: &'a dyn ConsensusPool, block: Block, ) -> Box + 'a> { - Box::new(ChainIterator::new( - pool, - block, - Some(self.catch_up_package().content.block), - )) + Box::new(ChainIterator::new(pool, block)) } } @@ -427,23 +423,15 @@ pub enum ConsensusBlockChainErr { /// An iterator for block ancestors. pub struct ChainIterator<'a> { consensus_pool: &'a dyn ConsensusPool, - to_block: Option, cursor: Option, } impl<'a> ChainIterator<'a> { - /// Return an iterator that iterates block ancestors, going backwards - /// from the `from_block` to the `to_block` (both inclusive), or until a - /// parent is not found in the consensus pool if the `to_block` is not - /// specified. - pub fn new( - consensus_pool: &'a dyn ConsensusPool, - from_block: Block, - to_block: Option, - ) -> Self { + /// Return an iterator that iterates block ancestors, going backwards from the `from_block` + /// until a parent is not found in the consensus pool. + pub fn new(consensus_pool: &'a dyn ConsensusPool, from_block: Block) -> Self { ChainIterator { consensus_pool, - to_block, cursor: Some(from_block), } } @@ -455,32 +443,32 @@ impl<'a> ChainIterator<'a> { } let parent_height = height.decrement(); let parent_hash = &block.parent; - if let Some(to_block) = &self.to_block { - match parent_height.cmp(&to_block.height()) { - std::cmp::Ordering::Less => { - return None; - } - std::cmp::Ordering::Equal => { - if parent_hash == to_block.get_hash() { - return Some(to_block.as_ref().clone()); - } else { - return None; - } - } - _ => (), - } - } - self.consensus_pool + for proposal in self + .consensus_pool .validated() .block_proposal() .get_by_height(parent_height) - .find_map(|proposal| { - if proposal.content.get_hash() == parent_hash { - Some(proposal.content.into_inner()) - } else { - None - } - }) + { + if proposal.content.get_hash() == parent_hash { + return Some(proposal.content.into_inner()); + } + } + + // If the parent block is not found in the block proposals, it might be because we just + // started up and only have the initial CUP to work with. + if let Ok(cup) = self + .consensus_pool + .validated() + .catch_up_package() + .get_only_by_height(parent_height) + { + let (hash, block) = cup.content.block.decompose(); + if hash == *parent_hash { + return Some(block); + } + } + + None } } diff --git a/rs/state_manager/src/lib.rs b/rs/state_manager/src/lib.rs index d48d408c9f0d..2271230824c4 100644 --- a/rs/state_manager/src/lib.rs +++ b/rs/state_manager/src/lib.rs @@ -1399,7 +1399,13 @@ impl StateManagerImpl { metrics.clone(), malicious_flags.clone(), ); - let (_hash_thread_handle, hash_channel) = spawn_hash_thread(metrics.clone(), log.clone()); + let persist_metadata_guard = Arc::new(Mutex::new(())); + let (_hash_thread_handle, hash_channel) = spawn_hash_thread( + metrics.clone(), + log.clone(), + state_layout.clone(), + persist_metadata_guard.clone(), + ); let starting_time = Instant::now(); let loaded_states_metadata = @@ -1613,8 +1619,6 @@ impl StateManagerImpl { tip: Some(tip_height_and_state.1), })); - let persist_metadata_guard = Arc::new(Mutex::new(())); - let deallocator_thread = DeallocatorThread::new("StateDeallocator", Duration::from_millis(1)); @@ -3474,20 +3478,20 @@ impl StateManager for StateManagerImpl { .tip_handler_queue_length .set(self.tip_channel.len() as i64); - let mut state_metadata_and_compute_manifest_request: Option<(StateMetadata, TipRequest)> = - None; + let mut state_metadata: Option = None; + let mut compute_manifest_request: Option = None; let mut follow_up_tip_requests = Vec::new(); let state = match scope { CertificationScope::Full => { let CreateCheckpointResult { state, - state_metadata, - compute_manifest_request, + state_metadata: metadata, + compute_manifest_request: req, tip_requests, } = self.create_checkpoint_and_switch(state, height); - state_metadata_and_compute_manifest_request = - Some((state_metadata, compute_manifest_request)); + state_metadata = Some(metadata); + compute_manifest_request = Some(req); follow_up_tip_requests = tip_requests; state @@ -3496,7 +3500,10 @@ impl StateManager for StateManagerImpl { }; // Kick off hashing of the new state. This will also compare the result with the - // delivered hash, if present, in order to detect divergence. + // delivered hash, if present, in order to detect divergence. For checkpointing + // heights, the hash thread also inserts `state_metadata` into `states_metadata` + // under the same write lock as `certifications_metadata`, so the two maps stay + // consistent. let hash_req = HashRequest::HashState { state: Arc::clone(&state), states: Arc::clone(&self.states), @@ -3506,8 +3513,8 @@ impl StateManager for StateManagerImpl { latest_height_update_time: Arc::clone(&self.latest_height_update_time), reference_certification: Box::new(maybe_delivered_certification), scope: scope.clone(), - state_layout: Box::new(self.state_layout.clone()), max_certified_height_tx: Arc::clone(&self.max_certified_height_tx), + state_metadata, }; self.hash_channel.send(hash_req).unwrap(); @@ -3521,8 +3528,9 @@ impl StateManager for StateManagerImpl { (height, state.deref().clone()) }; - // For checkpoint heights, we await the state hash immediately. This may not be necessary, - // but it keeps the existing checkpointing behaviour as is. + // For checkpoint heights, we await the state hash immediately. This is also what + // ensures that `states_metadata` has been populated by the hash thread before we + // read it below. // Note: This must not be called while a write lock to `states` is being held. if scope == CertificationScope::Full { self.flush_hash_channel(); @@ -3534,15 +3542,22 @@ impl StateManager for StateManagerImpl { assert_tip_is_none(&states); - if let Some((state_metadata, compute_manifest_request)) = - state_metadata_and_compute_manifest_request - { - let metadata = states - .states_metadata - .entry(height) - .or_insert(state_metadata); + if let Some(compute_manifest_request) = compute_manifest_request { + // The hash thread inserted `states_metadata` for this height under the same + // lock, and we just awaited it above via `flush_hash_channel`, so an entry + // must be present. + debug_assert!(states.states_metadata.contains_key(&height)); debug_assert!(self.tip_channel.len() <= 2); - if metadata.bundled_manifest.is_none() { + // Skip the manifest request only if a manifest is already present (e.g. + // populated by state sync). Otherwise — including the case where the entry + // is unexpectedly missing in release — send the request; the tip thread + // handles a missing `states_metadata` entry gracefully. + let already_has_manifest = states + .states_metadata + .get(&height) + .and_then(|m| m.bundled_manifest.as_ref()) + .is_some(); + if !already_has_manifest { self.tip_channel .send(compute_manifest_request) .expect("failed to send ComputeManifestRequest message"); @@ -3559,9 +3574,10 @@ impl StateManager for StateManagerImpl { // tip if needed. states.tip_height = next_tip.0; states.tip = Some(next_tip.1); - if scope == CertificationScope::Full { - self.release_lock_and_persist_metadata(states); - } + // Note: for `Full` scope, `states_metadata` has already been persisted to disk by + // the hash thread (under the same write lock that inserted the entry), so there is + // no need to persist here. + drop(states); for req in follow_up_tip_requests { self.tip_channel .send(req) @@ -3638,9 +3654,12 @@ enum HashRequest { /// `MAX_CONSECUTIVE_ROUNDS_WITHOUT_STATE_CLONING`. reference_certification: Box>, scope: CertificationScope, - /// Boxed so that variants have similar size and we don't waste space when sending `HashRequest::Wait`. - state_layout: Box, max_certified_height_tx: Arc>, + /// For checkpointing heights, the `StateMetadata` to insert into `states_metadata` + /// atomically with `certifications_metadata` (i.e. under the same write lock on + /// `SharedState`). This guarantees that after the hash thread has processed the + /// request, both metadata maps contain a consistent entry for `height`. + state_metadata: Option, }, /// Wait for the message to be executed and notify back via `sender`. Wait { sender: Sender<()> }, @@ -3649,6 +3668,8 @@ enum HashRequest { fn spawn_hash_thread( metrics: StateManagerMetrics, log: ReplicaLogger, + state_layout: StateLayout, + persist_metadata_guard: Arc>, ) -> (JoinOnDrop<()>, Sender) { #[allow(clippy::disallowed_methods)] let (hash_req_sender, receiver) = unbounded(); @@ -3667,8 +3688,8 @@ fn spawn_hash_thread( latest_height_update_time, reference_certification, scope, - state_layout, max_certified_height_tx, + state_metadata, } => { let mut certification_metadata = StateManagerImpl::compute_certification_metadata( @@ -3719,6 +3740,17 @@ fn spawn_hash_thread( assert_prev_hash_matches(prev_hash, "previously computed"); } + // For checkpointing heights, insert `states_metadata` under the same + // write lock as `certifications_metadata`, so the two maps are always + // consistent. We use `or_insert` so that an existing entry (e.g. + // populated by state sync) is preserved. + if let Some(state_metadata) = state_metadata { + states + .states_metadata + .entry(height) + .or_insert(state_metadata); + } + // Add state and hash to snapshots and certification_metadata if !states .snapshots @@ -3767,6 +3799,20 @@ fn spawn_hash_thread( ); } } + + // For checkpointing heights, persist `states_metadata` to disk while + // still holding (and then releasing) the same write lock used to insert + // the in-memory entry above. This keeps the persisted file in lockstep + // with the in-memory state. + if scope == CertificationScope::Full { + release_lock_and_persist_metadata( + &log, + &metrics, + &state_layout, + states, + &persist_metadata_guard, + ); + } } HashRequest::Wait { sender } => { sender.send(()).unwrap();