Skip to content
Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
- Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294))
- Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336))
- Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339))
- Support `SCHEDULE EVERY` and `TRIGGER ON UPDATE` refresh modes for materialized views and streaming tables, with parser and diff coverage so relations whose actual refresh is not CRON no longer crash on subsequent runs ([#1293](https://github.com/databricks/dbt-databricks/issues/1293))

### Fixes

- Fix `RefreshConfig.__eq__` self/other typo where two configs with the same `cron` but different `time_zone_value` compared equal
- Fix streaming-table DROP-SCHEDULE path that was silently filtered out of the changeset

### Fixes

Expand Down
232 changes: 191 additions & 41 deletions dbt/adapters/databricks/relation_configs/refresh.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,183 @@
import re
from enum import Enum
from typing import Any, ClassVar, Optional

from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs.config_base import RelationResults
from dbt_common.exceptions import DbtRuntimeError
from pydantic import model_validator

from dbt.adapters.databricks.relation_configs import base
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksComponentProcessor,
)

SCHEDULE_REGEX = re.compile(r"CRON '(.*)' AT TIME ZONE '(.*)'")

class RefreshMode(str, Enum):
MANUAL = "manual"
CRON = "cron"
EVERY = "every"
ON_UPDATE = "on_update"


CRON_REGEX = re.compile(r"^CRON '(.*)' AT TIME ZONE '(.*)'$")
Comment thread
sd-db marked this conversation as resolved.
EVERY_REGEX = re.compile(r"^EVERY (\d+) (HOURS?|DAYS?|WEEKS?)$", re.IGNORECASE)
TRIGGER_REGEX = re.compile(
r"^TRIGGER ON UPDATE(?: AT MOST EVERY INTERVAL (\d+) SECONDS?)?$",
Comment thread
sd-db marked this conversation as resolved.
re.IGNORECASE,
)

_QUANTITY_RE = re.compile(r"^\s*(\d+)\s+([A-Z]+)\s*$", re.IGNORECASE)
_SECONDS_PER_UNIT = {
"SECOND": 1,
"MINUTE": 60,
"HOUR": 3600,
"DAY": 86400,
"WEEK": 604800,
}
_EVERY_UNITS = {"HOUR", "DAY", "WEEK"}
# Databricks treats an absent time zone as UTC and emits 'Etc/UTC' back in DESCRIBE EXTENDED;
# these all canonicalize to the same UTC for equality.
_UTC_ALIASES = {"UTC", "ETC/UTC"}


def _canonical_tz(tz: Optional[str]) -> str:
s = (tz or "UTC").upper()
return "UTC" if s in _UTC_ALIASES else s


def _parse_quantity(value: str) -> tuple[int, str]:
"""Parse '<n> <unit>' (case-insensitive, singular or plural) into (n, singular_unit)."""
match = _QUANTITY_RE.match(value)
if not match:
raise DbtRuntimeError(f"Cannot parse interval {value!r}; expected '<integer> <unit>'.")
n, unit = int(match.group(1)), match.group(2).upper()
singular = unit[:-1] if unit.endswith("S") else unit
return n, singular


def _interval_seconds(value: str) -> int:
n, singular = _parse_quantity(value)
if singular not in _SECONDS_PER_UNIT:
raise DbtRuntimeError(
f"Unknown interval unit in {value!r};"
f" supported: SECOND, MINUTE, HOUR, DAY, WEEK (singular or plural)."
)
return n * _SECONDS_PER_UNIT[singular]


def _every_canonical(value: str) -> tuple[int, str]:
"""Return (n, plural_unit) for an EVERY clause, e.g. '1 DAY' -> (1, 'DAYS')."""
n, singular = _parse_quantity(value)
if singular not in _EVERY_UNITS:
raise DbtRuntimeError(
f"Cannot parse `every` value {value!r}; expected '<integer> {{HOURS|DAYS|WEEKS}}'."
)
return n, singular + "S"


class RefreshConfig(DatabricksComponentConfig):
"""Component encapsulating the refresh schedule of a relation."""
"""Component encapsulating the refresh schedule of a relation.

The mode is derived from which discriminator field is set:
- MANUAL - no fields set
- CRON - `cron` set, optional `time_zone_value`
- EVERY - `every` set, e.g. "2 HOURS"
- ON_UPDATE - `on_update=True`, optional `at_most_every` (e.g. "15 MINUTES")
"""

cron: Optional[str] = None
time_zone_value: Optional[str] = None
every: Optional[str] = None
on_update: bool = False
at_most_every: Optional[str] = None

# Property indicating whether the schedule change should be accomplished by an ADD SCHEDULE
# vs an ALTER SCHEDULE. This is only True when modifying an existing schedule, rather than
# switching from manual refresh to scheduled or vice versa.
# Render-time hint for the alter macro: True when both old and new states are scheduled
# (emit ALTER); False for ADD or DROP. Excluded from __eq__ / __hash__ so it doesn't
# affect identity.
is_altered: bool = False

@model_validator(mode="after")
def _validate_mode_fields(self) -> "RefreshConfig":
modes_set = [name for name, value in self._mode_signals() if value]
if len(modes_set) > 1:
raise DbtRuntimeError(
f"Refresh schedule must specify at most one of cron / every / on_update;"
f" got {modes_set}."
)
if self.time_zone_value is not None and self.cron is None:
Comment thread
sd-db marked this conversation as resolved.
raise DbtRuntimeError("`time_zone_value` is only valid when `cron` is set.")
if self.at_most_every is not None:
if not self.on_update:
raise DbtRuntimeError("`at_most_every` is only valid when `on_update` is True.")
seconds = _interval_seconds(self.at_most_every)
if seconds < 60:
raise DbtRuntimeError(
f"`at_most_every` must be at least 60 seconds (1 minute);"
f" got {self.at_most_every!r} ({seconds}s)."
)
return self

def _mode_signals(self) -> tuple[tuple[str, Any], ...]:
return (
("cron", self.cron),
("every", self.every),
("on_update", self.on_update),
)

@property
def mode(self) -> RefreshMode:
if self.cron is not None:
return RefreshMode.CRON
if self.every is not None:
return RefreshMode.EVERY
if self.on_update:
return RefreshMode.ON_UPDATE
return RefreshMode.MANUAL

@property
def auto_refreshed(self) -> bool:
"""True for modes where Databricks auto-manages refresh and a manual REFRESH is a no-op."""
return self.mode in (RefreshMode.EVERY, RefreshMode.ON_UPDATE)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, RefreshConfig):
if not isinstance(other, RefreshConfig) or self.mode != other.mode:
return False
if self.mode == RefreshMode.MANUAL:
return True
if self.mode == RefreshMode.CRON:
# Server fills 'Etc/UTC' when no time zone is given, so None/UTC/Etc/UTC all match.
return self.cron == other.cron and _canonical_tz(self.time_zone_value) == _canonical_tz(
other.time_zone_value
)
if self.mode == RefreshMode.EVERY:
assert self.every is not None and other.every is not None
return _every_canonical(self.every) == _every_canonical(other.every)
if (self.at_most_every is None) != (other.at_most_every is None):
return False
return self.cron == other.cron and (
if self.at_most_every is None:
return True
assert other.at_most_every is not None
return _interval_seconds(self.at_most_every) == _interval_seconds(other.at_most_every)

def __hash__(self) -> int:
return hash(
(
self.time_zone_value is None
and other.time_zone_value
and "utc" in other.time_zone_value.lower()
self.cron,
_canonical_tz(self.time_zone_value),
self.every,
self.on_update,
self.at_most_every,
)
or (other.time_zone_value == other.time_zone_value)
)

def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]:
if self != other:
return RefreshConfig(
cron=self.cron,
time_zone_value=self.time_zone_value,
is_altered=self.cron is not None and other.cron is not None,
)
return None
if self == other:
return None
is_altered = self.mode != RefreshMode.MANUAL and other.mode != RefreshMode.MANUAL
# model_construct skips re-validation; only is_altered changes, other fields stay valid.
return self.model_construct(**{**self.model_dump(), "is_altered": is_altered})


