Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Reduce connection startup overhead from the `EXCHANGE TABLES` capability check. On ClickHouse Cloud (Shared engine), the check now short-circuits immediately after detecting the engine — skipping 5 DDL round-trips (2× `CREATE TABLE`, `EXCHANGE TABLES`, 2× `DROP TABLE`) that were previously run on every connection open. For all other engine types, the result is cached behind a process-level lock so the DDL test runs at most once per dbt invocation regardless of thread count. ([#653](https://github.com/ClickHouse/dbt-clickhouse/pull/653)).
* `dbt clone` improvements: tables backed by a MergeTree-family engine are cloned with ClickHouse's zero-copy `CREATE OR REPLACE TABLE ... CLONE AS ...`; other engines and Distributed tables fall back to dbt's view behavior ([#655](https://github.com/ClickHouse/dbt-clickhouse/pull/655)).
* Add relation-scoped catalog metadata support with `clickhouse__get_catalog_relations` ([#657](https://github.com/ClickHouse/dbt-clickhouse/pull/657)).
* Add a `reuse_connections` profile option (default `true`). When set to `false`, dbt closes the connection after each model so the next opens a fresh one — useful for multi-replica ClickHouse Cloud where connection-sticky load balancing would otherwise pin a `dbt run` to one replica. Per-model reconnects are kept cheap by caching the `EXISTS DATABASE` probe and dropping the now-redundant lightweight-delete capability check ([#669](https://github.com/ClickHouse/dbt-clickhouse/issues/669), [#670](https://github.com/ClickHouse/dbt-clickhouse/pull/670)).

#### Repository maintenance
* Replaced legacy `docker-compose` commands with `docker compose` (V2) and updated the GitHub Actions workflow to use Docker Compose V2 ([#647](https://github.com/ClickHouse/dbt-clickhouse/pull/647)).
Expand Down
7 changes: 6 additions & 1 deletion dbt/adapters/clickhouse/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def cancel(self, connection):
logger.debug('Cancel query \'{}\'', connection_name)

def release(self):
pass # There is no "release" type functionality in the existing ClickHouse connectors
# Default: keep the connection open (historical behavior). With
# `reuse_connections: false`, fall through to the base release()
# which closes the handle so the next model picks a fresh replica.
if self.profile.credentials.reuse_connections:
return
super().release()

@classmethod
def get_table_from_response(cls, response, column_names) -> "agate.Table":
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class ClickHouseCredentials(Credentials):
local_db_prefix: str = ''
allow_automatic_deduplication: bool = False
tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False
# When False, close the connection after each model so the next opens a
# fresh TCP socket — lets a Cloud LB rebalance dbt across replicas.
reuse_connections: bool = True

@property
def type(self):
Expand Down Expand Up @@ -87,4 +90,5 @@ def _connection_keys(self):
'use_lw_deletes',
'allow_automatic_deduplication',
'tcp_keepalive',
'reuse_connections',
)
115 changes: 48 additions & 67 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@
from typing import Dict, Optional

from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.errors import (
lw_deletes_not_enabled_error,
lw_deletes_not_enabled_warning,
nd_mutations_not_enabled_error,
nd_mutations_not_enabled_warning,
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.util import compare_versions, engine_can_atomic_exchange
from dbt.adapters.exceptions import FailedToConnectError
from dbt_common.exceptions import DbtConfigError, DbtDatabaseError
from dbt_common.exceptions import DbtDatabaseError

_exchange_lock = threading.Lock()
_exchange_result: Optional[bool] = None

LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
# Databases whose existence has already been ensured in this process. Guarded by
# `_database_lock`. With `reuse_connections: false` a client is created per model,
# so without this cache the `EXISTS DATABASE` probe would run on every model.
_database_lock = threading.Lock()
_ensured_databases: set = set()

ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
DEDUP_WINDOW_SETTING = 'replicated_deduplication_window'
DEDUP_WINDOW_SETTING_SUPPORTED_MATERIALIZATION = [
Expand Down Expand Up @@ -89,6 +88,11 @@ def __init__(self, credentials: ClickHouseCredentials):
self._conn_settings.setdefault('mutations_sync', '3')
self._conn_settings.setdefault('alter_sync', '3')
self._conn_settings.setdefault('insert_distributed_sync', '1')
# Lightweight deletes that read from other tables require nondeterministic
# mutations. Apply it via the connection settings (which both drivers read
# at client creation) so we don't probe + SET it on every client.
if credentials.use_lw_deletes:
self._conn_settings.setdefault(ND_MUTATION_SETTING, '1')
self._client = self._create_client(credentials)
check_exchange = credentials.check_exchange and not credentials.cluster_mode
try:
Expand Down Expand Up @@ -136,7 +140,10 @@ def get_ch_setting(self, setting_name):
pass

def database_dropped(self, database: str):
pass
# Forget the cached existence so a later model recreating the schema runs
# the EXISTS/CREATE path again instead of trusting a stale cache entry.
with _database_lock:
_ensured_databases.discard(database)

@abstractmethod
def close(self):
Expand All @@ -163,69 +170,43 @@ def update_model_settings(self, model_settings: Dict[str, str], materialization_
model_settings[key] = value

def _check_lightweight_deletes(self, requested: bool):
lw_deletes, lw_read_only = self.get_ch_setting(LW_DELETE_SETTING)
nd_mutations, nd_mutations_read_only = self.get_ch_setting(ND_MUTATION_SETTING)
if lw_deletes is None or nd_mutations is None:
if requested:
logger.warning(lw_deletes_not_enabled_error)
return False, False
lw_deletes = int(lw_deletes) > 0
if not lw_deletes:
if lw_read_only:
lw_deletes = False
if requested:
raise DbtConfigError(lw_deletes_not_enabled_error)
logger.warning(lw_deletes_not_enabled_warning)
else:
try:
self.command(f'SET {LW_DELETE_SETTING} = 1')
self._conn_settings[LW_DELETE_SETTING] = '1'
lw_deletes = True
except DbtDatabaseError:
logger.warning(lw_deletes_not_enabled_warning)
nd_mutations = int(nd_mutations) > 0
if lw_deletes and not nd_mutations:
if nd_mutations_read_only:
nd_mutations = False
if requested:
raise DbtConfigError(nd_mutations_not_enabled_error)
logger.warning(nd_mutations_not_enabled_warning)
else:
try:
self.command(f'SET {ND_MUTATION_SETTING} = 1')
self._conn_settings[ND_MUTATION_SETTING] = '1'
nd_mutations = True
except DbtDatabaseError:
logger.warning(nd_mutations_not_enabled_warning)
if lw_deletes and nd_mutations:
return True, requested
return False, False
# Lightweight deletes have been generally available since ClickHouse 23.3,
# which is older than every version this adapter supports, so there's no
# capability to probe. The `allow_nondeterministic_mutations` setting they
# may require is applied through the connection settings in __init__.
return True, requested
Comment thread
cursor[bot] marked this conversation as resolved.

def _ensure_database(self, database_engine, cluster_name) -> None:
if not self.database:
return
check_db = f'EXISTS DATABASE {quote_identifier(self.database)}'
try:
db_exists = self.command(check_db)
if not db_exists:
engine_clause = f' ENGINE {database_engine} ' if database_engine else ''
cluster_clause = (
f' ON CLUSTER "{cluster_name}" '
if cluster_name is not None and cluster_name.strip() != ''
else ''
)
self.command(
f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}'
)
db_exists = self.command(check_db)
if not db_exists:
# The existence check/create only needs to happen once per process; cache
# the result so a per-model client (reuse_connections: false) doesn't probe
# on every model. The cache is invalidated by `database_dropped`.
with _database_lock:
if self.database not in _ensured_databases:
check_db = f'EXISTS DATABASE {quote_identifier(self.database)}'
try:
db_exists = self.command(check_db)
if not db_exists:
engine_clause = f' ENGINE {database_engine} ' if database_engine else ''
cluster_clause = (
f' ON CLUSTER "{cluster_name}" '
if cluster_name is not None and cluster_name.strip() != ''
else ''
)
self.command(
f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}'
)
db_exists = self.command(check_db)
if not db_exists:
raise FailedToConnectError(
f'Failed to create database {self.database} for unknown reason'
)
except DbtDatabaseError as ex:
raise FailedToConnectError(
f'Failed to create database {self.database} for unknown reason'
)
except DbtDatabaseError as ex:
raise FailedToConnectError(
f'Failed to create {self.database} database due to ClickHouse exception'
) from ex
f'Failed to create {self.database} database due to ClickHouse exception'
) from ex
_ensured_databases.add(self.database)
self._set_client_database()

def _check_atomic_exchange(self) -> bool:
Expand Down
21 changes: 0 additions & 21 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,3 @@

Source columns not in target: {0}
"""

lw_deletes_not_enabled_error = """
Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but
`light weight deletes` are either not available or not enabled on this ClickHouse server.
"""

lw_deletes_not_enabled_warning = """
`light weight deletes` are either not available or not enabled on this ClickHouse server. This prevents the use
of the delete+insert incremental strategy, which may negatively affect performance for incremental models.
"""

nd_mutations_not_enabled_error = """
Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but
the required `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user
"""

nd_mutations_not_enabled_warning = """
The setting `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user` This prevents the use
of `light weight deletes` and therefore the delete+insert incremental strategy. This may negatively affect performance
for incremental models
"""
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_ch_setting(self, setting_name):
return (setting.value, setting.readonly) if setting else (None, 0)

def database_dropped(self, database: str):
super().database_dropped(database)
# This is necessary for the http client to avoid exceptions when ClickHouse doesn't recognize the database
# query parameter
if self.database == database:
Expand Down
22 changes: 22 additions & 0 deletions tests/unit/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import MagicMock, patch

from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.sql import SQLConnectionManager


def _make_manager_with_client(mock_client):
Expand Down Expand Up @@ -59,3 +60,24 @@ def test_query_id_unique_per_call(self):
r2, _ = manager.execute('SELECT 1')

assert r1.query_id != r2.query_id


def _make_manager_with_reuse(reuse_connections: bool) -> ClickHouseConnectionManager:
manager = ClickHouseConnectionManager.__new__(ClickHouseConnectionManager)
manager.profile = MagicMock()
manager.profile.credentials.reuse_connections = reuse_connections
return manager


class TestReleaseRespectsReuseConnections:
def test_release_is_a_noop_when_reuse_connections_is_true(self):
manager = _make_manager_with_reuse(reuse_connections=True)
with patch.object(SQLConnectionManager, 'release') as mock_super_release:
manager.release()
mock_super_release.assert_not_called()

def test_release_calls_super_when_reuse_connections_is_false(self):
manager = _make_manager_with_reuse(reuse_connections=False)
with patch.object(SQLConnectionManager, 'release') as mock_super_release:
manager.release()
mock_super_release.assert_called_once()
71 changes: 71 additions & 0 deletions tests/unit/test_dbclient.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from unittest.mock import MagicMock, patch

import dbt.adapters.clickhouse.dbclient as dbclient_module
import pytest
from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.dbclient import ND_MUTATION_SETTING
from dbt.adapters.clickhouse.httpclient import ChHttpClient


Expand Down Expand Up @@ -72,3 +74,72 @@ def test_default_engine_settings(mock_ch_client):
assert settings['mutations_sync'] == '3'
assert settings['alter_sync'] == '3'
assert 'select_sequential_consistency' not in settings


def _lw_credentials(use_lw_deletes, schema='default'):
return ClickHouseCredentials(
host='localhost',
port=8123,
user='default',
password='',
schema=schema,
use_lw_deletes=use_lw_deletes,
check_exchange=False,
)


def test_nd_mutation_setting_injected_when_lw_deletes_enabled(mock_ch_client):
"""allow_nondeterministic_mutations is set on the client when lw deletes are requested."""
ChHttpClient(_lw_credentials(use_lw_deletes=True))
settings = _get_settings(mock_ch_client)

assert settings[ND_MUTATION_SETTING] == '1'


def test_nd_mutation_setting_absent_when_lw_deletes_disabled(mock_ch_client):
"""No nondeterministic mutation setting is added by default."""
ChHttpClient(_lw_credentials(use_lw_deletes=False))
settings = _get_settings(mock_ch_client)

assert ND_MUTATION_SETTING not in settings


def test_check_lightweight_deletes_always_available(mock_ch_client):
"""Lightweight deletes are assumed available; use_lw_deletes mirrors the request."""
client = ChHttpClient(_lw_credentials(use_lw_deletes=True))
assert client.has_lw_deletes is True
assert client.use_lw_deletes is True


def _exists_calls(mock_ch_client):
client = mock_ch_client.return_value
return [
call
for call in client.command.call_args_list
if str(call.args[0]).startswith('EXISTS DATABASE')
]


def test_ensure_database_probes_only_once_per_process(mock_ch_client):
"""The EXISTS DATABASE probe is cached process-wide across client creations."""
dbclient_module._ensured_databases.clear()

ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))

assert len(_exists_calls(mock_ch_client)) == 1
assert 'cache_test_db' in dbclient_module._ensured_databases


def test_database_dropped_invalidates_existence_cache(mock_ch_client):
"""Dropping a schema forces the next client to probe again."""
dbclient_module._ensured_databases.clear()

client = ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
assert len(_exists_calls(mock_ch_client)) == 1

client.database_dropped('cache_test_db')
assert 'cache_test_db' not in dbclient_module._ensured_databases

ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
assert len(_exists_calls(mock_ch_client)) == 2