Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 2 additions & 23 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mz_catalog::builtin::{
MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONTINUAL_TASKS,
MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
Expand Down Expand Up @@ -405,9 +405,7 @@ impl CatalogState {
id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
privileges, diff, None,
),
CatalogItem::Index(index) => {
self.pack_index_update(id, oid, name, owner_id, index, diff)
}
CatalogItem::Index(index) => self.pack_index_update(id, index, diff),
CatalogItem::Table(table) => {
let mut updates = self
.pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
Expand Down Expand Up @@ -1454,9 +1452,6 @@ impl CatalogState {
fn pack_index_update(
&self,
id: CatalogItemId,
oid: u32,
name: &str,
owner_id: &RoleId,
index: &Index,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
Expand All @@ -1478,22 +1473,6 @@ impl CatalogState {
.expect("key_parts is filled in during planning"),
_ => unreachable!(),
};
let on_item_id = self.get_entry_by_global_id(&index.on).id();

updates.push(BuiltinTableUpdate::row(
&*MZ_INDEXES,
Row::pack_slice(&[
Datum::String(&id.to_string()),
Datum::UInt32(oid),
Datum::String(name),
Datum::String(&on_item_id.to_string()),
Datum::String(&index.cluster_id.to_string()),
Datum::String(&owner_id.to_string()),
Datum::String(&index.create_sql),
Datum::String(&create_stmt.to_ast_string_redacted()),
]),
diff,
));

let on_entry = self.get_entry_by_global_id(&index.on);
let on_desc = on_entry
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,12 @@ static MIGRATIONS: LazyLock<Vec<MigrationStep>> = LazyLock::new(|| {
MZ_CATALOG_SCHEMA,
"mz_secrets",
),
MigrationStep::replacement(
"26.21.0-dev.0",
CatalogItemType::MaterializedView,
MZ_CATALOG_SCHEMA,
"mz_indexes",
),
]
});

Expand Down
140 changes: 99 additions & 41 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2495,47 +2495,105 @@ pub static MZ_COLUMNS: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});
pub static MZ_INDEXES: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
name: "mz_indexes",
schema: MZ_CATALOG_SCHEMA,
oid: oid::TABLE_MZ_INDEXES_OID,
desc: RelationDesc::builder()
.with_column("id", SqlScalarType::String.nullable(false))
.with_column("oid", SqlScalarType::Oid.nullable(false))
.with_column("name", SqlScalarType::String.nullable(false))
.with_column("on_id", SqlScalarType::String.nullable(false))
.with_column("cluster_id", SqlScalarType::String.nullable(false))
.with_column("owner_id", SqlScalarType::String.nullable(false))
.with_column("create_sql", SqlScalarType::String.nullable(false))
.with_column("redacted_create_sql", SqlScalarType::String.nullable(false))
.with_key(vec![0])
.with_key(vec![1])
.finish(),
column_comments: BTreeMap::from_iter([
("id", "Materialize's unique ID for the index."),
("oid", "A PostgreSQL-compatible OID for the index."),
("name", "The name of the index."),
(
"on_id",
"The ID of the relation on which the index is built.",
),
(
"cluster_id",
"The ID of the cluster in which the index is built.",
),
(
"owner_id",
"The role ID of the owner of the index. Corresponds to `mz_roles.id`.",
),
("create_sql", "The `CREATE` SQL statement for the index."),
(
"redacted_create_sql",
"The redacted `CREATE` SQL statement for the index.",
),
]),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],

