Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use mz_license_keys::ValidatedLicenseKey;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
use mz_ore::result::ResultExt as _;
use mz_ore::soft_panic_or_log;
use mz_persist_client::PersistClient;
use mz_repr::adt::mz_acl_item::{AclMode, PrivilegeMap};
use mz_repr::explain::ExprHumanizer;
Expand Down Expand Up @@ -1311,32 +1310,28 @@ impl Catalog {
.deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
}

/// Cache global and, optionally, local expressions for the given `GlobalId`.
/// Cache global and, optionally, local expressions for the given
/// `GlobalId`.
///
/// This takes the required plans and metainfo from the catalog and expects that they were
/// previously stored via [`Catalog::set_optimized_plan`], [`Catalog::set_physical_plan`], and
/// [`Catalog::set_dataflow_metainfo`].
/// Takes the plans and metainfo directly as parameters (rather than
/// fishing them out of catalog state), so this can be called **before**
/// the catalog transaction that creates the item. Returns the future
/// returned by [`Catalog::update_expression_cache`]; callers should
/// `.await` it before the catalog transaction commits, so the durable
/// expression cache is observed to contain the entries by the time any
/// other process (or a subsequent bootstrap on this process) reads them.
pub(crate) fn cache_expressions(
&self,
id: GlobalId,
local_mir: Option<OptimizedMirRelationExpr>,
mut global_mir: DataflowDescription<OptimizedMirRelationExpr>,
mut physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
optimizer_features: OptimizerFeatures,
) {
let Some(mut global_mir) = self.try_get_optimized_plan(&id).cloned() else {
soft_panic_or_log!("optimized plan missing for ID {id}");
return;
};
let Some(mut physical_plan) = self.try_get_physical_plan(&id).cloned() else {
soft_panic_or_log!("physical plan missing for ID {id}");
return;
};
let Some(dataflow_metainfos) = self.try_get_dataflow_metainfo(&id).cloned() else {
soft_panic_or_log!("dataflow metainfo missing for ID {id}");
return;
};

// Make sure we're not caching the result of timestamp selection, as it will almost
// certainly be wrong if we re-install the dataflow at a later time.
) -> BoxFuture<'static, ()> {
// Make sure we're not caching the result of timestamp selection, as
// it will almost certainly be wrong if we re-install the dataflow at
// a later time.
global_mir.as_of = None;
global_mir.until = Default::default();
physical_plan.as_of = None;
Expand All @@ -1361,7 +1356,7 @@ impl Catalog {
optimizer_features,
},
)];
let _fut = self.update_expression_cache(local_exprs, global_exprs, Default::default());
self.update_expression_cache(local_exprs, global_exprs, Default::default())
}

