Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## dbt-databricks 1.12.0 (TBD)

### Features

- Add support for metric views as a materialization ([#1285](https://github.com/databricks/dbt-databricks/pull/1285))
- Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294))
- Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336))
- Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339))
- Replace information_schema queries with DESCRIBE TABLE EXTENDED AS JSON for metadata fetching in incremental, materialized view, and view materializations (DBR 17.3+, falls back to info_schema on older runtimes)

## dbt-databricks 1.11.8 (TBD)

### Features
Expand Down Expand Up @@ -60,6 +70,7 @@
### Features

- Add `query_id` to `SQLQueryStatus` events to improve query tracing and debugging
- Add support for Row Filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294))

### Fixes

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.11.7"
version = "1.12.0a1"
12 changes: 8 additions & 4 deletions dbt/adapters/databricks/dbr_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
class DBRCapability(Enum):
"""Named capabilities that depend on DBR version."""

TIMESTAMPDIFF = "timestampdiff"
ICEBERG = "iceberg"
COMMENT_ON_COLUMN = "comment_on_column"
JSON_COLUMN_METADATA = "json_column_metadata"
STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata"
DESCRIBE_TABLE_EXTENDED_AS_JSON = "describe_table_extended_as_json"
ICEBERG = "iceberg"
INSERT_BY_NAME = "insert_by_name"
JSON_COLUMN_METADATA = "json_column_metadata"
REPLACE_ON = "replace_on"
STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata"
TIMESTAMPDIFF = "timestampdiff"


@dataclass
Expand Down Expand Up @@ -61,6 +62,9 @@ class DBRCapabilities:
DBRCapability.REPLACE_ON: CapabilitySpec(
min_version=(17, 1),
),
DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON: CapabilitySpec(
min_version=(17, 3),
),
}

def __init__(
Expand Down
216 changes: 200 additions & 16 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import posixpath
import re
from abc import ABC, abstractmethod
Expand All @@ -11,6 +12,7 @@
from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, Optional, Union, cast
from uuid import uuid4

import agate
from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, log_code_execution
from dbt.adapters.base.meta import available
Expand Down Expand Up @@ -76,6 +78,7 @@
from dbt.adapters.databricks.relation_configs.materialized_view import (
MaterializedViewConfig,
)
from dbt.adapters.databricks.relation_configs.metric_view import MetricViewConfig
from dbt.adapters.databricks.relation_configs.streaming_table import (
StreamingTableConfig,
)
Expand Down Expand Up @@ -399,6 +402,23 @@ def require_capability(self, capability: DBRCapability) -> None:
f"Current connection does not meet this requirement."
)

def is_describe_as_json_supported(self, relation: DatabricksRelation) -> bool:
"""
Check if DESCRIBE TABLE EXTENDED AS JSON can be used for the relation.
"""
return (
not relation.is_hive_metastore()
and not relation.is_foreign_table
and self.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON)
)

def fetch_json_metadata(self, relation: DatabricksRelation) -> dict[str, Any]:
"""Fetch the JSON metadata for a relation using DESCRIBE TABLE EXTENDED AS JSON."""
kwargs = {"relation": relation}
describe_results = self.execute_macro("describe_table_extended_as_json", kwargs=kwargs)
json_metadata = json.loads(describe_results.rows[0].get("json_metadata"))
return json_metadata

def list_schemas(self, database: Optional[str]) -> list[str]:
results = self.execute_macro(LIST_SCHEMAS_MACRO_NAME, kwargs={"database": database})
return [row[0] for row in results]
Expand Down Expand Up @@ -937,6 +957,8 @@ def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelatio
return IncrementalTableAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.View:
return ViewAPI.get_from_relation(self, relation)
elif relation.type == DatabricksRelationType.MetricView:
return MetricViewAPI.get_from_relation(self, relation)
else:
raise NotImplementedError(f"Relation type {relation.type} is not supported.")

Expand All @@ -952,6 +974,8 @@ def get_config_from_model(self, model: RelationConfig) -> DatabricksRelationConf
return IncrementalTableAPI.get_from_relation_config(model)
elif model.config.materialized == "view":
return ViewAPI.get_from_relation_config(model)
elif model.config.materialized == "metric_view":
return MetricViewAPI.get_from_relation_config(model)
else:
raise NotImplementedError(
f"Materialization {model.config.materialized} is not supported."
Expand Down Expand Up @@ -1077,10 +1101,17 @@ def _describe_relation(
)

kwargs = {"relation": relation}
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
results["information_schema.views"] = (
DatabricksDescribeJsonMetadata.parse_view_description(json_metadata)
)
else:
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)
return results


Expand All @@ -1104,6 +1135,7 @@ def _describe_relation(
kwargs = {"relation": relation}

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)
return results


Expand All @@ -1126,16 +1158,27 @@ def _describe_relation(
results["information_schema.column_tags"] = adapter.execute_macro(
"fetch_column_tags", kwargs=kwargs
)
results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
results["primary_key_constraints"] = adapter.execute_macro(
"fetch_primary_key_constraints", kwargs=kwargs
)
results["foreign_key_constraints"] = adapter.execute_macro(
"fetch_foreign_key_constraints", kwargs=kwargs
)
results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs)
results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs)

if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
relation_metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata)
results["non_null_constraint_columns"] = relation_metadata.non_null_constraints
results["primary_key_constraints"] = relation_metadata.primary_key_constraints
results["foreign_key_constraints"] = relation_metadata.foreign_key_constraints
results["column_masks"] = relation_metadata.column_masks
else:
results["non_null_constraint_columns"] = adapter.execute_macro(
"fetch_non_null_constraint_columns", kwargs=kwargs
)
results["primary_key_constraints"] = adapter.execute_macro(
"fetch_primary_key_constraints", kwargs=kwargs
)
results["foreign_key_constraints"] = adapter.execute_macro(
"fetch_foreign_key_constraints", kwargs=kwargs
)
results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs)

