diff --git a/integration_tests/dbt_project/dbt_project.yml b/integration_tests/dbt_project/dbt_project.yml index 6a82f50c6..bb2663282 100644 --- a/integration_tests/dbt_project/dbt_project.yml +++ b/integration_tests/dbt_project/dbt_project.yml @@ -29,3 +29,6 @@ models: +schema: elementary +enabled: "{{ var('elementary_enabled', True) }}" +file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}" + +flags: + require_batched_execution_for_custom_microbatch_strategy: True \ No newline at end of file diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py new file mode 100644 index 000000000..dea35420a --- /dev/null +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -0,0 +1,143 @@ +from contextlib import contextmanager + +import pytest + +from dbt_project import DbtProject + + +def _microbatch_source_model_sql() -> str: + return """ +{{ config(event_time='order_date') }} +{% set event_time_data_type = 'datetime2' if target.type == 'sqlserver' else 'timestamp' %} + +select + 1 as order_id, + 1 as customer_id, + 42 as amount, + cast('2024-01-01 00:00:00' as {{ event_time_data_type }}) as order_date +union all +select + 2 as order_id, + 2 as customer_id, + 84 as amount, + cast('2025-01-01 00:00:00' as {{ event_time_data_type }}) as order_date +""" + + +def _microbatch_model_sql(source_model_name: str) -> str: + return """ +{% set model_config = { + "materialized": "incremental", + "incremental_strategy": "microbatch", + "event_time": "order_date", + "batch_size": "year", + "begin": "2024-01-01" +} %} +{% if target.type != "duckdb" %} + {% do model_config.update({"unique_key": "order_id"}) %} +{% endif %} +{{ config(**model_config) }} + +select + order_id, + customer_id, + amount, + order_date +from {{ ref('__MICROBATCH_SOURCE_MODEL__') }} +""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name) + + +@contextmanager +def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str): + source_model_name = f"mb_src_{model_suffix}" + target_model_name = f"mb_tgt_{model_suffix}" + source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql") + target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql") + + source_model_path.write_text(_microbatch_source_model_sql()) + target_model_path.write_text(_microbatch_model_sql(source_model_name)) + relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path) + relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path) + try: + yield relative_source_model_path, relative_target_model_path, target_model_name + finally: + if source_model_path.exists(): + source_model_path.unlink() + if target_model_path.exists(): + target_model_path.unlink() + + +def _run_microbatch_model_and_get_latest_success_result( + dbt_project: DbtProject, model_suffix: str +): + with _with_microbatch_test_models(dbt_project, model_suffix) as ( + source_model_path, + model_path, + target_model_name, + ): + dbt_project.dbt_runner.run( + select=f"{source_model_path} {model_path}" + ) + + unique_id = f"model.elementary_tests.{target_model_name}" + run_results = dbt_project.read_table( + "dbt_run_results", + where=f"unique_id = '{unique_id}' and status = 'success'", + order_by="generated_at desc", + limit=1, + ) + return run_results + + +@contextmanager +def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): + macro_path = ( + dbt_project.project_dir_path / "macros" / "microbatch.sql" + ) + macro_sql = """ +{% macro __MACRO_NAME__(arg_dict) %} + {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} +{% endmacro %} +""".replace("__MACRO_NAME__", macro_name) + if macro_path.exists(): + raise FileExistsError(f"Expected no macro file at {macro_path}") + + macro_path.write_text(macro_sql) + try: + yield + finally: + if macro_path.exists(): + macro_path.unlink() + + +@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"]) +@pytest.mark.skip_for_dbt_fusion +@pytest.mark.parametrize( + "macro_name,expected_compiled_code,model_suffix", + [ + ("get_incremental_microbatch_sql", True, "with_override"), + ("get_incremental_microbatch_sql_not_used", False, "without_override"), + ], + ids=["with_override", "without_override"], +) +def test_microbatch_run_results_compiled_code_behavior( + dbt_project: DbtProject, + macro_name: str, + expected_compiled_code: bool, + model_suffix: str, +): + dbt_project.dbt_runner.vars["disable_run_results"] = False + + with _with_microbatch_macro_file(dbt_project, macro_name): + run_results = _run_microbatch_model_and_get_latest_success_result( + dbt_project, model_suffix + ) + assert run_results, "Expected a successful run result row for microbatch model" + if expected_compiled_code: + assert run_results[0]["compiled_code"], ( + "Expected compiled_code to be populated when override macro is present" + ) + else: + assert not run_results[0]["compiled_code"], ( + "Expected compiled_code to stay empty when override macro is absent" + ) diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql new file mode 100644 index 000000000..856dd91cf --- /dev/null +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -0,0 +1,50 @@ +{#- + NOTE FOR PACKAGE CONSUMERS: + This package macro is not guaranteed to be picked up automatically by dbt's + incremental strategy resolution in all projects. + To apply this behavior, users should: + 1) Override `get_incremental_microbatch_sql` in their own project and delegate to + `elementary.get_incremental_microbatch_sql(arg_dict)`. + 2) Enable dbt behavior flag `require_batched_execution_for_custom_microbatch_strategy`. + + This flow is currently not supported for adapters: + - spark + - bigquery + - athena + - clickhouse + - dremio + - vertica + + This flow is currently not supported for dbt Fusion. +-#} +{% macro get_incremental_microbatch_sql(arg_dict) %} + {% if execute and model is defined %} + {% do elementary.capture_microbatch_compiled_code_for_model() %} + {% endif %} + + {{ return(adapter.dispatch("get_incremental_microbatch_sql", "dbt")(arg_dict)) }} +{% endmacro %} + + +{% macro capture_microbatch_compiled_code_for_model() %} + {% set model_unique_id = ( + model.get("unique_id") if model is mapping else model.unique_id + ) | default(none, true) %} + {% set model_compiled_code = ( + model.get("compiled_code") if model is mapping else model.compiled_code + ) | default(none, true) %} + {% if model_unique_id is none %} + {{ return(none) }} + {% endif %} + {% if not model_compiled_code %} + {{ return(none) }} + {% endif %} + + {% set compiled_code_by_unique_id = elementary.get_cache( + "microbatch_compiled_code_by_unique_id" + ) %} + {% if model_unique_id in compiled_code_by_unique_id %} + {{ return(none) }} + {% endif %} + {% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %} +{% endmacro %} diff --git a/macros/edr/tests/on_run_start/init_elementary_graph.sql b/macros/edr/tests/on_run_start/init_elementary_graph.sql index 576cf2d38..00fda83de 100644 --- a/macros/edr/tests/on_run_start/init_elementary_graph.sql +++ b/macros/edr/tests/on_run_start/init_elementary_graph.sql @@ -11,6 +11,7 @@ }, "temp_test_table_relations_map": {}, "duration_context_stack": {}, + "microbatch_compiled_code_by_unique_id": {}, }, ) %} {% endmacro %} diff --git a/macros/utils/graph/get_compiled_code.sql b/macros/utils/graph/get_compiled_code.sql index 69ddcfb94..6ce92316e 100644 --- a/macros/utils/graph/get_compiled_code.sql +++ b/macros/utils/graph/get_compiled_code.sql @@ -1,5 +1,11 @@ {% macro get_compiled_code(node, as_column_value=false) %} - {% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %} + {% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %} + {% if not compiled_code and node and node.get("unique_id") %} + {% set compiled_code = elementary.get_cache( + "microbatch_compiled_code_by_unique_id", {} + ).get(node.get("unique_id")) %} + {% endif %} + {% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %} {% set max_column_size = elementary.get_column_size() %} {% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %} @@ -9,12 +15,11 @@ {% do return(compiled_code) %} {% endmacro %} -{% macro default__get_compiled_code(node) %} - {% do return(node.get("compiled_code") or node.get("compiled_sql")) %} +{% macro default__format_compiled_code(compiled_code) %} + {% do return(compiled_code) %} {% endmacro %} -{% macro redshift__get_compiled_code(node) %} - {% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %} +{% macro redshift__format_compiled_code(compiled_code) %} {% if not compiled_code %} {% do return(none) %} {% else %} {% do return(compiled_code.replace("%", "%%")) %} {% endif %}