pub(crate) fn update_expression_cache<'a, 'b>(
Expand Down
67 changes: 49 additions & 18 deletions src/adapter/src/catalog/builtin_table_updates/notice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::sync::Arc;

use mz_catalog::builtin::BuiltinTable;
use mz_catalog::builtin::notice::MZ_OPTIMIZER_NOTICES;
use mz_ore::now::EpochMillis;
use mz_repr::explain::ExprHumanizer;
use mz_repr::{Datum, Diff, GlobalId, Row};
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::{
Expand All @@ -23,11 +25,44 @@ use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState};
impl Catalog {
/// Transform the [`DataflowMetainfo`] by rendering an [`OptimizerNotice`]
/// for each [`RawOptimizerNotice`].
///
/// Thin adapter over [`CatalogState::render_notices_core`]: uses a
/// system-session [`ExprHumanizer`] and the catalog's `now` clock.
pub fn render_notices(
&self,
df_meta: DataflowMetainfo<RawOptimizerNotice>,
notice_ids: Vec<GlobalId>,
item_id: Option<GlobalId>,
) -> DataflowMetainfo<Arc<OptimizerNotice>> {
// These notices will be persisted in a system table, so should not be
// relative to any user's session.
let conn_catalog = self.for_system_session();
CatalogState::render_notices_core(
&conn_catalog,
(self.config().now)(),
&df_meta,
notice_ids,
item_id,
)
}
}

impl CatalogState {
/// Render the raw optimizer notices in `df_meta` into fully-formatted
/// [`OptimizerNotice`]s, using the given `humanizer` to resolve object
/// names and `now` as the `created_at` timestamp for every rendered
/// notice.
///
/// This is the humanizer-agnostic core of [`Catalog::render_notices`]; it
/// can be called before the new item is in the catalog by wrapping a
/// base humanizer with an [`mz_repr::explain::ExprHumanizerExt`] that
/// knows about the to-be-created item.
pub fn render_notices_core(
humanizer: &dyn ExprHumanizer,
now: EpochMillis,
df_meta: &DataflowMetainfo<RawOptimizerNotice>,
notice_ids: Vec<GlobalId>,
item_id: Option<GlobalId>,
) -> DataflowMetainfo<Arc<OptimizerNotice>> {
// The caller should supply a pre-allocated GlobalId for each notice.
assert_eq!(notice_ids.len(), df_meta.optimizer_notices.len());
Expand All @@ -37,35 +72,31 @@ impl Catalog {
if &x != y { Some(x) } else { None }
}

// These notices will be persisted in a system table, so should not be
// relative to any user's session.
let conn_catalog = self.for_system_session();

let optimizer_notices = std::iter::zip(df_meta.optimizer_notices, notice_ids)
let optimizer_notices = std::iter::zip(&df_meta.optimizer_notices, notice_ids)
.map(|(notice, id)| {
// Render non-redacted fields.
let message = notice.message(&conn_catalog, false).to_string();
let hint = notice.hint(&conn_catalog, false).to_string();
let action = match notice.action_kind(&conn_catalog) {
let message = notice.message(humanizer, false).to_string();
let hint = notice.hint(humanizer, false).to_string();
let action = match notice.action_kind(humanizer) {
ActionKind::SqlStatements => {
Action::SqlStatements(notice.action(&conn_catalog, false).to_string())
Action::SqlStatements(notice.action(humanizer, false).to_string())
}
ActionKind::PlainText => {
Action::PlainText(notice.action(&conn_catalog, false).to_string())
Action::PlainText(notice.action(humanizer, false).to_string())
}
ActionKind::None => {
Action::None // No concrete action.
}
};
// Render redacted fields.
let message_redacted = notice.message(&conn_catalog, true).to_string();
let hint_redacted = notice.hint(&conn_catalog, true).to_string();
let action_redacted = match notice.action_kind(&conn_catalog) {
let message_redacted = notice.message(humanizer, true).to_string();
let hint_redacted = notice.hint(humanizer, true).to_string();
let action_redacted = match notice.action_kind(humanizer) {
ActionKind::SqlStatements => {
Action::SqlStatements(notice.action(&conn_catalog, true).to_string())
Action::SqlStatements(notice.action(humanizer, true).to_string())
}
ActionKind::PlainText => {
Action::PlainText(notice.action(&conn_catalog, true).to_string())
Action::PlainText(notice.action(humanizer, true).to_string())
}
ActionKind::None => {
Action::None // No concrete action.
Expand All @@ -74,7 +105,7 @@ impl Catalog {
// Assemble the rendered notice.
OptimizerNotice {
id,
kind: OptimizerNoticeKind::from(&notice),
kind: OptimizerNoticeKind::from(notice),
item_id,
dependencies: notice.dependencies(),
message_redacted: some_if_neq(message_redacted, &message),
Expand All @@ -83,15 +114,15 @@ impl Catalog {
message,
hint,
action,
created_at: (self.config().now)(),
created_at: now,
}
})
.map(From::from) // Wrap each notice into an `Arc`.
.collect();

DataflowMetainfo {
optimizer_notices,
index_usage_types: df_meta.index_usage_types,
index_usage_types: df_meta.index_usage_types.clone(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ where
pub(crate) fn emit_optimizer_notices(
catalog: &Catalog,
session: &Session,
notices: &Vec<RawOptimizerNotice>,
notices: &[RawOptimizerNotice],
) {
// `for_session` below is expensive, so return early if there's nothing to do.
if notices.is_empty() {
Expand Down
42 changes: 27 additions & 15 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ use mz_storage_types::AlterCompatible;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::controller::StorageError;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::{OptimizerNotice, RawOptimizerNotice};
use smallvec::SmallVec;
use timely::progress::Antichain;
use tokio::sync::{oneshot, watch};
Expand Down Expand Up @@ -4778,24 +4779,35 @@ impl Coordinator {
}

impl Coordinator {
/// Process the metainfo from a newly created non-transient dataflow.
async fn process_dataflow_metainfo(
/// Emit the raw optimizer notices in `notices` to the user's session, if
/// any.
///
/// This intentionally consumes `RawOptimizerNotice`s (not pre-rendered
/// ones) because the user-facing rendering goes through the user's
/// session-aware humanizer, which produces e.g. schema-qualified names
/// relative to the user's current database/schema.
pub(crate) fn emit_raw_optimizer_notices_to_user(
&self,
ctx: &ExecuteContext,
notices: &[RawOptimizerNotice],
) {
emit_optimizer_notices(&*self.catalog, ctx.session(), notices);
}

/// Persist already-rendered optimizer notices for a newly created
/// non-transient dataflow.
///
/// This:
/// - packs builtin-table updates for `mz_optimizer_notices` (if enabled),
/// - stores the rendered metainfo on the catalog object via
/// `set_dataflow_metainfo`,
/// - and returns a future that resolves once the builtin-table append
/// has been observed, or `None` if nothing was appended.
async fn persist_dataflow_metainfo(
&mut self,
df_meta: DataflowMetainfo,
df_meta: DataflowMetainfo<Arc<OptimizerNotice>>,
export_id: GlobalId,
ctx: Option<&mut ExecuteContext>,
notice_ids: Vec<GlobalId>,
) -> Option<BuiltinTableAppendNotify> {
// Emit raw notices to the user.
if let Some(ctx) = ctx {
emit_optimizer_notices(&*self.catalog, ctx.session(), &df_meta.optimizer_notices);
}

// Create a metainfo with rendered notices.
let df_meta = self
.catalog()
.render_notices(df_meta, notice_ids, Some(export_id));

// Attend to optimization notice builtin tables and save the metainfo in the catalog's
// in-memory state.
if self.catalog().state().system_config().enable_mz_notices()
Expand Down
79 changes: 67 additions & 12 deletions src/adapter/src/coord/sequencer/inner/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use mz_sql::names::ResolvedIds;
use mz_sql::plan;
use tracing::Span;

use crate::catalog::CatalogState;
use crate::command::ExecuteResponse;
use crate::coord::sequencer::inner::return_if_err;
use crate::coord::{
Expand Down Expand Up @@ -442,6 +443,9 @@ impl Coordinator {
} = stage;
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);

let on_entry = self.catalog().get_entry_by_global_id(&on);
let owner_id = *on_entry.owner_id();

let ops = vec![catalog::Op::CreateItem {
id: item_id,
name: name.clone(),
Expand All @@ -459,7 +463,7 @@ impl Coordinator {
physical_plan: None,
dataflow_metainfo: None,
}),
owner_id: *self.catalog().get_entry_by_global_id(&on).owner_id(),
owner_id,
}];

// Pre-allocate a vector of transient GlobalIds for each notice.
Expand All @@ -468,11 +472,60 @@ impl Coordinator {
.take(global_lir_plan.df_meta().optimizer_notices.len())
.collect::<Vec<_>>();

// Render optimizer notices before the catalog transaction, using an
// `ExprHumanizerExt` that knows about the to-be-created index. This
// way the notice text produced here (and persisted in
// `mz_optimizer_notices`) resolves the new index's own `global_id` to
// its intended human-readable name, rather than to the bare transient
// id that `for_system_session()` would produce on its own.
//
// We keep `raw_df_meta` live so that on success we can emit its raw
// notices to the user session (rendered against the user's
// session-aware humanizer). We deliberately do NOT emit to the user
// here, so that if the catalog transaction below fails the user
// isn't shown confusing notices about an item that wasn't actually
// created.
let (mut df_desc, raw_df_meta) = global_lir_plan.unapply();
let df_meta = {
let system_catalog = self.catalog().for_system_session();
let full_name = self.catalog().resolve_full_name(&name, None);
let on_desc = on_entry
.relation_desc()
.expect("can only create indexes on items with a valid description");
let transient_items = btreemap! {
global_id => TransientItem::new(
Some(full_name.into_parts()),
Some(on_desc.iter_names().map(|c| c.to_string()).collect()),
)
};
let humanizer = ExprHumanizerExt::new(transient_items, &system_catalog);
CatalogState::render_notices_core(
&humanizer,
(self.catalog().config().now)(),
&raw_df_meta,
notice_ids,
Some(global_id),
)
};

// Populate the durable expression cache before the catalog
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the transaction fails? My assumption is that it ok to cache the expression of failed transactions... just confirming

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, how do we handle notices if the transaction doesn't succeed? Will this state just remain around forever?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good questions!

Expression cache on failed transaction: Yes, harmless. The cache entry is keyed by (build_version, GlobalId, expr_type). If the transaction fails, that GlobalId never lands in the catalog, so the entry is never looked up — get_global_expressions (in the follow-up PR) only queries GlobalIds from Op::CreateItem ops that are actually being committed. The orphaned entry sits inert in the persist shard until the next version bump cleans it up via remove_prior_versions.

Notices on failed transaction: This is actually the exact bug this PR fixes! Pre-fix, emit_optimizer_notices ran before the catalog transaction, so the user saw spurious notices (e.g. "identical index already exists") even when IF NOT EXISTS hit a name collision and the item wasn't created. Post-fix:

  • Raw notices to the user (emit_raw_optimizer_notices_to_user) are emitted only in the Ok(_) arm after the transaction succeeds.
  • Rendered notices persisted to mz_optimizer_notices (via persist_dataflow_metainfo) only happen inside the side-effect closure, which only runs on success.
  • Rendered notices in the expression cache are only consumed by parse_item during apply_updates for committed transactions, so also inert on failure.

The new notice.pt test at the bottom of the PR pins this invariant for both CREATE INDEX IF NOT EXISTS and CREATE MATERIALIZED VIEW IF NOT EXISTS.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah lol, Claude just went ahead and posted this when I asked it to draft a reply :D

Copy link
Copy Markdown
Contributor Author

@ggevay ggevay May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yeah, its answer is mostly right: The important thing is that this is only a small amount of orphaned data, and it gets cleaned up on the next version upgrade, because the expression cache is scoped to a version.

// transaction and await the write. This way any other envd (or a
// subsequent bootstrap here) will observe the cached plans +
// rendered notices as soon as the item becomes visible.
self.catalog()
.cache_expressions(
global_id,
None,
global_mir_plan.df_desc().clone(),
df_desc.clone(),
df_meta.clone(),
optimizer_features,
)
.await;

let transact_result = self
.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, ctx| {
.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
Box::pin(async move {
let (mut df_desc, df_meta) = global_lir_plan.unapply();

// Save plan structures.
coord
.catalog_mut()
Expand All @@ -481,13 +534,8 @@ impl Coordinator {
.catalog_mut()
.set_physical_plan(global_id, df_desc.clone());

let notice_builtin_updates_fut = coord
.process_dataflow_metainfo(df_meta, global_id, ctx, notice_ids)
.await;

coord
.catalog()
.cache_expressions(global_id, None, optimizer_features);
let notice_builtin_updates_fut =
coord.persist_dataflow_metainfo(df_meta, global_id).await;

// We're putting in place read holds, such that ship_dataflow,
// below, which calls update_read_capabilities, can successfully
Expand Down Expand Up @@ -524,7 +572,14 @@ impl Coordinator {
.await;

match transact_result {
Ok(_) => Ok(StageResult::Response(ExecuteResponse::CreatedIndex)),
Ok(_) => {
// Only emit optimizer notices to the user now that the
// catalog transaction has succeeded. If the transaction had
// failed, emitting notices would confuse the user with
// information about an item that wasn't actually created.
self.emit_raw_optimizer_notices_to_user(ctx, &raw_df_meta.optimizer_notices);
Ok(StageResult::Response(ExecuteResponse::CreatedIndex))
}
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
})) if if_not_exists => {
Expand Down
Loading
Loading