Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/rfcs/035-safekeeper-dynamic-membership-change.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ Node management is similar to pageserver:
2) GET `/control/v1/safekeeper` lists safekeepers.
3) GET `/control/v1/safekeeper/:node_id` gets safekeeper.
4) PUT `/control/v1/safekeper/:node_id/scheduling_policy` changes status to e.g.
`offline` or `decomissioned`. Initially it is simpler not to schedule any
`offline` or `decommissioned`. Initially it is simpler not to schedule any
migrations here.

Safekeeper deploy scripts should register safekeeper at storage_contorller as
Expand Down
9 changes: 5 additions & 4 deletions libs/pageserver_api/src/controller_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ pub enum SkSchedulingPolicy {
Active,
Activating,
Pause,
Decomissioned,
#[serde(alias = "Decomissioned")]
Decommissioned,
}

impl FromStr for SkSchedulingPolicy {
Expand All @@ -449,10 +450,10 @@ impl FromStr for SkSchedulingPolicy {
"active" => Self::Active,
"activating" => Self::Activating,
"pause" => Self::Pause,
"decomissioned" => Self::Decomissioned,
"decommissioned" | "decomissioned" => Self::Decommissioned,
_ => {
return Err(anyhow::anyhow!(
"Unknown scheduling policy '{s}', try active,pause,decomissioned"
"Unknown scheduling policy '{s}', try active,pause,decommissioned"
));
}
})
Expand All @@ -466,7 +467,7 @@ impl From<SkSchedulingPolicy> for String {
Active => "active",
Activating => "activating",
Pause => "pause",
Decomissioned => "decomissioned",
Decommissioned => "decommissioned",
}
.to_string()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE safekeepers SET scheduling_policy = 'decomissioned' WHERE scheduling_policy = 'decommissioned';
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UPDATE safekeepers SET scheduling_policy = 'decommissioned' WHERE scheduling_policy = 'decomissioned';
2 changes: 1 addition & 1 deletion storage_controller/src/heartbeater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl HeartBeat<Safekeeper, SafekeeperState> for HeartbeaterTask<Safekeeper, Safe

let mut heartbeat_futs = FuturesUnordered::new();
for (node_id, sk) in &*safekeepers {
if sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned {
if sk.scheduling_policy() == SkSchedulingPolicy::Decommissioned {
continue;
}
heartbeat_futs.push({
Expand Down
2 changes: 1 addition & 1 deletion storage_controller/src/service/safekeeper_reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub(crate) async fn load_schedule_requests(
for (op_persist, timeline_persist) in pending_ops_timelines {
let node_id = NodeId(op_persist.sk_id as u64);
let Some(sk) = safekeepers.get(&node_id) else {
// This shouldn't happen, at least the safekeeper should exist as decomissioned.
// This shouldn't happen, at least the safekeeper should exist as decommissioned.
tracing::warn!(
tenant_id = op_persist.tenant_id,
timeline_id = op_persist.timeline_id,
Expand Down
8 changes: 4 additions & 4 deletions storage_controller/src/service/safekeeper_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ impl Service {
.safekeeper_reconcilers
.start_reconciler(node_id, self);
}
SkSchedulingPolicy::Decomissioned
SkSchedulingPolicy::Decommissioned
| SkSchedulingPolicy::Pause
| SkSchedulingPolicy::Activating => {
locked.safekeeper_reconcilers.stop_reconciler(node_id);
Expand Down Expand Up @@ -1192,13 +1192,13 @@ impl Service {
.map(|&id| NodeId(id as u64))
.collect::<Vec<_>>();

// Validate that we are not migrating to a decomissioned safekeeper.
// Validate that we are not migrating to a decommissioned safekeeper.
for sk in new_safekeepers.iter() {
if !cur_sk_set.contains(&sk.get_id())
&& sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
&& sk.scheduling_policy() == SkSchedulingPolicy::Decommissioned
{
return Err(ApiError::BadRequest(anyhow::anyhow!(
"safekeeper {} is decomissioned",
"safekeeper {} is decommissioned",
sk.get_id()
)));
}
Expand Down
4 changes: 2 additions & 2 deletions test_runner/regress/test_safekeeper_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ def expect_fail(sk_set: list[int], match: str):
assert len(sk_set) == 2

decom_sk = [sk.id for sk in env.safekeepers if sk.id not in sk_set][0]
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decomissioned")
env.storage_controller.safekeeper_scheduling_policy(decom_sk, "Decommissioned")

expect_fail([sk_set[0], decom_sk], "decomissioned")
expect_fail([sk_set[0], decom_sk], "decommissioned")


def test_safekeeper_migration_common_set_failpoints(neon_env_builder: NeonEnvBuilder):
Expand Down
8 changes: 4 additions & 4 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3752,8 +3752,8 @@ def storcon_heartbeat():

wait_until(storcon_heartbeat)

# Now decomission it
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
# Now decommission it
target.safekeeper_scheduling_policy(inserted["id"], "Decommissioned")


@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
Expand Down Expand Up @@ -3803,8 +3803,8 @@ def safekeeper_is_active():

wait_until(safekeeper_is_active)

# Now decomission it
target.safekeeper_scheduling_policy(inserted["id"], "Decomissioned")
# Now decommission it
target.safekeeper_scheduling_policy(inserted["id"], "Decommissioned")


def eq_safekeeper_records(a: dict[str, Any], b: dict[str, Any]) -> bool:
Expand Down