Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration_tests/dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ models:
+schema: elementary
+enabled: "{{ var('elementary_enabled', True) }}"
+file_format: "{{ 'delta' if target.type in ['spark', 'fabricspark'] else none }}"

flags:
require_batched_execution_for_custom_microbatch_strategy: True
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from contextlib import contextmanager

import pytest

from dbt_project import DbtProject


def _microbatch_source_model_sql() -> str:
return """
{{ config(event_time='order_date') }}
{% set event_time_data_type = 'datetime2' if target.type == 'sqlserver' else 'timestamp' %}

select
1 as order_id,
1 as customer_id,
42 as amount,
cast('2024-01-01 00:00:00' as {{ event_time_data_type }}) as order_date
union all
select
2 as order_id,
2 as customer_id,
84 as amount,
cast('2025-01-01 00:00:00' as {{ event_time_data_type }}) as order_date
"""


def _microbatch_model_sql(source_model_name: str) -> str:
return """
{% set model_config = {
"materialized": "incremental",
"incremental_strategy": "microbatch",
"event_time": "order_date",
"batch_size": "year",
"begin": "2024-01-01"
} %}
{% if target.type != "duckdb" %}
{% do model_config.update({"unique_key": "order_id"}) %}
{% endif %}
{{ config(**model_config) }}

select
order_id,
customer_id,
amount,
order_date
from {{ ref('__MICROBATCH_SOURCE_MODEL__') }}
""".replace("__MICROBATCH_SOURCE_MODEL__", source_model_name)


@contextmanager
def _with_microbatch_test_models(dbt_project: DbtProject, model_suffix: str):
source_model_name = f"mb_src_{model_suffix}"
target_model_name = f"mb_tgt_{model_suffix}"
source_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{source_model_name}.sql")
target_model_path = dbt_project.tmp_models_dir_path.joinpath(f"{target_model_name}.sql")

source_model_path.write_text(_microbatch_source_model_sql())
target_model_path.write_text(_microbatch_model_sql(source_model_name))
relative_source_model_path = source_model_path.relative_to(dbt_project.project_dir_path)
relative_target_model_path = target_model_path.relative_to(dbt_project.project_dir_path)
try:
yield relative_source_model_path, relative_target_model_path, target_model_name
finally:
if source_model_path.exists():
source_model_path.unlink()
if target_model_path.exists():
target_model_path.unlink()


def _run_microbatch_model_and_get_latest_success_result(
dbt_project: DbtProject, model_suffix: str
):
with _with_microbatch_test_models(dbt_project, model_suffix) as (
source_model_path,
model_path,
target_model_name,
):
dbt_project.dbt_runner.run(
select=f"{source_model_path} {model_path}"
)

unique_id = f"model.elementary_tests.{target_model_name}"
run_results = dbt_project.read_table(
"dbt_run_results",
where=f"unique_id = '{unique_id}' and status = 'success'",
order_by="generated_at desc",
limit=1,
)
return run_results


@contextmanager
def _with_microbatch_macro_file(dbt_project: DbtProject, macro_name: str):
macro_path = (
dbt_project.project_dir_path / "macros" / "microbatch.sql"
)
macro_sql = """
{% macro __MACRO_NAME__(arg_dict) %}
{{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }}
{% endmacro %}
""".replace("__MACRO_NAME__", macro_name)
if macro_path.exists():
raise FileExistsError(f"Expected no macro file at {macro_path}")

macro_path.write_text(macro_sql)
try:
yield
finally:
if macro_path.exists():
macro_path.unlink()


@pytest.mark.skip_targets(["spark", "vertica", "bigquery", "athena", "clickhouse", "dremio"])
@pytest.mark.skip_for_dbt_fusion
@pytest.mark.parametrize(
"macro_name,expected_compiled_code,model_suffix",
[
("get_incremental_microbatch_sql", True, "with_override"),
("get_incremental_microbatch_sql_not_used", False, "without_override"),
],
ids=["with_override", "without_override"],
)
def test_microbatch_run_results_compiled_code_behavior(
dbt_project: DbtProject,
macro_name: str,
expected_compiled_code: bool,
model_suffix: str,
):
dbt_project.dbt_runner.vars["disable_run_results"] = False

with _with_microbatch_macro_file(dbt_project, macro_name):
run_results = _run_microbatch_model_and_get_latest_success_result(
dbt_project, model_suffix
)
assert run_results, "Expected a successful run result row for microbatch model"
if expected_compiled_code:
assert run_results[0]["compiled_code"], (
"Expected compiled_code to be populated when override macro is present"
)
else:
assert not run_results[0]["compiled_code"], (
"Expected compiled_code to stay empty when override macro is absent"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{#-
NOTE FOR PACKAGE CONSUMERS:
This package macro is not guaranteed to be picked up automatically by dbt's
incremental strategy resolution in all projects.
To apply this behavior, users should:
1) Override `get_incremental_microbatch_sql` in their own project and delegate to
`elementary.get_incremental_microbatch_sql(arg_dict)`.
2) Enable dbt behavior flag `require_batched_execution_for_custom_microbatch_strategy`.