pub static MZ_INDEXES: LazyLock<BuiltinMaterializedView> = LazyLock::new(|| {
BuiltinMaterializedView {
name: "mz_indexes",
schema: MZ_CATALOG_SCHEMA,
oid: oid::MV_MZ_INDEXES_OID,
desc: RelationDesc::builder()
.with_column("id", SqlScalarType::String.nullable(false))
.with_column("oid", SqlScalarType::Oid.nullable(false))
.with_column("name", SqlScalarType::String.nullable(false))
.with_column("on_id", SqlScalarType::String.nullable(false))
.with_column("cluster_id", SqlScalarType::String.nullable(false))
.with_column("owner_id", SqlScalarType::String.nullable(false))
.with_column("create_sql", SqlScalarType::String.nullable(false))
.with_column("redacted_create_sql", SqlScalarType::String.nullable(false))
.with_key(vec![0])
.with_key(vec![1])
.finish(),
column_comments: BTreeMap::from_iter([
("id", "Materialize's unique ID for the index."),
("oid", "A PostgreSQL-compatible OID for the index."),
("name", "The name of the index."),
(
"on_id",
"The ID of the relation on which the index is built.",
),
(
"cluster_id",
"The ID of the cluster in which the index is built.",
),
(
"owner_id",
"The role ID of the owner of the index. Corresponds to `mz_roles.id`.",
),
("create_sql", "The `CREATE` SQL statement for the index."),
(
"redacted_create_sql",
"The redacted `CREATE` SQL statement for the index.",
),
]),
sql: Box::leak(format!("
IN CLUSTER mz_catalog_server
WITH (
ASSERT NOT NULL id,
ASSERT NOT NULL oid,
ASSERT NOT NULL name,
ASSERT NOT NULL on_id,
ASSERT NOT NULL cluster_id,
ASSERT NOT NULL owner_id,
ASSERT NOT NULL create_sql,
ASSERT NOT NULL redacted_create_sql
) AS
WITH
user_indexes AS (
SELECT
mz_internal.parse_catalog_id(data->'key'->'gid') AS id,
(data->'value'->>'oid')::oid AS oid,
data->'value'->>'name' AS name,
mz_internal.parse_catalog_create_sql(data->'value'->'definition'->'V1'->>'create_sql')->>'on_id' AS on_id,
mz_internal.parse_catalog_create_sql(data->'value'->'definition'->'V1'->>'create_sql')->>'cluster_id' AS cluster_id,
mz_internal.parse_catalog_id(data->'value'->'owner_id') AS owner_id,
data->'value'->'definition'->'V1'->>'create_sql' AS create_sql,
mz_internal.redact_sql(data->'value'->'definition'->'V1'->>'create_sql') AS redacted_create_sql
FROM mz_internal.mz_catalog_raw
WHERE
data->>'kind' = 'Item' AND
mz_internal.parse_catalog_create_sql(data->'value'->'definition'->'V1'->>'create_sql')->>'type' = 'index'
),
builtin_mappings AS (
SELECT
data->'key'->>'schema_name' AS schema_name,
data->'key'->>'object_name' AS name,
's' || (data->'value'->>'catalog_id') AS id
FROM mz_internal.mz_catalog_raw
WHERE data->>'kind' = 'GidMapping'
),
builtin_indexes AS (
SELECT
m.id,
i.oid,
i.name,
onm.id AS on_id,
c.id AS cluster_id,
'{MZ_SYSTEM_ROLE_ID}' AS owner_id,
i.create_sql,
mz_internal.redact_sql(i.create_sql) AS redacted_create_sql
FROM mz_internal.mz_builtin_indexes i
JOIN builtin_mappings m USING (schema_name, name)
JOIN builtin_mappings onm ON onm.schema_name = i.on_schema_name AND onm.name = i.on_name
JOIN mz_clusters c ON c.name = i.cluster_name
)
SELECT * FROM user_indexes
UNION ALL
SELECT * FROM builtin_indexes").into_boxed_str()),
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
}
});

pub static MZ_INDEX_COLUMNS: LazyLock<BuiltinTable> = LazyLock::new(|| BuiltinTable {
name: "mz_index_columns",
schema: MZ_CATALOG_SCHEMA,
Expand Down Expand Up @@ -14376,7 +14434,6 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::MaterializedView(&MZ_DATABASES),
Builtin::MaterializedView(&MZ_SCHEMAS),
Builtin::Table(&MZ_COLUMNS),
Builtin::Table(&MZ_INDEXES),
Builtin::Table(&MZ_INDEX_COLUMNS),
Builtin::Table(&MZ_TABLES),
Builtin::Table(&MZ_SOURCES),
Expand Down Expand Up @@ -14428,6 +14485,7 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::Table(&MZ_COMMENTS),
Builtin::Table(&MZ_WEBHOOKS_SOURCES),
Builtin::Table(&MZ_HISTORY_RETENTION_STRATEGIES),
Builtin::MaterializedView(&MZ_INDEXES),
Builtin::MaterializedView(&MZ_MATERIALIZED_VIEWS),
Builtin::Table(&MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES),
Builtin::Table(&MZ_CONTINUAL_TASKS),
Expand Down
71 changes: 68 additions & 3 deletions src/catalog/src/builtin/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,97 @@ use mz_pgrepr::oid;
use mz_repr::adt::mz_acl_item::MzAclItem;
use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
use mz_repr::{RelationDesc, SqlScalarType};
use mz_sql::ast::Statement;
use mz_sql::ast::display::{AstDisplay, escaped_string_literal};
use mz_sql::ast::{RawItemName, Statement};
use mz_sql::catalog::{NameReference, ObjectType};
use mz_sql::rbac;
use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;

use crate::builtin::{Builtin, BuiltinMaterializedView, BuiltinView, PUBLIC_SELECT};
use crate::builtin::{Builtin, BuiltinIndex, BuiltinMaterializedView, BuiltinView, PUBLIC_SELECT};

/// Generate builtin views reporting the given builtins.
///
/// Used in the [`super::BUILTINS_STATIC`] initializer.
pub(super) fn builtins(
builtin_items: &[Builtin<NameReference>],
) -> impl Iterator<Item = Builtin<NameReference>> {
let idx_iter = builtin_items.iter().filter_map(|b| match b {
Builtin::Index(x) => Some(*x),
_ => None,
});
let mv_iter = builtin_items.iter().filter_map(|b| match b {
Builtin::MaterializedView(x) => Some(*x),
_ => None,
});

let indexes = make_builtin_indexes(idx_iter);
let materialized_views = make_builtin_materialized_views(mv_iter);

[materialized_views].into_iter().map(|v| {
[indexes, materialized_views].into_iter().map(|v| {
let static_ref = Box::leak(Box::new(v));
Builtin::View(static_ref)
})
}

fn make_builtin_indexes(iter: impl Iterator<Item = &'static BuiltinIndex>) -> BuiltinView {
let values = iter
.map(|idx| {
let create_sql_str = idx.create_sql();
let stmt = mz_sql::parse::parse(&create_sql_str)
.expect("valid sql")
.into_element()
.ast;
let Statement::CreateIndex(stmt) = stmt else {
panic!("invalid builtin index SQL");
};

let create_sql = stmt.to_ast_string_stable();
let create_sql = escaped_string_literal(&create_sql);

let cluster_name = stmt.in_cluster.expect("builtin index has cluster");

let RawItemName::Name(on_name) = stmt.on_name else {
panic!("builtin index SQL must have unresolved ON name");
};
let Ok([on_schema_name, on_name]) = <[_; 2]>::try_from(on_name.0) else {
panic!("builtin index ON name must be schema-qualified");
};

format!(
"({}::oid, '{}', '{}', '{}', '{}', '{}', {})",
idx.oid, idx.schema, idx.name, on_schema_name, on_name, cluster_name, create_sql
)
})
.join(",");
let sql = format!(
"
SELECT oid, schema_name, name, on_schema_name, on_name, cluster_name, create_sql
FROM (VALUES {values}) AS v(oid, schema_name, name, on_schema_name, on_name, cluster_name, create_sql)"
);

BuiltinView {
name: "mz_builtin_indexes",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_BUILTIN_INDEXES_OID,
desc: RelationDesc::builder()
.with_column("oid", SqlScalarType::Oid.nullable(false))
.with_column("schema_name", SqlScalarType::String.nullable(false))
.with_column("name", SqlScalarType::String.nullable(false))
.with_column("on_schema_name", SqlScalarType::String.nullable(false))
.with_column("on_name", SqlScalarType::String.nullable(false))
.with_column("cluster_name", SqlScalarType::String.nullable(false))
.with_column("create_sql", SqlScalarType::String.nullable(false))
.with_key(vec![0])
.with_key(vec![2])
.with_key(vec![4])
.with_key(vec![6])
.finish(),
column_comments: Default::default(),
sql: Box::leak(sql.into_boxed_str()),
access: vec![PUBLIC_SELECT],
}
}

fn make_builtin_materialized_views<'a>(
iter: impl Iterator<Item = &'a BuiltinMaterializedView>,
) -> BuiltinView {
Expand Down
35 changes: 26 additions & 9 deletions src/expr/src/scalar/func/impls/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
use mz_repr::role_id::RoleId;
use mz_repr::{ArrayRustType, Datum, Row, RowPacker, SqlColumnType, SqlScalarType, strconv};
use mz_sql_parser::ast::RawClusterName;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{RawClusterName, RawItemName};
use serde::{Deserialize, Serialize};
use serde_json::json;

Expand Down Expand Up @@ -368,6 +368,21 @@ fn parse_catalog_privileges<'a>(a: JsonbRef<'a>) -> Result<ArrayRustType<MzAclIt
// Consider moving all the `parse_catalog_*` functions into their own module.
#[sqlfunc]
fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
fn get_cluster_id(in_cluster: Option<RawClusterName>) -> Result<String, &'static str> {
match in_cluster {
Some(RawClusterName::Resolved(s)) => Ok(s),
Some(RawClusterName::Unresolved(_)) => Err("unresolved IN CLUSTER"),
None => Err("missing IN CLUSTER"),
}
}

fn get_item_id(item: RawItemName) -> Result<String, &'static str> {
match item {
RawItemName::Id(id, _, _) => Ok(id),
RawItemName::Name(_) => Err("unresolved item name"),
}
}

let parse = || -> Result<serde_json::Value, String> {
let mut stmts = mz_sql_parser::parser::parse_statements(a)
.map_err(|e| format!("failed to parse create_sql: {e}"))?;
Expand All @@ -389,13 +404,7 @@ fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
}
CreateView(_) => "view",
CreateMaterializedView(stmt) => {
let Some(in_cluster) = stmt.in_cluster else {
return Err("missing IN CLUSTER".into());
};
let cluster_id = match in_cluster {
RawClusterName::Unresolved(ident) => ident.into_string(),
RawClusterName::Resolved(s) => s,
};
let cluster_id = get_cluster_id(stmt.in_cluster)?;
info.insert("cluster_id", json!(cluster_id));

let mut definition = stmt.query.to_ast_string_stable();
Expand All @@ -409,7 +418,15 @@ fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
CreateSource(_) | CreateWebhookSource(_) => "source",
CreateSubsource(_) => "subsource",
CreateSink(_) => "sink",
CreateIndex(_) => "index",
CreateIndex(stmt) => {
let cluster_id = get_cluster_id(stmt.in_cluster)?;
info.insert("cluster_id", json!(cluster_id));

let on_id = get_item_id(stmt.on_name)?;
info.insert("on_id", json!(on_id));

"index"
}
CreateType(_) => "type",
_ => return Err("not a CREATE item statement".into()),
};
Expand Down
Loading
Loading