Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/user/content/reference/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_activity_log_thinned -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_builtin_materialized_views -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_builtin_sources -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_catalog_raw -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_cluster_workload_classes -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_compute_error_counts_raw_unified -->
Expand Down
113 changes: 6 additions & 107 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use mz_catalog::builtin::{
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
Expand Down Expand Up @@ -401,10 +401,6 @@ impl CatalogState {
let privileges_row = self.pack_privilege_array_row(entry.privileges());
let privileges = privileges_row.unpack_first();
let mut updates = match entry.item() {
CatalogItem::Log(_) => self.pack_source_update(
id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
privileges, diff, None,
),
CatalogItem::Index(index) => {
self.pack_index_update(id, oid, name, owner_id, index, diff)
}
Expand Down Expand Up @@ -509,45 +505,7 @@ impl CatalogState {
updates
}
CatalogItem::Source(source) => {
let source_type = source.source_type();
let connection_id = source.connection_id();
let envelope = source.data_source.envelope();
let cluster_entry = match source.data_source {
// Ingestion exports don't have their own cluster, but
// run on their ingestion's cluster.
DataSourceDesc::IngestionExport { ingestion_id, .. } => {
self.get_entry(&ingestion_id)
}
DataSourceDesc::Ingestion { .. }
| DataSourceDesc::OldSyntaxIngestion { .. }
| DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Catalog => entry,
};

let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());

let (key_format, value_format) = source.data_source.formats();

let mut updates = self.pack_source_update(
id,
oid,
schema_id,
name,
source_type,
connection_id,
envelope,
key_format,
value_format,
cluster_id.as_deref(),
owner_id,
privileges,
diff,
source.create_sql.as_ref(),
);

updates.extend(match &source.data_source {
match &source.data_source {
DataSourceDesc::Ingestion { desc, .. }
| DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
GenericSourceConnection::Postgres(postgres) => {
Expand Down Expand Up @@ -628,9 +586,7 @@ impl CatalogState {
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Catalog => vec![],
});

updates
}
}
CatalogItem::View(view) => {
self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
Expand All @@ -647,13 +603,13 @@ impl CatalogState {
CatalogItem::Func(func) => {
self.pack_func_update(id, schema_id, name, owner_id, func, diff)
}
CatalogItem::Secret(_) => vec![],
CatalogItem::Connection(connection) => {
self.pack_connection_update(id, connection, diff)
}
CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
id, oid, schema_id, name, owner_id, privileges, ct, diff,
),
CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
};

if !entry.item().is_temporary() {
Expand Down Expand Up @@ -834,63 +790,6 @@ impl CatalogState {
)]
}

fn pack_source_update(
&self,
id: CatalogItemId,
oid: u32,
schema_id: &SchemaSpecifier,
name: &str,
source_desc_name: &str,
connection_id: Option<CatalogItemId>,
envelope: Option<&str>,
key_format: Option<&str>,
value_format: Option<&str>,
cluster_id: Option<&str>,
owner_id: &RoleId,
privileges: Datum,
diff: Diff,
create_sql: Option<&String>,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let redacted = create_sql.map(|create_sql| {
let create_stmt = mz_sql::parse::parse(create_sql)
.unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
.into_element()
.ast;
create_stmt.to_ast_string_redacted()
});
vec![BuiltinTableUpdate::row(
&*MZ_SOURCES,
Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(&schema_id.to_string()),
Datum::String(name),
Datum::String(source_desc_name),
Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
// This is the "source size", which is a remnant from linked
// clusters.
Datum::Null,
Datum::from(envelope),
Datum::from(key_format),
Datum::from(value_format),
Datum::from(cluster_id),
Datum::String(&owner_id.to_string()),
privileges,
if let Some(create_sql) = create_sql {
Datum::String(create_sql)
} else {
Datum::Null
},
if let Some(redacted) = &redacted {
Datum::String(redacted)
} else {
Datum::Null
},
]),
diff,
)]
}

fn pack_postgres_source_update(
&self,
id: CatalogItemId,
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
MZ_CATALOG_SCHEMA,
"mz_secrets",
),
MigrationStep::replacement(
"26.21.0-dev.0",
CatalogItemType::MaterializedView,
MZ_CATALOG_SCHEMA,
"mz_sources",
),
]
});

Expand Down
Loading
Loading