diff --git a/CHANGELOG.md b/CHANGELOG.md index e70860e3..6d7ea54a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Populate `query_id` in `AdapterResponse` for every executed query. The query ID is generated as a UUID4 and forwarded to ClickHouse, making it available via `adapter_response` in dbt artifacts and enabling tools like Elementary to correlate dbt model runs with entries in `system.query_log`. * Replaced legacy `docker-compose` commands with `docker compose` (V2) * Updated GitHub Actions workflow to use Docker Compose V2 +* Add `index` parameter to the `projections` model config as syntax sugar for lightweight index projections (introduced in ClickHouse 25.6). When `index` is specified instead of `query`, dbt-clickhouse generates the appropriate `ADD PROJECTION` statement automatically: `SELECT _part_offset ORDER BY ` on ClickHouse 25.6–26.0, and the cleaner `INDEX TYPE basic` syntax on ClickHouse 26.1+. Specifying both `query` and `index`, or neither, raises a compile-time error. ### Release [1.10.0], 2026-02-16 diff --git a/dbt/include/clickhouse/macros/materializations/table.sql b/dbt/include/clickhouse/macros/materializations/table.sql index 6a80af49..b3663802 100644 --- a/dbt/include/clickhouse/macros/materializations/table.sql +++ b/dbt/include/clickhouse/macros/materializations/table.sql @@ -70,7 +70,7 @@ {{ adapter.rename_relation(existing_relation, backup_relation) }} {{ adapter.rename_relation(intermediate_relation, target_relation) }} {% endif %} - + {# If mv_on_schema_change is set, we apply the strategy #} {% else %} {%- set mv_on_schema_change = incremental_validate_on_schema_change(configured_mv_on_schema_change, default='ignore') -%} @@ -86,7 +86,7 @@ {% endif %} {%- endif %} {% endif %} - + {# Behaviour under full_refresh operations #} {% else %} {# Atomic full refresh with MV repopulation: when table is target of dbt MVs and repopulate_from_mvs_on_full_refresh is enabled #} @@ -295,7 +295,30 @@ ALTER TABLE {{ relation }} {%- if projections %} {%- for projection in projections %} - ADD PROJECTION {{ projection.get('name') }} ({{ projection.get('query') }}) + {%- set proj_name = projection.get('name') -%} + {%- set proj_query = projection.get('query') -%} + {%- set proj_index = projection.get('index') -%} + {%- if proj_query and proj_index -%} + {{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' cannot specify both 'query' and 'index'.") }} + {%- elif not proj_query and not proj_index -%} + {{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' must specify either 'query' or 'index'.") }} + {%- elif proj_query -%} + ADD PROJECTION {{ proj_name }} ({{ proj_query }}) + {%- else -%} + {%- if adapter.is_before_version('25.6.1.1') -%} + {{ exceptions.raise_compiler_error("Projection '" ~ proj_name ~ "' with 'index' requires '_part_offset' virtual column available from ClickHouse 25.6 onwards.") }} + {%- endif -%} + {%- if proj_index is string -%} + {%- set proj_index = [proj_index] -%} + {%- endif -%} + {%- set cols_str = proj_index | join(', ') -%} + {%- set cols_index_expr = '(' ~ cols_str ~ ')' if proj_index | length > 1 else cols_str -%} + {%- if not adapter.is_before_version('26.1.1.1') -%} + ADD PROJECTION {{ proj_name }} INDEX {{ cols_index_expr }} TYPE basic + {%- else -%} + ADD PROJECTION {{ proj_name }} (SELECT _part_offset ORDER BY {{ cols_str }}) + {%- endif -%} + {%- endif -%} {%- if not loop.last or indexes | length > 0 -%} , {% endif %} diff --git a/tests/integration/adapter/projections/test_projections.py b/tests/integration/adapter/projections/test_projections.py index c315d2fc..cf4d1b10 100644 --- a/tests/integration/adapter/projections/test_projections.py +++ b/tests/integration/adapter/projections/test_projections.py @@ -1,10 +1,15 @@ +import contextlib import os import uuid import pytest from dbt.tests.util import relation_from_name, run_dbt -from tests.integration.adapter.helpers import DEFAULT_RETRY_CONFIG, retry_until_assertion_passes +from tests.integration.adapter.helpers import ( + DEFAULT_RETRY_CONFIG, + below_version, + retry_until_assertion_passes, +) PEOPLE_SEED_CSV = """ id,name,age,department @@ -67,9 +72,61 @@ - name: people """ +PEOPLE_MODEL_WITH_SINGLE_INDEX_PROJECTION = """ +{{ config( + materialized='table', + order_by='id', + projections=[ + { + 'name': 'proj_by_age', + 'index': 'age' + } + ] +) }} +select id, name, age, department from {{ source('raw', 'people') }} +""" + +PEOPLE_MODEL_WITH_MULTI_COLUMN_INDEX_PROJECTION = """ +{{ config( + materialized='table', + order_by='id', + projections=[ + { + 'name': 'proj_by_dept_age', + 'index': ['department', 'age'] + } + ] +) }} +select id, name, age, department from {{ source('raw', 'people') }} +""" + +PEOPLE_MODEL_WITH_QUERY_AND_INDEX = """ +{{ config( + materialized='table', + order_by='id', + projections=[ + { + 'name': 'bad_proj', + 'query': 'SELECT department ORDER BY department', + 'index': 'age' + } + ] +) }} +select id, name, age, department from {{ source('raw', 'people') }} +""" + +PEOPLE_MODEL_WITH_NO_QUERY_OR_INDEX = """ +{{ config( + materialized='table', + order_by='id', + projections=[{'name': 'bad_proj'}] +) }} +select id, name, age, department from {{ source('raw', 'people') }} +""" + RETRY_CONFIG = ( - {'max_retries': 30, 'delay': 1} - if os.environ.get('DBT_CH_TEST_CLOUD', '').lower() in ('1', 'true', 'yes') + {"max_retries": 30, "delay": 1} + if os.environ.get("DBT_CH_TEST_CLOUD", "").lower() in ("1", "true", "yes") else DEFAULT_RETRY_CONFIG ) @@ -95,13 +152,13 @@ def models(self): def _get_table_reference(self, table: str) -> str: return ( table - if os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '' + if os.environ.get("DBT_CH_TEST_CLUSTER", "").strip() == "" else f"clusterAllReplicas({os.environ.get('DBT_CH_TEST_CLUSTER')}, {table})" ) def _flush_system_logs(self, project) -> None: - cluster = os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() - cluster_clause = f'ON CLUSTER "{cluster}"' if cluster else '' + cluster = os.environ.get("DBT_CH_TEST_CLUSTER", "").strip() + cluster_clause = f'ON CLUSTER "{cluster}"' if cluster else "" project.run_sql(f"SYSTEM FLUSH LOGS {cluster_clause}", fetch="all") def test_create_and_verify_projection(self, project): @@ -117,7 +174,11 @@ def test_create_and_verify_projection(self, project): # Check that the projection works as expected result = project.run_sql(query, fetch="all") assert len(result) == 3 # We expect 3 departments in the result - assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)] + assert result == [ + ("engineering", 43.666666666666664), + ("malware", 40.0), + ("sales", 25.0), + ] # check that the latest query used the projection def check_that_the_latest_query_used_the_projection(): @@ -131,7 +192,7 @@ def check_that_the_latest_query_used_the_projection(): assert len(result) > 0 assert query in result[0][0] - assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age'] + assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_avg_age"] retry_until_assertion_passes( check_that_the_latest_query_used_the_projection, **RETRY_CONFIG @@ -152,7 +213,11 @@ def test_create_and_verify_multiple_projections(self, project): # Check that the projection works as expected result = project.run_sql(query, fetch="all") assert len(result) == 3 # We expect 3 departments in the result - assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)] + assert result == [ + ("engineering", 43.666666666666664), + ("malware", 40.0), + ("sales", 25.0), + ] # check that the latest query used the projection def check_that_the_latest_query_used_the_projection(): @@ -166,7 +231,7 @@ def check_that_the_latest_query_used_the_projection(): assert len(result) > 0 assert query in result[0][0] - assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_avg_age'] + assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_avg_age"] retry_until_assertion_passes( check_that_the_latest_query_used_the_projection, **RETRY_CONFIG @@ -181,7 +246,7 @@ def check_that_the_latest_query_used_the_projection(): # Check that the projection works as expected result = project.run_sql(query, fetch="all") assert len(result) == 3 # We expect 3 departments in the result - assert result == [('engineering', 131), ('malware', 40), ('sales', 25)] + assert result == [("engineering", 131), ("malware", 40), ("sales", 25)] def check_that_the_latest_query_used_the_projection(): self._flush_system_logs(project) @@ -194,7 +259,7 @@ def check_that_the_latest_query_used_the_projection(): assert len(result) > 0 assert query in result[0][0] - assert result[0][1] == [f'{project.test_schema}.{relation.name}.projection_sum_age'] + assert result[0][1] == [f"{project.test_schema}.{relation.name}.projection_sum_age"] retry_until_assertion_passes( check_that_the_latest_query_used_the_projection, **RETRY_CONFIG @@ -202,7 +267,8 @@ def check_that_the_latest_query_used_the_projection(): @pytest.mark.xfail @pytest.mark.skipif( - os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster' + os.environ.get("DBT_CH_TEST_CLUSTER", "").strip() == "", + reason="Not on a cluster", ) def test_create_and_verify_distributed_projection(self, project): run_dbt(["seed"]) @@ -210,13 +276,17 @@ def test_create_and_verify_distributed_projection(self, project): relation = relation_from_name(project.adapter, "distributed_people_with_projection") unique_query_identifier = str(uuid.uuid4()) query = f"""-- {unique_query_identifier} - SELECT department, avg(age) AS avg_age FROM {project.test_schema}.{relation.name} GROUP BY + SELECT department, avg(age) AS avg_age FROM {project.test_schema}.{relation.name} GROUP BY department ORDER BY department""" # Check that the projection works as expected result = project.run_sql(query, fetch="all") assert len(result) == 3 # We expect 3 departments in the result - assert result == [('engineering', 43.666666666666664), ('malware', 40.0), ('sales', 25.0)] + assert result == [ + ("engineering", 43.666666666666664), + ("malware", 40.0), + ("sales", 25.0), + ] def check_that_the_latest_query_used_the_projection(): self._flush_system_logs(project) @@ -230,9 +300,76 @@ def check_that_the_latest_query_used_the_projection(): assert query in result[0][0] assert result[0][1] == [ - f'{project.test_schema}.{relation.name}_local.projection_avg_age' + f"{project.test_schema}.{relation.name}_local.projection_avg_age" ] retry_until_assertion_passes( check_that_the_latest_query_used_the_projection, **RETRY_CONFIG ) + + +class TestIndexProjections: + @pytest.fixture(scope="class") + def seeds(self): + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "people_with_index_projection.sql": PEOPLE_MODEL_WITH_SINGLE_INDEX_PROJECTION, + "people_with_multi_index_projection.sql": PEOPLE_MODEL_WITH_MULTI_COLUMN_INDEX_PROJECTION, + } + + @pytest.mark.parametrize( + "model_name, proj_name", + [ + ("people_with_index_projection", "proj_by_age"), + ("people_with_multi_index_projection", "proj_by_dept_age"), + ], + ) + def test_index_projection(self, project, model_name, proj_name): + run_dbt(["seed"]) + unsupported_version = below_version(25, 6) + ctx = pytest.raises(AssertionError) if unsupported_version else contextlib.nullcontext() + with ctx: + run_dbt(["run", "--select", model_name]) + if not unsupported_version: + result = project.run_sql( + f"SELECT name FROM system.projections " + f"WHERE database = '{project.test_schema}' " + f"AND table = '{model_name}' AND name = '{proj_name}'", + fetch="all", + ) + assert len(result) == 1 + + +class TestIndexProjectionValidation: + @pytest.fixture(scope="class") + def seeds(self): + return { + "people.csv": PEOPLE_SEED_CSV, + "schema.yml": SEED_SCHEMA_YML, + } + + @pytest.fixture(scope="class") + def models(self): + return { + "both_query_and_index.sql": PEOPLE_MODEL_WITH_QUERY_AND_INDEX, + "no_query_or_index.sql": PEOPLE_MODEL_WITH_NO_QUERY_OR_INDEX, + } + + def test_raises_when_both_query_and_index(self, project): + run_dbt(["seed"]) + res = run_dbt(["run", "--select", "both_query_and_index"], expect_pass=False) + assert any( + "cannot specify both 'query' and 'index'" in (r.message or "") for r in res.results + ) + + def test_raises_when_neither_query_nor_index(self, project): + res = run_dbt(["run", "--select", "no_query_or_index"], expect_pass=False) + assert any( + "must specify either 'query' or 'index'" in (r.message or "") for r in res.results + )