Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 122 additions & 50 deletions pageserver/src/tenant/layer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,70 +839,114 @@ impl LayerMap {
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
pub fn count_deltas(&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> usize {
// We get the delta coverage of the region, and for each part of the coverage
// we recurse right underneath the delta. The recursion depth is limited by
// the largest result this function could return, which is in practice between
// 3 and 10 (since we usually try to create an image when the number gets larger).

if lsn.is_empty() || key.is_empty() || limit == Some(0) {
return 0;
}

let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return 0,
};
struct Frame {
max_stacked_deltas: usize,
}

let start = key.start.to_i128();
let end = key.end.to_i128();
enum Task {
Process {
key: Range<Key>,
lsn: Range<Lsn>,
limit: Option<usize>,
},
Accumulate {
frame_idx: usize,
base_count: usize,
},
FinishFrame {
frame_idx: usize,
},
}

// Initialize loop variables
let mut max_stacked_deltas = 0;
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);

// Loop through the delta coverage and recurse on each part
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = &current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
);
let mut last_result = 0;
let mut frames: Vec<Frame> = Vec::new();
let mut tasks = vec![Task::Process {
key: key.clone(),
lsn: lsn.clone(),
limit,
}];

while let Some(task) = tasks.pop() {
match task {
Task::Process { key, lsn, limit } => {
if lsn.is_empty() || key.is_empty() || limit == Some(0) {
last_result = 0;
continue;
}
}
}

current_key = change_key;
current_val.clone_from(&change_val);
}
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => {
last_result = 0;
continue;
}
};

// Consider the last part
if let Some(val) = &current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;

if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit);
max_stacked_deltas = std::cmp::max(
max_stacked_deltas,
base_count + max_stacked_deltas_underneath,
let start = key.start.to_i128();
let end = key.end.to_i128();

let mut partitions = Vec::new();
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);

let changes = version.delta_coverage.range(start..end)
.chain(std::iter::once((end, None)));

for (change_key, next_val) in changes {
if let Some(val) = &current_val {
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(val, &key) as usize;
let new_limit = limit.map(|l| l.saturating_sub(base_count));
partitions.push((kr, lr, new_limit, base_count));
}
}
}
current_key = change_key;
current_val = next_val;
}

if partitions.is_empty() {
last_result = 0;
continue;
}

let frame_idx = frames.len();
frames.push(Frame { max_stacked_deltas: 0 });

tasks.push(Task::FinishFrame { frame_idx });

for (kr, lr, new_limit, base_count) in partitions.into_iter().rev() {
tasks.push(Task::Accumulate {
frame_idx,
base_count,
});
tasks.push(Task::Process {
key: kr,
lsn: lr,
limit: new_limit,
});
}
}
Task::Accumulate { frame_idx, base_count } => {
frames[frame_idx].max_stacked_deltas = std::cmp::max(
frames[frame_idx].max_stacked_deltas,
base_count + last_result,
);
}
Task::FinishFrame { frame_idx } => {
last_result = frames[frame_idx].max_stacked_deltas;
}
}
}

max_stacked_deltas
last_result
}

/* BEGIN_HADRON */
Expand Down Expand Up @@ -1808,6 +1852,34 @@ mod tests {
assert_eq!(Lsn(100), image_consistent_lsn);
}

#[test]
fn test_count_deltas_stack_overflow() {
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();

let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let timeline_id = TimelineId::generate();

// Insert 10,000 overlapping delta layers with increasing LSN ranges
// Use a non-L0 key range so that all layers are considered reimage-worthy at all partition ranges.
let key_range = Key::from_i128(0)..Key::from_i128(100);
for i in 1..10000 {
let desc = PersistentLayerDesc::new_delta(
tenant_shard_id,
timeline_id,
key_range.clone(),
Lsn(i * 10)..Lsn(i * 10 + 5),
1024,
);
updates.insert_historic(desc);
}
updates.flush();

// This call will recursively traverse all 10,000 layers in the original code, causing stack overflow
let count = layer_map.count_deltas(&key_range, &(Lsn(0)..Lsn(100000)), None);
assert_eq!(count, 9999);
}

/* END_HADRON */
}

Expand Down