This flow is currently not supported for adapters:
- spark
- bigquery
- athena
- clickhouse
- dremio
- vertica

This flow is currently not supported for dbt Fusion.
-#}
{% macro get_incremental_microbatch_sql(arg_dict) %}
{% if execute and model is defined %}
{% do elementary.capture_microbatch_compiled_code_for_model() %}
{% endif %}

{{ return(adapter.dispatch("get_incremental_microbatch_sql", "dbt")(arg_dict)) }}
{% endmacro %}


{% macro capture_microbatch_compiled_code_for_model() %}
{% set model_unique_id = (
model.get("unique_id") if model is mapping else model.unique_id
) | default(none, true) %}
{% set model_compiled_code = (
model.get("compiled_code") if model is mapping else model.compiled_code
) | default(none, true) %}
{% if model_unique_id is none %}
{{ return(none) }}
{% endif %}
{% if not model_compiled_code %}
{{ return(none) }}
{% endif %}

{% set compiled_code_by_unique_id = elementary.get_cache(
"microbatch_compiled_code_by_unique_id", {}
) %}
{% if model_unique_id in compiled_code_by_unique_id %}
{{ return(none) }}
{% endif %}

{% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %}
{% do elementary.set_cache(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set_cache could have a race if multiple dbt models run in parallel, and all of them update the initial dict.

I think instead you should create the microbatch_compiled_code_by_unique_id in the init_elementary_graph macro, and then I think you don't even need to do set_cache.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"microbatch_compiled_code_by_unique_id", compiled_code_by_unique_id
) %}
{% endmacro %}
15 changes: 10 additions & 5 deletions macros/utils/graph/get_compiled_code.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
{% macro get_compiled_code(node, as_column_value=false) %}
{% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %}
{% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %}
{% if not compiled_code and node and node.get("unique_id") %}
{% set compiled_code = elementary.get_cache(
"microbatch_compiled_code_by_unique_id", {}
).get(node.get("unique_id")) %}
{% endif %}
Comment on lines +3 to +7
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Microbatch cache fallback bypasses Redshift %%% escaping

On Redshift, redshift__get_compiled_code (macros/utils/graph/get_compiled_code.sql:21-25) replaces % with %% to prevent SQL formatting errors when the compiled code is inserted as a string literal. The new microbatch cache fallback path (lines 3-7) retrieves compiled code directly from the cache without going through adapter.dispatch, so the Redshift-specific escaping is never applied. If a microbatch model's compiled SQL contains % characters, this will produce unescaped % in the INSERT into dbt_run_results, which can cause runtime SQL errors on Redshift (e.g., incomplete format or similar adapter-level failures).

Prompt for agents
In macros/utils/graph/get_compiled_code.sql, the new microbatch cache fallback (lines 3-7) returns raw compiled code, bypassing adapter-specific transformations such as the Redshift percent-escaping in redshift__get_compiled_code (lines 21-25). To fix this, the cached compiled code should be run through the same adapter-specific post-processing. One approach: after retrieving the code from the cache, apply the same adapter.dispatch or at minimum replicate the Redshift escaping logic. For example, you could introduce a new dispatchable macro like elementary.post_process_compiled_code(compiled_code) that is a no-op by default but does .replace("%", "%%") on Redshift, and call it on the cache-retrieved value. Alternatively, store the already-escaped code in the cache by calling the capture during redshift__get_compiled_code, but that would couple the cache to a specific adapter.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{% set compiled_code = adapter.dispatch("format_compiled_code", "elementary")(compiled_code) %}

{% set max_column_size = elementary.get_column_size() %}
{% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %}
Expand All @@ -9,12 +15,11 @@
{% do return(compiled_code) %}
{% endmacro %}

{% macro default__get_compiled_code(node) %}
{% do return(node.get("compiled_code") or node.get("compiled_sql")) %}
{% macro default__format_compiled_code(compiled_code) %}
{% do return(compiled_code) %}
{% endmacro %}

{% macro redshift__get_compiled_code(node) %}
{% set compiled_code = node.get("compiled_code") or node.get("compiled_sql") %}
{% macro redshift__format_compiled_code(compiled_code) %}
{% if not compiled_code %} {% do return(none) %}
{% else %} {% do return(compiled_code.replace("%", "%%")) %}
{% endif %}
Expand Down
Loading