Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,7 @@ fn add_new_remove_old_builtin_clusters_migration(
logging: default_logging_config(),
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
enable_upsert_v2: None,
}),
workload_class: None,
},
Expand Down
19 changes: 16 additions & 3 deletions src/adapter/src/coord/catalog_implications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use fail::fail_point;
use itertools::Itertools;
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::memory::objects::{
CatalogItem, Cluster, ClusterReplica, Connection, ContinualTask, DataSourceDesc, Index,
MaterializedView, Secret, Sink, Source, StateDiff, Table, TableDataSource, View,
CatalogItem, Cluster, ClusterReplica, ClusterVariant, Connection, ContinualTask,
DataSourceDesc, Index, MaterializedView, Secret, Sink, Source, StateDiff, Table,
TableDataSource, View,
};
use mz_cloud_resources::VpcEndpointConfig;
use mz_compute_client::logging::LogVariant;
Expand Down Expand Up @@ -1379,12 +1380,18 @@ impl Coordinator {
let desc = desc.into_inline_connection(self.catalog().state());
let item_global_id = self.catalog().get_entry(&item_id).latest_global_id();

let ingestion = mz_storage_types::sources::IngestionDescription::new(
let mut ingestion = mz_storage_types::sources::IngestionDescription::new(
desc,
cluster_id,
item_global_id,
);

if let ClusterVariant::Managed(managed) =
&self.catalog().get_cluster(cluster_id).config.variant
{
ingestion.enable_upsert_v2 = managed.enable_upsert_v2;
}

DataSource::Ingestion(ingestion)
}
DataSourceDesc::OldSyntaxIngestion {
Expand All @@ -1410,6 +1417,12 @@ impl Coordinator {
progress_subsource,
);

if let ClusterVariant::Managed(managed) =
&self.catalog().get_cluster(cluster_id).config.variant
{
ingestion.enable_upsert_v2 = managed.enable_upsert_v2;
}

let legacy_export = SourceExport {
storage_metadata: (),
data_config,
Expand Down
15 changes: 15 additions & 0 deletions src/adapter/src/coord/sequencer/inner/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Coordinator {
replication_factor: 1,
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
enable_upsert_v2: None,
});
}
}
Expand All @@ -193,6 +194,7 @@ impl Coordinator {
replication_factor,
optimizer_feature_overrides: _,
schedule,
enable_upsert_v2,
}) => {
match &options.size {
Set(s) => size.clone_from(s),
Expand Down Expand Up @@ -231,6 +233,11 @@ impl Coordinator {
Reset => *schedule = Default::default(),
Unchanged => {}
}
match &options.enable_upsert_v2 {
Set(v) => *enable_upsert_v2 = Some(*v),
Reset => *enable_upsert_v2 = None,
Unchanged => {}
}
if !matches!(options.replicas, Unchanged) {
coord_bail!("Cannot change REPLICAS of managed clusters");
}
Expand All @@ -251,6 +258,9 @@ impl Coordinator {
if !matches!(options.replication_factor, Unchanged) {
coord_bail!("Cannot change REPLICATION FACTOR of unmanaged clusters");
}
if !matches!(options.enable_upsert_v2, Unchanged) {
coord_bail!("Cannot change UPSERT V2 of unmanaged clusters");
}
}
}

Expand Down Expand Up @@ -617,6 +627,7 @@ impl Coordinator {
replication_factor: plan.replication_factor,
optimizer_feature_overrides: plan.optimizer_feature_overrides.clone(),
schedule: plan.schedule.clone(),
enable_upsert_v2: plan.enable_upsert_v2,
})
}
CreateClusterVariant::Unmanaged(_) => ClusterVariant::Unmanaged,
Expand Down Expand Up @@ -656,6 +667,7 @@ impl Coordinator {
size,
optimizer_feature_overrides: _,
schedule: _,
enable_upsert_v2: _,
}: CreateClusterManagedPlan,
cluster_id: ClusterId,
mut ops: Vec<catalog::Op>,
Expand Down Expand Up @@ -1026,6 +1038,7 @@ impl Coordinator {
replication_factor,
optimizer_feature_overrides: _,
schedule: _,
enable_upsert_v2: _,
}) = &cluster.config.variant
else {
panic!("expected existing managed cluster config");
Expand All @@ -1037,6 +1050,7 @@ impl Coordinator {
logging: new_logging,
optimizer_feature_overrides: _,
schedule: _,
enable_upsert_v2: _,
}) = &new_config.variant
else {
panic!("expected new managed cluster config");
Expand Down Expand Up @@ -1203,6 +1217,7 @@ impl Coordinator {
logging: _,
optimizer_feature_overrides: _,
schedule: _,
enable_upsert_v2: _,
}) = &mut new_config.variant
else {
panic!("expected new managed cluster config");
Expand Down
6 changes: 5 additions & 1 deletion src/catalog-protos/objects_hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.rs",
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
"md5": "ce88c2a8e5d0933963f2cfe9ae0080a2"
},
{
"name": "objects_v74.rs",
Expand Down Expand Up @@ -34,5 +34,9 @@
{
"name": "objects_v81.rs",
"md5": "42585baa1b4b5e1b6da4e361a35ec546"
},
{
"name": "objects_v82.rs",
"md5": "ce88c2a8e5d0933963f2cfe9ae0080a2"
}
]
3 changes: 2 additions & 1 deletion src/catalog-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ pub mod objects_v78;
pub mod objects_v79;
pub mod objects_v80;
pub mod objects_v81;
pub mod objects_v82;
pub mod serialization;

/// The current version of the `Catalog`.
///
/// We will initialize new `Catalog`s with this version, and migrate existing `Catalog`s to this
/// version. Whenever the `Catalog` changes, e.g. the types we serialize in the `Catalog`
/// change, we need to bump this version.
pub const CATALOG_VERSION: u64 = 81;
pub const CATALOG_VERSION: u64 = 82;

/// The minimum `Catalog` version number that we support migrating from.
///
Expand Down
2 changes: 2 additions & 0 deletions src/catalog-protos/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ pub struct ManagedCluster {
pub logging: ReplicaLogging,
pub optimizer_feature_overrides: Vec<OptimizerFeatureOverride>,
pub schedule: ClusterSchedule,
#[serde(default)]
pub enable_upsert_v2: Option<bool>,
}

#[derive(
Expand Down
Loading
Loading