results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

kwargs = {"table_name": relation}
Expand All @@ -1159,9 +1202,16 @@ def _describe_relation(
results = {}
kwargs = {"relation": relation}

results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)
if adapter.is_describe_as_json_supported(relation):
json_metadata = adapter.fetch_json_metadata(relation)
results["information_schema.views"] = (
DatabricksDescribeJsonMetadata.parse_view_description(json_metadata)
)
else:
results["information_schema.views"] = get_first_row(
adapter.execute_macro("get_view_description", kwargs=kwargs)
)

results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)

Expand All @@ -1170,3 +1220,137 @@ def _describe_relation(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)
return results


class MetricViewAPI(RelationAPIBase[MetricViewConfig]):
relation_type = DatabricksRelationType.MetricView

@classmethod
def config_type(cls) -> type[MetricViewConfig]:
return MetricViewConfig

@classmethod
def _describe_relation(
cls, adapter: DatabricksAdapter, relation: DatabricksRelation
) -> RelationResults:
results = {}
kwargs = {"relation": relation}
results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs)
results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs)
kwargs = {"table_name": relation}
results["describe_extended"] = adapter.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)
return results

@dataclass
class DatabricksDescribeJsonMetadata:
column_masks: Optional["agate.Table"] = None
foreign_key_constraints: Optional["agate.Table"] = None
non_null_constraints: Optional["agate.Table"] = None
primary_key_constraints: Optional["agate.Table"] = None
view_description: Optional["agate.Row"] = None

@classmethod
def from_json_metadata(cls, json_metadata: dict[str, Any]) -> "DatabricksDescribeJsonMetadata":
"""Parse and convert the json metadata into structured metadata for the adapter to use."""
return DatabricksDescribeJsonMetadata(
column_masks=cls.parse_column_masks(json_metadata),
foreign_key_constraints=cls.parse_foreign_key_constraints(json_metadata),
non_null_constraints=cls.parse_non_null_constraints(json_metadata),
primary_key_constraints=cls.parse_primary_key_constraints(json_metadata),
view_description=cls.parse_view_description(json_metadata),
)

@classmethod
def parse_column_masks(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of column masks (info_schema format)."""
raw_masks = json_metadata.get("column_masks", [])
rows = []
for mask in raw_masks:
column_name = mask["column_name"]
fn = mask["function_name"]
mask_name = f"{fn['catalog_name']}.{fn['schema_name']}.{fn['function_name']}"
using_columns = ",".join(mask.get("using_column_names", []))
rows.append((column_name, mask_name, using_columns or None))

return agate.Table(
rows=rows,
column_names=["column_name", "mask_name", "using_columns"],
column_types=[agate.Text(), agate.Text(), agate.Text()],
)

@classmethod
def parse_foreign_key_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of FK constraints (info_schema format)."""
table_constraint = re.sub(r"\s+", " ", json_metadata.get("table_constraints", "").strip())
pairs = re.findall(r"\(\s*(\w+)\s*,(.*?)\)(?=\s*,\s*\(|\s*\])", table_constraint)
fk_rows = []
for name, constraint in pairs:
constraint = constraint.strip()
if re.search(r"FOREIGN\s+KEY", constraint):
fk_part, ref_part = constraint.split("REFERENCES", 1)
from_cols = re.findall(r"`([^`]+)`", fk_part)
ref_parts = re.findall(r"`([^`]+)`", ref_part)
to_catalog = ref_parts[0]
to_schema = ref_parts[1]
to_table = ref_parts[2]
to_cols = ref_parts[3:]
for from_col, to_col in zip(from_cols, to_cols):
fk_rows.append([name, from_col, to_catalog, to_schema, to_table, to_col])

fk_column_names = [
"constraint_name",
"from_column",
"to_catalog",
"to_schema",
"to_table",
"to_column",
]
fk_columns_types = [
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
agate.Text(),
]
return agate.Table(fk_rows, fk_column_names, fk_columns_types)

@classmethod
def parse_non_null_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of non-null constraints (info_schema format)."""
columns = json_metadata.get("columns", [])

non_null_cols = [column["name"] for column in columns if not column.get("nullable")]
return agate.Table(
rows=[[col] for col in non_null_cols],
column_names=["column_name"],
column_types=[agate.Text()],
)

@classmethod
def parse_primary_key_constraints(cls, json_metadata: dict[str, Any]) -> agate.Table:
"""Parse json metadata into an agate Table of PK constraints (info_schema format)."""
table_constraint = re.sub(r"\s+", " ", json_metadata.get("table_constraints", "").strip())
pairs = re.findall(r"\(\s*(\w+)\s*,(.*?)\)(?=\s*,\s*\(|\s*\])", table_constraint)
pk_rows = []
for name, constraint in pairs:
constraint = constraint.strip()
parts = re.findall(r"`([^`]+)`", constraint)
if re.search(r"PRIMARY\s+KEY", constraint):
for col in parts:
pk_rows.append([name, col])

pk_column_names = ["constraint_name", "column_name"]
pk_columns_types = [agate.Text(), agate.Text()]
return agate.Table(pk_rows, pk_column_names, pk_columns_types)

@classmethod
def parse_view_description(cls, json_metadata: dict[str, Any]) -> "agate.Row":
"""Parse json metadata into an agate Row for the view description (info_schema format)."""
view_text = json_metadata.get("view_text", None)
if view_text is None:
return agate.Row(values=set())
else:
return agate.Row(values=(view_text,), keys=("view_definition",))
Loading
Loading