Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/admin/src/cluster_controller/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ impl ClusterCtrlSvc for ClusterCtrlSvcHandler {
let response = GetClusterConfigurationResponse {
cluster_configuration: Some(ClusterConfiguration {
num_partitions: u32::from(partition_table.num_partitions()),
partition_replication: partition_table.replication().clone().into(),
partition_replication: Some(
partition_table
.replication_property(&Metadata::with_current(|m| m.nodes_config_ref()))
.into(),
Comment on lines +370 to +373

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve Everywhere on configuration reads

For clusters that still have legacy PartitionReplication::Everywhere metadata, this now returns a concrete replication property instead of omitting the field. If a client reads the cluster configuration and writes it back unchanged (for example while changing only bifrost_provider), update_cluster_configuration treats any Some value that is not already Limit(current) as a request to set Limit (crates/admin/src/cluster_controller/service.rs:699-701); previously the protobuf conversion encoded Everywhere as None (crates/types/src/protobuf.rs:134-140), which preserved the legacy dynamic behavior. That round-trip freezes replication to the current worker count, so later added workers are no longer automatically included as they were under Everywhere.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really want to get rid of the legacy everywhere.

),
bifrost_provider: Some(logs.configuration().default_provider.clone().into()),
}),
};
Expand Down
32 changes: 3 additions & 29 deletions crates/admin/src/cluster_controller/service/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use restate_types::net::partition_processor_manager::{
ControlProcessor, ControlProcessors, ProcessorCommand,
};
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, WorkerState};
use restate_types::partition_table::{PartitionReplication, PartitionTable};
use restate_types::partition_table::PartitionTable;
use restate_types::partitions::leadership_policy::{LeaderAffinity, LeadershipPolicy};
use restate_types::partitions::state::{PartitionReplicaSetStates, ReplicaSetState};
use restate_types::partitions::{PartitionConfiguration, worker_candidate_filter};
Expand Down Expand Up @@ -347,10 +347,7 @@ impl<T: TransportConnect> Scheduler<T> {
// make sure that we have a valid partition processor configuration
let mut occupied_entry = match entry {
Entry::Occupied(mut entry) if entry.get().current.is_valid() => {
let partition_replication = Self::partition_replication_to_replication_property(
nodes_config,
partition_table,
);
let partition_replication = partition_table.replication_property(nodes_config);
if Self::requires_reconfiguration(
partition_id,
entry.get(),
Expand Down Expand Up @@ -397,10 +394,7 @@ impl<T: TransportConnect> Scheduler<T> {
entry
}
entry => {
let partition_replication = Self::partition_replication_to_replication_property(
nodes_config,
partition_table,
);
let partition_replication = partition_table.replication_property(nodes_config);

// no or no valid current configuration, pick a valid configuration
if let Some(current) = Self::choose_partition_configuration(
Expand Down Expand Up @@ -506,26 +500,6 @@ impl<T: TransportConnect> Scheduler<T> {
all_current_workers_disabled || any_next_pp_active
}

fn partition_replication_to_replication_property(
nodes_config: &NodesConfiguration,
partition_table: &PartitionTable,
) -> ReplicationProperty {
match partition_table.replication() {
PartitionReplication::Everywhere => {
// only kept for backwards compatibility; this can be removed once
// we no longer need to support the Everywhere variant
// for everywhere we pick all current worker candidates but at least 1
let candidates = nodes_config
.iter()
.filter(|(node_id, node_config)| worker_candidate_filter(*node_id, node_config))
.count()
.max(1);
ReplicationProperty::new_unchecked(candidates.min(usize::from(u8::MAX)) as u8)
}
PartitionReplication::Limit(partition_replication) => partition_replication.clone(),
}
}

async fn load_partition_configuration(
metadata_store_client: &MetadataStoreClient,
partition_id: PartitionId,
Expand Down
4 changes: 2 additions & 2 deletions crates/types/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub enum DurabilityMode {
/// fetch the snapshot as usual.
///
/// [requires snapshot repository]
/// [default] if snapshot repository configured
/// [default] if restate-server is in cluster mode.
/// DurabilityPoint = Min(Max(ReplicaSetDurablePoints), SnapshotDurablePoint)
Balanced,

Expand All @@ -268,7 +268,7 @@ pub enum DurabilityMode {
///
/// default in standalone-mode with no snapshot repository configured
///
/// [default] if snapshot repository is not configured
/// [default] if restate-server is in single-node mode.
/// DurabilityPoint = Min(ReplicaSetDurablePoints)
// [Requires node-to-on-node sharing of ad-hoc snapshots] if used in cluster mode.
ReplicaSetOnly,
Expand Down
22 changes: 22 additions & 0 deletions crates/types/src/partition_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use crate::identifiers::{PartitionId, PartitionKey};
use crate::logs::LogId;
use crate::metadata::GlobalMetadata;
use crate::net::metadata::{MetadataContainer, MetadataKind};
use crate::nodes_config::NodesConfiguration;
use crate::partitions::worker_candidate_filter;
use crate::protobuf::common::DatabaseKind;
use crate::replication::ReplicationProperty;
use crate::{Version, Versioned, flexbuffers_storage_encode_decode};
Expand Down Expand Up @@ -156,10 +158,30 @@ impl PartitionTable {
self.partitions.contains_key(partition_id)
}

// todo: remove me after auto-migrating to replication property
pub fn replication(&self) -> &PartitionReplication {
&self.replication
}

// todo: Run migration on startup to remove PartitionReplication from metadata and use
// PartitionProperty directly.
pub fn replication_property(&self, nodes_config: &NodesConfiguration) -> ReplicationProperty {
match &self.replication {
PartitionReplication::Everywhere => {
// only kept for backwards compatibility; this can be removed once
// we no longer need to support the Everywhere variant
// for everywhere we pick all current worker candidates but at least 1
let candidates = nodes_config
.iter()
.filter(|(node_id, node_config)| worker_candidate_filter(*node_id, node_config))
.count()
.max(1);
ReplicationProperty::new_unchecked(candidates.min(usize::from(u8::MAX)) as u8)
}
PartitionReplication::Limit(partition_replication) => partition_replication.clone(),
}
}

pub fn into_builder(self) -> PartitionTableBuilder {
self.into()
}
Expand Down
54 changes: 23 additions & 31 deletions crates/worker/src/partition/leadership/durability_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use std::task::Poll;
use std::time::Duration;

use futures::{Stream, StreamExt};
use restate_clock::WallClock;
use tokio::sync::watch;
use tokio::time::{Instant, MissedTickBehavior};
use tokio_stream::wrappers::{IntervalStream, WatchStream};
use tracing::{debug, warn};

use restate_clock::WallClock;
use restate_core::Metadata;
use restate_types::config::{Configuration, DurabilityMode};
use restate_types::config::{Configuration, DurabilityMode, WorkerOptions};
use restate_types::identifiers::PartitionId;
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::nodes_config::Role;
Expand Down Expand Up @@ -74,14 +74,18 @@ impl DurabilityTracker {
}
}

fn sanitize_durability_mode(
&self,
input_durability_mode: Option<DurabilityMode>,
) -> DurabilityMode {
let configuration = Configuration::pinned();
let has_snapshot_repository = configuration.worker.snapshots.destination.is_some();
let is_cluster =
Metadata::with_current(|m| m.nodes_config_ref().iter_role(Role::Worker).count() > 1);
fn sanitize_durability_mode(&self, opts: &WorkerOptions) -> DurabilityMode {
let has_snapshot_repository = opts.snapshots.destination.is_some();
let is_cluster = Metadata::with_current(|m| {
let nodes_config = m.nodes_config_ref();
let replication_needed = m
.partition_table_ref()
.replication_property(&nodes_config)
.num_copies()
> 1;

replication_needed || nodes_config.iter_role(Role::Worker).count() > 1
});

let require_snapshots_notice = |durability_mode| {
if !has_snapshot_repository && should_emit_snapshot_warning() {
Expand All @@ -107,26 +111,15 @@ impl DurabilityTracker {
};

// ## Special cases:
// - A standalone node:
// - no snapshot repository configured. Trim after local durability -> `ReplicaSetOnly`
// - snapshot repository configured. Trim after a snapshot is taken + local durability -> `Balanced` || `SnapshotAndReplicaSet`
//
// Simply, if snapshot is not configured, our options are:
// - None
// - ReplicaSetOnly
//
// If snapshot is configured:
// - None
// - ReplicaSetOnly (not recommended, can't recover if cluster)
// - SnapshotOrReplicaSet (not supported / disabled)
// - SnapshotOnly
let durability_mode = input_durability_mode.unwrap_or({
// default depends on whether we have snapshot repository configured or not
if has_snapshot_repository {
DurabilityMode::Balanced
} else {
DurabilityMode::ReplicaSetOnly
}
// When in standalone (single-node) node we allow trimming to happen without relying on a
// snapshot store by defaulting to `ReplicaSetOnly`. If restate is configured to be used
// in a cluster setup, we push the user to user a snapshot store. In that setup we default
// to `Balanced`.
let durability_mode = opts.durability_mode.unwrap_or(if is_cluster {
DurabilityMode::Balanced
} else {
DurabilityMode::ReplicaSetOnly
});

match durability_mode {
Expand Down Expand Up @@ -173,8 +166,7 @@ impl Stream for DurabilityTracker {
(Poll::Pending, Poll::Pending) => return Poll::Pending,
}

let durability_mode =
self.sanitize_durability_mode(Configuration::pinned().worker.durability_mode);
let durability_mode = self.sanitize_durability_mode(&Configuration::pinned().worker);
let suggested = match durability_mode {
DurabilityMode::None => {
// Skip, maybe by next tick the durability mode changes
Expand Down
Loading