class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
Expand All @@ -54,35 +187,52 @@ class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]):
def from_relation_results(cls, results: RelationResults) -> RefreshConfig:
table = results["describe_extended"]
for row in table.rows:
if row[0] == "Refresh Schedule":
if row[1] == "MANUAL":
return RefreshConfig()

match = SCHEDULE_REGEX.match(row[1])
if row[0] != "Refresh Schedule":
continue
return cls._parse_schedule(row[1])

if match:
cron, time_zone_value = match.groups()
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)

raise DbtRuntimeError(
f"Could not parse schedule from description: {row[1]}."
" This is most likely a bug in the dbt-databricks adapter,"
" so please file an issue!"
)
raise DbtRuntimeError(
"Could not find Refresh Schedule in describe extended."
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
)

@staticmethod
def _parse_schedule(value: str) -> RefreshConfig:
if value == "MANUAL":
return RefreshConfig()
if (m := CRON_REGEX.match(value)) is not None:
cron, time_zone_value = m.groups()
return RefreshConfig(cron=cron, time_zone_value=time_zone_value)
if (m := EVERY_REGEX.match(value)) is not None:
n, unit = m.groups()
return RefreshConfig(every=f"{n} {unit.upper()}")
if (m := TRIGGER_REGEX.match(value)) is not None:
seconds = m.group(1)
if seconds is None:
return RefreshConfig(on_update=True)
return RefreshConfig(on_update=True, at_most_every=f"{seconds} SECOND")
raise DbtRuntimeError(
"Could not parse schedule for table."
" This is most likely a bug in the dbt-databricks adapter, so please file an issue!"
f"Could not parse refresh schedule from describe extended: {value!r}."
" Please file an issue at https://github.com/databricks/dbt-databricks/issues."
)

