From 36e1956482e22cd350eb6dd45e7af113a9f2f676 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Sun, 3 May 2026 23:48:31 +0300 Subject: [PATCH 01/17] overriding microbatch macro for capturing the compiled code of the microbatch models --- .../dbt_project/macros/microbatch.sql | 3 ++ .../test_microbatch_compiled_code.py | 40 +++++++++++++++++++ .../capture_microbatch_compiled_code.sql | 38 ++++++++++++++++++ macros/utils/graph/get_compiled_code.sql | 5 +++ 4 files changed, 86 insertions(+) create mode 100644 integration_tests/dbt_project/macros/microbatch.sql create mode 100644 integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py create mode 100644 macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql diff --git a/integration_tests/dbt_project/macros/microbatch.sql b/integration_tests/dbt_project/macros/microbatch.sql new file mode 100644 index 000000000..a42567bae --- /dev/null +++ b/integration_tests/dbt_project/macros/microbatch.sql @@ -0,0 +1,3 @@ +{% macro get_incremental_microbatch_sql(arg_dict) %} + {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} +{% endmacro %} diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py new file mode 100644 index 000000000..e25841a98 --- /dev/null +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -0,0 +1,40 @@ +from dbt_project import DbtProject + + +def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): + dbt_project.dbt_runner.vars["disable_run_results"] = False + + model_sql = """ +{{ config( + materialized='incremental', + incremental_strategy='microbatch', + event_time='order_date', + batch_size='day', + begin='2025-03-01', + unique_key='order_id' +) }} + +select + order_id, + customer_id, + amount, + cast('2025-03-01 00:00:00+00:00' as timestamp) as order_date +from {{ ref('stg_orders') }} +""" + + with dbt_project.create_temp_model_for_existing_table( + test_id, raw_code=model_sql + ) as model_path: + dbt_project.dbt_runner.run(select=str(model_path)) + + unique_id = f"model.elementary_tests.{test_id}" + run_results = dbt_project.read_table( + "dbt_run_results", + where=f"unique_id = '{unique_id}' and status = 'success'", + order_by="generated_at desc", + limit=1, + ) + assert run_results, "Expected a successful run result row for microbatch model" + assert run_results[0]["compiled_code"], ( + "Expected compiled_code to be populated for successful microbatch model run result" + ) diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql new file mode 100644 index 000000000..c38c35ae7 --- /dev/null +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -0,0 +1,38 @@ +{#- + NOTE FOR PACKAGE CONSUMERS: + This package macro is not guaranteed to be picked up automatically by dbt's + incremental strategy resolution in all projects. + To apply this behavior, users should override `get_incremental_microbatch_sql` + in their own project and delegate to `elementary.get_incremental_microbatch_sql(arg_dict)` +-#} +{% macro get_incremental_microbatch_sql(arg_dict) %} + {% if execute and model is defined %} + {% do elementary.capture_microbatch_compiled_code_for_model() %} + {% endif %} + + {{ return(adapter.dispatch("get_incremental_microbatch_sql", "dbt")(arg_dict)) }} +{% endmacro %} + + +{% macro capture_microbatch_compiled_code_for_model() %} + {% set model_unique_id = model.get("unique_id") %} + {% set model_compiled_code = model.get("compiled_code") %} + {% if model_unique_id is none %} + {{ return(none) }} + {% endif %} + {% if not model_compiled_code %} + {{ return(none) }} + {% endif %} + + {% set compiled_code_by_unique_id = elementary.get_cache( + "microbatch_compiled_code_by_unique_id", {} + ) %} + {% if model_unique_id in compiled_code_by_unique_id %} + {{ return(none) }} + {% endif %} + + {% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %} + {% do elementary.set_cache( + "microbatch_compiled_code_by_unique_id", compiled_code_by_unique_id + ) %} +{% endmacro %} diff --git a/macros/utils/graph/get_compiled_code.sql b/macros/utils/graph/get_compiled_code.sql index 69ddcfb94..199924393 100644 --- a/macros/utils/graph/get_compiled_code.sql +++ b/macros/utils/graph/get_compiled_code.sql @@ -1,5 +1,10 @@ {% macro get_compiled_code(node, as_column_value=false) %} {% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %} + {% if not compiled_code and node and node.get("unique_id") %} + {% set compiled_code = elementary.get_cache( + "microbatch_compiled_code_by_unique_id", {} + ).get(node.get("unique_id")) %} + {% endif %} {% set max_column_size = elementary.get_column_size() %} {% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %} From bbd163d00be93fbad01890039f9d69fbbf960331 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 00:53:41 +0300 Subject: [PATCH 02/17] fix test --- .../test_microbatch_compiled_code.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index e25841a98..cb19f17db 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -9,17 +9,17 @@ def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: Dbt materialized='incremental', incremental_strategy='microbatch', event_time='order_date', - batch_size='day', + batch_size='year', begin='2025-03-01', unique_key='order_id' ) }} select - order_id, - customer_id, - amount, - cast('2025-03-01 00:00:00+00:00' as timestamp) as order_date -from {{ ref('stg_orders') }} + cast(one as int) as order_id, + 1 as customer_id, + 42 as amount, + {{ dbt.current_timestamp() }} as order_date +from {{ ref('one') }} """ with dbt_project.create_temp_model_for_existing_table( From b6aac494e0b655a67290d53243781cecd67595c8 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 11:39:17 +0300 Subject: [PATCH 03/17] test fixes --- .../test_microbatch_compiled_code.py | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index cb19f17db..0e4328bc3 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -1,21 +1,34 @@ +import pytest + from dbt_project import DbtProject +@pytest.mark.skip_targets(["vertica"]) +@pytest.mark.skip_for_dbt_fusion def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_run_results"] = False model_sql = """ -{{ config( - materialized='incremental', - incremental_strategy='microbatch', - event_time='order_date', - batch_size='year', - begin='2025-03-01', - unique_key='order_id' -) }} +{% set model_config = { + "materialized": "incremental", + "incremental_strategy": "microbatch", + "event_time": "order_date", + "batch_size": "year", + "begin": "2025-03-01", + "unique_key": "order_id" +} %} +{% if target.type == "bigquery" %} + {% do model_config.update( + {"partition_by": {"field": "order_date", "data_type": "timestamp", "granularity": "year"}} + ) %} +{% endif %} +{% if target.type == "athena" %} + {% do model_config.update({"partitioned_by": ["order_date"]}) %} +{% endif %} +{{ config(**model_config) }} select - cast(one as int) as order_id, + cast({{ elementary.escape_reserved_keywords("one") }} as int) as order_id, 1 as customer_id, 42 as amount, {{ dbt.current_timestamp() }} as order_date From cecb3a6f08ade872dcd0304eb1ad5a2ff667c749 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 12:19:58 +0300 Subject: [PATCH 04/17] test fixes --- .../test_microbatch_compiled_code.py | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 0e4328bc3..180338a19 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -1,14 +1,13 @@ +from contextlib import contextmanager +from pathlib import Path + import pytest from dbt_project import DbtProject -@pytest.mark.skip_targets(["vertica"]) -@pytest.mark.skip_for_dbt_fusion -def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): - dbt_project.dbt_runner.vars["disable_run_results"] = False - - model_sql = """ +def _microbatch_model_sql() -> str: + return """ {% set model_config = { "materialized": "incremental", "incremental_strategy": "microbatch", @@ -28,15 +27,19 @@ def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: Dbt {{ config(**model_config) }} select - cast({{ elementary.escape_reserved_keywords("one") }} as int) as order_id, + 1 as order_id, 1 as customer_id, 42 as amount, {{ dbt.current_timestamp() }} as order_date from {{ ref('one') }} """ + +def _run_microbatch_model_and_get_latest_success_result( + dbt_project: DbtProject, test_id: str +): with dbt_project.create_temp_model_for_existing_table( - test_id, raw_code=model_sql + test_id, raw_code=_microbatch_model_sql() ) as model_path: dbt_project.dbt_runner.run(select=str(model_path)) @@ -47,7 +50,50 @@ def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: Dbt order_by="generated_at desc", limit=1, ) + return run_results + + +@contextmanager +def _without_microbatch_override_macro(dbt_project: DbtProject): + macro_path = ( + dbt_project.project_dir_path / "macros" / "microbatch.sql" + ) + backup_path = macro_path.with_suffix(".sql.bak") + if not macro_path.exists(): + raise FileNotFoundError(f"Expected macro file at {macro_path}") + + macro_path.rename(backup_path) + try: + yield + finally: + if backup_path.exists(): + backup_path.rename(macro_path) + + +@pytest.mark.skip_targets(["vertica"]) +@pytest.mark.skip_for_dbt_fusion +def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): + dbt_project.dbt_runner.vars["disable_run_results"] = False + + run_results = _run_microbatch_model_and_get_latest_success_result(dbt_project, test_id) assert run_results, "Expected a successful run result row for microbatch model" assert run_results[0]["compiled_code"], ( "Expected compiled_code to be populated for successful microbatch model run result" ) + + +@pytest.mark.skip_targets(["vertica"]) +@pytest.mark.skip_for_dbt_fusion +def test_microbatch_run_results_without_override_has_empty_compiled_code( + test_id: str, dbt_project: DbtProject +): + dbt_project.dbt_runner.vars["disable_run_results"] = False + + with _without_microbatch_override_macro(dbt_project): + run_results = _run_microbatch_model_and_get_latest_success_result( + dbt_project, test_id + ) + assert run_results, "Expected a successful run result row for microbatch model" + assert not run_results[0]["compiled_code"], ( + "Expected compiled_code to stay empty when microbatch override macro is absent" + ) From 4271d1f8e448f7eeb070e4528b1927207e45da4e Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 12:52:31 +0300 Subject: [PATCH 05/17] test fixes --- .../dbt_project/macros/microbatch.sql | 3 -- .../test_microbatch_compiled_code.py | 28 ++++++++++--------- 2 files changed, 15 insertions(+), 16 deletions(-) delete mode 100644 integration_tests/dbt_project/macros/microbatch.sql diff --git a/integration_tests/dbt_project/macros/microbatch.sql b/integration_tests/dbt_project/macros/microbatch.sql deleted file mode 100644 index a42567bae..000000000 --- a/integration_tests/dbt_project/macros/microbatch.sql +++ /dev/null @@ -1,3 +0,0 @@ -{% macro get_incremental_microbatch_sql(arg_dict) %} - {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} -{% endmacro %} diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 180338a19..4ea65577f 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -1,5 +1,4 @@ from contextlib import contextmanager -from pathlib import Path import pytest @@ -54,20 +53,23 @@ def _run_microbatch_model_and_get_latest_success_result( @contextmanager -def _without_microbatch_override_macro(dbt_project: DbtProject): +def _with_microbatch_override_macro(dbt_project: DbtProject): macro_path = ( dbt_project.project_dir_path / "macros" / "microbatch.sql" ) - backup_path = macro_path.with_suffix(".sql.bak") - if not macro_path.exists(): - raise FileNotFoundError(f"Expected macro file at {macro_path}") + macro_sql = """{% macro get_incremental_microbatch_sql(arg_dict) %} + {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} +{% endmacro %} +""" + if macro_path.exists(): + raise FileExistsError(f"Expected no macro file at {macro_path}") - macro_path.rename(backup_path) + macro_path.write_text(macro_sql) try: yield finally: - if backup_path.exists(): - backup_path.rename(macro_path) + if macro_path.exists(): + macro_path.unlink() @pytest.mark.skip_targets(["vertica"]) @@ -75,7 +77,10 @@ def _without_microbatch_override_macro(dbt_project: DbtProject): def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_run_results"] = False - run_results = _run_microbatch_model_and_get_latest_success_result(dbt_project, test_id) + with _with_microbatch_override_macro(dbt_project): + run_results = _run_microbatch_model_and_get_latest_success_result( + dbt_project, test_id + ) assert run_results, "Expected a successful run result row for microbatch model" assert run_results[0]["compiled_code"], ( "Expected compiled_code to be populated for successful microbatch model run result" @@ -89,10 +94,7 @@ def test_microbatch_run_results_without_override_has_empty_compiled_code( ): dbt_project.dbt_runner.vars["disable_run_results"] = False - with _without_microbatch_override_macro(dbt_project): - run_results = _run_microbatch_model_and_get_latest_success_result( - dbt_project, test_id - ) + run_results = _run_microbatch_model_and_get_latest_success_result(dbt_project, test_id) assert run_results, "Expected a successful run result row for microbatch model" assert not run_results[0]["compiled_code"], ( "Expected compiled_code to stay empty when microbatch override macro is absent" From 1c26b1a6e8123bb70fb4c1cecaaf3d29355ab487 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 13:26:36 +0300 Subject: [PATCH 06/17] test fixes --- .../test_microbatch_compiled_code.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 4ea65577f..5e63c12e1 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -53,13 +53,13 @@ def _run_microbatch_model_and_get_latest_success_result( @contextmanager -def _with_microbatch_override_macro(dbt_project: DbtProject): +def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path = ( dbt_project.project_dir_path / "macros" / "microbatch.sql" ) - macro_sql = """{% macro get_incremental_microbatch_sql(arg_dict) %} + macro_sql = f"""{{% macro {macro_name}(arg_dict) %}} {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} -{% endmacro %} +{{% endmacro %}} """ if macro_path.exists(): raise FileExistsError(f"Expected no macro file at {macro_path}") @@ -77,7 +77,7 @@ def _with_microbatch_override_macro(dbt_project: DbtProject): def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): dbt_project.dbt_runner.vars["disable_run_results"] = False - with _with_microbatch_override_macro(dbt_project): + with _with_microbatch_macro_file(dbt_project, "get_incremental_microbatch_sql"): run_results = _run_microbatch_model_and_get_latest_success_result( dbt_project, test_id ) @@ -94,7 +94,12 @@ def test_microbatch_run_results_without_override_has_empty_compiled_code( ): dbt_project.dbt_runner.vars["disable_run_results"] = False - run_results = _run_microbatch_model_and_get_latest_success_result(dbt_project, test_id) + with _with_microbatch_macro_file( + dbt_project, "get_incremental_microbatch_sql_not_used" + ): + run_results = _run_microbatch_model_and_get_latest_success_result( + dbt_project, test_id + ) assert run_results, "Expected a successful run result row for microbatch model" assert not run_results[0]["compiled_code"], ( "Expected compiled_code to stay empty when microbatch override macro is absent" From 7ae883bd49d9acb3e11b44a8b0cc6a69da426e6e Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 15:15:05 +0300 Subject: [PATCH 07/17] test fixes --- .../test_microbatch_compiled_code.py | 56 +++++++++++++++---- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 5e63c12e1..4ddef2b5d 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -5,14 +5,32 @@ from dbt_project import DbtProject -def _microbatch_model_sql() -> str: +def _microbatch_source_model_sql() -> str: + return """ +{{ config(event_time='order_date') }} + +select + 1 as order_id, + 1 as customer_id, + 42 as amount, + cast('2024-01-01 00:00:00' as timestamp) as order_date +union all +select + 2 as order_id, + 2 as customer_id, + 84 as amount, + cast('2025-01-01 00:00:00' as timestamp) as order_date +""" + + +def _microbatch_model_sql(source_model_name: str) -> str: return """ {% set model_config = { "materialized": "incremental", "incremental_strategy": "microbatch", "event_time": "order_date", "batch_size": "year", - "begin": "2025-03-01", + "begin": "2024-01-01", "unique_key": "order_id" } %} {% if target.type == "bigquery" %} @@ -26,20 +44,36 @@ def _microbatch_model_sql() -> str: {{ config(**model_config) }} select - 1 as order_id, - 1 as customer_id, - 42 as amount, - {{ dbt.current_timestamp() }} as order_date -from {{ ref('one') }} -""" + order_id, + customer_id, + amount, + order_date +from {{ ref('__MICROBATCH_SOURCE_MODEL__') }} +""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name) + + +@contextmanager +def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str): + source_model_name = f"{test_id}_source" + source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql") + target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{test_id}.sql") + + source_model_path.write_text(_microbatch_source_model_sql()) + target_model_path.write_text(_microbatch_model_sql(source_model_name)) + relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path) + try: + yield relative_target_model_path + finally: + if source_model_path.exists(): + source_model_path.unlink() + if target_model_path.exists(): + target_model_path.unlink() def _run_microbatch_model_and_get_latest_success_result( dbt_project: DbtProject, test_id: str ): - with dbt_project.create_temp_model_for_existing_table( - test_id, raw_code=_microbatch_model_sql() - ) as model_path: + with _with_microbatch_test_models(dbt_project, test_id) as model_path: dbt_project.dbt_runner.run(select=str(model_path)) unique_id = f"model.elementary_tests.{test_id}" From 677b1811519abcb1fc8e54567e9809af2db748cf Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 15:33:36 +0300 Subject: [PATCH 08/17] test fixes --- .../test_microbatch_compiled_code.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 4ddef2b5d..c4ab8197a 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -60,9 +60,10 @@ def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str): source_model_path.write_text(_microbatch_source_model_sql()) target_model_path.write_text(_microbatch_model_sql(source_model_name)) + relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path) relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path) try: - yield relative_target_model_path + yield relative_source_model_path, relative_target_model_path finally: if source_model_path.exists(): source_model_path.unlink() @@ -73,8 +74,13 @@ def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str): def _run_microbatch_model_and_get_latest_success_result( dbt_project: DbtProject, test_id: str ): - with _with_microbatch_test_models(dbt_project, test_id) as model_path: - dbt_project.dbt_runner.run(select=str(model_path)) + with _with_microbatch_test_models(dbt_project, test_id) as ( + source_model_path, + model_path, + ): + dbt_project.dbt_runner.run( + select=f"{source_model_path} {model_path}" + ) unique_id = f"model.elementary_tests.{test_id}" run_results = dbt_project.read_table( From a80518a483e2c42b2196a8ff461de58d23b4ce61 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 15:49:08 +0300 Subject: [PATCH 09/17] test fixes --- integration_tests/dbt_project/dbt_project.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration_tests/dbt_project/dbt_project.yml b/integration_tests/dbt_project/dbt_project.yml index 6a82f50c6..bb2663282 100644 --- a/integration_tests/dbt_project/dbt_project.yml +++ b/integration_tests/dbt_project/dbt_project.yml @@ -29,3 +29,6 @@ models: +schema: elementary +enabled: "{{ var('elementary_enabled', True) }}" +file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}" + +flags: + require_batched_execution_for_custom_microbatch_strategy: True \ No newline at end of file From dd629fdb4073130c59fac8efa373bf32ecab2369 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 17:37:39 +0300 Subject: [PATCH 10/17] test fixes --- .../test_dbt_artifacts/test_microbatch_compiled_code.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index c4ab8197a..8702242fe 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -97,10 +97,11 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path = ( dbt_project.project_dir_path / "macros" / "microbatch.sql" ) - macro_sql = f"""{{% macro {macro_name}(arg_dict) %}} + macro_sql = """ +{% macro __MACRO_NAME__(arg_dict) %} {{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }} -{{% endmacro %}} -""" +{% endmacro %} +""".replace("__MACRO_NAME__", macro_name) if macro_path.exists(): raise FileExistsError(f"Expected no macro file at {macro_path}") From 70ccd14fb4dc9ff3d362511123341a38094ef65c Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 19:27:54 +0300 Subject: [PATCH 11/17] test fixes --- .../test_microbatch_compiled_code.py | 76 ++++++++++--------- 1 file changed, 40 insertions(+), 36 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 8702242fe..f2efe3476 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -8,18 +8,19 @@ def _microbatch_source_model_sql() -> str: return """ {{ config(event_time='order_date') }} +{% set event_time_data_type = 'datetime2' if target.type == 'sqlserver' else 'timestamp' %} select 1 as order_id, 1 as customer_id, 42 as amount, - cast('2024-01-01 00:00:00' as timestamp) as order_date + cast('2024-01-01 00:00:00' as {{ event_time_data_type }}) as order_date union all select 2 as order_id, 2 as customer_id, 84 as amount, - cast('2025-01-01 00:00:00' as timestamp) as order_date + cast('2025-01-01 00:00:00' as {{ event_time_data_type }}) as order_date """ @@ -30,8 +31,7 @@ def _microbatch_model_sql(source_model_name: str) -> str: "incremental_strategy": "microbatch", "event_time": "order_date", "batch_size": "year", - "begin": "2024-01-01", - "unique_key": "order_id" + "begin": "2024-01-01" } %} {% if target.type == "bigquery" %} {% do model_config.update( @@ -41,6 +41,9 @@ def _microbatch_model_sql(source_model_name: str) -> str: {% if target.type == "athena" %} {% do model_config.update({"partitioned_by": ["order_date"]}) %} {% endif %} +{% if target.type != "duckdb" %} + {% do model_config.update({"unique_key": "order_id"}) %} +{% endif %} {{ config(**model_config) }} select @@ -53,17 +56,18 @@ def _microbatch_model_sql(source_model_name: str) -> str: @contextmanager -def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str): - source_model_name = f"{test_id}_source" +def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str): + source_model_name = f"mb_src_{model_suffix}" + target_model_name = f"mb_tgt_{model_suffix}" source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql") - target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{test_id}.sql") + target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql") source_model_path.write_text(_microbatch_source_model_sql()) target_model_path.write_text(_microbatch_model_sql(source_model_name)) relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path) relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path) try: - yield relative_source_model_path, relative_target_model_path + yield relative_source_model_path, relative_target_model_path, target_model_name finally: if source_model_path.exists(): source_model_path.unlink() @@ -72,17 +76,18 @@ def _with_microbatch_test_models(dbt_project: DbtProject, test_id: str): def _run_microbatch_model_and_get_latest_success_result( - dbt_project: DbtProject, test_id: str + dbt_project: DbtProject, model_suffix: str ): - with _with_microbatch_test_models(dbt_project, test_id) as ( + with _with_microbatch_test_models(dbt_project, model_suffix) as ( source_model_path, model_path, + target_model_name, ): dbt_project.dbt_runner.run( select=f"{source_model_path} {model_path}" ) - unique_id = f"model.elementary_tests.{test_id}" + unique_id = f"model.elementary_tests.{target_model_name}" run_results = dbt_project.read_table( "dbt_run_results", where=f"unique_id = '{unique_id}' and status = 'success'", @@ -113,35 +118,34 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path.unlink() -@pytest.mark.skip_targets(["vertica"]) -@pytest.mark.skip_for_dbt_fusion -def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject): - dbt_project.dbt_runner.vars["disable_run_results"] = False - - with _with_microbatch_macro_file(dbt_project, "get_incremental_microbatch_sql"): - run_results = _run_microbatch_model_and_get_latest_success_result( - dbt_project, test_id - ) - assert run_results, "Expected a successful run result row for microbatch model" - assert run_results[0]["compiled_code"], ( - "Expected compiled_code to be populated for successful microbatch model run result" - ) - - -@pytest.mark.skip_targets(["vertica"]) +@pytest.mark.skip_targets(["vertica", "bigquery", "athena", "clickhouse"]) @pytest.mark.skip_for_dbt_fusion -def test_microbatch_run_results_without_override_has_empty_compiled_code( - test_id: str, dbt_project: DbtProject +@pytest.mark.parametrize( + "macro_name,expected_compiled_code,model_suffix", + [ + ("get_incremental_microbatch_sql", True, "with_override"), + ("get_incremental_microbatch_sql_not_used", False, "without_override"), + ], + ids=["with_override", "without_override"], +) +def test_microbatch_run_results_compiled_code_behavior( + dbt_project: DbtProject, + macro_name: str, + expected_compiled_code: bool, + model_suffix: str, ): dbt_project.dbt_runner.vars["disable_run_results"] = False - with _with_microbatch_macro_file( - dbt_project, "get_incremental_microbatch_sql_not_used" - ): + with _with_microbatch_macro_file(dbt_project, macro_name): run_results = _run_microbatch_model_and_get_latest_success_result( - dbt_project, test_id + dbt_project, model_suffix ) assert run_results, "Expected a successful run result row for microbatch model" - assert not run_results[0]["compiled_code"], ( - "Expected compiled_code to stay empty when microbatch override macro is absent" - ) + if expected_compiled_code: + assert run_results[0]["compiled_code"], ( + "Expected compiled_code to be populated when override macro is present" + ) + else: + assert not run_results[0]["compiled_code"], ( + "Expected compiled_code to stay empty when override macro is absent" + ) From 736b2a18435735e9267635d9fb46b509ff9c1180 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 19:28:04 +0300 Subject: [PATCH 12/17] test fixes --- .../microbatch/capture_microbatch_compiled_code.sql | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql index c38c35ae7..3a8846ae4 100644 --- a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -15,8 +15,12 @@ {% macro capture_microbatch_compiled_code_for_model() %} - {% set model_unique_id = model.get("unique_id") %} - {% set model_compiled_code = model.get("compiled_code") %} + {% set model_unique_id = ( + model.get("unique_id") if model is mapping else model.unique_id + ) | default(none, true) %} + {% set model_compiled_code = ( + model.get("compiled_code") if model is mapping else model.compiled_code + ) | default(none, true) %} {% if model_unique_id is none %} {{ return(none) }} {% endif %} From 9c2d74f0920b5bb6765d487afb13cfe6f64d6471 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 20:47:50 +0300 Subject: [PATCH 13/17] improved comments and fixed tests for dremio and spark --- .../test_microbatch_compiled_code.py | 4 ++-- .../capture_microbatch_compiled_code.sql | 15 +++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index f2efe3476..50e6fd8db 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -33,7 +33,7 @@ def _microbatch_model_sql(source_model_name: str) -> str: "batch_size": "year", "begin": "2024-01-01" } %} -{% if target.type == "bigquery" %} +{% if target.type in ["bigquery", "spark"] %} {% do model_config.update( {"partition_by": {"field": "order_date", "data_type": "timestamp", "granularity": "year"}} ) %} @@ -118,7 +118,7 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path.unlink() -@pytest.mark.skip_targets(["vertica", "bigquery", "athena", "clickhouse"]) +@pytest.mark.skip_targets(["vertica", "bigquery", "athena", "clickhouse", "dremio"]) @pytest.mark.skip_for_dbt_fusion @pytest.mark.parametrize( "macro_name,expected_compiled_code,model_suffix", diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql index 3a8846ae4..665ce2a72 100644 --- a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -2,8 +2,19 @@ NOTE FOR PACKAGE CONSUMERS: This package macro is not guaranteed to be picked up automatically by dbt's incremental strategy resolution in all projects. - To apply this behavior, users should override `get_incremental_microbatch_sql` - in their own project and delegate to `elementary.get_incremental_microbatch_sql(arg_dict)` + To apply this behavior, users should: + 1) Override `get_incremental_microbatch_sql` in their own project and delegate to + `elementary.get_incremental_microbatch_sql(arg_dict)`. + 2) Enable dbt behavior flag `require_batched_execution_for_custom_microbatch_strategy`. + + This flow is currently not supported for adapters: + - bigquery + - athena + - clickhouse + - dremio + - vertica + + This flow is currently not supported for dbt Fusion. -#} {% macro get_incremental_microbatch_sql(arg_dict) %} {% if execute and model is defined %} From 20b2faedb20bed27716e0dba671587749b9dd2b1 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Mon, 4 May 2026 22:26:33 +0300 Subject: [PATCH 14/17] fix spark tests --- .../test_dbt_artifacts/test_microbatch_compiled_code.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 50e6fd8db..22c7561e4 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -33,13 +33,8 @@ def _microbatch_model_sql(source_model_name: str) -> str: "batch_size": "year", "begin": "2024-01-01" } %} -{% if target.type in ["bigquery", "spark"] %} - {% do model_config.update( - {"partition_by": {"field": "order_date", "data_type": "timestamp", "granularity": "year"}} - ) %} -{% endif %} -{% if target.type == "athena" %} - {% do model_config.update({"partitioned_by": ["order_date"]}) %} +{% if target.type == "spark" %} + {% do model_config.update({"partition_by": ["order_date"]}) %} {% endif %} {% if target.type != "duckdb" %} {% do model_config.update({"unique_key": "order_id"}) %} From 35a35ceaa553ff0244abab98d2ca5a99fd58ebb2 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Tue, 5 May 2026 13:32:13 +0300 Subject: [PATCH 15/17] skip spark as it has a different microbatch implementation --- .../test_dbt_artifacts/test_microbatch_compiled_code.py | 5 +---- .../microbatch/capture_microbatch_compiled_code.sql | 1 + 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py index 22c7561e4..dea35420a 100644 --- a/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py +++ b/integration_tests/tests/test_dbt_artifacts/test_microbatch_compiled_code.py @@ -33,9 +33,6 @@ def _microbatch_model_sql(source_model_name: str) -> str: "batch_size": "year", "begin": "2024-01-01" } %} -{% if target.type == "spark" %} - {% do model_config.update({"partition_by": ["order_date"]}) %} -{% endif %} {% if target.type != "duckdb" %} {% do model_config.update({"unique_key": "order_id"}) %} {% endif %} @@ -113,7 +110,7 @@ def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str): macro_path.unlink() -@pytest.mark.skip_targets(["vertica", "bigquery", "athena", "clickhouse", "dremio"]) +@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"]) @pytest.mark.skip_for_dbt_fusion @pytest.mark.parametrize( "macro_name,expected_compiled_code,model_suffix", diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql index 665ce2a72..44f6523ca 100644 --- a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -8,6 +8,7 @@ 2) Enable dbt behavior flag `require_batched_execution_for_custom_microbatch_strategy`. This flow is currently not supported for adapters: + - spark - bigquery - athena - clickhouse From b4407f080242c73db5cb5da56d119779b2022c4d Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Tue, 5 May 2026 15:59:42 +0300 Subject: [PATCH 16/17] fixed compiled code formatting in redshift in microbatch models --- macros/utils/graph/get_compiled_code.sql | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/macros/utils/graph/get_compiled_code.sql b/macros/utils/graph/get_compiled_code.sql index 199924393..6ce92316e 100644 --- a/macros/utils/graph/get_compiled_code.sql +++ b/macros/utils/graph/get_compiled_code.sql @@ -1,10 +1,11 @@ {% macro get_compiled_code(node, as_column_value=false) %} - {% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %} + {% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %} {% if not compiled_code and node and node.get("unique_id") %} {% set compiled_code = elementary.get_cache( "microbatch_compiled_code_by_unique_id", {} ).get(node.get("unique_id")) %} {% endif %} + {% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %} {% set max_column_size = elementary.get_column_size() %} {% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %} @@ -14,12 +15,11 @@ {% do return(compiled_code) %} {% endmacro %} -{% macro default__get_compiled_code(node) %} - {% do return(node.get("compiled_code") or node.get("compiled_sql")) %} +{% macro default__format_compiled_code(compiled_code) %} + {% do return(compiled_code) %} {% endmacro %} -{% macro redshift__get_compiled_code(node) %} - {% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %} +{% macro redshift__format_compiled_code(compiled_code) %} {% if not compiled_code %} {% do return(none) %} {% else %} {% do return(compiled_code.replace("%", "%%")) %} {% endif %} From 51d387213dcf90f6a9f01e38c0ef6a40a7b870a0 Mon Sep 17 00:00:00 2001 From: GuyEshdat Date: Tue, 5 May 2026 17:17:53 +0300 Subject: [PATCH 17/17] avoiding race condition when updating the microbatch_compiled_code_by_unique_id map. Instead, we will initalize it when initalizing the graph, and only update the dict itself instead of updating the cache --- .../microbatch/capture_microbatch_compiled_code.sql | 6 +----- macros/edr/tests/on_run_start/init_elementary_graph.sql | 1 + 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql index 44f6523ca..856dd91cf 100644 --- a/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql +++ b/macros/edr/dbt_artifacts/microbatch/capture_microbatch_compiled_code.sql @@ -41,14 +41,10 @@ {% endif %} {% set compiled_code_by_unique_id = elementary.get_cache( - "microbatch_compiled_code_by_unique_id", {} + "microbatch_compiled_code_by_unique_id" ) %} {% if model_unique_id in compiled_code_by_unique_id %} {{ return(none) }} {% endif %} - {% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %} - {% do elementary.set_cache( - "microbatch_compiled_code_by_unique_id", compiled_code_by_unique_id - ) %} {% endmacro %} diff --git a/macros/edr/tests/on_run_start/init_elementary_graph.sql b/macros/edr/tests/on_run_start/init_elementary_graph.sql index 576cf2d38..00fda83de 100644 --- a/macros/edr/tests/on_run_start/init_elementary_graph.sql +++ b/macros/edr/tests/on_run_start/init_elementary_graph.sql @@ -11,6 +11,7 @@ }, "temp_test_table_relations_map": {}, "duration_context_stack": {}, + "microbatch_compiled_code_by_unique_id": {}, }, ) %} {% endmacro %}