Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Populate `query_id` in `AdapterResponse` for every executed query. The query ID is generated as a UUID4 and forwarded to ClickHouse, making it available via `adapter_response` in dbt artifacts and enabling tools like Elementary to correlate dbt model runs with entries in `system.query_log`.
* Replaced legacy `docker-compose` commands with `docker compose` (V2)
* Updated GitHub Actions workflow to use Docker Compose V2
* Add `index` parameter to the `projections` model config as syntax sugar for lightweight index projections (introduced in ClickHouse 25.6). When `index` is specified instead of `query`, dbt-clickhouse generates the appropriate `ADD PROJECTION` statement automatically: `SELECT _part_offset ORDER BY <cols>` on ClickHouse 25.6–26.0, and the cleaner `INDEX <cols> TYPE basic` syntax on ClickHouse 26.1+. Specifying both `query` and `index`, or neither, raises a compile-time error.

### Release [1.10.0], 2026-02-16

Expand Down
29 changes: 26 additions & 3 deletions dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
{{ adapter.rename_relation(existing_relation, backup_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{% endif %}

{# If mv_on_schema_change is set, we apply the strategy #}
{% else %}
{%- set mv_on_schema_change = incremental_validate_on_schema_change(configured_mv_on_schema_change, default='ignore') -%}
Expand All @@ -86,7 +86,7 @@
{% endif %}
{%- endif %}
{% endif %}

{# Behaviour under full_refresh operations #}
{% else %}
{# Atomic full refresh with MV repopulation: when table is target of dbt MVs and repopulate_from_mvs_on_full_refresh is enabled #}
Expand Down Expand Up @@ -295,7 +295,30 @@
ALTER TABLE {{ relation }}
{%- if projections %}
{%- for projection in projections %}
ADD PROJECTION {{ projection.get('name') }} ({{ projection.get('query') }})
{%- set proj_name = projection.get('name') -%}
{%- set proj_query = projection.get('query') -%}
{%- set proj_index = projection.get('index') -%}
{%- if proj_query and proj_index -%}
{{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' cannot specify both 'query' and 'index'.") }}
{%- elif not proj_query and not proj_index -%}
{{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' must specify either 'query' or 'index'.") }}
{%- elif proj_query -%}
ADD PROJECTION {{ proj_name }} ({{ proj_query }})
{%- else -%}
{%- if adapter.is_before_version('25.6.1.1') -%}
{{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' with 'index' requires '_part_offset' virtual column available from ClickHouse 25.6 onwards.") }}
{%- endif -%}
{%- if proj_index is string -%}
{%- set proj_index = [proj_index] -%}
{%- endif -%}
{%- set cols_str = proj_index | join(', ') -%}
{%- set cols_index_expr = '(' ~ cols_str ~ ')' if proj_index | length > 1 else cols_str -%}
{%- if not adapter.is_before_version('26.1.1.1') -%}
ADD PROJECTION {{ proj_name }} INDEX {{ cols_index_expr }} TYPE basic
{%- else -%}
ADD PROJECTION {{ proj_name }} (SELECT _part_offset ORDER BY {{ cols_str }})
{%- endif -%}
{%- endif -%}
{%- if not loop.last or indexes | length > 0 -%}
,
{% endif %}
Expand Down
169 changes: 153 additions & 16 deletions tests/integration/adapter/projections/test_projections.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import contextlib
import os
import uuid

import pytest
from dbt.tests.util import relation_from_name, run_dbt

from tests.integration.adapter.helpers import DEFAULT_RETRY_CONFIG, retry_until_assertion_passes
from tests.integration.adapter.helpers import (
DEFAULT_RETRY_CONFIG,
below_version,
retry_until_assertion_passes,
)

PEOPLE_SEED_CSV = """
id,name,age,department
Expand Down Expand Up @@ -67,9 +72,61 @@
- name: people
"""

PEOPLE_MODEL_WITH_SINGLE_INDEX_PROJECTION = """
{{ config(
materialized='table',
order_by='id',
projections=[
{
'name': 'proj_by_age',
'index': 'age'
}
]
) }}
select id, name, age, department from {{ source('raw', 'people') }}
"""

PEOPLE_MODEL_WITH_MULTI_COLUMN_INDEX_PROJECTION = """
{{ config(
materialized='table',
order_by='id',
projections=[
{
'name': 'proj_by_dept_age',
'index': ['department', 'age']
}
]
) }}
select id, name, age, department from {{ source('raw', 'people') }}
"""

PEOPLE_MODEL_WITH_QUERY_AND_INDEX = """
{{ config(
materialized='table',
order_by='id',
projections=[
{
'name': 'bad_proj',
'query': 'SELECT department ORDER BY department',
'index': 'age'
}
]
) }}
select id, name, age, department from {{ source('raw', 'people') }}
"""

PEOPLE_MODEL_WITH_NO_QUERY_OR_INDEX = """
{{ config(
materialized='table',
order_by='id',
projections=[{'name': 'bad_proj'}]
) }}
select id, name, age, department from {{ source('raw', 'people') }}
"""

RETRY_CONFIG = (
{'max_retries': 30, 'delay': 1}
if os.environ.get('DBT_CH_TEST_CLOUD', '').lower() in ('1', 'true', 'yes')
{"max_retries": 30, "delay": 1}
if os.environ.get("DBT_CH_TEST_CLOUD", "").lower() in ("1", "true", "yes")
else DEFAULT_RETRY_CONFIG
)

Expand All @@ -95,13 +152,13 @@ def models(self):
def _get_table_reference(self, table: str) -> str:
return (
table
if os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == ''
if os.environ.get("DBT_CH_TEST_CLUSTER", "").strip() == ""
else f"clusterAllReplicas({os.environ.get('DBT_CH_TEST_CLUSTER')}, {table})"
)

def _flush_system_logs(self, project) -> None:
cluster = os.environ.get('DBT_CH_TEST_CLUSTER', '').strip()
cluster_clause = f'ON CLUSTER "{cluster}"' if cluster else ''
cluster = os.environ.get("DBT_CH_TEST_CLUSTER", "").strip()
cluster_clause = f'ON CLUSTER "{cluster}"' if cluster else ""
project.run_sql(f"SYSTEM FLUSH LOGS {cluster_clause}", fetch="all")

def test_create_and_verify_projection(self, project):
Expand All @@ -117,7 +174,11 @@ def test_create_and_verify_projection(self, project):
# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
assert result == [
("engineering", 43.666666666666664),
("malware", 40.0),
("sales", 25.0),
]

# check that the latest query used the projection
def check_that_the_latest_query_used_the_projection():
Expand All @@ -131,7 +192,7 @@ def check_that_the_latest_query_used_the_projection():
assert len(result) > 0
assert query in result[0][0]

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age']
assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_avg_age"]

retry_until_assertion_passes(
check_that_the_latest_query_used_the_projection, **RETRY_CONFIG
Expand All @@ -152,7 +213,11 @@ def test_create_and_verify_multiple_projections(self, project):
# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
assert result == [
("engineering", 43.666666666666664),
("malware", 40.0),
("sales", 25.0),
]

# check that the latest query used the projection
def check_that_the_latest_query_used_the_projection():
Expand All @@ -166,7 +231,7 @@ def check_that_the_latest_query_used_the_projection():
assert len(result) > 0
assert query in result[0][0]

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age']
assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_avg_age"]

retry_until_assertion_passes(
check_that_the_latest_query_used_the_projection, **RETRY_CONFIG
Expand All @@ -181,7 +246,7 @@ def check_that_the_latest_query_used_the_projection():
# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 131), ('malware', 40), ('sales', 25)]
assert result == [("engineering", 131), ("malware", 40), ("sales", 25)]

def check_that_the_latest_query_used_the_projection():
self._flush_system_logs(project)
Expand All @@ -194,29 +259,34 @@ def check_that_the_latest_query_used_the_projection():
assert len(result) > 0
assert query in result[0][0]

assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_sum_age']
assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_sum_age"]

retry_until_assertion_passes(
check_that_the_latest_query_used_the_projection, **RETRY_CONFIG
)

@pytest.mark.xfail
@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
os.environ.get("DBT_CH_TEST_CLUSTER", "").strip() == "",
reason="Not on a cluster",
)
def test_create_and_verify_distributed_projection(self, project):
run_dbt(["seed"])
run_dbt()
relation = relation_from_name(project.adapter, "distributed_people_with_projection")
unique_query_identifier = str(uuid.uuid4())
query = f"""-- {unique_query_identifier}
SELECT department, avg(age) AS avg_age FROM {project.test_schema}.{relation.name} GROUP BY
SELECT department, avg(age) AS avg_age FROM {project.test_schema}.{relation.name} GROUP BY
department ORDER BY department"""

# Check that the projection works as expected
result = project.run_sql(query, fetch="all")
assert len(result) == 3 # We expect 3 departments in the result
assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)]
assert result == [
("engineering", 43.666666666666664),
("malware", 40.0),
("sales", 25.0),
]

def check_that_the_latest_query_used_the_projection():
self._flush_system_logs(project)
Expand All @@ -230,9 +300,76 @@ def check_that_the_latest_query_used_the_projection():
assert query in result[0][0]

assert result[0][1] == [
f'{project.test_schema}.{relation.name}_local.projection_avg_age'
f"{project.test_schema}.{relation.name}_local.projection_avg_age"
]

retry_until_assertion_passes(
check_that_the_latest_query_used_the_projection, **RETRY_CONFIG
)


class TestIndexProjections:
@pytest.fixture(scope="class")
def seeds(self):
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"people_with_index_projection.sql": PEOPLE_MODEL_WITH_SINGLE_INDEX_PROJECTION,
"people_with_multi_index_projection.sql": PEOPLE_MODEL_WITH_MULTI_COLUMN_INDEX_PROJECTION,
}

@pytest.mark.parametrize(
"model_name, proj_name",
[
("people_with_index_projection", "proj_by_age"),
("people_with_multi_index_projection", "proj_by_dept_age"),
],
)
def test_index_projection(self, project, model_name, proj_name):
run_dbt(["seed"])
unsupported_version = below_version(25, 6)
ctx = pytest.raises(AssertionError) if unsupported_version else contextlib.nullcontext()
with ctx:
run_dbt(["run", "--select", model_name])
if not unsupported_version:
result = project.run_sql(
f"SELECT name FROM system.projections "
f"WHERE database = '{project.test_schema}' "
f"AND table = '{model_name}' AND name = '{proj_name}'",
fetch="all",
)
assert len(result) == 1


class TestIndexProjectionValidation:
@pytest.fixture(scope="class")
def seeds(self):
return {
"people.csv": PEOPLE_SEED_CSV,
"schema.yml": SEED_SCHEMA_YML,
}

@pytest.fixture(scope="class")
def models(self):
return {
"both_query_and_index.sql": PEOPLE_MODEL_WITH_QUERY_AND_INDEX,
"no_query_or_index.sql": PEOPLE_MODEL_WITH_NO_QUERY_OR_INDEX,
}

def test_raises_when_both_query_and_index(self, project):
run_dbt(["seed"])
res = run_dbt(["run", "--select", "both_query_and_index"], expect_pass=False)
assert any(
"cannot specify both 'query' and 'index'" in (r.message or "") for r in res.results
)

def test_raises_when_neither_query_nor_index(self, project):
res = run_dbt(["run", "--select", "no_query_or_index"], expect_pass=False)
assert any(
"must specify either 'query' or 'index'" in (r.message or "") for r in res.results
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing seed call creates implicit test ordering dependency

Medium Severity

test_raises_when_neither_query_nor_index omits the run_dbt(["seed"]) call that every other test in this file includes. The model references {{ source('raw', 'people') }}, and the materialization template executes CREATE TABLE ... EMPTY AS (SELECT ... FROM people) before calling add_index_and_projections where the expected validation error is raised. Without the seed table in the database, the model fails at table creation with a missing-table error instead of the projection validation error, causing the assertion checking for "must specify either 'query' or 'index'" to fail for the wrong reason.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 06962eb. Configure here.