@classmethod
def from_relation_config(cls, relation_config: RelationConfig) -> RefreshConfig:
schedule = base.get_config_value(relation_config, "schedule")
if schedule:
if "cron" not in schedule:
raise DbtRuntimeError(f"Schedule config must contain a 'cron' key, got {schedule}")
return RefreshConfig(
cron=schedule["cron"], time_zone_value=schedule.get("time_zone_value")
)
else:
if not schedule:
return RefreshConfig()
if not isinstance(schedule, dict):
raise DbtRuntimeError(f"`schedule` must be a dict; got {schedule!r}.")

kwargs: dict[str, Any] = {
field: schedule[field]
for field in ("cron", "time_zone_value", "every", "on_update", "at_most_every")
if field in schedule
}

if not kwargs:
raise DbtRuntimeError(
"Schedule config must contain one of `cron`, `every`, or `on_update`;"
f" got {schedule}"
)
return RefreshConfig(**kwargs)
29 changes: 15 additions & 14 deletions dbt/adapters/databricks/relation_configs/streaming_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
PartitionedByProcessor,
)
from dbt.adapters.databricks.relation_configs.query import DescribeQueryProcessor
from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig, RefreshProcessor
from dbt.adapters.databricks.relation_configs.refresh import RefreshProcessor
from dbt.adapters.databricks.relation_configs.row_filter import RowFilterProcessor
from dbt.adapters.databricks.relation_configs.tags import TagsProcessor
from dbt.adapters.databricks.relation_configs.tblproperties import (
Expand All @@ -42,22 +42,23 @@ def get_changeset(
current state of the dbt project.
"""
changes: dict[str, DatabricksComponentConfig] = {}
requires_refresh = False
requires_replace = False
requires_full_refresh = False
has_changes = False

for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if key == "partition_by" and diff is not None:
requires_refresh = True
if diff and diff != RefreshConfig():
requires_replace = True
diff = diff or value
if diff != RefreshConfig():
if diff is not None:
has_changes = True
if key == "partition_by":
requires_full_refresh = True
changes[key] = diff
if requires_replace:
return DatabricksRelationChangeSet(
changes=changes, requires_full_refresh=requires_refresh
)
return None
else:
changes[key] = value

if not has_changes:
return None
return DatabricksRelationChangeSet(
changes=changes, requires_full_refresh=requires_full_refresh
)
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}

{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
{% if configuration_changes is none %}
{% set build_sql = refresh_materialized_view(target_relation) %}
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
{%- if refresh.auto_refreshed -%}
{% set build_sql = '' %}
{%- else -%}
{% set build_sql = refresh_materialized_view(target_relation) %}
{%- endif -%}

{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@
-- get config options
{% set on_configuration_change = config.get('on_configuration_change') %}
{% set configuration_changes = get_configuration_changes(existing_relation) %}
{# Skip manual REFRESH on no-op re-runs for auto-refreshed modes. #}
{% if configuration_changes is none %}
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
{%- set refresh = adapter.get_config_from_model(config.model).config["refresh"] -%}
{%- if refresh.auto_refreshed -%}
{% set build_sql = '' %}
{%- else -%}
{{ log("REFRESHING STREAMING TABLE: " ~ target_relation) }}
{% set build_sql = refresh_streaming_table(target_relation, sql) %}
{%- endif -%}

{% elif on_configuration_change == 'apply' %}
{% set build_sql = get_alter_streaming_table_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %}
{%- if cron -%}
SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{% macro get_create_sql_refresh_schedule(refresh) %}
{%- if refresh.cron -%}
SCHEDULE CRON '{{ refresh.cron }}'{%- if refresh.time_zone_value %} AT TIME ZONE '{{ refresh.time_zone_value }}'{%- endif -%}
{%- elif refresh.every -%}
SCHEDULE EVERY {{ refresh.every }}
{%- elif refresh.on_update -%}
TRIGGER ON UPDATE{%- if refresh.at_most_every %} AT MOST EVERY INTERVAL {{ refresh.at_most_every }}{%- endif -%}
{%- endif -%}
{% endmacro %}

{% macro get_alter_sql_refresh_schedule(cron, time_zone_value, is_altered) %}
{%- if cron -%}
{%- if is_altered -%}
ALTER SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{%- else -%}
ADD SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{%- endif -%}
{%- else -%}
{% macro get_alter_sql_refresh_schedule(refresh) %}
{%- if not (refresh.cron or refresh.every or refresh.on_update) -%}
DROP SCHEDULE
{%- else -%}
{{- 'ALTER ' if refresh.is_altered else 'ADD ' -}}{{- get_create_sql_refresh_schedule(refresh) -}}
{%- endif -%}
{% endmacro %}
Loading
Loading