diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index ba02602cfeb1..613284baaabe 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -839,70 +839,114 @@ impl LayerMap { /// we'll need to visit for any page reconstruction in this region. /// We use this heuristic to decide whether to create an image layer. pub fn count_deltas(&self, key: &Range, lsn: &Range, limit: Option) -> usize { - // We get the delta coverage of the region, and for each part of the coverage - // we recurse right underneath the delta. The recursion depth is limited by - // the largest result this function could return, which is in practice between - // 3 and 10 (since we usually try to create an image when the number gets larger). - if lsn.is_empty() || key.is_empty() || limit == Some(0) { return 0; } - let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { - Some(v) => v, - None => return 0, - }; + struct Frame { + max_stacked_deltas: usize, + } - let start = key.start.to_i128(); - let end = key.end.to_i128(); + enum Task { + Process { + key: Range, + lsn: Range, + limit: Option, + }, + Accumulate { + frame_idx: usize, + base_count: usize, + }, + FinishFrame { + frame_idx: usize, + }, + } - // Initialize loop variables - let mut max_stacked_deltas = 0; - let mut current_key = start; - let mut current_val = version.delta_coverage.query(start); - - // Loop through the delta coverage and recurse on each part - for (change_key, change_val) in version.delta_coverage.range(start..end) { - // If there's a relevant delta in this part, add 1 and recurse down - if let Some(val) = ¤t_val { - if val.get_lsn_range().end > lsn.start { - let kr = Key::from_i128(current_key)..Key::from_i128(change_key); - let lr = lsn.start..val.get_lsn_range().start; - if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(val, key) as usize; - let new_limit = limit.map(|l| l - base_count); - let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit); - max_stacked_deltas = std::cmp::max( - max_stacked_deltas, - base_count + max_stacked_deltas_underneath, - ); + let mut last_result = 0; + let mut frames: Vec = Vec::new(); + let mut tasks = vec![Task::Process { + key: key.clone(), + lsn: lsn.clone(), + limit, + }]; + + while let Some(task) = tasks.pop() { + match task { + Task::Process { key, lsn, limit } => { + if lsn.is_empty() || key.is_empty() || limit == Some(0) { + last_result = 0; + continue; } - } - } - current_key = change_key; - current_val.clone_from(&change_val); - } + let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { + Some(v) => v, + None => { + last_result = 0; + continue; + } + }; - // Consider the last part - if let Some(val) = ¤t_val { - if val.get_lsn_range().end > lsn.start { - let kr = Key::from_i128(current_key)..Key::from_i128(end); - let lr = lsn.start..val.get_lsn_range().start; - - if !kr.is_empty() { - let base_count = Self::is_reimage_worthy(val, key) as usize; - let new_limit = limit.map(|l| l - base_count); - let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit); - max_stacked_deltas = std::cmp::max( - max_stacked_deltas, - base_count + max_stacked_deltas_underneath, + let start = key.start.to_i128(); + let end = key.end.to_i128(); + + let mut partitions = Vec::new(); + let mut current_key = start; + let mut current_val = version.delta_coverage.query(start); + + let changes = version.delta_coverage.range(start..end) + .chain(std::iter::once((end, None))); + + for (change_key, next_val) in changes { + if let Some(val) = ¤t_val { + if val.get_lsn_range().end > lsn.start { + let kr = Key::from_i128(current_key)..Key::from_i128(change_key); + let lr = lsn.start..val.get_lsn_range().start; + if !kr.is_empty() { + let base_count = Self::is_reimage_worthy(val, &key) as usize; + let new_limit = limit.map(|l| l.saturating_sub(base_count)); + partitions.push((kr, lr, new_limit, base_count)); + } + } + } + current_key = change_key; + current_val = next_val; + } + + if partitions.is_empty() { + last_result = 0; + continue; + } + + let frame_idx = frames.len(); + frames.push(Frame { max_stacked_deltas: 0 }); + + tasks.push(Task::FinishFrame { frame_idx }); + + for (kr, lr, new_limit, base_count) in partitions.into_iter().rev() { + tasks.push(Task::Accumulate { + frame_idx, + base_count, + }); + tasks.push(Task::Process { + key: kr, + lsn: lr, + limit: new_limit, + }); + } + } + Task::Accumulate { frame_idx, base_count } => { + frames[frame_idx].max_stacked_deltas = std::cmp::max( + frames[frame_idx].max_stacked_deltas, + base_count + last_result, ); } + Task::FinishFrame { frame_idx } => { + last_result = frames[frame_idx].max_stacked_deltas; + } } } - max_stacked_deltas + last_result } /* BEGIN_HADRON */ @@ -1808,6 +1852,34 @@ mod tests { assert_eq!(Lsn(100), image_consistent_lsn); } + #[test] + fn test_count_deltas_stack_overflow() { + let mut layer_map = LayerMap::default(); + let mut updates = layer_map.batch_update(); + + let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); + let timeline_id = TimelineId::generate(); + + // Insert 10,000 overlapping delta layers with increasing LSN ranges + // Use a non-L0 key range so that all layers are considered reimage-worthy at all partition ranges. + let key_range = Key::from_i128(0)..Key::from_i128(100); + for i in 1..10000 { + let desc = PersistentLayerDesc::new_delta( + tenant_shard_id, + timeline_id, + key_range.clone(), + Lsn(i * 10)..Lsn(i * 10 + 5), + 1024, + ); + updates.insert_historic(desc); + } + updates.flush(); + + // This call will recursively traverse all 10,000 layers in the original code, causing stack overflow + let count = layer_map.count_deltas(&key_range, &(Lsn(0)..Lsn(100000)), None); + assert_eq!(count, 9999); + } + /* END_HADRON */ }