-
Notifications
You must be signed in to change notification settings - Fork 137
feat: add full ClickHouse support - remove all skip_targets markers (CORE-397) #934
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
dd75799
0a2dd80
41701e9
1d2fe6c
d43adc6
e36e33e
ca0db37
749d582
4cd2e2a
956c061
ad08596
92d0e89
48ea275
81219e3
8893238
1915c34
5b38158
4d6e7c2
601db95
207d9bd
335fbf3
6f886e7
8823952
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,17 +2,36 @@ | |
| {% set relation = ref(table_name) %} | ||
| {% set columns = adapter.get_columns_in_relation(relation) %} | ||
|
|
||
| {% for col in columns %} | ||
| {% set data_type = elementary.get_column_data_type(col) %} | ||
| {% set normalized_data_type = elementary.normalize_data_type(data_type) %} | ||
|
|
||
| {% if normalized_data_type == "string" %} | ||
| {% set update_query %} | ||
| update {{ relation }} | ||
| set {{ col["name"] }} = NULL | ||
| where {{ col["name"] }} = '' | ||
| {% endset %} | ||
| {% do elementary.run_query(update_query) %} | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% if target.type == "clickhouse" %} | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We normally solve this by using a macro with a "default__" and "clickhouse__" implementation, could we do the same here? |
||
| {# On ClickHouse, columns are non-Nullable by default so NULLs in CSV seeds become | ||
| empty strings. We first ALTER each string column to Nullable(String), then use | ||
| ALTER TABLE UPDATE to convert empty strings to NULLs. | ||
| We use statement blocks for DDL since dbt.run_query may not handle DDL on ClickHouse. #} | ||
| {% for col in columns %} | ||
| {% set data_type = elementary.get_column_data_type(col) %} | ||
| {% set normalized_data_type = elementary.normalize_data_type(data_type) %} | ||
| {% if normalized_data_type == "string" %} | ||
| {% call statement('alter_nullable_' ~ col['name'], fetch_result=False) %} | ||
| alter table {{ relation }} modify column {{ col['name'] }} Nullable(String) | ||
| {% endcall %} | ||
| {% call statement('update_nulls_' ~ col['name'], fetch_result=False) %} | ||
| alter table {{ relation }} update {{ col['name'] }} = NULL where {{ col['name'] }} = '' | ||
| {% endcall %} | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
| {% endif %} | ||
| {% endfor %} | ||
| {% else %} | ||
| {% for col in columns %} | ||
| {% set data_type = elementary.get_column_data_type(col) %} | ||
| {% set normalized_data_type = elementary.normalize_data_type(data_type) %} | ||
|
|
||
| {% if normalized_data_type == "string" %} | ||
| {% set update_query %} | ||
| update {{ relation }} | ||
| set {{ col["name"] }} = NULL | ||
| where {{ col["name"] }} = '' | ||
| {% endset %} | ||
| {% do elementary.run_query(update_query) %} | ||
| {% endif %} | ||
| {% endfor %} | ||
| {% endif %} | ||
| {% endmacro %} | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| <clickhouse> | ||
| <profiles> | ||
| <default> | ||
|
haritamar marked this conversation as resolved.
|
||
| <join_use_nulls>1</join_use_nulls> | ||
|
haritamar marked this conversation as resolved.
|
||
| <mutations_sync>1</mutations_sync> | ||
| </default> | ||
| </profiles> | ||
| </clickhouse> | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -260,16 +260,110 @@ def seed(self, data: List[dict], table_name: str): | |||||||||||||||||||||||||||||||||||||
| with DbtDataSeeder( | ||||||||||||||||||||||||||||||||||||||
| self.dbt_runner, self.project_dir_path, self.seeds_dir_path | ||||||||||||||||||||||||||||||||||||||
| ).seed(data, table_name): | ||||||||||||||||||||||||||||||||||||||
| self._fix_seed_if_needed(table_name) | ||||||||||||||||||||||||||||||||||||||
| self._fix_seed_if_needed(table_name, data) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| def _fix_seed_if_needed(self, table_name: str): | ||||||||||||||||||||||||||||||||||||||
| def _fix_seed_if_needed(self, table_name: str, data: Optional[List[dict]] = None): | ||||||||||||||||||||||||||||||||||||||
| # Hack for BigQuery - seems like we get empty strings instead of nulls in seeds, so we | ||||||||||||||||||||||||||||||||||||||
| # fix them here | ||||||||||||||||||||||||||||||||||||||
| # fix them here. | ||||||||||||||||||||||||||||||||||||||
| if self.runner_method == RunnerMethod.FUSION and self.target == "bigquery": | ||||||||||||||||||||||||||||||||||||||
| self.dbt_runner.run_operation( | ||||||||||||||||||||||||||||||||||||||
| "elementary_tests.replace_empty_strings_with_nulls", | ||||||||||||||||||||||||||||||||||||||
| macro_args={"table_name": table_name}, | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| # On ClickHouse, columns are non-Nullable by default, so NULL values in CSVs become | ||||||||||||||||||||||||||||||||||||||
| # default values (0 for Int, '' for String, etc.). We fix this by altering columns to | ||||||||||||||||||||||||||||||||||||||
| # Nullable and updating default values back to NULLs directly via the ClickHouse HTTP | ||||||||||||||||||||||||||||||||||||||
| # API, since dbt's run_query/statement don't reliably execute DDL on ClickHouse. | ||||||||||||||||||||||||||||||||||||||
| elif self.target == "clickhouse" and data: | ||||||||||||||||||||||||||||||||||||||
| self._fix_clickhouse_seed_nulls(table_name, data) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| def _fix_clickhouse_seed_nulls(self, table_name: str, data: List[dict]): | ||||||||||||||||||||||||||||||||||||||
| """Fix ClickHouse seed tables where NULL values became default values. | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| ClickHouse columns are non-Nullable by default, so NULL values in CSV seeds | ||||||||||||||||||||||||||||||||||||||
| become default values (0 for Int, '' for String, etc.). This method: | ||||||||||||||||||||||||||||||||||||||
| 1. Determines which columns had NULL values in the original data | ||||||||||||||||||||||||||||||||||||||
| 2. ALTERs those columns to Nullable types | ||||||||||||||||||||||||||||||||||||||
| 3. Rebuilds the table via INSERT SELECT with nullIf() to restore NULLs | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| Uses the ClickHouse HTTP API directly because dbt's run_query/statement | ||||||||||||||||||||||||||||||||||||||
| don't reliably execute DDL on ClickHouse. | ||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||
|
haritamar marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||
| import urllib.request | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| # Find columns that contain at least one NULL in the original data | ||||||||||||||||||||||||||||||||||||||
| nullable_columns: set = set() | ||||||||||||||||||||||||||||||||||||||
| for row in data: | ||||||||||||||||||||||||||||||||||||||
| for col_name, value in row.items(): | ||||||||||||||||||||||||||||||||||||||
| if value is None: | ||||||||||||||||||||||||||||||||||||||
| nullable_columns.add(col_name) | ||||||||||||||||||||||||||||||||||||||
| if not nullable_columns: | ||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| schema = f"default{SCHEMA_NAME_SUFFIX}" if SCHEMA_NAME_SUFFIX else "default" | ||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hardcoded The 🔧 Suggested approachExpose the effective schema via + logger.warning(
+ "ClickHouse fix: no columns found for %s.%s – "
+ "schema may be wrong (using '%s'). NULLs will not be repaired.",
+ schema, table_name, schema,
+ )
if not cols_result:
return🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
| ch_url = "http://localhost:8123" | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| def ch_query(query: str) -> str: | ||||||||||||||||||||||||||||||||||||||
|
haritamar marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||
| encoded = query.encode("utf-8") | ||||||||||||||||||||||||||||||||||||||
| req = urllib.request.Request( | ||||||||||||||||||||||||||||||||||||||
| f"{ch_url}/?user=default&password=default&mutations_sync=1", | ||||||||||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||
| data=encoded, | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| with urllib.request.urlopen(req) as resp: | ||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Without a 🔧 Proposed fix- with urllib.request.urlopen(req) as resp:
+ with urllib.request.urlopen(req, timeout=60) as resp:📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.2)[error] 312-312: Audit URL open for permitted schemes. Allowing use of (S310) 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
| return resp.read().decode("utf-8") | ||||||||||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| # Get all columns and their types | ||||||||||||||||||||||||||||||||||||||
| cols_result = ch_query( | ||||||||||||||||||||||||||||||||||||||
| f"SELECT name, type FROM system.columns " | ||||||||||||||||||||||||||||||||||||||
| f"WHERE database = '{schema}' AND table = '{table_name}'" | ||||||||||||||||||||||||||||||||||||||
| ).strip() | ||||||||||||||||||||||||||||||||||||||
| if not cols_result: | ||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| columns = [] | ||||||||||||||||||||||||||||||||||||||
| for line in cols_result.split("\n"): | ||||||||||||||||||||||||||||||||||||||
| parts = line.strip().split("\t") | ||||||||||||||||||||||||||||||||||||||
| if len(parts) == 2: | ||||||||||||||||||||||||||||||||||||||
| columns.append((parts[0], parts[1])) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| # Build SELECT expressions: use nullIf() for nullable columns | ||||||||||||||||||||||||||||||||||||||
| select_exprs = [] | ||||||||||||||||||||||||||||||||||||||
| for col_name, col_type in columns: | ||||||||||||||||||||||||||||||||||||||
| if col_name in nullable_columns: | ||||||||||||||||||||||||||||||||||||||
| # Get the default value for this type to use with nullIf | ||||||||||||||||||||||||||||||||||||||
| if col_type == "String": | ||||||||||||||||||||||||||||||||||||||
| default_val = "''" | ||||||||||||||||||||||||||||||||||||||
| elif col_type.startswith("Int") or col_type.startswith("UInt"): | ||||||||||||||||||||||||||||||||||||||
| default_val = "0" | ||||||||||||||||||||||||||||||||||||||
| elif col_type.startswith("Float"): | ||||||||||||||||||||||||||||||||||||||
| default_val = "0" | ||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||
| default_val = "defaultValueOfTypeName('" + col_type + "')" | ||||||||||||||||||||||||||||||||||||||
| select_exprs.append( | ||||||||||||||||||||||||||||||||||||||
| f"nullIf(`{col_name}`, {default_val})::Nullable({col_type}) as `{col_name}`" | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||
| select_exprs.append(f"`{col_name}`") | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| # Rebuild the table: CREATE temp AS SELECT with nullIf, EXCHANGE, DROP | ||||||||||||||||||||||||||||||||||||||
| tmp_name = f"{table_name}_tmp_nullable" | ||||||||||||||||||||||||||||||||||||||
| select_sql = ", ".join(select_exprs) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| logger.info( | ||||||||||||||||||||||||||||||||||||||
| "ClickHouse fix: rebuilding %s.%s with Nullable columns: %s", | ||||||||||||||||||||||||||||||||||||||
| schema, | ||||||||||||||||||||||||||||||||||||||
| table_name, | ||||||||||||||||||||||||||||||||||||||
| nullable_columns, | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}") | ||||||||||||||||||||||||||||||||||||||
| ch_query( | ||||||||||||||||||||||||||||||||||||||
| f"CREATE TABLE {schema}.{tmp_name} " | ||||||||||||||||||||||||||||||||||||||
| f"ENGINE = MergeTree() ORDER BY tuple() " | ||||||||||||||||||||||||||||||||||||||
| f"AS SELECT {select_sql} FROM {schema}.{table_name}" | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| ch_query(f"EXCHANGE TABLES {schema}.{table_name} AND {schema}.{tmp_name}") | ||||||||||||||||||||||||||||||||||||||
| ch_query(f"DROP TABLE {schema}.{tmp_name}") | ||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No cleanup on failure in the table-rebuild sequence. If 🔧 Proposed fix- ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")
- ch_query(
- f"CREATE TABLE {schema}.{tmp_name} "
- f"ENGINE = MergeTree() ORDER BY tuple() "
- f"AS SELECT {select_sql} FROM {schema}.{table_name}"
- )
- ch_query(f"EXCHANGE TABLES {schema}.{table_name} AND {schema}.{tmp_name}")
- ch_query(f"DROP TABLE {schema}.{tmp_name}")
+ ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")
+ try:
+ ch_query(
+ f"CREATE TABLE {schema}.{tmp_name} "
+ f"ENGINE = MergeTree() ORDER BY tuple() "
+ f"AS SELECT {select_sql} FROM {schema}.{table_name}"
+ )
+ ch_query(f"EXCHANGE TABLES {schema}.{table_name} AND {schema}.{tmp_name}")
+ finally:
+ ch_query(f"DROP TABLE IF EXISTS {schema}.{tmp_name}")📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.15.2)[error] 361-363: Possible SQL injection vector through string-based query construction (S608) 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| @contextmanager | ||||||||||||||||||||||||||||||||||||||
| def seed_context( | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore full warehouse matrix before merge (current single-target matrix is merge-blocking).
Line 90 limits CI to ClickHouse only, which drops regression coverage for all other warehouses/fusion variants. Keep this for WIP iteration if needed, but restore the full matrix before this PR is merged.
🤖 Prompt for AI Agents