Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 164 additions & 18 deletions crates/activate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub enum Change {
}

// JournalSplit describes a collection partition or a shard recovery log.
#[derive(Debug, Default, Clone, serde::Serialize)]
#[derive(Debug, Default, Clone, PartialEq, serde::Serialize)]
pub struct JournalSplit {
pub name: String,
pub labels: LabelSet,
Expand Down Expand Up @@ -177,6 +177,7 @@ pub async fn activate_collection(
)?;

let shards = apply_initial_splits(task_template, initial_splits, shards)?;
let partitions = apply_initial_partition_splits(task_spec, partitions)?;
let changes_1 = partition_changes(partition_template, partitions)?;
let changes_2 = task_changes(task_template, shards, recovery, ops_logs, ops_stats)?;

Expand Down Expand Up @@ -767,13 +768,23 @@ fn apply_initial_splits<'a>(
return Ok(shards);
}
// The task is being upsert-ed, it's not disabled, and no current shards
// have its template prefix.
// have its template prefix. Use the old-generation shard count if it
// exceeds `initial_splits`, preserving split parallelism across resets.
let effective_splits = shards.len().max(initial_splits);

if shards.len() > initial_splits {
tracing::info!(
task = template.shard.id,
old_shards = shards.len(),
effective_splits,
"preserving split count from previous generation"
);
}

