Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions integration_tests/dbt_project/macros/microbatch.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{% macro get_incremental_microbatch_sql(arg_dict) %}
{{ return(elementary.get_incremental_microbatch_sql(arg_dict)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from contextlib import contextmanager
from pathlib import Path
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

import pytest

from dbt_project import DbtProject


def _microbatch_model_sql() -> str:
return """
{% set model_config = {
"materialized": "incremental",
"incremental_strategy": "microbatch",
"event_time": "order_date",
"batch_size": "year",
"begin": "2025-03-01",
"unique_key": "order_id"
} %}
{% if target.type == "bigquery" %}
{% do model_config.update(
{"partition_by": {"field": "order_date", "data_type": "timestamp", "granularity": "year"}}
) %}
{% endif %}
{% if target.type == "athena" %}
{% do model_config.update({"partitioned_by": ["order_date"]}) %}
{% endif %}
{{ config(**model_config) }}

select
1 as order_id,
1 as customer_id,
42 as amount,
{{ dbt.current_timestamp() }} as order_date
from {{ ref('one') }}
"""


def _run_microbatch_model_and_get_latest_success_result(
dbt_project: DbtProject, test_id: str
):
with dbt_project.create_temp_model_for_existing_table(
test_id, raw_code=_microbatch_model_sql()
) as model_path:
dbt_project.dbt_runner.run(select=str(model_path))

unique_id = f"model.elementary_tests.{test_id}"
run_results = dbt_project.read_table(
"dbt_run_results",
where=f"unique_id = '{unique_id}' and status = 'success'",
order_by="generated_at desc",
limit=1,
)
return run_results


@contextmanager
def _without_microbatch_override_macro(dbt_project: DbtProject):
macro_path = (
dbt_project.project_dir_path / "macros" / "microbatch.sql"
)
backup_path = macro_path.with_suffix(".sql.bak")
if not macro_path.exists():
raise FileNotFoundError(f"Expected macro file at {macro_path}")

macro_path.rename(backup_path)
try:
yield
finally:
if backup_path.exists():
backup_path.rename(macro_path)


@pytest.mark.skip_targets(["vertica"])
@pytest.mark.skip_for_dbt_fusion
def test_microbatch_run_results_has_compiled_code(test_id: str, dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_run_results"] = False

run_results = _run_microbatch_model_and_get_latest_success_result(dbt_project, test_id)
assert run_results, "Expected a successful run result row for microbatch model"
assert run_results[0]["compiled_code"], (
"Expected compiled_code to be populated for successful microbatch model run result"
)


@pytest.mark.skip_targets(["vertica"])
@pytest.mark.skip_for_dbt_fusion
def test_microbatch_run_results_without_override_has_empty_compiled_code(
test_id: str, dbt_project: DbtProject
):
dbt_project.dbt_runner.vars["disable_run_results"] = False

with _without_microbatch_override_macro(dbt_project):
run_results = _run_microbatch_model_and_get_latest_success_result(
dbt_project, test_id
)
assert run_results, "Expected a successful run result row for microbatch model"
assert not run_results[0]["compiled_code"], (
"Expected compiled_code to stay empty when microbatch override macro is absent"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{#-
NOTE FOR PACKAGE CONSUMERS:
This package macro is not guaranteed to be picked up automatically by dbt's
incremental strategy resolution in all projects.
To apply this behavior, users should override `get_incremental_microbatch_sql`
in their own project and delegate to `elementary.get_incremental_microbatch_sql(arg_dict)`
-#}
{% macro get_incremental_microbatch_sql(arg_dict) %}
{% if execute and model is defined %}
{% do elementary.capture_microbatch_compiled_code_for_model() %}
{% endif %}

{{ return(adapter.dispatch("get_incremental_microbatch_sql", "dbt")(arg_dict)) }}
{% endmacro %}


{% macro capture_microbatch_compiled_code_for_model() %}
{% set model_unique_id = model.get("unique_id") %}
{% set model_compiled_code = model.get("compiled_code") %}
{% if model_unique_id is none %}
{{ return(none) }}
{% endif %}
{% if not model_compiled_code %}
{{ return(none) }}
{% endif %}

{% set compiled_code_by_unique_id = elementary.get_cache(
"microbatch_compiled_code_by_unique_id", {}
) %}
{% if model_unique_id in compiled_code_by_unique_id %}
{{ return(none) }}
{% endif %}

{% do compiled_code_by_unique_id.update({model_unique_id: model_compiled_code}) %}
{% do elementary.set_cache(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set_cache could have a race if multiple dbt models run in parallel, and all of them update the initial dict.

I think instead you should create the microbatch_compiled_code_by_unique_id in the init_elementary_graph macro, and then I think you don't even need to do set_cache.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"microbatch_compiled_code_by_unique_id", compiled_code_by_unique_id
) %}
{% endmacro %}
5 changes: 5 additions & 0 deletions macros/utils/graph/get_compiled_code.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{% macro get_compiled_code(node, as_column_value=false) %}
{% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %}
{% if not compiled_code and node and node.get("unique_id") %}
{% set compiled_code = elementary.get_cache(
"microbatch_compiled_code_by_unique_id", {}
).get(node.get("unique_id")) %}
{% endif %}
Comment on lines +3 to +7
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Microbatch cache fallback bypasses Redshift %%% escaping

On Redshift, redshift__get_compiled_code (macros/utils/graph/get_compiled_code.sql:21-25) replaces % with %% to prevent SQL formatting errors when the compiled code is inserted as a string literal. The new microbatch cache fallback path (lines 3-7) retrieves compiled code directly from the cache without going through adapter.dispatch, so the Redshift-specific escaping is never applied. If a microbatch model's compiled SQL contains % characters, this will produce unescaped % in the INSERT into dbt_run_results, which can cause runtime SQL errors on Redshift (e.g., incomplete format or similar adapter-level failures).

Prompt for agents
In macros/utils/graph/get_compiled_code.sql, the new microbatch cache fallback (lines 3-7) returns raw compiled code, bypassing adapter-specific transformations such as the Redshift percent-escaping in redshift__get_compiled_code (lines 21-25). To fix this, the cached compiled code should be run through the same adapter-specific post-processing. One approach: after retrieving the code from the cache, apply the same adapter.dispatch or at minimum replicate the Redshift escaping logic. For example, you could introduce a new dispatchable macro like elementary.post_process_compiled_code(compiled_code) that is a no-op by default but does .replace("%", "%%") on Redshift, and call it on the cache-retrieved value. Alternatively, store the already-escaped code in the cache by calling the capture during redshift__get_compiled_code, but that would couple the cache to a specific adapter.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


{% set max_column_size = elementary.get_column_size() %}
{% if as_column_value and max_column_size and compiled_code and compiled_code | length > max_column_size %}
Expand Down
Loading