diff --git a/CHANGELOG.md b/CHANGELOG.md index 115ea1738..03bdcf957 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Under the Hood - **BREAKING:** `databricks_tags` defined at different hierarchy levels (e.g. project-level and model-level) now merge additively instead of the child config completely replacing the parent. +- Skip `information_schema.tags` and `information_schema.column_tags` metadata fetches when table tags and column tags are not configured on a model. ([#1387](https://github.com/databricks/dbt-databricks/pull/1387)) ## dbt-databricks 1.11.8 (TBD) diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index c32029ddf..e70083e9b 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -81,6 +81,9 @@ StreamingTableConfig, ) from dbt.adapters.databricks.relation_configs.table_format import TableFormat +from dbt.adapters.databricks.relation_configs.tags import ( + TagsProcessor, +) from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig from dbt.adapters.databricks.relation_configs.view import ViewConfig from dbt.adapters.databricks.utils import ( @@ -967,17 +970,21 @@ def parse_columns_and_constraints( return enriched_columns, parsed_constraints @available.parse(lambda *a, **k: {}) - def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase: + def get_relation_config( + self, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, + ) -> DatabricksRelationConfigBase: if relation.type == DatabricksRelationType.MaterializedView: - return MaterializedViewAPI.get_from_relation(self, relation) + return MaterializedViewAPI.get_from_relation(self, relation, model_config) elif relation.type == DatabricksRelationType.StreamingTable: - return StreamingTableAPI.get_from_relation(self, relation) + return StreamingTableAPI.get_from_relation(self, relation, model_config) elif relation.type == DatabricksRelationType.Table: - return IncrementalTableAPI.get_from_relation(self, relation) + return IncrementalTableAPI.get_from_relation(self, relation, model_config) elif relation.type == DatabricksRelationType.View: - return ViewAPI.get_from_relation(self, relation) + return ViewAPI.get_from_relation(self, relation, model_config) elif relation.type == DatabricksRelationType.MetricView: - return MetricViewAPI.get_from_relation(self, relation) + return MetricViewAPI.get_from_relation(self, relation, model_config) else: raise NotImplementedError(f"Relation type {relation.type} is not supported.") @@ -1060,12 +1067,15 @@ def config_type(cls) -> type[DatabricksRelationConfig]: @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" assert relation.type == cls.relation_type - results = cls._describe_relation(adapter, relation) + results = cls._describe_relation(adapter, relation, model_config) return cls.config_type().from_results(results) @classmethod @@ -1077,7 +1087,10 @@ def get_from_relation_config(cls, relation_config: RelationConfig) -> Databricks @classmethod @abstractmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: """Describe the relation and return the results.""" @@ -1087,11 +1100,14 @@ def _describe_relation( class DeltaLiveTableAPIBase(RelationAPIBase[DatabricksRelationConfig]): @classmethod def get_from_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> DatabricksRelationConfig: """Get the relation config from the relation.""" - relation_config = super().get_from_relation(adapter, relation) + relation_config = super().get_from_relation(adapter, relation, model_config) # Ensure any current refreshes are completed before returning the relation config tblproperties = cast(TblPropertiesConfig, relation_config.config["tblproperties"]) @@ -1111,7 +1127,10 @@ def config_type(cls) -> type[MaterializedViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1123,9 +1142,18 @@ def _describe_relation( results["information_schema.views"] = get_first_row( adapter.execute_macro("get_view_description", kwargs=kwargs) ) - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) + + # To be backward compatible model_config can be None. In that case, tags should be fetched + # to maintain backward compatibility. + table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + results["row_filters"] = adapter.execute_macro("fetch_row_filters", kwargs=kwargs) + return results @@ -1138,7 +1166,10 @@ def config_type(cls) -> type[StreamingTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: kwargs = {"table_name": relation} results: RelationResults = dict() @@ -1162,16 +1193,37 @@ def config_type(cls) -> type[IncrementalTableConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: results = {} kwargs = {"relation": relation} if not relation.is_hive_metastore(): - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) - results["information_schema.column_tags"] = adapter.execute_macro( - "fetch_column_tags", kwargs=kwargs + # To be backward compatible model_config can be None. In that case, tags should be + # fetched to maintain backward compatibility. + table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro( + "fetch_tags", kwargs=kwargs + ) + else: + results["information_schema.tags"] = None + + # To be backward compatible model_config can be None. In that case, tags should be + # fetched to maintain backward compatibility. + column_tag_config = ( + model_config.config.get(ColumnTagsProcessor.name) if model_config else None ) + if column_tag_config is None or column_tag_config.requires_server_metadata_for_diff(): + results["information_schema.column_tags"] = adapter.execute_macro( + "fetch_column_tags", kwargs=kwargs + ) + else: + results["information_schema.column_tags"] = None + results["non_null_constraint_columns"] = adapter.execute_macro( "fetch_non_null_constraint_columns", kwargs=kwargs ) @@ -1201,7 +1253,10 @@ def config_type(cls) -> type[ViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: results = {} kwargs = {"relation": relation} @@ -1209,7 +1264,15 @@ def _describe_relation( results["information_schema.views"] = get_first_row( adapter.execute_macro("get_view_description", kwargs=kwargs) ) - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + + # To be backward compatible model_config can be None. In that case, tags should be fetched + # to maintain backward compatibility. + table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) kwargs = {"table_name": relation} @@ -1228,14 +1291,25 @@ def config_type(cls) -> type[MetricViewConfig]: @classmethod def _describe_relation( - cls, adapter: DatabricksAdapter, relation: DatabricksRelation + cls, + adapter: DatabricksAdapter, + relation: DatabricksRelation, + model_config: Optional[DatabricksRelationConfigBase] = None, ) -> RelationResults: results = {} kwargs = {"relation": relation} - results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) results["show_tblproperties"] = adapter.execute_macro("fetch_tbl_properties", kwargs=kwargs) + + kwargs = {"relation": relation} + table_tag_config = model_config.config.get(TagsProcessor.name) if model_config else None + if table_tag_config is None or table_tag_config.requires_server_metadata_for_diff(): + results["information_schema.tags"] = adapter.execute_macro("fetch_tags", kwargs=kwargs) + else: + results["information_schema.tags"] = None + kwargs = {"table_name": relation} results["describe_extended"] = adapter.execute_macro( DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs ) + return results diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py index 92be6706a..cd358b529 100644 --- a/dbt/adapters/databricks/relation_configs/base.py +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -34,6 +34,12 @@ def get_diff(self, other: Self) -> Optional[Self]: return self return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return True + class DatabricksRelationChangeSet(BaseModel): """Class for encapsulating the changes that need to be applied to a Databricks relation.""" diff --git a/dbt/adapters/databricks/relation_configs/column_tags.py b/dbt/adapters/databricks/relation_configs/column_tags.py index 26c8f6aa5..0da92da82 100644 --- a/dbt/adapters/databricks/relation_configs/column_tags.py +++ b/dbt/adapters/databricks/relation_configs/column_tags.py @@ -41,6 +41,12 @@ def get_diff(self, other: "ColumnTagsConfig") -> Optional["ColumnTagsConfig"]: return ColumnTagsConfig(set_column_tags=set_column_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_column_tags is not None and len(self.set_column_tags) > 0 + class ColumnTagsProcessor(DatabricksComponentProcessor[ColumnTagsConfig]): name: ClassVar[str] = "column_tags" diff --git a/dbt/adapters/databricks/relation_configs/tags.py b/dbt/adapters/databricks/relation_configs/tags.py index 757c3fdf0..294a1776c 100644 --- a/dbt/adapters/databricks/relation_configs/tags.py +++ b/dbt/adapters/databricks/relation_configs/tags.py @@ -22,6 +22,12 @@ def get_diff(self, other: "TagsConfig") -> Optional["TagsConfig"]: return TagsConfig(set_tags=self.set_tags) return None + def requires_server_metadata_for_diff(self) -> bool: + """ + Indicates whether server metadata is required to compute the diff for this component. + """ + return self.set_tags is not None and len(self.set_tags) > 0 + class TagsProcessor(DatabricksComponentProcessor[TagsConfig]): name: ClassVar[str] = "tags" diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index c018d86c9..7972474e8 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -133,8 +133,8 @@ {{ set_overwrite_mode('DYNAMIC') }} {%- endif -%} {#-- Relation must be merged --#} - {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set _existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set _configuration_changes = model_config.get_changeset(_existing_config) -%} {%- call statement('create_temp_relation', language=language) -%} {{ create_table_as(True, temp_relation, compiled_code, language) }} @@ -241,8 +241,8 @@ {% macro process_config_changes(target_relation) %} {% set apply_config_changes = config.get('incremental_apply_config_changes', True) | as_bool %} {% if apply_config_changes %} - {%- set existing_config = adapter.get_relation_config(target_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(target_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {{ apply_config_changeset(target_relation, model, configuration_changes) }} {% endif %} diff --git a/dbt/include/databricks/macros/relations/config.sql b/dbt/include/databricks/macros/relations/config.sql index c5c867522..53c4a3a53 100644 --- a/dbt/include/databricks/macros/relations/config.sql +++ b/dbt/include/databricks/macros/relations/config.sql @@ -1,6 +1,6 @@ {%- macro get_configuration_changes(existing_relation) -%} - {%- set existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} + {%- set existing_config = adapter.get_relation_config(existing_relation, model_config) -%} {%- set configuration_changes = model_config.get_changeset(existing_config) -%} {% do return(configuration_changes) %} {%- endmacro -%} diff --git a/tests/functional/adapter/fixtures.py b/tests/functional/adapter/fixtures.py index 408158654..35ec7c60a 100644 --- a/tests/functional/adapter/fixtures.py +++ b/tests/functional/adapter/fixtures.py @@ -1,5 +1,21 @@ import pytest +fail_if_tag_fetch_called_macros = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} +""" + +fail_if_tag_and_column_tag_fetch_called_macros = """ +{% macro fetch_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_tags should not be called") }} +{% endmacro %} + +{% macro fetch_column_tags(relation) %} + {{ exceptions.raise_compiler_error("fetch_column_tags should not be called") }} +{% endmacro %} +""" + class MaterializationV1Mixin: @pytest.fixture(scope="class") diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index d50fa71f3..5ff35abc3 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -76,6 +76,47 @@ - name: color """ +metadata_fetch_incremental_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', +) }} + +select cast(1 as bigint) as id +""" + +metadata_fetch_no_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id +""" + +metadata_fetch_table_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + config: + databricks_tags: + classification: internal + columns: + - name: id +""" + +metadata_fetch_column_tags_schema = """ +version: 2 + +models: + - name: metadata_fetch_incremental + columns: + - name: id + databricks_tags: + classification: internal +""" + tblproperties_a = """ version: 2 diff --git a/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py new file mode 100644 index 000000000..ca31e18be --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_metadata_fetch_skips.py @@ -0,0 +1,80 @@ +import pytest +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests import util + +from tests.functional.adapter.fixtures import ( + fail_if_tag_and_column_tag_fetch_called_macros, +) +from tests.functional.adapter.incremental import fixtures + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_no_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_and_column_tag_fetch_called_macros} + + def test_second_incremental_run_succeeds_without_tag_fetches(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresTableTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_table_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_and_column_tag_fetch_called_macros} + + def test_second_incremental_run_fails_when_table_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + + run_execution_results = util.run_dbt(["run"], expect_pass=False) + assert len(run_execution_results.results) == 1 + result = run_execution_results.results[0] + + assert result.status == RunStatus.Error + assert "tags should not be called" in result.message + + +@pytest.mark.skip_profile("databricks_cluster") +class TestIncrementalMetadataFetchRequiresColumnTags: + @pytest.fixture(scope="class") + def models(self): + return { + "metadata_fetch_incremental.sql": fixtures.metadata_fetch_incremental_sql, + "schema.yml": fixtures.metadata_fetch_column_tags_schema, + } + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_and_column_tag_fetch_called_macros} + + def test_second_incremental_run_fails_when_column_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["run"]) + + run_execution_results = util.run_dbt(["run"], expect_pass=False) + assert len(run_execution_results.results) == 1 + result = run_execution_results.results[0] + + assert result.status == RunStatus.Error + assert "tags should not be called" in result.message diff --git a/tests/functional/adapter/materialized_view_tests/fixtures.py b/tests/functional/adapter/materialized_view_tests/fixtures.py index 855e6edcc..203314f6a 100644 --- a/tests/functional/adapter/materialized_view_tests/fixtures.py +++ b/tests/functional/adapter/materialized_view_tests/fixtures.py @@ -146,3 +146,23 @@ def query_relation_type(project, relation: BaseRelation) -> Optional[str]: config: liquid_clustered_by: [] """ + +metadata_fetch_mv_seed_csv = """id,value +1,100 +2,200 +""".lstrip() + +metadata_fetch_materialized_view_without_tags_sql = """ +{{ config( + materialized='materialized_view', +) }} +select * from {{ ref('mv_metadata_fetch_seed') }} +""" + +metadata_fetch_materialized_view_with_tags_sql = """ +{{ config( + materialized='materialized_view', + databricks_tags={'classification': 'internal'}, +) }} +select * from {{ ref('mv_metadata_fetch_seed') }} +""" diff --git a/tests/functional/adapter/materialized_view_tests/test_materialized_view_metadata_fetch_skips.py b/tests/functional/adapter/materialized_view_tests/test_materialized_view_metadata_fetch_skips.py new file mode 100644 index 000000000..ac2aa09cb --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_materialized_view_metadata_fetch_skips.py @@ -0,0 +1,58 @@ +import pytest +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests import util + +from tests.functional.adapter.fixtures import fail_if_tag_fetch_called_macros +from tests.functional.adapter.materialized_view_tests import fixtures + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewMetadataFetchSkips: + @pytest.fixture(scope="class") + def seeds(self): + return {"mv_metadata_fetch_seed.csv": fixtures.metadata_fetch_mv_seed_csv} + + @pytest.fixture(scope="class") + def models(self): + return {"mv_metadata_fetch.sql": fixtures.metadata_fetch_materialized_view_without_tags_sql} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_fetch_called_macros} + + def test_second_materialized_view_run_succeeds_without_tag_fetches(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["seed"]) + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.dlt +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewMetadataFetchRequiresTags: + @pytest.fixture(scope="class") + def seeds(self): + return {"mv_metadata_fetch_seed.csv": fixtures.metadata_fetch_mv_seed_csv} + + @pytest.fixture(scope="class") + def models(self): + return {"mv_metadata_fetch.sql": fixtures.metadata_fetch_materialized_view_with_tags_sql} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_fetch_called_macros} + + def test_second_materialized_view_run_fails_when_tag_fetch_is_required(self, project): + # The first run creates the relation; the second run exercises the existing-relation + # path where adapter.get_relation_config() may attempt metadata fetches. + util.run_dbt(["seed"]) + util.run_dbt(["run"]) + + run_execution_results = util.run_dbt(["run"], expect_pass=False) + assert len(run_execution_results.results) == 1 + result = run_execution_results.results[0] + + assert result.status == RunStatus.Error + assert "fetch_tags should not be called" in result.message diff --git a/tests/functional/adapter/views/fixtures.py b/tests/functional/adapter/views/fixtures.py index d9c86dc38..95fcf5960 100644 --- a/tests/functional/adapter/views/fixtures.py +++ b/tests/functional/adapter/views/fixtures.py @@ -58,3 +58,19 @@ {{ config(materialized='view') }} select id from {{ ref('seed') }}; """ + + +view_without_tags_sql = """ +{{ config(materialized='view') }} + +select cast(1 as bigint) as id +""" + +view_with_tags_sql = """ +{{ config( + materialized='view', + databricks_tags={'classification': 'internal'} +) }} + +select cast(1 as bigint) as id +""" diff --git a/tests/functional/adapter/views/test_view_metadata_fetch_skips.py b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py new file mode 100644 index 000000000..b349634e9 --- /dev/null +++ b/tests/functional/adapter/views/test_view_metadata_fetch_skips.py @@ -0,0 +1,64 @@ +import pytest +from dbt.artifacts.schemas.results import RunStatus +from dbt.tests import util + +from tests.functional.adapter.fixtures import fail_if_tag_fetch_called_macros +from tests.functional.adapter.views.fixtures import view_with_tags_sql, view_without_tags_sql + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchSkips: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": view_without_tags_sql} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_fetch_called_macros} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_succeeds_without_tag_fetches(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. + util.run_dbt(["run"]) + util.run_dbt(["run"]) + + +@pytest.mark.skip_profile("databricks_cluster") +class TestViewMetadataFetchRequiresTags: + @pytest.fixture(scope="class") + def models(self): + return {"view_metadata_fetch.sql": view_with_tags_sql} + + @pytest.fixture(scope="class") + def macros(self): + return {"fail_if_tag_fetch_called.sql": fail_if_tag_fetch_called_macros} + + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": {"use_materialization_v2": True}, + "models": { + "+view_update_via_alter": True, + }, + } + + def test_second_view_run_fails_when_tag_fetch_is_required(self, project): + # The first run creates the view; the second run exercises the existing-relation + # alter/config-diff path where adapter.get_relation_config() may fetch tags. + util.run_dbt(["run"]) + + run_execution_results = util.run_dbt(["run"], expect_pass=False) + assert len(run_execution_results.results) == 1 + result = run_execution_results.results[0] + + assert result.status == RunStatus.Error + assert "fetch_tags should not be called" in result.message diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index 226ee34e6..60f7dcfa8 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -20,7 +20,11 @@ CATALOG_KEY_IN_SESSION_PROPERTIES, ) from dbt.adapters.databricks.impl import ( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, DatabricksRelationInfo, + IncrementalTableAPI, + MaterializedViewAPI, + ViewAPI, get_identifier_list_string, ) from dbt.adapters.databricks.relation import ( @@ -28,6 +32,14 @@ DatabricksRelationType, DatabricksTableType, ) +from dbt.adapters.databricks.relation_configs.column_tags import ( + ColumnTagsConfig, + ColumnTagsProcessor, +) +from dbt.adapters.databricks.relation_configs.incremental import IncrementalTableConfig +from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig +from dbt.adapters.databricks.relation_configs.tags import TagsConfig, TagsProcessor +from dbt.adapters.databricks.relation_configs.view import ViewConfig from dbt.adapters.databricks.utils import check_not_found_error from tests.unit.utils import config_from_parts_or_dicts @@ -1311,6 +1323,233 @@ def test_get_columns_reraises_other_database_errors( ) +class TestDescribeRelationMetadataFetchPlanning: + @staticmethod + def _create_adapter(): + adapter = Mock() + + def execute_macro(macro_name, kwargs=None): + if macro_name == "get_view_description": + return Mock(rows=[("view_description",)]) + return f"{macro_name}_result" + + adapter.execute_macro.side_effect = execute_macro + return adapter + + @staticmethod + def _create_incremental_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_incremental_model", + type=DatabricksRelationType.Table, + ) + + @staticmethod + def _create_view_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_view_model", + type=DatabricksRelationType.View, + ) + + @staticmethod + def _create_mv_relation(database="main"): + return DatabricksRelation.create( + database=database, + schema="analytics", + identifier="my_mv_model", + type=DatabricksRelationType.MaterializedView, + ) + + @staticmethod + def _create_incremental_config( + tags: dict[str, str] | None = None, + column_tags: dict[str, dict[str, str]] | None = None, + ) -> IncrementalTableConfig: + return IncrementalTableConfig( + config={ + TagsProcessor.name: TagsConfig(set_tags=tags or {}), + ColumnTagsProcessor.name: ColumnTagsConfig(set_column_tags=column_tags or {}), + } + ) + + @staticmethod + def _create_view_config(tags: dict[str, str] | None = None) -> ViewConfig: + return ViewConfig(config={TagsProcessor.name: TagsConfig(set_tags=tags or {})}) + + @staticmethod + def _create_mv_config(tags: dict[str, str] | None = None) -> MaterializedViewConfig: + return MaterializedViewConfig(config={TagsProcessor.name: TagsConfig(set_tags=tags or {})}) + + @staticmethod + def _called_macro_names(adapter: Mock) -> list[str]: + return [call.args[0] for call in adapter.execute_macro.call_args_list] + + def test_incremental_describe_relation_skips_both_tag_queries_without_tags(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config() + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_incremental_describe_relation_fetches_only_table_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config(tags={"classification": "internal"}) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" not in called_macro_names + + def test_incremental_describe_relation_fetches_table_tags_from_project_level_cascade(self): + # Project-level databricks_tags cascade onto a model that doesn't declare + # its own. + adapter = self._create_adapter() + relation = self._create_incremental_relation() + + model = Mock() + model.config.extra = {"databricks_tags": {"team": "platform"}} + relation_config = IncrementalTableConfig( + config={ + TagsProcessor.name: TagsProcessor.from_relation_config(model), + ColumnTagsProcessor.name: ColumnTagsConfig(set_column_tags={}), + } + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" not in called_macro_names + + def test_incremental_describe_relation_fetches_only_column_tags_when_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + column_tags={"id": {"classification": "internal"}} + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_both_tag_queries_when_both_present(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_fetches_tag_queries_when_relation_config_is_none(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation() + + results = IncrementalTableAPI._describe_relation(adapter, relation, None) + + assert results["information_schema.tags"] == "fetch_tags_result" + assert results["information_schema.column_tags"] == "fetch_column_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "fetch_column_tags" in called_macro_names + + def test_incremental_describe_relation_skips_tag_queries_for_hive_metastore(self): + adapter = self._create_adapter() + relation = self._create_incremental_relation(database="hive_metastore") + relation_config = self._create_incremental_config( + tags={"classification": "internal"}, + column_tags={"id": {"classification": "internal"}}, + ) + + results = IncrementalTableAPI._describe_relation(adapter, relation, relation_config) + + assert "information_schema.tags" not in results + assert "information_schema.column_tags" not in results + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "fetch_column_tags" not in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + assert DESCRIBE_TABLE_EXTENDED_MACRO_NAME in called_macro_names + + def test_view_describe_relation_skips_tag_query_without_tags(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config() + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "get_view_description" in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + + def test_view_describe_relation_fetches_tag_query_when_tags_present(self): + adapter = self._create_adapter() + relation = self._create_view_relation() + relation_config = self._create_view_config(tags={"classification": "internal"}) + + results = ViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "get_view_description" in called_macro_names + + def test_mv_describe_relation_skips_tag_query_without_tags(self): + adapter = self._create_adapter() + relation = self._create_mv_relation() + relation_config = self._create_mv_config() + + results = MaterializedViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] is None + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" not in called_macro_names + assert "get_view_description" in called_macro_names + assert "fetch_tbl_properties" in called_macro_names + + def test_mv_describe_relation_fetches_tag_query_when_tags_present(self): + adapter = self._create_adapter() + relation = self._create_mv_relation() + relation_config = self._create_mv_config(tags={"classification": "internal"}) + + results = MaterializedViewAPI._describe_relation(adapter, relation, relation_config) + + assert results["information_schema.tags"] == "fetch_tags_result" + called_macro_names = self._called_macro_names(adapter) + assert "fetch_tags" in called_macro_names + assert "get_view_description" in called_macro_names + + class TestManagedIcebergBehaviorFlag(DatabricksAdapterBase): @pytest.fixture def adapter(self):