// Invent `initial_splits` new shards.
for pivot in 0..initial_splits {
for pivot in 0..effective_splits {
let range = flow::RangeSpec {
key_begin: ((1 << 32) * (pivot + 0) / initial_splits) as u32,
key_end: (((1 << 32) * (pivot + 1) / initial_splits) - 1) as u32,
key_begin: ((1 << 32) * (pivot + 0) / effective_splits) as u32,
key_end: (((1 << 32) * (pivot + 1) / effective_splits) - 1) as u32,
r_clock_begin: 0,
r_clock_end: u32::MAX,
};
Expand All @@ -794,6 +805,79 @@ fn apply_initial_splits<'a>(
Ok(shards)
}

/// Produce the partition set to upsert when activating a collection.
/// Mirrors old-generation key-range splits into the new generation only
/// when both old and new generations are unpartitioned, preserving
/// parallelism across reset.
fn apply_initial_partition_splits(
task_spec: Option<&flow::CollectionSpec>,
mut partitions: Vec<JournalSplit>,
) -> anyhow::Result<Vec<JournalSplit>> {
// No spec: the collection is being deleted.
let Some(spec) = task_spec else {
return Ok(partitions);
};
let Some(template) = spec.partition_template.as_ref() else {
return Ok(partitions);
};
// Re-activation: new-generation journals already exist.
if partitions
.iter()
.any(|p| p.name.starts_with(&template.name))
{
return Ok(partitions);
}
// First-time activation: nothing to mirror from.
if partitions.is_empty() {
return Ok(partitions);
}
// New generation is logically partitioned. The set of partition
// journals is determined by the combinations of partition field values
// observed in incoming data, which we can't know at activation time.
// The runtime mapper creates each partition journal when the first
// document with that field-value combination arrives.
if !spec.partition_fields.is_empty() {
return Ok(partitions);
}
// Old generation was logically partitioned, new one isn't. The old
// journal layout shouldn't carry over.
if partitions.iter().any(|p| {
p.labels
.labels
.iter()
.any(|l| l.name.starts_with(labels::FIELD_PREFIX))
}) {
return Ok(partitions);
}

tracing::info!(
collection = template.name,
old_partitions = partitions.len(),
"preserving partition splits from previous generation"
);

let old_count = partitions.len();
for idx in 0..old_count {
let old = &partitions[idx];
let mut new_labels = LabelSet::default();
for label in &old.labels.labels {
if label.name == labels::KEY_BEGIN || label.name == labels::KEY_END {
new_labels = labels::add_value(new_labels, &label.name, &label.value);
}
}
let name = labels::partition::full_name(&template.name, &new_labels)
.context("building new-generation partition name from old-generation labels")?;
partitions.push(JournalSplit {
name,
labels: new_labels,
mod_revision: 0,
suspend: None,
});
}

Ok(partitions)
}

/// Map a parent JournalSplit into two subdivided splits.
pub fn map_partition_to_split(
parent: &JournalSplit,
Expand Down Expand Up @@ -936,12 +1020,14 @@ mod test {
.get_key(&models::Collection::new("example/collection"))
.unwrap();

let Some(flow::CollectionSpec {
partition_template: Some(partition_template),
partition_fields,
projections,
..
}) = spec
let Some(
collection_spec @ flow::CollectionSpec {
partition_template: Some(partition_template),
partition_fields,
projections,
..
},
) = spec
else {
unreachable!()
};
Expand Down Expand Up @@ -1319,12 +1405,12 @@ mod test {
insta::assert_json_snapshot!("delete", (partition_changes, task_changes));
}

// Case: test mixed deletion and creation.
// Case: test mixed deletion and creation, as happens during a reset.
// Simulates existing data-plane specs from an older generation being
// swapped out for a new generation. The fixture is logically
// partitioned, so partitions are not pre-created. Shard split counts
// are preserved: 3 old-gen shards with initial_splits=1 produce 3.
{
// Simulate existing data-plane specs which were created under an
// older initial publication ID, and which are now being swapped out.
// This emulates a deletion followed by a re-creation, where we failed
// to activate the intermediary deletion.
let mut all_partitions = all_partitions.clone();
let mut all_shards = all_shards.clone();
let mut all_recovery = all_recovery.clone();
Expand All @@ -1339,7 +1425,9 @@ mod test {
*name = name.replace("2020202020202020", "replaced-pub-id");
}

let shards = apply_initial_splits(Some(task_template), 4, all_shards).unwrap();
let shards = apply_initial_splits(Some(task_template), 1, all_shards).unwrap();
let all_partitions =
apply_initial_partition_splits(Some(collection_spec), all_partitions).unwrap();

let partition_changes =
partition_changes(Some(&partition_template), all_partitions).unwrap();
Expand All @@ -1363,6 +1451,64 @@ mod test {
insta::assert_json_snapshot!("create_and_delete", (partition_changes, task_changes));
}

// Case: reset of an unpartitioned collection mirrors key-range
// splits into the new generation. Builds a CollectionSpec with no
// partition_fields and old-gen partitions whose labels carry only
// KEY_BEGIN/KEY_END (the shape of an unpartitioned collection).
{
let mut unpartitioned_spec = collection_spec.clone();
unpartitioned_spec.partition_fields = Vec::new();

let make_unpartitioned = |key_begin: u32, key_end: u32| {
let labels = labels::partition::encode_key_range_labels(
LabelSet::default(),
key_begin,
key_end,
);
let name = labels::partition::full_name(&partition_template.name, &labels)
.unwrap()
.replace("2020202020202020", "replaced-pub-id");
JournalSplit {
name,
labels,
mod_revision: 222,
suspend: None,
}
};

let old_partitions = vec![
make_unpartitioned(0, 0x7fffffff),
make_unpartitioned(0x80000000, u32::MAX),
];

let new_partitions =
apply_initial_partition_splits(Some(&unpartitioned_spec), old_partitions).unwrap();

insta::assert_json_snapshot!("reset_unpartitioned", new_partitions);
}

// Case: reset that removes logical partitioning. Old generation has
// field-labeled partitions, new generation has no partition_fields.
// The function must not mirror old per-field-value partitions onto
// the new unpartitioned topology — partition_changes will delete
// them and the runtime mapper creates the single new journal on
// first commit.
{
let mut unpartitioned_spec = collection_spec.clone();
unpartitioned_spec.partition_fields = Vec::new();

let mut old_partitions = all_partitions.clone();
for JournalSplit { name, .. } in old_partitions.iter_mut() {
*name = name.replace("2020202020202020", "replaced-pub-id");
}
let expected = old_partitions.clone();

let result =
apply_initial_partition_splits(Some(&unpartitioned_spec), old_partitions).unwrap();

assert_eq!(result, expected);
}

// Case: split a shard on its key or clock.
{
let parent = all_shards.first().unwrap();
Expand Down
Loading
Loading