diff --git a/CHANGELOG.md b/CHANGELOG.md index f7f620a0c..c6d18bf11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,17 +6,15 @@ - Add support for row filters ([#1294](https://github.com/databricks/dbt-databricks/pull/1294)) - Add support for Python UDFs ([#1336](https://github.com/databricks/dbt-databricks/pull/1336)) - Add support for key-only `databricks_tags` for table and column tagging. This can now be configured by setting tag values to empty strings `""` or `None`. ([#1339](https://github.com/databricks/dbt-databricks/pull/1339)) +- Fetch relation metadata like constraints, column masks, row filters, etc with a single `DESCRIBE TABLE EXTENDED ... AS JSON` call, replacing multiple `information_schema` queries. Falls back to `information_schema` on older runtimes. Gated behind `use_describe_as_json_for_relation_metadata` behavior flag, off by default. ([#1432](https://github.com/databricks/dbt-databricks/pull/1432)) - Support `SCHEDULE EVERY` and `TRIGGER ON UPDATE` refresh modes for materialized views and streaming tables, with parser and diff coverage so relations whose actual refresh is not CRON no longer crash on subsequent runs ([#1293](https://github.com/databricks/dbt-databricks/issues/1293)) ### Fixes +- Fix `metric_view` failing with `METRIC_VIEW_INVALID_VIEW_DEFINITION` when models use bare `{{ ref(...) }}` for the `source:` field ([#1361](https://github.com/databricks/dbt-databricks/issues/1361)) - Fix `RefreshConfig.__eq__` self/other typo where two configs with the same `cron` but different `time_zone_value` compared equal - Fix streaming-table DROP-SCHEDULE path that was silently filtered out of the changeset -### Fixes - -- Fix `metric_view` failing with `METRIC_VIEW_INVALID_VIEW_DEFINITION` when models use bare `{{ ref(...) }}` for the `source:` field ([#1361](https://github.com/databricks/dbt-databricks/issues/1361)) - ### Under the Hood - **BREAKING:** `databricks_tags` defined at different hierarchy levels (e.g. project-level and model-level) now merge additively instead of the child config completely replacing the parent. diff --git a/dbt/adapters/databricks/dbr_capabilities.py b/dbt/adapters/databricks/dbr_capabilities.py index 9b7c918d7..38bd133ef 100644 --- a/dbt/adapters/databricks/dbr_capabilities.py +++ b/dbt/adapters/databricks/dbr_capabilities.py @@ -13,13 +13,14 @@ class DBRCapability(Enum): """Named capabilities that depend on DBR version.""" - TIMESTAMPDIFF = "timestampdiff" - ICEBERG = "iceberg" COMMENT_ON_COLUMN = "comment_on_column" - JSON_COLUMN_METADATA = "json_column_metadata" - STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata" + DESCRIBE_TABLE_EXTENDED_AS_JSON = "describe_table_extended_as_json" + ICEBERG = "iceberg" INSERT_BY_NAME = "insert_by_name" + JSON_COLUMN_METADATA = "json_column_metadata" REPLACE_ON = "replace_on" + STREAMING_TABLE_JSON_METADATA = "streaming_table_json_metadata" + TIMESTAMPDIFF = "timestampdiff" @dataclass @@ -61,6 +62,9 @@ class DBRCapabilities: DBRCapability.REPLACE_ON: CapabilitySpec( min_version=(17, 1), ), + DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON: CapabilitySpec( + min_version=(17, 3), + ), } def __init__( diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 8a86d1fb6..5e67867fa 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -1,3 +1,4 @@ +import json import posixpath import re from abc import ABC, abstractmethod @@ -30,7 +31,7 @@ ) from dbt_common.behavior_flags import BehaviorFlag from dbt_common.contracts.config.base import BaseConfig, MergeBehavior -from dbt_common.exceptions import DbtConfigError, DbtInternalError +from dbt_common.exceptions import DbtConfigError, DbtInternalError, DbtRuntimeError from dbt_common.utils import executor from dbt_common.utils.dict import AttrDict from packaging import version @@ -156,6 +157,16 @@ ), ) # type: ignore[typeddict-item] +USE_DESCRIBE_AS_JSON_FOR_RELATION_METADATA = BehaviorFlag( + name="use_describe_as_json_for_relation_metadata", + default=False, + description=( + "Use DESCRIBE TABLE EXTENDED AS JSON when supported to fetch " + "relation metadata like constraints, column masks, row filters, " + "view definition, etc. When disabled, falls back to information_schema queries." + ), +) # type: ignore[typeddict-item] + class DatabricksRelationInfo(NamedTuple): table_name: str @@ -266,6 +277,7 @@ def _behavior_flags(self) -> list[BehaviorFlag]: USE_REPLACE_ON_FOR_INSERT_OVERWRITE, USE_MANAGED_ICEBERG, USE_CONCURRENT_MICROBATCH, + USE_DESCRIBE_AS_JSON_FOR_RELATION_METADATA, ] def supports(self, capability: Capability) -> bool: @@ -406,6 +418,28 @@ def require_capability(self, capability: DBRCapability) -> None: f"Current connection does not meet this requirement." ) + def is_describe_as_json_supported(self, relation: DatabricksRelation) -> bool: + """ + Check if DESCRIBE TABLE EXTENDED AS JSON can be used for the relation. + """ + return ( + not relation.is_hive_metastore() + and not relation.is_foreign_table + and self.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + and bool(self.behavior.use_describe_as_json_for_relation_metadata.no_warn) + ) + + def fetch_json_metadata(self, relation: DatabricksRelation) -> dict[str, Any]: + """Fetch the JSON metadata for a relation using DESCRIBE TABLE EXTENDED AS JSON.""" + kwargs = {"relation": relation} + describe_results = self.execute_macro("describe_table_extended_as_json", kwargs=kwargs) + try: + return json.loads(describe_results.rows[0].get("json_metadata")) + except Exception as e: + raise DbtRuntimeError( + "Failed to parse json metadata from describe table extended as json" + ) from e + def list_schemas(self, database: Optional[str]) -> list[str]: results = self.execute_macro(LIST_SCHEMAS_MACRO_NAME, kwargs={"database": database}) return [row[0] for row in results] @@ -1143,10 +1177,6 @@ def _describe_relation( ) kwargs = {"relation": relation} - results["information_schema.views"] = get_first_row( - adapter.execute_macro("get_view_description", kwargs=kwargs) - ) - results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) # To be backward compatible model_config can be None. In that case, tags should be fetched # to maintain backward compatibility. @@ -1156,7 +1186,19 @@ def _describe_relation( else: results["information_schema.tags"] = None - results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + if adapter.is_describe_as_json_supported(relation): + json_metadata = adapter.fetch_json_metadata(relation) + results["information_schema.views"] = ( + DatabricksDescribeJsonMetadata.parse_view_description(json_metadata) + ) + results["row_filters"] = DatabricksDescribeJsonMetadata.parse_row_filter(json_metadata) + else: + results["information_schema.views"] = get_first_row( + adapter.execute_macro("get_view_description", kwargs=kwargs) + ) + results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) return results @@ -1184,7 +1226,13 @@ def _describe_relation( kwargs = {"relation": relation} results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) - results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + + if adapter.is_describe_as_json_supported(relation): + json_metadata = adapter.fetch_json_metadata(relation) + results["row_filters"] = DatabricksDescribeJsonMetadata.parse_row_filter(json_metadata) + else: + results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + return results @@ -1228,23 +1276,34 @@ def _describe_relation( else: results["information_schema.column_tags"] = None - results["non_null_constraint_columns"] = adapter.execute_macro( - "fetch_non_null_constraint_columns", kwargs=kwargs - ) - results["primary_key_constraints"] = adapter.execute_macro( - "fetch_primary_key_constraints", kwargs=kwargs - ) - results["foreign_key_constraints"] = adapter.execute_macro( - "fetch_foreign_key_constraints", kwargs=kwargs - ) - results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs) - results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + if adapter.is_describe_as_json_supported(relation): + json_metadata = adapter.fetch_json_metadata(relation) + relation_metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + results["non_null_constraint_columns"] = relation_metadata.non_null_constraints + results["primary_key_constraints"] = relation_metadata.primary_key_constraints + results["foreign_key_constraints"] = relation_metadata.foreign_key_constraints + results["column_masks"] = relation_metadata.column_masks + results["row_filters"] = relation_metadata.row_filters + else: + results["non_null_constraint_columns"] = adapter.execute_macro( + "fetch_non_null_constraint_columns", kwargs=kwargs + ) + results["primary_key_constraints"] = adapter.execute_macro( + "fetch_primary_key_constraints", kwargs=kwargs + ) + results["foreign_key_constraints"] = adapter.execute_macro( + "fetch_foreign_key_constraints", kwargs=kwargs + ) + results["column_masks"] = adapter.execute_macro("fetch_column_masks", kwargs=kwargs) + results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) kwargs = {"table_name": relation} results["describe_extended"] = adapter.execute_macro( DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs ) + return results @@ -1265,10 +1324,6 @@ def _describe_relation( results = {} kwargs = {"relation": relation} - results["information_schema.views"] = get_first_row( - adapter.execute_macro("get_view_description", kwargs=kwargs) - ) - # To be backward compatible model_config can be None. In that case, tags should be fetched # to maintain backward compatibility. table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None @@ -1277,6 +1332,16 @@ def _describe_relation( else: results["information_schema.tags"] = None + if adapter.is_describe_as_json_supported(relation): + json_metadata = adapter.fetch_json_metadata(relation) + results["information_schema.views"] = ( + DatabricksDescribeJsonMetadata.parse_view_description(json_metadata) + ) + else: + results["information_schema.views"] = get_first_row( + adapter.execute_macro("get_view_description", kwargs=kwargs) + ) + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) kwargs = {"table_name": relation} @@ -1317,3 +1382,444 @@ def _describe_relation( ) return results + + +@dataclass +class DatabricksDescribeJsonMetadata: + column_masks: Optional["Table"] = None + foreign_key_constraints: Optional["Table"] = None + non_null_constraints: Optional["Table"] = None + primary_key_constraints: Optional["Table"] = None + row_filters: Optional["Table"] = None + view_description: Optional["Row"] = None + + @classmethod + def from_json_metadata(cls, json_metadata: dict[str, Any]) -> "DatabricksDescribeJsonMetadata": + """Parse and convert the json metadata into structured metadata for the adapter to use.""" + return DatabricksDescribeJsonMetadata( + column_masks=cls.parse_column_masks(json_metadata), + foreign_key_constraints=cls.parse_foreign_key_constraints(json_metadata), + non_null_constraints=cls.parse_non_null_constraints(json_metadata), + primary_key_constraints=cls.parse_primary_key_constraints(json_metadata), + row_filters=cls.parse_row_filter(json_metadata), + view_description=cls.parse_view_description(json_metadata), + ) + + @classmethod + def parse_column_masks(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of column masks (info_schema format).""" + try: + return cls._parse_column_masks(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse column masks from describe table extended as json" + ) from e + + @classmethod + def _parse_column_masks(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of column masks (info_schema format).""" + # Lazy load to improve startup time + from agate import Table as AgateTable + from agate import Text as AgateText + + raw_masks = json_metadata.get("column_masks", []) + rows = [] + for mask in raw_masks: + column_name = mask["column_name"] + fn = mask["function_name"] + mask_name = f"{fn['catalog_name']}.{fn['schema_name']}.{fn['function_name']}" + using_columns = ",".join(mask.get("using_column_names", [])) + rows.append((column_name, mask_name, using_columns or None)) + + return AgateTable( + rows=rows, + column_names=["column_name", "mask_name", "using_columns"], + column_types=[AgateText(), AgateText(), AgateText()], + ) + + @classmethod + def parse_foreign_key_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of FK constraints (info_schema format).""" + try: + return cls._parse_foreign_key_constraints(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse foreign key constraints from describe table extended as json" + ) from e + + @classmethod + def _parse_foreign_key_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of FK constraints (info_schema format).""" + # Lazy load to improve startup time + from agate import Table as AgateTable + from agate import Text as AgateText + + rows: list[list[str]] = [] + # For the worked example, _split_constraints yields: + # ("p-a,b@c(d", "PRIMARY KEY ...") -> skipped, leading token != FOREIGN + # ("fk1", "FOREIGN KEY (`x`, `y`) REFERENCES `cat`.`sch`.`tbl` (`a`, `b`)") + for name, body in cls._split_constraints(json_metadata.get("table_constraints", "")): + # `re.match` (not `re.search`) so the keyword must be the body's + # leading token — never a column-name false-positive. + if not re.match(r"FOREIGN\s+KEY\b", body): + continue + # Structurally-anchored split: REFERENCES sits between the + # closing `)` of the from-cols and the opening `` ` `` of the + # qualified relation. Lookbehind/lookahead require those chars, + # so a column whose name contains the substring REFERENCES + # (e.g. `MY_REFERENCES_COL`) cannot mis-split, and surrounding + # whitespace is optional. + # + # Worked example FK body splits as: + # parts[0] = "FOREIGN KEY (`x`, `y`)" + # parts[1] = "`cat`.`sch`.`tbl` (`a`, `b`)" + parts = re.split(r"(?<=\))\s*REFERENCES\s*(?=`)", body, maxsplit=1) + if len(parts) != 2: + raise DbtRuntimeError( + f"FOREIGN KEY constraint '{name}' is missing a REFERENCES clause: {body!r}" + ) + # Worked example: + # from_cols = ["x", "y"] + # ref_tokens = ["cat", "sch", "tbl", "a", "b"] + from_cols = cls._extract_backticked(parts[0]) + ref_tokens = cls._extract_backticked(parts[1]) + n = len(from_cols) + if n == 0: + raise DbtRuntimeError( + f"FOREIGN KEY constraint '{name}' has no from-columns: {body!r}" + ) + # Layout in the right half: catalog, schema, table, *to_cols. + # Anything other than exactly (3 + n) tokens means the referenced + # name is not a fully-qualified `catalog`.`schema`.`table` or the + # to-column count doesn't match the from-column count. + # + # Worked example: n=2, len(ref_tokens)=5, 5 == 2+3 ✓ + if len(ref_tokens) != n + 3: + raise DbtRuntimeError( + f"FOREIGN KEY constraint '{name}' must reference a 3-part " + f"`catalog`.`schema`.`table` with matching column counts " + f"(from={n}, ref tokens={len(ref_tokens)}): {body!r}" + ) + # Worked example unpack: + # to_catalog="cat", to_schema="sch", to_table="tbl", to_cols=["a","b"] + to_catalog, to_schema, to_table, *to_cols = ref_tokens + # Worked example zip yields two rows: + # ["fk1", "x", "cat", "sch", "tbl", "a"] + # ["fk1", "y", "cat", "sch", "tbl", "b"] + for from_col, to_col in zip(from_cols, to_cols): + rows.append([name, from_col, to_catalog, to_schema, to_table, to_col]) + + return AgateTable( + rows=rows, + column_names=[ + "constraint_name", + "from_column", + "to_catalog", + "to_schema", + "to_table", + "to_column", + ], + column_types=[AgateText()] * 6, + ) + + @classmethod + def parse_non_null_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of non-null constraints (info_schema format).""" + try: + return cls._parse_non_null_constraints(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse non-null constraints from describe table extended as json" + ) from e + + @classmethod + def _parse_non_null_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of non-null constraints (info_schema format).""" + # Lazy load to improve startup time + from agate import Table as AgateTable + from agate import Text as AgateText + + columns = json_metadata.get("columns", []) + + non_null_cols = [column["name"] for column in columns if not column.get("nullable")] + return AgateTable( + rows=[[col] for col in non_null_cols], + column_names=["column_name"], + column_types=[AgateText()], + ) + + @classmethod + def parse_primary_key_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of PK constraints (info_schema format).""" + try: + return cls._parse_primary_key_constraints(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse primary key constraints from describe table extended as json" + ) from e + + @classmethod + def _parse_primary_key_constraints(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of PK constraints (info_schema format).""" + # Lazy load to improve startup time + from agate import Table as AgateTable + from agate import Text as AgateText + + rows: list[list[str]] = [] + # `_split_constraints` yields, for the worked example: + # ("p-a,b@c(d", "PRIMARY KEY (`id``a`)") and + # ("fk1", "FOREIGN KEY ...") + for name, body in cls._split_constraints(json_metadata.get("table_constraints", "")): + # `re.match` (anchored at start) so the keyword must be the body's + # leading token. The FK body's leading token is "FOREIGN", so it + # is skipped here without inspecting its contents. + if re.match(r"PRIMARY\s+KEY\b", body): + # Worked example: body = "PRIMARY KEY (`id``a`)". + # _extract_backticked returns ["id`a"] (one column, with the + # doubled backtick un-escaped to a literal `). + # Row appended: ["p-a,b@c(d", "id`a"]. + for col in cls._extract_backticked(body): + rows.append([name, col]) + + return AgateTable( + rows=rows, + column_names=["constraint_name", "column_name"], + column_types=[AgateText(), AgateText()], + ) + + @classmethod + def parse_view_description(cls, json_metadata: dict[str, Any]) -> "Row": + """Parse json metadata into an agate Row for the view description (info_schema format).""" + try: + return cls._parse_view_description(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse view description from describe table extended as json" + ) from e + + @classmethod + def _parse_view_description(cls, json_metadata: dict[str, Any]) -> "Row": + """Parse json metadata into an agate Row for the view description (info_schema format).""" + # Lazy load to improve startup time + from agate import Row as AgateRow + + view_text = json_metadata.get("view_text", None) + if view_text is None: + return AgateRow(values=set()) + else: + return AgateRow(values=(view_text,), keys=("view_definition",)) + + @classmethod + def parse_row_filter(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of row filter (info_schema format).""" + try: + return cls._parse_row_filter(json_metadata) + except DbtRuntimeError: + raise + except Exception as e: + raise DbtRuntimeError( + "Failed to parse row filter from describe table extended as json" + ) from e + + @classmethod + def _parse_row_filter(cls, json_metadata: dict[str, Any]) -> "Table": + """Parse json metadata into an agate Table of row filter (info_schema format).""" + # Lazy load to improve startup time + from agate import Table as AgateTable + from agate import Text as AgateText + + row_filter_metadata = json_metadata.get("row_filter") + rows: list[Any] = [] + column_names = [ + "table_catalog", + "table_schema", + "table_name", + "filter_name", + "target_columns", + ] + column_types = [AgateText(), AgateText(), AgateText(), AgateText(), AgateText()] + + if not row_filter_metadata: + return AgateTable(rows=rows, column_names=column_names, column_types=column_types) + + table_catalog = json_metadata["catalog_name"] + table_schema = json_metadata["schema_name"] + table_name = json_metadata["table_name"] + + function_name = row_filter_metadata["function_name"] + filter_name = ( + function_name["catalog_name"] + + "." + + function_name["schema_name"] + + "." + + function_name["function_name"] + ) + filter_column_names = row_filter_metadata["column_names"] + + rows.append( + [table_catalog, table_schema, table_name, filter_name, ",".join(filter_column_names)] + ) + + return AgateTable( + rows=rows, + column_names=column_names, + column_types=column_types, + ) + + # --------------------------------------------------------------------- + # WORKED EXAMPLE — referenced by all parser comments below. + # + # Input: + # table_constraints = + # "[(p-a,b@c(d,PRIMARY KEY (`id``a`))," + # " (fk1,FOREIGN KEY (`x`, `y`)" + # " REFERENCES `cat`.`sch`.`tbl` (`a`, `b`))]" + # + # Single fixture, every corner case: + # - constraint name with hyphen, comma, paren, @ ("p-a,b@c(d") + # - column name with literal backtick, escaped as `` ("id``a" -> id`a) + # - composite FK (two from-columns paired with two to-columns) + # - two constraints in one array (PK first, FK second) + # + # Expected parse: + # PK rows: [("p-a,b@c(d", "id`a")] + # FK rows: [("fk1", "x", "cat", "sch", "tbl", "a"), + # ("fk1", "y", "cat", "sch", "tbl", "b")] + # --------------------------------------------------------------------- + + # Boundary anchor: the comma that precedes the body keyword. A constraint + # name cannot contain `,PRIMARY KEY` or `,FOREIGN KEY` because the space + # inside the keyword pair is illegal in a name, so this never fires inside + # a name. In the example, the name "p-a,b@c(d" contains TWO commas — the + # regex skips both because neither is followed by the keyword. + _CONSTRAINT_BOUNDARY = re.compile(r",\s*(?:PRIMARY|FOREIGN)\s+KEY\b") + + # A backticked identifier with embedded doubled-backticks treated as a + # single literal `. In the example, `id``a` matches as ONE token whose + # group(1) is "id``a"; the `.replace("``", "`")` step in + # `_extract_backticked` un-escapes it to "id`a". + _BACKTICKED = re.compile(r"`((?:[^`]|``)*)`") + + @classmethod + def _split_constraints(cls, table_constraints: str) -> list[tuple[str, str]]: + """Yield (name, body) pairs from a Databricks table_constraints value. + + The constraint name is captured as raw text — characters inside it + (commas, parens, backticks, @, hyphens) are not interpreted. The body + is then walked with paren-depth + backtick-state tracking to find the + block's outer closing `)`. + + See WORKED EXAMPLE block above for the fixture used in the comments + below. + """ + # Strip surrounding `[` `]`. For the worked example: + # inner = "(p-a,b@c(d,PRIMARY KEY (`id``a`))," + # " (fk1,FOREIGN KEY (`x`, `y`)" + # " REFERENCES `cat`.`sch`.`tbl` (`a`, `b`))" + s = table_constraints.strip() + if not (s.startswith("[") and s.endswith("]")): + return [] + inner = s[1:-1] + + pairs: list[tuple[str, str]] = [] + pos = 0 + while pos < len(inner): + # Skip whitespace + inter-block commas. Iter 1: `inner[0]` is + # already `(`, loop is a no-op. Iter 2: consumes `, ` between + # the two constraint blocks and stops at the next `(`. + while pos < len(inner) and (inner[pos].isspace() or inner[pos] == ","): + pos += 1 + if pos >= len(inner) or inner[pos] != "(": + break + + # Step into the block. Iter 1: name_start points one past `(`. + # The boundary regex finds the FIRST `,PRIMARY KEY` or + # `,FOREIGN KEY` — never inside the name. The two commas inside + # "p-a,b@c(d" are skipped because they aren't followed by the + # keyword pair (the pair contains a space, illegal in a name). + # + # Iter 1: name = "p-a,b@c(d" + # Iter 2: name = "fk1" + name_start = pos + 1 + m = cls._CONSTRAINT_BOUNDARY.search(inner, name_start) + if not m: + break + name = inner[name_start : m.start()].strip() + + # Walk the body forward to find the block's outer closing `)`. + # Body parens are balanced, so depth returns to 0 only at that `)`. + # Backticks come paired in the body (or doubled to mean a literal). + # + # Iter 1 walk through "PRIMARY KEY (`id``a`)": + # '(' depth 0 -> 1 + # '`' in_bt True + # 'id' literal chars + # '``' DOUBLED — skipped as one unit, in_bt stays True + # 'a' literal char + # '`' in_bt False + # ')' depth 1 -> 0 + # ')' depth 0 -> BREAK (this is the block's outer closer) + # -> body = "PRIMARY KEY (`id``a`)" + # + # Iter 2 walk through the FK body: + # '(' 0->1, `\`x\``, ',', `\`y\``, ')' 1->0, + # ' REFERENCES ' (depth 0, no parens), + # `\`cat\`.\`sch\`.\`tbl\`` (backticks toggle in pairs), + # '(' 0->1, `\`a\``, ',', `\`b\``, ')' 1->0, + # ')' depth 0 -> BREAK + # -> body = "FOREIGN KEY (`x`, `y`) REFERENCES `cat`.`sch`.`tbl` (`a`, `b`)" + depth = 0 + in_bt = False + i = m.start() + 1 + while i < len(inner): + c = inner[i] + if c == "`": + if i + 1 < len(inner) and inner[i + 1] == "`": + i += 2 # doubled = literal, don't toggle + continue + in_bt = not in_bt + elif not in_bt: + if c == "(": + depth += 1 + elif c == ")": + if depth == 0: + break # this is the block's outer closer + depth -= 1 + i += 1 + if i >= len(inner): + break + + body = inner[m.start() + 1 : i].strip() + pairs.append((name, body)) + pos = i + 1 # advance past the block's outer `)` + + # For the worked example, pairs is now: + # [("p-a,b@c(d", "PRIMARY KEY (`id``a`)"), + # ("fk1", "FOREIGN KEY (`x`, `y`) REFERENCES `cat`.`sch`.`tbl` (`a`, `b`)")] + return pairs + + @classmethod + def _extract_backticked(cls, s: str) -> list[str]: + """Every backticked identifier in s, with doubled backticks unescaped. + + Worked-example calls: + - On PK body "PRIMARY KEY (`id``a`)": + regex matches once, group(1) = "id``a"; + .replace("``", "`") -> "id`a"; + returns ["id`a"] (one column, with the literal backtick) + - On FK left half "FOREIGN KEY (`x`, `y`)": + returns ["x", "y"] + - On FK right half "`cat`.`sch`.`tbl` (`a`, `b`)": + returns ["cat", "sch", "tbl", "a", "b"] + """ + return [m.group(1).replace("``", "`") for m in cls._BACKTICKED.finditer(s)] diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index 91f3280c8..98fdd5f91 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -150,6 +150,10 @@ def is_metric_view(self) -> bool: def is_streaming_table(self) -> bool: return self.type == DatabricksRelationType.StreamingTable + @property + def is_foreign_table(self) -> bool: + return self.type == DatabricksRelationType.Foreign + @property def is_external_table(self) -> bool: return self.databricks_table_type == DatabricksTableType.External diff --git a/dbt/include/databricks/macros/adapters/metadata.sql b/dbt/include/databricks/macros/adapters/metadata.sql index 44f9c0a15..08f01d231 100644 --- a/dbt/include/databricks/macros/adapters/metadata.sql +++ b/dbt/include/databricks/macros/adapters/metadata.sql @@ -129,9 +129,17 @@ SELECT NULL ) AS databricks_table_type FROM `system`.`information_schema`.`tables` -WHERE table_catalog = '{{ relation.database|lower }}' +WHERE table_catalog = '{{ relation.database|lower }}' AND table_schema = '{{ relation.schema|lower }}' {%- if relation.identifier %} AND table_name = '{{ relation.identifier|lower }}' {% endif %} {% endmacro %} + +{% macro describe_table_extended_as_json(relation) %} + {{ return(run_query_as(describe_table_extended_as_json_sql(relation), 'describe_table_extended_as_json')) }} +{% endmacro %} + +{% macro describe_table_extended_as_json_sql(relation) %} +DESCRIBE TABLE EXTENDED {{ relation.render() }} AS JSON +{% endmacro %} diff --git a/tests/functional/adapter/fixtures.py b/tests/functional/adapter/fixtures.py index 35ec7c60a..9bcec8112 100644 --- a/tests/functional/adapter/fixtures.py +++ b/tests/functional/adapter/fixtures.py @@ -1,4 +1,7 @@ import pytest +from dbt.tests import util + +from dbt.adapters.databricks.dbr_capabilities import DBRCapability fail_if_tag_fetch_called_macros = """ {% macro fetch_tags(relation) %} @@ -33,3 +36,13 @@ class ManagedIcebergMixin: @pytest.fixture(scope="class") def project_config_update(self): return {"flags": {"use_managed_iceberg": True}} + + +class RequiresDescribeAsJsonCapabilityMixin: + """Skip the test class if the connected compute lacks DESCRIBE TABLE EXTENDED AS JSON.""" + + @pytest.fixture(scope="class", autouse=True) + def require_describe_as_json_capability(self, project): + with util.get_connection(project.adapter): + if not project.adapter.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON): + pytest.skip("DESCRIBE TABLE EXTENDED AS JSON not supported on this compute") diff --git a/tests/functional/adapter/incremental/test_incremental_column_masks.py b/tests/functional/adapter/incremental/test_incremental_column_masks.py index ad5c69ac9..6ec287026 100644 --- a/tests/functional/adapter/incremental/test_incremental_column_masks.py +++ b/tests/functional/adapter/incremental/test_incremental_column_masks.py @@ -1,7 +1,10 @@ import pytest from dbt.tests import util -from tests.functional.adapter.fixtures import MaterializationV2Mixin +from tests.functional.adapter.fixtures import ( + MaterializationV2Mixin, + RequiresDescribeAsJsonCapabilityMixin, +) from tests.functional.adapter.incremental import fixtures @@ -61,3 +64,17 @@ def test_changing_column_masks(self, project): assert result[0][1] == "hello" # name (unmasked) assert result[0][2] == "********@example.com" # email (partially masked) assert result[0][3] == "*****" # password (masked) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalColumnMasksDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestIncrementalColumnMasks +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } diff --git a/tests/functional/adapter/incremental/test_incremental_constraints.py b/tests/functional/adapter/incremental/test_incremental_constraints.py index 78c52b4d6..4f02f85d8 100644 --- a/tests/functional/adapter/incremental/test_incremental_constraints.py +++ b/tests/functional/adapter/incremental/test_incremental_constraints.py @@ -2,6 +2,7 @@ from dbt.contracts.results import RunStatus from dbt.tests import util +from tests.functional.adapter.fixtures import RequiresDescribeAsJsonCapabilityMixin from tests.functional.adapter.incremental import fixtures @@ -30,6 +31,20 @@ def test_add_non_null_constraint(self, project): assert "DELTA_NOT_NULL_CONSTRAINT_VIOLATED" in results.results[0].message +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalSetNonNullConstraintDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestIncrementalSetNonNullConstraint +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } + + @pytest.mark.skip_profile("databricks_cluster") class TestIncrementalUnsetNonNullConstraint: @pytest.fixture(scope="class") @@ -175,6 +190,20 @@ def test_update_primary_key_constraint(self, project): assert not any(constraint[0] == "pk_model" for constraint in primary_key_constraints) +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalUpdatePrimaryKeyConstraintDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestIncrementalUpdatePrimaryKeyConstraint +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } + + @pytest.mark.skip_profile("databricks_cluster") class TestCascadingConstraintDrop: @pytest.fixture(scope="class") diff --git a/tests/functional/adapter/materialized_view_tests/test_changes.py b/tests/functional/adapter/materialized_view_tests/test_changes.py index 85f4b08cf..b2c3cd3da 100644 --- a/tests/functional/adapter/materialized_view_tests/test_changes.py +++ b/tests/functional/adapter/materialized_view_tests/test_changes.py @@ -9,11 +9,13 @@ MaterializedViewChangesContinueMixin, MaterializedViewChangesFailMixin, ) +from dbt_common.contracts.config.materialization import OnConfigurationChangeOption from dbt.adapters.databricks.relation_configs.materialized_view import ( MaterializedViewConfig, ) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig +from tests.functional.adapter.fixtures import RequiresDescribeAsJsonCapabilityMixin from tests.functional.adapter.materialized_view_tests import fixtures @@ -86,6 +88,29 @@ class TestMaterializedViewApplyChanges( pass +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewApplyChangesDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestMaterializedViewApplyChanges +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}, + "flags": {"use_describe_as_json_for_relation_metadata": True}, + } + + @pytest.mark.skip(reason="Full-refresh bypasses get_configuration_changes(); JSON path unused") + def test_full_refresh_occurs_with_changes(self): + pass + + @pytest.mark.skip( + reason="Alter test only mutates refresh schedule. view_text is parsed but not asserted on" + ) + def test_change_is_applied_via_alter(self): + pass + + @pytest.mark.dlt @pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") class TestMaterializedViewContinueOnChanges( diff --git a/tests/functional/adapter/row_filters/test_row_filter.py b/tests/functional/adapter/row_filters/test_row_filter.py index 39b4f960b..d0283f3bc 100644 --- a/tests/functional/adapter/row_filters/test_row_filter.py +++ b/tests/functional/adapter/row_filters/test_row_filter.py @@ -1,7 +1,11 @@ import pytest from dbt.tests.util import run_dbt, write_file -from tests.functional.adapter.fixtures import MaterializationV1Mixin, MaterializationV2Mixin +from tests.functional.adapter.fixtures import ( + MaterializationV1Mixin, + MaterializationV2Mixin, + RequiresDescribeAsJsonCapabilityMixin, +) from tests.functional.adapter.row_filters.fixtures import ( base_model_mv, base_model_sql, @@ -142,6 +146,20 @@ def test_incremental_row_filter_lifecycle(self, project): assert len(filters) == 0 +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalRowFilterDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestIncrementalRowFilter +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } + + @pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") class TestMaterializedViewRowFilter(RowFilterMixin): """Test row filters on materialized view models.""" @@ -177,6 +195,20 @@ def test_mv_row_filter_lifecycle(self, project): assert len(filters) == 0 +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewRowFilterDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestMaterializedViewRowFilter +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } + + @pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") class TestStreamingTableRowFilter(RowFilterMixin): """Test row filters on streaming table models.""" @@ -228,6 +260,20 @@ def test_streaming_table_row_filter_lifecycle(self, project): assert len(filters) == 0 +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestStreamingTableRowFilterDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestStreamingTableRowFilter +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + } + } + + @pytest.mark.skip_profile("databricks_cluster") class TestViewRowFilterFailure(MaterializationV2Mixin): """Test that row filters on regular views fail with clear error.""" diff --git a/tests/functional/adapter/views/test_views.py b/tests/functional/adapter/views/test_views.py index 20baaed5f..e00c1ca62 100644 --- a/tests/functional/adapter/views/test_views.py +++ b/tests/functional/adapter/views/test_views.py @@ -2,6 +2,7 @@ from agate import Row from dbt.tests import util +from tests.functional.adapter.fixtures import RequiresDescribeAsJsonCapabilityMixin from tests.functional.adapter.views import fixtures @@ -127,6 +128,27 @@ def project_config_update(self): } +@pytest.mark.skip_profile("databricks_cluster") +class TestUpdateViewViaAlterDescriptionDescribeJsonOn( + RequiresDescribeAsJsonCapabilityMixin, TestUpdateViewViaAlterDescription +): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "use_materialization_v2": True, + "use_describe_as_json_for_relation_metadata": True, + }, + "models": { + "+view_update_via_alter": True, + "+persist_docs": { + "relation": True, + "columns": True, + }, + }, + } + + @pytest.mark.skip_profile("databricks_cluster") class TestUpdateViewViaAlterNothing(BaseUpdateNothing): @pytest.fixture(scope="class") diff --git a/tests/unit/macros/adapters/test_metadata_macros.py b/tests/unit/macros/adapters/test_metadata_macros.py index 23eb0b0e9..643f99148 100644 --- a/tests/unit/macros/adapters/test_metadata_macros.py +++ b/tests/unit/macros/adapters/test_metadata_macros.py @@ -197,6 +197,13 @@ def test_check_schema_exists_sql_with_hyphenated_database(self, template_bundle) expected_sql = "SHOW SCHEMAS IN `data_engineering-uc-dev` LIKE 'my_schema'" self.assert_sql_equal(result, expected_sql) + def test_describe_table_extended_as_json_sql(self, template_bundle, relation): + result = self.run_macro( + template_bundle.template, "describe_table_extended_as_json_sql", relation + ) + expected_sql = "DESCRIBE TABLE EXTENDED `some_database`.`some_schema`.`some_table` AS JSON" + self.assert_sql_equal(result, expected_sql) + def test_case_sensitivity(self, template_bundle): relation = Mock() relation.database = "TEST_DB" diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 60f7dcfa8..0c2b5ca80 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -11,7 +11,12 @@ from dbt.config import RuntimeConfig from dbt_common.clients.agate_helper import merge_tables from dbt_common.events.event_manager_client import add_callback_to_manager -from dbt_common.exceptions import DbtConfigError, DbtDatabaseError, DbtValidationError +from dbt_common.exceptions import ( + DbtConfigError, + DbtDatabaseError, + DbtRuntimeError, + DbtValidationError, +) from dbt.adapters.databricks import DatabricksAdapter, __version__, constants from dbt.adapters.databricks.catalogs._relation import DatabricksCatalogRelation @@ -1323,10 +1328,133 @@ def test_get_columns_reraises_other_database_errors( ) +class TestFetchJsonMetadata(DatabricksAdapterBase): + @pytest.fixture + def adapter(self): + with patch("dbt.adapters.databricks.connections.DatabricksConnectionManager"): + adapter = DatabricksAdapter(self._get_config(), get_context("spawn")) + yield adapter + + @pytest.fixture + def relation(self): + return DatabricksRelation.create( + database="catalog", + schema="schema", + identifier="table", + type=DatabricksRelation.Table, + ) + + def _macro_result(self, json_metadata): + """Build a minimal execute_macro return value with one row carrying json_metadata.""" + row = Mock() + row.get = Mock(return_value=json_metadata) + result = Mock() + result.rows = [row] + return result + + def test_valid_json_metadata_parsed(self, adapter, relation): + """Happy path: well-formed json_metadata is returned as a dict.""" + with patch.object(adapter, "execute_macro", return_value=self._macro_result('{"a": 1}')): + assert adapter.fetch_json_metadata(relation) == {"a": 1} + + def test_malformed_json_wrapped(self, adapter, relation): + """A malformed JSON string triggers json.JSONDecodeError, wrapped by the adapter.""" + with patch.object(adapter, "execute_macro", return_value=self._macro_result("not-json")): + with pytest.raises( + DbtRuntimeError, + match="Failed to parse json metadata from describe table extended as json", + ): + adapter.fetch_json_metadata(relation) + + def test_none_json_metadata_wrapped(self, adapter, relation): + """A missing json_metadata column (None) triggers TypeError, wrapped by the adapter.""" + with patch.object(adapter, "execute_macro", return_value=self._macro_result(None)): + with pytest.raises( + DbtRuntimeError, + match="Failed to parse json metadata from describe table extended as json", + ): + adapter.fetch_json_metadata(relation) + + def test_empty_rows_wrapped(self, adapter, relation): + """No rows from execute_macro triggers IndexError, wrapped by the adapter.""" + empty_result = Mock() + empty_result.rows = [] + with patch.object(adapter, "execute_macro", return_value=empty_result): + with pytest.raises( + DbtRuntimeError, + match="Failed to parse json metadata from describe table extended as json", + ): + adapter.fetch_json_metadata(relation) + + +class TestIsDescribeAsJsonSupported(DatabricksAdapterBase): + @pytest.fixture + def adapter(self): + with patch("dbt.adapters.databricks.connections.DatabricksConnectionManager"): + adapter = DatabricksAdapter(self._get_config(), get_context("spawn")) + yield adapter + + def test_supported_for_uc_table_with_capability(self, adapter): + relation = DatabricksRelation.create( + database="catalog", + schema="schema", + identifier="table", + type=DatabricksRelation.Table, + ) + adapter.behavior.use_describe_as_json_for_relation_metadata = Mock(no_warn=True) + with patch.object(adapter, "has_capability", return_value=True): + assert adapter.is_describe_as_json_supported(relation) is True + + def test_not_supported_without_capability(self, adapter): + relation = DatabricksRelation.create( + database="catalog", + schema="schema", + identifier="table", + type=DatabricksRelation.Table, + ) + adapter.behavior.use_describe_as_json_for_relation_metadata = Mock(no_warn=True) + with patch.object(adapter, "has_capability", return_value=False): + assert adapter.is_describe_as_json_supported(relation) is False + + def test_not_supported_for_hive_metastore(self, adapter): + relation = DatabricksRelation.create( + database="hive_metastore", + schema="schema", + identifier="table", + type=DatabricksRelation.Table, + ) + adapter.behavior.use_describe_as_json_for_relation_metadata = Mock(no_warn=True) + with patch.object(adapter, "has_capability", return_value=True): + assert adapter.is_describe_as_json_supported(relation) is False + + def test_not_supported_for_foreign_table(self, adapter): + relation = DatabricksRelation.create( + database="catalog", + schema="schema", + identifier="table", + type=DatabricksRelationType.Foreign, + ) + adapter.behavior.use_describe_as_json_for_relation_metadata = Mock(no_warn=True) + with patch.object(adapter, "has_capability", return_value=True): + assert adapter.is_describe_as_json_supported(relation) is False + + def test_not_supported_when_behavior_flag_disabled(self, adapter): + relation = DatabricksRelation.create( + database="catalog", + schema="schema", + identifier="table", + type=DatabricksRelation.Table, + ) + adapter.behavior.use_describe_as_json_for_relation_metadata = Mock(no_warn=False) + with patch.object(adapter, "has_capability", return_value=True): + assert adapter.is_describe_as_json_supported(relation) is False + + class TestDescribeRelationMetadataFetchPlanning: @staticmethod - def _create_adapter(): + def _create_adapter(describe_as_json_supported: bool = False): adapter = Mock() + adapter.is_describe_as_json_supported.return_value = describe_as_json_supported def execute_macro(macro_name, kwargs=None): if macro_name == "get_view_description": diff --git a/tests/unit/test_dbr_capabilities.py b/tests/unit/test_dbr_capabilities.py index 7f02d8dcf..0ba052b34 100644 --- a/tests/unit/test_dbr_capabilities.py +++ b/tests/unit/test_dbr_capabilities.py @@ -14,40 +14,57 @@ class TestDBRCapabilities: def test_capability_enum_values(self): """Test that all capabilities have the expected values.""" - assert DBRCapability.TIMESTAMPDIFF.value == "timestampdiff" - assert DBRCapability.ICEBERG.value == "iceberg" assert DBRCapability.COMMENT_ON_COLUMN.value == "comment_on_column" + assert ( + DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON.value == "describe_table_extended_as_json" + ) + assert DBRCapability.ICEBERG.value == "iceberg" + assert DBRCapability.INSERT_BY_NAME.value == "insert_by_name" assert DBRCapability.JSON_COLUMN_METADATA.value == "json_column_metadata" + assert DBRCapability.REPLACE_ON.value == "replace_on" + assert DBRCapability.STREAMING_TABLE_JSON_METADATA.value == "streaming_table_json_metadata" + assert DBRCapability.TIMESTAMPDIFF.value == "timestampdiff" def test_old_dbr_version(self): """Test capabilities with old DBR version.""" capabilities = DBRCapabilities(dbr_version=(10, 0)) # Should not have newer features - assert not capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) - assert not capabilities.has_capability(DBRCapability.ICEBERG) assert not capabilities.has_capability(DBRCapability.COMMENT_ON_COLUMN) + assert not capabilities.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + assert not capabilities.has_capability(DBRCapability.ICEBERG) + assert not capabilities.has_capability(DBRCapability.INSERT_BY_NAME) assert not capabilities.has_capability(DBRCapability.JSON_COLUMN_METADATA) + assert not capabilities.has_capability(DBRCapability.REPLACE_ON) + assert not capabilities.has_capability(DBRCapability.STREAMING_TABLE_JSON_METADATA) + assert not capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) def test_modern_dbr_version(self): """Test capabilities with modern DBR version.""" - capabilities = DBRCapabilities(dbr_version=(16, 2)) + capabilities = DBRCapabilities(dbr_version=(17, 3)) # Should have all features up to 16.2 - assert capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) - assert capabilities.has_capability(DBRCapability.ICEBERG) assert capabilities.has_capability(DBRCapability.COMMENT_ON_COLUMN) + assert capabilities.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + assert capabilities.has_capability(DBRCapability.ICEBERG) + assert capabilities.has_capability(DBRCapability.INSERT_BY_NAME) assert capabilities.has_capability(DBRCapability.JSON_COLUMN_METADATA) + assert capabilities.has_capability(DBRCapability.REPLACE_ON) + assert capabilities.has_capability(DBRCapability.STREAMING_TABLE_JSON_METADATA) + assert capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) def test_sql_warehouse(self): """Test that SQL warehouses are assumed to have latest features.""" capabilities = DBRCapabilities(is_sql_warehouse=True) # SQL warehouses should have all supported features - assert capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) - assert capabilities.has_capability(DBRCapability.ICEBERG) assert capabilities.has_capability(DBRCapability.COMMENT_ON_COLUMN) + assert capabilities.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + assert capabilities.has_capability(DBRCapability.ICEBERG) + assert capabilities.has_capability(DBRCapability.INSERT_BY_NAME) assert capabilities.has_capability(DBRCapability.JSON_COLUMN_METADATA) + assert capabilities.has_capability(DBRCapability.REPLACE_ON) + assert capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) def test_sql_warehouse_unsupported_features(self): """Test that some features are not supported on SQL warehouses.""" @@ -58,17 +75,36 @@ def test_sql_warehouse_unsupported_features(self): def test_get_required_version(self): """Test getting required version strings.""" - assert DBRCapabilities.get_required_version(DBRCapability.TIMESTAMPDIFF) == "DBR 10.4+" - assert DBRCapabilities.get_required_version(DBRCapability.ICEBERG) == "DBR 14.3+" assert DBRCapabilities.get_required_version(DBRCapability.COMMENT_ON_COLUMN) == "DBR 16.1+" + assert ( + DBRCapabilities.get_required_version(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + == "DBR 17.3+" + ) + assert DBRCapabilities.get_required_version(DBRCapability.ICEBERG) == "DBR 14.3+" + assert DBRCapabilities.get_required_version(DBRCapability.INSERT_BY_NAME) == "DBR 12.2+" + assert ( + DBRCapabilities.get_required_version(DBRCapability.JSON_COLUMN_METADATA) == "DBR 16.2+" + ) + assert DBRCapabilities.get_required_version(DBRCapability.REPLACE_ON) == "DBR 17.1+" + assert ( + DBRCapabilities.get_required_version(DBRCapability.STREAMING_TABLE_JSON_METADATA) + == "DBR 17.1+" + ) + assert DBRCapabilities.get_required_version(DBRCapability.TIMESTAMPDIFF) == "DBR 10.4+" def test_no_connection(self): """Test behavior when not connected (no version info).""" capabilities = DBRCapabilities(dbr_version=None) # Without connection info, assume no capabilities - assert not capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) + assert not capabilities.has_capability(DBRCapability.COMMENT_ON_COLUMN) + assert not capabilities.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) assert not capabilities.has_capability(DBRCapability.ICEBERG) + assert not capabilities.has_capability(DBRCapability.INSERT_BY_NAME) + assert not capabilities.has_capability(DBRCapability.JSON_COLUMN_METADATA) + assert not capabilities.has_capability(DBRCapability.REPLACE_ON) + assert not capabilities.has_capability(DBRCapability.STREAMING_TABLE_JSON_METADATA) + assert not capabilities.has_capability(DBRCapability.TIMESTAMPDIFF) def test_enabled_capabilities_property(self): """Test the enabled_capabilities method.""" @@ -78,15 +114,18 @@ def test_enabled_capabilities_property(self): # Should include all capabilities supported by DBR 16.2 expected = { - DBRCapability.TIMESTAMPDIFF, - DBRCapability.ICEBERG, DBRCapability.COMMENT_ON_COLUMN, + DBRCapability.ICEBERG, + DBRCapability.INSERT_BY_NAME, DBRCapability.JSON_COLUMN_METADATA, + DBRCapability.TIMESTAMPDIFF, } assert expected.issubset(enabled) # Should not include capabilities requiring newer versions + assert DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON not in enabled + assert DBRCapability.REPLACE_ON not in enabled assert DBRCapability.STREAMING_TABLE_JSON_METADATA not in enabled @@ -137,6 +176,15 @@ def test_version_requirements(self): assert specs[DBRCapability.ICEBERG].min_version == (14, 3) assert specs[DBRCapability.COMMENT_ON_COLUMN].min_version == (16, 1) assert specs[DBRCapability.JSON_COLUMN_METADATA].min_version == (16, 2) + assert specs[DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON].min_version == (17, 3) + + def test_describe_json_boundary(self): + """Test DESCRIBE_TABLE_EXTENDED_AS_JSON is available at 17.3 but not 17.2.""" + unsupported = DBRCapabilities(dbr_version=(17, 2)) + assert not unsupported.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) + + supported = DBRCapabilities(dbr_version=(17, 3)) + assert supported.has_capability(DBRCapability.DESCRIBE_TABLE_EXTENDED_AS_JSON) def test_sql_warehouse_support_flags(self): """Test that SQL warehouse support is correctly specified.""" diff --git a/tests/unit/test_describe_json_metadata.py b/tests/unit/test_describe_json_metadata.py new file mode 100644 index 000000000..6ae09143d --- /dev/null +++ b/tests/unit/test_describe_json_metadata.py @@ -0,0 +1,1692 @@ +""" +Unit tests for DatabricksDescribeJsonMetadata parser. + +Tests the parsing of DESCRIBE TABLE EXTENDED ... AS JSON responses into +agate Tables that match the format expected by existing processors. +""" + +import pytest +from dbt_common.contracts.constraints import ConstraintType +from dbt_common.exceptions import DbtRuntimeError + +from dbt.adapters.databricks.constraints import ( + ForeignKeyConstraint, + PrimaryKeyConstraint, +) +from dbt.adapters.databricks.impl import DatabricksDescribeJsonMetadata +from dbt.adapters.databricks.relation_configs.column_mask import ( + ColumnMaskConfig, + ColumnMaskProcessor, +) +from dbt.adapters.databricks.relation_configs.constraints import ( + ConstraintsConfig, + ConstraintsProcessor, +) +from dbt.adapters.databricks.relation_configs.query import QueryConfig, QueryProcessor +from dbt.adapters.databricks.relation_configs.row_filter import ( + RowFilterConfig, + RowFilterProcessor, +) + +# Fixtures: minimal JSON samples with only fields relevant to parsing. + + +EMAIL_ADDRESSES_JSON = { + "columns": [ + {"name": "address_id", "nullable": False}, + {"name": "remote_user_id", "nullable": True}, + {"name": "email_address", "nullable": True}, + ], + "table_constraints": ( + "[(email_ad_pk,PRIMARY KEY (`address_id`))," + " (email_fk,FOREIGN KEY (`remote_user_id`)" + " REFERENCES `main`.`default`.`users` (`user_id`))]" + ), +} + +COLUMN_MASK_JSON = { + "column_masks": [ + { + "column_name": "phone_number", + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "mask_phone", + }, + "using_column_names": ["city"], + } + ], +} + +ROW_FILTER_JSON = { + "table_name": "table_with_row_filter", + "catalog_name": "default_catalog", + "schema_name": "default", + "row_filter": { + "function_name": { + "catalog_name": "default_catalog", + "schema_name": "default", + "function_name": "filter_by_region", + }, + "column_names": ["region"], + }, +} + +ROW_FILTER_MULTI_COLUMN_JSON = { + "table_name": "table_with_row_filter", + "catalog_name": "default_catalog", + "schema_name": "default", + "row_filter": { + "function_name": { + "catalog_name": "default_catalog", + "schema_name": "default", + "function_name": "filter_by_dept_and_region", + }, + "column_names": ["department", "region"], + }, +} + + +MATERIALIZED_VIEW_JSON = { + "view_text": "SELECT id, name FROM main.default.source_table", +} + +REGULAR_VIEW_JSON = { + "view_text": "SELECT id, name FROM main.default.other_table", +} + +PLAIN_TABLE_JSON = { + "columns": [ + {"name": "id", "nullable": True}, + {"name": "value", "nullable": True}, + ], +} + + +COMPOSITE_PK_JSON = { + "columns": [ + {"name": "id", "nullable": False}, + {"name": "name", "nullable": False}, + {"name": "value", "nullable": True}, + ], + "table_constraints": "[(id_name_pk,PRIMARY KEY (`id`, `name`))]", +} + +COMPOSITE_FK_JSON = { + "columns": [ + {"name": "id", "nullable": True}, + {"name": "ref_id", "nullable": True}, + {"name": "ref_name", "nullable": True}, + ], + "table_constraints": ( + "[(fk_pk,PRIMARY KEY (`id`))," + " (child_fk,FOREIGN KEY (`ref_id`, `ref_name`)" + " REFERENCES `main`.`default`.`parents` (`id`, `name`))]" + ), +} + +ALL_FIELDS_JSON = { + "table_name": "source", + "catalog_name": "main", + "schema_name": "default", + "columns": [ + {"name": "id", "nullable": False}, + {"name": "secret", "nullable": True}, + ], + "table_constraints": ( + "[(pk1,PRIMARY KEY (`id`))," + " (fk1,FOREIGN KEY (`id`)" + " REFERENCES `main`.`default`.`other` (`other_id`))]" + ), + "column_masks": [ + { + "column_name": "secret", + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "mask_secret", + }, + "using_column_names": ["id"], + } + ], + "row_filter": { + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "filter_secret", + }, + "column_names": ["id"], + }, + "view_text": "SELECT id, secret FROM main.default.source", +} + +MIXED_PK_FK_JSON = { + "columns": [ + {"name": "id", "nullable": False}, + {"name": "ref_id", "nullable": True}, + ], + "table_constraints": ( + "[(pk1,PRIMARY KEY (`id`))," + " (fk1,FOREIGN KEY (`ref_id`)" + " REFERENCES `main`.`default`.`other` (`other_id`))]" + ), +} + + +class TestParsePrimaryKeyConstraints: + def test_single_primary_key(self): + """Test PRIMARY KEY parsing with a single primary key constraint.""" + json_metadata = {"table_constraints": "[(pk1,PRIMARY KEY (`address_id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0][0] == "pk1" + assert result.rows[0]["constraint_name"] == "pk1" + assert result.rows[0][1] == "address_id" + assert result.rows[0]["column_name"] == "address_id" + + def test_no_primary_key(self): + """Test PRIMARY KEY parsing with no primary key constraints.""" + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `main`.`default`.`t` (`id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 0 + + def test_no_table_constraints_field(self): + """Test PRIMARY KEY parsing with no table_constraints field.""" + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints({}) + assert len(result.rows) == 0 + + def test_empty_string(self): + """Test PRIMARY KEY parsing with an empty string.""" + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints( + {"table_constraints": ""} + ) + assert len(result.rows) == 0 + + def test_spaces(self): + """Test PRIMARY KEY parsing is robust to excessive spaces between 'PRIMARY' and 'KEY'.""" + for num_extra_spaces in range(0, 40): + es = " " * num_extra_spaces # extra spaces + constraint_entry = f"{es}({es}pk1{es},{es}PRIMARY {es}KEY{es}({es}`col_1`{es}){es}){es}" + json_metadata = {"table_constraints": f"[{constraint_entry}]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "pk1" + assert row["constraint_name"] == "pk1" + assert row[1] == "col_1" + assert row["column_name"] == "col_1" + + def test_many_constraints(self): + """Test PRIMARY KEY constraint parsing with many constraints in one string.""" + constraint_count = 20 + constraint_entries = [ + f"(pk{i},PRIMARY KEY (`col_{i}`))" for i in range(1, constraint_count + 1) + ] + json_metadata = {"table_constraints": f"[{', '.join(constraint_entries)}]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == constraint_count + for row_index in range(constraint_count): + expected_constraint_name = f"pk{row_index + 1}" + expected_column_name = f"col_{row_index + 1}" + row = result.rows[row_index] + assert row[0] == expected_constraint_name + assert row["constraint_name"] == expected_constraint_name + assert row[1] == expected_column_name + assert row["column_name"] == expected_column_name + + def test_composite_with_many_columns(self): + """Test composite PRIMARY KEY with 1 to 20 columns.""" + for num_cols in range(1, 21): + cols = ", ".join(f"`col_{i}`" for i in range(1, num_cols + 1)) + json_metadata = {"table_constraints": f"[(pk1,PRIMARY KEY ({cols}))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == num_cols + for i in range(num_cols): + assert result.rows[i][0] == "pk1" + assert result.rows[i][1] == f"col_{i + 1}" + + def test_underscores_on_names(self): + """ + Test that PRIMARY KEY parsing works when table/column names in constraints + are qualified with varying numbers of leading/trailing underscores. + """ + for i in range(0, 20): + usc = "_" * i # underscores + column_name = f"{usc}id{usc}" + constraint_entry = f"(pk1,PRIMARY KEY (`{column_name}`))" + + json_metadata = {"table_constraints": f"[{constraint_entry}]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "pk1" + assert row["constraint_name"] == "pk1" + assert row[1] == column_name + assert row["column_name"] == column_name + + def test_hyphen_in_constraint_name(self): + json_metadata = {"table_constraints": "[(my-pk,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "my-pk" + assert result.rows[0]["column_name"] == "id" + + def test_at_sign_in_constraint_name(self): + json_metadata = {"table_constraints": "[(pk@1,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "pk@1" + assert result.rows[0]["column_name"] == "id" + + def test_comma_in_constraint_name(self): + json_metadata = {"table_constraints": "[(a,b,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "a,b" + assert result.rows[0]["column_name"] == "id" + + def test_paren_in_constraint_name(self): + json_metadata = {"table_constraints": "[(a(b,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "a(b" + assert result.rows[0]["column_name"] == "id" + + def test_backtick_in_constraint_name(self): + json_metadata = {"table_constraints": "[(p`a,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "p`a" + assert result.rows[0]["column_name"] == "id" + + def test_column_with_escaped_backtick(self): + json_metadata = {"table_constraints": "[(p-a4,PRIMARY KEY (`id``a`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "p-a4" + assert result.rows[0]["column_name"] == "id`a" + + def test_unicode_cjk_constraint_name(self): + """Constraint names with CJK ideographs (valid Unicode in Databricks).""" + json_metadata = {"table_constraints": "[(用户_pk,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "用户_pk" + assert result.rows[0]["column_name"] == "id" + + def test_unicode_acute_accent_column_name(self): + """Column names with acute-accented characters (e.g. é) inside backticks.""" + json_metadata = {"table_constraints": "[(pk1,PRIMARY KEY (`prénom`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "pk1" + assert result.rows[0]["column_name"] == "prénom" + + def test_unicode_diaeresis_column_name(self): + """Column names with diaeresis/umlaut characters (e.g. ï, ë).""" + json_metadata = {"table_constraints": "[(pk1,PRIMARY KEY (`cliënt_id`))]"} + result = DatabricksDescribeJsonMetadata.parse_primary_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "pk1" + assert result.rows[0]["column_name"] == "cliënt_id" + + def test_unexpected_exception_wrapped(self): + """A non-string constraint triggers an inner AttributeError that the wrapper converts.""" + with pytest.raises( + DbtRuntimeError, + match="Failed to parse primary key constraints from describe table extended as json", + ): + DatabricksDescribeJsonMetadata.parse_primary_key_constraints({"table_constraints": 123}) + + +class TestParseForeignKeyConstraints: + def test_single_column_foreign_key(self): + """Test FOREIGN KEY parsing with a single foreign key constraint.""" + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `main`.`default`.`users` (`user_id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "fk1" + assert row["constraint_name"] == "fk1" + assert row[1] == "ref_id" + assert row["from_column"] == "ref_id" + assert row[2] == "main" + assert row["to_catalog"] == "main" + assert row[3] == "default" + assert row["to_schema"] == "default" + assert row[4] == "users" + assert row["to_table"] == "users" + assert row[5] == "user_id" + assert row["to_column"] == "user_id" + + def test_composite_foreign_key(self): + """Test FOREIGN KEY parsing many columns.""" + for num_cols in range(1, 21): + from_cols = ", ".join(f"`from_{i}`" for i in range(1, num_cols + 1)) + to_cols = ", ".join(f"`to_{i}`" for i in range(1, num_cols + 1)) + json_metadata = { + "table_constraints": ( + f"[(cfk,FOREIGN KEY ({from_cols})" + f" REFERENCES `main`.`default`.`parents` ({to_cols}))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == num_cols + for i in range(num_cols): + row = result.rows[i] + assert row[0] == "cfk" + assert row["constraint_name"] == "cfk" + assert row[1] == f"from_{i + 1}" + assert row["from_column"] == f"from_{i + 1}" + assert row[2] == "main" + assert row[3] == "default" + assert row[4] == "parents" + assert row[5] == f"to_{i + 1}" + assert row["to_column"] == f"to_{i + 1}" + + def test_schema_with_hyphens(self): + """Test FOREIGN KEY parsing when the referenced schema contains hyphens.""" + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `main`.`my-schema`.`users` (`user_id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[3] == "my-schema" + assert row["to_schema"] == "my-schema" + + def test_foreign_key_with_primary_key(self): + """Test FOREIGN KEY parsing with mixed primary and foreign key constraints.""" + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(MIXED_PK_FK_JSON) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "fk1" + assert row["constraint_name"] == "fk1" + assert row[1] == "ref_id" + assert row["from_column"] == "ref_id" + + def test_no_foreign_key(self): + """Test FOREIGN KEY parsing with no foreign key constraints.""" + json_metadata = {"table_constraints": "[(pk1,PRIMARY KEY (`id`))]"} + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 0 + + def test_no_table_constraints_field(self): + """Test FOREIGN KEY parsing with no table_constraints field.""" + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints({}) + assert len(result.rows) == 0 + + def test_empty_string(self): + """Test FOREIGN KEY parsing with an empty string.""" + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints( + {"table_constraints": ""} + ) + assert len(result.rows) == 0 + + def test_spaces(self): + """Test FOREIGN KEY parsing is robust to excessive spaces between keywords.""" + for num_extra_spaces in range(0, 40): + es = " " * num_extra_spaces + constraint_entry = ( + f"{es}({es}fk1{es},{es}FOREIGN {es}KEY{es}({es}`ref_id`{es})" + f"{es}REFERENCES{es}`main`{es}.{es}`default`{es}.{es}`users`{es}" + f"({es}`user_id`{es}){es}){es}" + ) + json_metadata = {"table_constraints": f"[{constraint_entry}]"} + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "fk1" + assert row["constraint_name"] == "fk1" + assert row[1] == "ref_id" + assert row["from_column"] == "ref_id" + assert row[2] == "main" + assert row["to_catalog"] == "main" + assert row[3] == "default" + assert row["to_schema"] == "default" + assert row[4] == "users" + assert row["to_table"] == "users" + assert row[5] == "user_id" + assert row["to_column"] == "user_id" + + def test_many_constraints(self): + """Test FOREIGN KEY parsing with many constraints in one string.""" + constraint_count = 20 + constraint_entries = [ + ( + f"(fk{i},FOREIGN KEY (`ref_col_{i}`)" + f" REFERENCES `main`.`default`.`users_{i}` (`user_col_{i}`))" + ) + for i in range(1, constraint_count + 1) + ] + json_metadata = {"table_constraints": f"[{', '.join(constraint_entries)}]"} + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == constraint_count + for row_index in range(constraint_count): + expected_constraint_name = f"fk{row_index + 1}" + expected_from_column = f"ref_col_{row_index + 1}" + expected_to_table = f"users_{row_index + 1}" + expected_to_column = f"user_col_{row_index + 1}" + row = result.rows[row_index] + assert row[0] == expected_constraint_name + assert row["constraint_name"] == expected_constraint_name + assert row[1] == expected_from_column + assert row["from_column"] == expected_from_column + assert row[2] == "main" + assert row["to_catalog"] == "main" + assert row[3] == "default" + assert row["to_schema"] == "default" + assert row[4] == expected_to_table + assert row["to_table"] == expected_to_table + assert row[5] == expected_to_column + assert row["to_column"] == expected_to_column + + def test_underscores_on_names(self): + """Test FOREIGN KEY parsing with varying leading and trailing underscores.""" + for i in range(0, 20): + underscores = "_" * i + from_column = f"{underscores}ref_id{underscores}" + to_catalog = f"{underscores}main{underscores}" + to_schema = f"{underscores}default{underscores}" + to_table = f"{underscores}users{underscores}" + to_column = f"{underscores}user_id{underscores}" + constraint_entry = ( + f"(fk1,FOREIGN KEY (`{from_column}`)" + f" REFERENCES `{to_catalog}`.`{to_schema}`.`{to_table}` (`{to_column}`))" + ) + + json_metadata = {"table_constraints": f"[{constraint_entry}]"} + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row[0] == "fk1" + assert row["constraint_name"] == "fk1" + assert row[1] == from_column + assert row["from_column"] == from_column + assert row[2] == to_catalog + assert row["to_catalog"] == to_catalog + assert row[3] == to_schema + assert row["to_schema"] == to_schema + assert row[4] == to_table + assert row["to_table"] == to_table + assert row[5] == to_column + assert row["to_column"] == to_column + + def test_hyphen_in_constraint_name(self): + json_metadata = { + "table_constraints": ("[(my-fk,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "my-fk" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_at_sign_in_constraint_name(self): + json_metadata = { + "table_constraints": ("[(fk@1,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "fk@1" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_comma_in_constraint_name(self): + json_metadata = { + "table_constraints": ("[(a,b,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "a,b" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_paren_in_constraint_name(self): + json_metadata = { + "table_constraints": ("[(a(b,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "a(b" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_backtick_in_constraint_name(self): + json_metadata = { + "table_constraints": ("[(p`a,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "p`a" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_unicode_diaeresis_constraint_name(self): + """FK constraint name with diaeresis/umlaut characters (e.g. ï, ë).""" + json_metadata = { + "table_constraints": ( + "[(cliënt_fk,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["constraint_name"] == "cliënt_fk" + assert result.rows[0]["from_column"] == "ref_id" + assert result.rows[0]["to_table"] == "t" + + def test_unicode_cjk_referenced_identifiers(self): + """CJK ideographs inside backticked catalog/schema/table/column.""" + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`用户_id`) REFERENCES `主目录`.`架构`.`用户` (`编号`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + row = result.rows[0] + assert row["constraint_name"] == "fk1" + assert row["from_column"] == "用户_id" + assert row["to_catalog"] == "主目录" + assert row["to_schema"] == "架构" + assert row["to_table"] == "用户" + assert row["to_column"] == "编号" + + def test_from_column_with_escaped_backtick(self): + json_metadata = { + "table_constraints": ("[(fk1,FOREIGN KEY (`ref``id`) REFERENCES `c`.`s`.`t` (`id`))]") + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["from_column"] == "ref`id" + assert result.rows[0]["to_catalog"] == "c" + assert result.rows[0]["to_schema"] == "s" + assert result.rows[0]["to_table"] == "t" + assert result.rows[0]["to_column"] == "id" + + def test_referenced_table_with_escaped_backtick(self): + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`a`) REFERENCES `c`.`s`.`weird``tbl` (`id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["to_catalog"] == "c" + assert result.rows[0]["to_schema"] == "s" + assert result.rows[0]["to_table"] == "weird`tbl" + assert result.rows[0]["to_column"] == "id" + + def test_from_column_named_references(self): + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`REFERENCES`) REFERENCES `c`.`s`.`t` (`id`))]" + ) + } + result = DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0]["from_column"] == "REFERENCES" + assert result.rows[0]["to_catalog"] == "c" + assert result.rows[0]["to_schema"] == "s" + assert result.rows[0]["to_table"] == "t" + assert result.rows[0]["to_column"] == "id" + + def test_missing_references_clause_raises(self): + """A FOREIGN KEY body without a REFERENCES clause is malformed.""" + json_metadata = {"table_constraints": "[(fk1,FOREIGN KEY (`ref_id`))]"} + with pytest.raises(DbtRuntimeError, match="missing a REFERENCES"): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + + def test_one_part_referenced_name_raises(self): + """A 1-part reference is missing catalog and schema; must raise.""" + json_metadata = { + "table_constraints": "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `t` (`id`))]" + } + with pytest.raises(DbtRuntimeError, match="3-part"): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + + def test_two_part_referenced_name_raises(self): + """A 2-part reference is missing catalog; must raise.""" + json_metadata = { + "table_constraints": "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `s`.`t` (`id`))]" + } + with pytest.raises(DbtRuntimeError, match="3-part"): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + + def test_four_part_referenced_name_raises(self): + """A 4-part reference exceeds catalog.schema.table; must raise.""" + json_metadata = { + "table_constraints": ( + "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `a`.`b`.`c`.`d` (`id`))]" + ) + } + with pytest.raises(DbtRuntimeError, match="3-part"): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + + def test_mismatched_column_counts_raises(self): + """from-cols count must equal to-cols count; mismatch must raise.""" + json_metadata = { + "table_constraints": ("[(fk1,FOREIGN KEY (`a`, `b`) REFERENCES `c`.`s`.`t` (`x`))]") + } + with pytest.raises(DbtRuntimeError, match="3-part"): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints(json_metadata) + + def test_unexpected_exception_wrapped(self): + """A non-string constraints triggers an inner AttributeError that the wrapper converts.""" + with pytest.raises( + DbtRuntimeError, + match="Failed to parse foreign key constraints from describe table extended as json", + ): + DatabricksDescribeJsonMetadata.parse_foreign_key_constraints({"table_constraints": 123}) + + +class TestParseNonNullConstraints: + def test_mixed_nullable(self): + """Test parsing of non-null constraints when some columns are nullable and some are not.""" + json_metadata = { + "columns": [ + {"name": "id", "nullable": False}, + {"name": "email", "nullable": True}, + ] + } + result = DatabricksDescribeJsonMetadata.parse_non_null_constraints(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0][0] == "id" + assert result.rows[0]["column_name"] == "id" + + def test_all_nullable(self): + """Test parsing of non-null constraints when all columns are nullable.""" + json_metadata = { + "columns": [ + {"name": "a", "nullable": True}, + {"name": "b", "nullable": True}, + ] + } + result = DatabricksDescribeJsonMetadata.parse_non_null_constraints(json_metadata) + assert len(result.rows) == 0 + + def test_multiple_non_null(self): + """Test parsing of non-null constraints when multiple columns are non-nullable.""" + json_metadata = { + "columns": [ + {"name": "id", "nullable": False}, + {"name": "email", "nullable": False}, + {"name": "msg", "nullable": True}, + ] + } + result = DatabricksDescribeJsonMetadata.parse_non_null_constraints(json_metadata) + assert len(result.rows) == 2 + assert result.rows[0][0] == "id" + assert result.rows[0]["column_name"] == "id" + assert result.rows[1][0] == "email" + assert result.rows[1]["column_name"] == "email" + + def test_no_columns_key(self): + """Test parsing of non-null constraints when there is no 'columns' key in the input.""" + result = DatabricksDescribeJsonMetadata.parse_non_null_constraints({}) + assert len(result.rows) == 0 + + def test_unexpected_exception_wrapped(self): + """A column entry missing 'name' triggers an inner KeyError that the wrapper converts.""" + with pytest.raises( + DbtRuntimeError, + match="Failed to parse non-null constraints from describe table extended as json", + ): + DatabricksDescribeJsonMetadata.parse_non_null_constraints( + {"columns": [{"nullable": False}]} + ) + + +class TestParseColumnMasks: + def test_mask_with_using_columns(self): + result = DatabricksDescribeJsonMetadata.parse_column_masks(COLUMN_MASK_JSON) + assert len(result.rows) == 1 + assert result.rows[0][0] == "phone_number" + assert result.rows[0]["column_name"] == "phone_number" + assert result.rows[0][1] == "main.db.mask_phone" + assert result.rows[0]["mask_name"] == "main.db.mask_phone" + assert result.rows[0][2] == "city" + assert result.rows[0]["using_columns"] == "city" + + def test_mask_without_using_columns(self): + json_metadata = { + "column_masks": [ + { + "column_name": "ssn", + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "mask_ssn", + }, + "using_column_names": [], + } + ] + } + result = DatabricksDescribeJsonMetadata.parse_column_masks(json_metadata) + assert len(result.rows) == 1 + assert result.rows[0][0] == "ssn" + assert result.rows[0]["column_name"] == "ssn" + assert result.rows[0][1] == "main.db.mask_ssn" + assert result.rows[0]["mask_name"] == "main.db.mask_ssn" + assert result.rows[0][2] is None + assert result.rows[0]["using_columns"] is None + + def test_multiple_masks(self): + json_metadata = { + "column_masks": [ + { + "column_name": "col_a", + "function_name": { + "catalog_name": "c", + "schema_name": "s", + "function_name": "fn_a", + }, + "using_column_names": ["x"], + }, + { + "column_name": "col_b", + "function_name": { + "catalog_name": "c", + "schema_name": "s", + "function_name": "fn_b", + }, + "using_column_names": [], + }, + ] + } + result = DatabricksDescribeJsonMetadata.parse_column_masks(json_metadata) + assert len(result.rows) == 2 + assert result.rows[0][0] == "col_a" + assert result.rows[0]["column_name"] == "col_a" + assert result.rows[0][1] == "c.s.fn_a" + assert result.rows[0]["mask_name"] == "c.s.fn_a" + assert result.rows[0][2] == "x" + assert result.rows[0]["using_columns"] == "x" + assert result.rows[1][0] == "col_b" + assert result.rows[1]["column_name"] == "col_b" + assert result.rows[1][1] == "c.s.fn_b" + assert result.rows[1]["mask_name"] == "c.s.fn_b" + assert result.rows[1][2] is None + assert result.rows[1]["using_columns"] is None + + def test_no_column_masks_field(self): + result = DatabricksDescribeJsonMetadata.parse_column_masks({}) + assert len(result.rows) == 0 + + def test_empty_column_masks(self): + result = DatabricksDescribeJsonMetadata.parse_column_masks({"column_masks": []}) + assert len(result.rows) == 0 + + def test_mask_with_multiple_using_columns(self): + json_input = { + "column_masks": [ + { + "column_name": "secret", + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "mask_fn", + }, + "using_column_names": ["col1", "col2", "col3"], + } + ] + } + result = DatabricksDescribeJsonMetadata.parse_column_masks(json_input) + assert len(result.rows) == 1 + assert result.rows[0][0] == "secret" + assert result.rows[0]["column_name"] == "secret" + assert result.rows[0][1] == "main.db.mask_fn" + assert result.rows[0]["mask_name"] == "main.db.mask_fn" + assert result.rows[0][2] == "col1,col2,col3" + assert result.rows[0]["using_columns"] == "col1,col2,col3" + + def test_mask_missing_using_column_names_key(self): + json_input = { + "column_masks": [ + { + "column_name": "secret", + "function_name": { + "catalog_name": "main", + "schema_name": "db", + "function_name": "mask_fn", + }, + } + ] + } + result = DatabricksDescribeJsonMetadata.parse_column_masks(json_input) + assert len(result.rows) == 1 + assert result.rows[0][0] == "secret" + assert result.rows[0]["column_name"] == "secret" + assert result.rows[0][1] == "main.db.mask_fn" + assert result.rows[0]["mask_name"] == "main.db.mask_fn" + assert result.rows[0][2] is None + assert result.rows[0]["using_columns"] is None + + def test_unexpected_exception_wrapped(self): + """A mask missing 'function_name' triggers an inner KeyError that the wrapper converts.""" + with pytest.raises( + DbtRuntimeError, + match="Failed to parse column masks from describe table extended as json", + ): + DatabricksDescribeJsonMetadata.parse_column_masks( + {"column_masks": [{"column_name": "x"}]} + ) + + +class TestParseRowFilter: + def test_row_filter_with_single_target_column(self): + result = DatabricksDescribeJsonMetadata.parse_row_filter(ROW_FILTER_JSON) + assert len(result.rows) == 1 + assert result.rows[0][0] == "default_catalog" + assert result.rows[0]["table_catalog"] == "default_catalog" + assert result.rows[0][1] == "default" + assert result.rows[0]["table_schema"] == "default" + assert result.rows[0][2] == "table_with_row_filter" + assert result.rows[0]["table_name"] == "table_with_row_filter" + assert result.rows[0][3] == "default_catalog.default.filter_by_region" + assert result.rows[0]["filter_name"] == "default_catalog.default.filter_by_region" + assert result.rows[0][4] == "region" + assert result.rows[0]["target_columns"] == "region" + + def test_row_filter_with_multiple_target_columns(self): + result = DatabricksDescribeJsonMetadata.parse_row_filter(ROW_FILTER_MULTI_COLUMN_JSON) + assert len(result.rows) == 1 + assert result.rows[0][0] == "default_catalog" + assert result.rows[0]["table_catalog"] == "default_catalog" + assert result.rows[0][1] == "default" + assert result.rows[0]["table_schema"] == "default" + assert result.rows[0][2] == "table_with_row_filter" + assert result.rows[0]["table_name"] == "table_with_row_filter" + assert result.rows[0][3] == "default_catalog.default.filter_by_dept_and_region" + assert result.rows[0]["filter_name"] == "default_catalog.default.filter_by_dept_and_region" + assert result.rows[0][4] == "department,region" + assert result.rows[0]["target_columns"] == "department,region" + + def test_no_row_filter_field(self): + result = DatabricksDescribeJsonMetadata.parse_row_filter(PLAIN_TABLE_JSON) + assert len(result.rows) == 0 + + def test_unexpected_exception_wrapped(self): + """A missing 'function_name' triggers an inner KeyError that the wrapper converts.""" + with pytest.raises( + DbtRuntimeError, + match="Failed to parse row filter from describe table extended as json", + ): + DatabricksDescribeJsonMetadata.parse_row_filter( + { + "catalog_name": "c", + "schema_name": "s", + "table_name": "t", + "row_filter": {"column_names": ["x"]}, + } + ) + + +class TestParseViewDescription: + def test_with_view_text(self): + json_metadata = {"view_text": "SELECT id, name FROM main.default.source_table"} + result = DatabricksDescribeJsonMetadata.parse_view_description(json_metadata) + assert result["view_definition"] == "SELECT id, name FROM main.default.source_table" + + def test_without_view_text(self): + json_metadata = { + "columns": [ + {"name": "id", "nullable": True}, + {"name": "value", "nullable": True}, + ], + } + result = DatabricksDescribeJsonMetadata.parse_view_description(json_metadata) + assert len(result.values()) == 0 + + def test_null_view_text(self): + result = DatabricksDescribeJsonMetadata.parse_view_description({"view_text": None}) + assert len(result.values()) == 0 + + +class TestFromJsonMetadata: + def test_table_with_column_masks(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COLUMN_MASK_JSON) + assert len(metadata.column_masks.rows) == 1 + assert metadata.column_masks.rows[0][0] == "phone_number" + assert metadata.column_masks.rows[0]["column_name"] == "phone_number" + assert metadata.column_masks.rows[0][1] == "main.db.mask_phone" + assert metadata.column_masks.rows[0]["mask_name"] == "main.db.mask_phone" + assert metadata.column_masks.rows[0][2] == "city" + assert metadata.column_masks.rows[0]["using_columns"] == "city" + assert len(metadata.primary_key_constraints.rows) == 0 + assert len(metadata.foreign_key_constraints.rows) == 0 + + def test_table_with_row_filter(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + assert len(metadata.row_filters.rows) == 1 + assert metadata.row_filters.rows[0][0] == "default_catalog" + assert metadata.row_filters.rows[0]["table_catalog"] == "default_catalog" + assert metadata.row_filters.rows[0][1] == "default" + assert metadata.row_filters.rows[0]["table_schema"] == "default" + assert metadata.row_filters.rows[0][2] == "table_with_row_filter" + assert metadata.row_filters.rows[0]["table_name"] == "table_with_row_filter" + assert metadata.row_filters.rows[0][3] == "default_catalog.default.filter_by_region" + assert ( + metadata.row_filters.rows[0]["filter_name"] + == "default_catalog.default.filter_by_region" + ) + assert metadata.row_filters.rows[0][4] == "region" + assert metadata.row_filters.rows[0]["target_columns"] == "region" + assert len(metadata.primary_key_constraints.rows) == 0 + assert len(metadata.foreign_key_constraints.rows) == 0 + assert len(metadata.column_masks.rows) == 0 + + def test_materialized_view(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(MATERIALIZED_VIEW_JSON) + assert metadata.view_description["view_definition"] == ( + "SELECT id, name FROM main.default.source_table" + ) + assert len(metadata.primary_key_constraints.rows) == 0 + assert len(metadata.column_masks.rows) == 0 + + def test_all_fields_populated(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ALL_FIELDS_JSON) + # PK + assert len(metadata.primary_key_constraints.rows) == 1 + assert metadata.primary_key_constraints.rows[0]["constraint_name"] == "pk1" + assert metadata.primary_key_constraints.rows[0]["column_name"] == "id" + # FK + assert len(metadata.foreign_key_constraints.rows) == 1 + fk = metadata.foreign_key_constraints.rows[0] + assert fk["constraint_name"] == "fk1" + assert fk["from_column"] == "id" + assert fk["to_catalog"] == "main" + assert fk["to_schema"] == "default" + assert fk["to_table"] == "other" + assert fk["to_column"] == "other_id" + # Non-null + assert len(metadata.non_null_constraints.rows) == 1 + assert metadata.non_null_constraints.rows[0]["column_name"] == "id" + # Column masks + assert len(metadata.column_masks.rows) == 1 + assert metadata.column_masks.rows[0]["column_name"] == "secret" + assert metadata.column_masks.rows[0]["mask_name"] == "main.db.mask_secret" + assert metadata.column_masks.rows[0]["using_columns"] == "id" + # Row filters + assert len(metadata.row_filters.rows) == 1 + assert metadata.row_filters.rows[0]["table_catalog"] == "main" + assert metadata.row_filters.rows[0]["table_schema"] == "default" + assert metadata.row_filters.rows[0]["table_name"] == "source" + assert metadata.row_filters.rows[0]["filter_name"] == "main.db.filter_secret" + assert metadata.row_filters.rows[0]["target_columns"] == "id" + # View description + assert metadata.view_description["view_definition"] == ( + "SELECT id, secret FROM main.default.source" + ) + + def test_plain_table(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(PLAIN_TABLE_JSON) + assert len(metadata.primary_key_constraints.rows) == 0 + assert len(metadata.foreign_key_constraints.rows) == 0 + assert len(metadata.non_null_constraints.rows) == 0 + assert len(metadata.row_filters.rows) == 0 + assert len(metadata.column_masks.rows) == 0 + assert len(metadata.view_description.values()) == 0 + + def test_pk_with_column_named_foreign_key(self): + json_metadata = { + "columns": [{"name": "FOREIGN KEY", "nullable": False}], + "table_constraints": "[(pk1,PRIMARY KEY (`FOREIGN KEY`))]", + } + # Must not raise. + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + assert len(metadata.primary_key_constraints.rows) == 1 + assert metadata.primary_key_constraints.rows[0]["constraint_name"] == "pk1" + assert metadata.primary_key_constraints.rows[0]["column_name"] == "FOREIGN KEY" + assert len(metadata.foreign_key_constraints.rows) == 0 + + def test_fk_with_column_named_primary_key(self): + json_metadata = { + "columns": [{"name": "PRIMARY KEY", "nullable": True}], + "table_constraints": ( + "[(fk1,FOREIGN KEY (`PRIMARY KEY`) REFERENCES `c`.`s`.`t` (`id`))]" + ), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + assert len(metadata.primary_key_constraints.rows) == 0 + assert len(metadata.foreign_key_constraints.rows) == 1 + fk = metadata.foreign_key_constraints.rows[0] + assert fk["constraint_name"] == "fk1" + assert fk["from_column"] == "PRIMARY KEY" + assert fk["to_catalog"] == "c" + assert fk["to_schema"] == "s" + assert fk["to_table"] == "t" + assert fk["to_column"] == "id" + + +class TestParserToConstraintsProcessor: + @staticmethod + def _build_results(metadata): + return { + "non_null_constraint_columns": metadata.non_null_constraints, + "primary_key_constraints": metadata.primary_key_constraints, + "foreign_key_constraints": metadata.foreign_key_constraints, + } + + def test_single_pk_roundtrip(self): + json_metadata = { + "columns": [{"name": "id", "nullable": False}], + "table_constraints": "[(pk1,PRIMARY KEY (`id`))]", + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"id"}, + set_constraints={ + PrimaryKeyConstraint(type=ConstraintType.primary_key, name="pk1", columns=["id"]), + }, + ) + + def test_composite_pk_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_PK_JSON) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"id", "name"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="id_name_pk", + columns=["id", "name"], + ) + }, + ) + + def test_single_fk_roundtrip(self): + json_metadata = { + "columns": [{"name": "ref_id", "nullable": True}], + "table_constraints": ( + "[(fk1,FOREIGN KEY (`ref_id`) REFERENCES `main`.`default`.`other` (`other_id`))]" + ), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="fk1", + columns=["ref_id"], + to="`main`.`default`.`other`", + to_columns=["other_id"], + ) + }, + ) + + def test_composite_fk_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_FK_JSON) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="fk_pk", + columns=["id"], + ), + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="child_fk", + columns=["ref_id", "ref_name"], + to="`main`.`default`.`parents`", + to_columns=["id", "name"], + ), + }, + ) + + def test_mixed_constraints_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(EMAIL_ADDRESSES_JSON) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config.set_non_nulls == {"address_id"} + assert any( + isinstance(c, PrimaryKeyConstraint) and c.name == "email_ad_pk" + for c in config.set_constraints + ) + assert any( + isinstance(c, ForeignKeyConstraint) + and c.name == "email_fk" + and c.to == "`main`.`default`.`users`" + for c in config.set_constraints + ) + + def test_no_constraints_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(PLAIN_TABLE_JSON) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig(set_non_nulls=set(), set_constraints=set()) + + # ----- Corner-case roundtrips (constraint name) ----- + + def _assert_pk_roundtrip(self, table_constraints: str, expected_name: str) -> None: + json_metadata = { + "columns": [{"name": "id", "nullable": False}], + "table_constraints": table_constraints, + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"id"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, name=expected_name, columns=["id"] + ), + }, + ) + + def _assert_fk_roundtrip( + self, table_constraints: str, expected_name: str, expected_to: str = "`c`.`s`.`t`" + ) -> None: + json_metadata = { + "columns": [{"name": "ref_id", "nullable": True}], + "table_constraints": table_constraints, + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name=expected_name, + columns=["ref_id"], + to=expected_to, + to_columns=["id"], + ) + }, + ) + + def test_pk_hyphen_name_roundtrip(self): + self._assert_pk_roundtrip("[(my-pk,PRIMARY KEY (`id`))]", "my-pk") + + def test_pk_at_sign_name_roundtrip(self): + self._assert_pk_roundtrip("[(pk@1,PRIMARY KEY (`id`))]", "pk@1") + + def test_pk_comma_name_roundtrip(self): + self._assert_pk_roundtrip("[(a,b,PRIMARY KEY (`id`))]", "a,b") + + def test_pk_paren_name_roundtrip(self): + self._assert_pk_roundtrip("[(a(b,PRIMARY KEY (`id`))]", "a(b") + + def test_pk_backtick_name_roundtrip(self): + self._assert_pk_roundtrip("[(p`a,PRIMARY KEY (`id`))]", "p`a") + + def test_fk_hyphen_name_roundtrip(self): + self._assert_fk_roundtrip( + "[(my-fk,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]", "my-fk" + ) + + def test_fk_at_sign_name_roundtrip(self): + self._assert_fk_roundtrip( + "[(fk@1,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]", "fk@1" + ) + + def test_fk_comma_name_roundtrip(self): + self._assert_fk_roundtrip( + "[(a,b,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]", "a,b" + ) + + def test_fk_paren_name_roundtrip(self): + self._assert_fk_roundtrip( + "[(a(b,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]", "a(b" + ) + + def test_fk_backtick_name_roundtrip(self): + self._assert_fk_roundtrip( + "[(p`a,FOREIGN KEY (`ref_id`) REFERENCES `c`.`s`.`t` (`id`))]", "p`a" + ) + + # ----- Corner-case roundtrips (escaped-backtick identifiers) ----- + + def test_pk_escaped_backtick_column_roundtrip(self): + """Column literally named id`a (emitted as `id``a`).""" + json_metadata = { + "columns": [{"name": "id`a", "nullable": False}], + "table_constraints": "[(pk1,PRIMARY KEY (`id``a`))]", + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"id`a"}, + set_constraints={ + PrimaryKeyConstraint(type=ConstraintType.primary_key, name="pk1", columns=["id`a"]), + }, + ) + + def test_fk_escaped_backtick_from_column_roundtrip(self): + """From-column literally named ref`id (emitted as `ref``id`).""" + json_metadata = { + "columns": [{"name": "ref`id", "nullable": True}], + "table_constraints": ("[(fk1,FOREIGN KEY (`ref``id`) REFERENCES `c`.`s`.`t` (`id`))]"), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="fk1", + columns=["ref`id"], + to="`c`.`s`.`t`", + to_columns=["id"], + ) + }, + ) + + def test_fk_column_named_references_roundtrip(self): + """From-column named REFERENCES; the substring must not mis-split.""" + json_metadata = { + "columns": [{"name": "REFERENCES", "nullable": True}], + "table_constraints": ( + "[(fk1,FOREIGN KEY (`REFERENCES`) REFERENCES `c`.`s`.`t` (`id`))]" + ), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="fk1", + columns=["REFERENCES"], + to="`c`.`s`.`t`", + to_columns=["id"], + ) + }, + ) + + # ----- Unicode roundtrips ----- + + def test_pk_unicode_cjk_roundtrip(self): + """Constraint name and column with CJK ideographs.""" + json_metadata = { + "columns": [{"name": "用户名", "nullable": False}], + "table_constraints": "[(用户_pk,PRIMARY KEY (`用户名`))]", + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"用户名"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, name="用户_pk", columns=["用户名"] + ), + }, + ) + + def test_pk_unicode_acute_accent_roundtrip(self): + """Column with acute-accented characters (e.g. é).""" + json_metadata = { + "columns": [{"name": "prénom", "nullable": False}], + "table_constraints": "[(pk1,PRIMARY KEY (`prénom`))]", + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls={"prénom"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, name="pk1", columns=["prénom"] + ), + }, + ) + + def test_fk_unicode_diaeresis_roundtrip(self): + """Constraint name and column with diaeresis/umlaut characters (ï, ë).""" + json_metadata = { + "columns": [{"name": "cliënt_id", "nullable": True}], + "table_constraints": ( + "[(cliënt_fk,FOREIGN KEY (`cliënt_id`) REFERENCES `c`.`s`.`klanten` (`id`))]" + ), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="cliënt_fk", + columns=["cliënt_id"], + to="`c`.`s`.`klanten`", + to_columns=["id"], + ) + }, + ) + + def test_fk_unicode_cjk_referenced_identifiers_roundtrip(self): + """Catalog/schema/table/column on the FK reference all in CJK ideographs.""" + json_metadata = { + "columns": [{"name": "用户_id", "nullable": True}], + "table_constraints": ( + "[(fk1,FOREIGN KEY (`用户_id`) REFERENCES `主目录`.`架构`.`用户` (`编号`))]" + ), + } + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(json_metadata) + config = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + assert config == ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="fk1", + columns=["用户_id"], + to="`主目录`.`架构`.`用户`", + to_columns=["编号"], + ) + }, + ) + + +class TestParserToColumnMaskProcessor: + def test_mask_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COLUMN_MASK_JSON) + config = ColumnMaskProcessor.from_relation_results({"column_masks": metadata.column_masks}) + assert config == ColumnMaskConfig( + set_column_masks={ + "phone_number": { + "function": "main.db.mask_phone", + "using_columns": "city", + } + } + ) + + def test_no_masks_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(PLAIN_TABLE_JSON) + config = ColumnMaskProcessor.from_relation_results({"column_masks": metadata.column_masks}) + assert config == ColumnMaskConfig(set_column_masks={}) + + def test_mask_no_false_diff(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COLUMN_MASK_JSON) + existing = ColumnMaskProcessor.from_relation_results( + {"column_masks": metadata.column_masks} + ) + model = ColumnMaskConfig( + set_column_masks={ + "phone_number": { + "function": "main.db.mask_phone", + "using_columns": "city", + } + } + ) + assert model.get_diff(existing) is None + + def test_mask_diff_change_function(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COLUMN_MASK_JSON) + existing = ColumnMaskProcessor.from_relation_results( + {"column_masks": metadata.column_masks} + ) + model = ColumnMaskConfig( + set_column_masks={ + "phone_number": { + "function": "main.db.new_mask_fn", + "using_columns": "city", + } + } + ) + diff = model.get_diff(existing) + assert diff is not None + assert diff.set_column_masks == { + "phone_number": { + "function": "main.db.new_mask_fn", + "using_columns": "city", + } + } + + def test_mask_diff_add_new_mask(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COLUMN_MASK_JSON) + existing = ColumnMaskProcessor.from_relation_results( + {"column_masks": metadata.column_masks} + ) + model = ColumnMaskConfig( + set_column_masks={ + "phone_number": { + "function": "main.db.mask_phone", + "using_columns": "city", + }, + "ssn": { + "function": "main.db.mask_ssn", + }, + } + ) + diff = model.get_diff(existing) + assert diff is not None + assert "ssn" in diff.set_column_masks + assert "phone_number" not in diff.set_column_masks + + +class TestParserToRowFilterProcessor: + def test_row_filter_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + config = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + assert config == RowFilterConfig( + function="default_catalog.default.filter_by_region", + columns=("region",), + ) + + def test_multi_column_row_filter_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_MULTI_COLUMN_JSON) + config = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + assert config == RowFilterConfig( + function="default_catalog.default.filter_by_dept_and_region", + columns=("department", "region"), + ) + + def test_no_row_filter_roundtrip(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(PLAIN_TABLE_JSON) + config = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + assert config == RowFilterConfig() + + def test_row_filter_no_false_diff(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + existing = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + model = RowFilterConfig( + function="default_catalog.default.filter_by_region", + columns=("region",), + ) + assert model.get_diff(existing) is None + + def test_row_filter_diff_change_function(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + existing = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + model = RowFilterConfig( + function="default_catalog.default.filter_by_department", + columns=("region",), + ) + diff = model.get_diff(existing) + assert diff is not None + assert diff == RowFilterConfig( + function="default_catalog.default.filter_by_department", + columns=("region",), + is_change=True, + ) + + def test_row_filter_diff_change_columns(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + existing = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + model = RowFilterConfig( + function="default_catalog.default.filter_by_region", + columns=("department", "region"), + ) + diff = model.get_diff(existing) + assert diff is not None + assert diff == RowFilterConfig( + function="default_catalog.default.filter_by_region", + columns=("department", "region"), + is_change=True, + ) + + def test_row_filter_diff_unset(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(ROW_FILTER_JSON) + existing = RowFilterProcessor.from_relation_results({"row_filters": metadata.row_filters}) + model = RowFilterConfig() + diff = model.get_diff(existing) + assert diff is not None + assert diff == RowFilterConfig(should_unset=True, is_change=True) + + +class TestParserToQueryProcessor: + def test_mv_view_text_roundtrip(self): + view_desc = DatabricksDescribeJsonMetadata.parse_view_description(MATERIALIZED_VIEW_JSON) + config = QueryProcessor.from_relation_results({"information_schema.views": view_desc}) + assert config == QueryConfig(query="SELECT id, name FROM main.default.source_table") + + def test_view_text_roundtrip(self): + view_desc = DatabricksDescribeJsonMetadata.parse_view_description(REGULAR_VIEW_JSON) + config = QueryProcessor.from_relation_results({"information_schema.views": view_desc}) + assert config == QueryConfig(query="SELECT id, name FROM main.default.other_table") + + def test_view_text_with_outer_parens(self): + view_desc = DatabricksDescribeJsonMetadata.parse_view_description( + {"view_text": "(SELECT id FROM t)"} + ) + config = QueryProcessor.from_relation_results({"information_schema.views": view_desc}) + assert config == QueryConfig(query="SELECT id FROM t") + + +class TestParserToQueryDiff: + def test_no_false_diff_on_identical_query(self): + view_desc = DatabricksDescribeJsonMetadata.parse_view_description(MATERIALIZED_VIEW_JSON) + existing = QueryProcessor.from_relation_results({"information_schema.views": view_desc}) + model = QueryConfig(query="SELECT id, name FROM main.default.source_table") + assert model.get_diff(existing) is None + + def test_detects_real_query_change(self): + view_desc = DatabricksDescribeJsonMetadata.parse_view_description(MATERIALIZED_VIEW_JSON) + existing = QueryProcessor.from_relation_results({"information_schema.views": view_desc}) + model = QueryConfig(query="SELECT id FROM different_table") + diff = model.get_diff(existing) + assert diff is not None + assert diff.query == "SELECT id FROM different_table" + + +class TestParserToConstraintsDiff: + @staticmethod + def _build_results(metadata): + return { + "non_null_constraint_columns": metadata.non_null_constraints, + "primary_key_constraints": metadata.primary_key_constraints, + "foreign_key_constraints": metadata.foreign_key_constraints, + } + + def test_composite_pk_no_false_diff(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_PK_JSON) + existing = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + model = ConstraintsConfig( + set_non_nulls={"id", "name"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="id_name_pk", + columns=["id", "name"], + ) + }, + ) + assert model.get_diff(existing) is None + + def test_composite_pk_diff_add_column(self): + """Model adds a column to PK — diff should set new PK, unset old PK, set new non-null.""" + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_PK_JSON) + existing = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + model = ConstraintsConfig( + set_non_nulls={"id", "name", "value"}, + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="new_pk", + columns=["id", "name", "value"], + ) + }, + ) + diff = model.get_diff(existing) + assert diff is not None + assert diff.set_non_nulls == {"value"} + assert diff.unset_non_nulls == set() + assert len(diff.unset_constraints) == 1 + unset = next(iter(diff.unset_constraints)) + assert isinstance(unset, PrimaryKeyConstraint) + assert unset.name == "id_name_pk" + assert unset.columns == ["id", "name"] + assert len(diff.set_constraints) == 1 + added = next(iter(diff.set_constraints)) + assert isinstance(added, PrimaryKeyConstraint) + assert added.name == "new_pk" + assert added.columns == ["id", "name", "value"] + + def test_composite_fk_diff_change_target(self): + """Model changes FK target — diff should unset old FK, set new FK.""" + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_FK_JSON) + existing = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + model = ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="fk_pk", + columns=["id"], + ), + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="new_fk", + columns=["ref_id"], + to="`main`.`default`.`other_table`", + to_columns=["other_id"], + ), + }, + ) + diff = model.get_diff(existing) + assert diff is not None + assert diff.set_non_nulls == set() + assert diff.unset_non_nulls == set() + # Old FK unset + unset_fks = {c for c in diff.unset_constraints if isinstance(c, ForeignKeyConstraint)} + assert len(unset_fks) == 1 + unset_fk = next(iter(unset_fks)) + assert unset_fk.name == "child_fk" + assert unset_fk.columns == ["ref_id", "ref_name"] + assert unset_fk.to == "`main`.`default`.`parents`" + # New FK set + set_fks = {c for c in diff.set_constraints if isinstance(c, ForeignKeyConstraint)} + assert len(set_fks) == 1 + set_fk = next(iter(set_fks)) + assert set_fk.name == "new_fk" + assert set_fk.columns == ["ref_id"] + assert set_fk.to == "`main`.`default`.`other_table`" + assert set_fk.to_columns == ["other_id"] + + def test_composite_fk_no_false_diff(self): + metadata = DatabricksDescribeJsonMetadata.from_json_metadata(COMPOSITE_FK_JSON) + existing = ConstraintsProcessor.from_relation_results(self._build_results(metadata)) + model = ConstraintsConfig( + set_non_nulls=set(), + set_constraints={ + PrimaryKeyConstraint( + type=ConstraintType.primary_key, + name="fk_pk", + columns=["id"], + ), + ForeignKeyConstraint( + type=ConstraintType.foreign_key, + name="child_fk", + columns=["ref_id", "ref_name"], + to="`main`.`default`.`parents`", + to_columns=["id", "name"], + ), + }, + ) + assert model.get_diff(existing) is None