Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Reduce connection startup overhead from the `EXCHANGE TABLES` capability check. On ClickHouse Cloud (Shared engine), the check now short-circuits immediately after detecting the engine — skipping 5 DDL round-trips (2× `CREATE TABLE`, `EXCHANGE TABLES`, 2× `DROP TABLE`) that were previously run on every connection open. For all other engine types, the result is cached behind a process-level lock so the DDL test runs at most once per dbt invocation regardless of thread count. ([#653](https://github.com/ClickHouse/dbt-clickhouse/pull/653)).
* `dbt clone` improvements: tables backed by a MergeTree-family engine are cloned with ClickHouse's zero-copy `CREATE OR REPLACE TABLE ... CLONE AS ...`; other engines and Distributed tables fall back to dbt's view behavior ([#655](https://github.com/ClickHouse/dbt-clickhouse/pull/655)).
* Add relation-scoped catalog metadata support with `clickhouse__get_catalog_relations` ([#657](https://github.com/ClickHouse/dbt-clickhouse/pull/657)).
* Add a `reuse_connections` profile option (default `true`). When set to `false`, dbt closes the connection after each model so the next opens a fresh one — useful for multi-replica ClickHouse Cloud where connection-sticky load balancing would otherwise pin a `dbt run` to one replica. Per-model reconnects are kept cheap by caching the `EXISTS DATABASE` probe and dropping the now-redundant lightweight-delete capability check ([#669](https://github.com/ClickHouse/dbt-clickhouse/issues/669), [#670](https://github.com/ClickHouse/dbt-clickhouse/pull/670)).

#### Repository maintenance
* Replaced legacy `docker-compose` commands with `docker compose` (V2) and updated the GitHub Actions workflow to use Docker Compose V2 ([#647](https://github.com/ClickHouse/dbt-clickhouse/pull/647)).
Expand Down
7 changes: 6 additions & 1 deletion dbt/adapters/clickhouse/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ def cancel(self, connection):
logger.debug('Cancel query \'{}\'', connection_name)

def release(self):
pass # There is no "release" type functionality in the existing ClickHouse connectors
# Default: keep the connection open (historical behavior). With
# `reuse_connections: false`, fall through to the base release()
# which closes the handle so the next model picks a fresh replica.
if self.profile.credentials.reuse_connections:
return
super().release()

@classmethod
def get_table_from_response(cls, response, column_names) -> "agate.Table":
Expand Down
4 changes: 4 additions & 0 deletions dbt/adapters/clickhouse/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ class ClickHouseCredentials(Credentials):
local_db_prefix: str = ''
allow_automatic_deduplication: bool = False
tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False
# When False, close the connection after each model so the next opens a
# fresh TCP socket — lets a Cloud LB rebalance dbt across replicas.
reuse_connections: bool = True

@property
def type(self):
Expand Down Expand Up @@ -87,4 +90,5 @@ def _connection_keys(self):
'use_lw_deletes',
'allow_automatic_deduplication',
'tcp_keepalive',
'reuse_connections',
)
115 changes: 48 additions & 67 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,21 @@
from typing import Dict, Optional

from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.errors import (
lw_deletes_not_enabled_error,
lw_deletes_not_enabled_warning,
nd_mutations_not_enabled_error,
nd_mutations_not_enabled_warning,
)
from dbt.adapters.clickhouse.logger import logger
from dbt.adapters.clickhouse.query import quote_identifier
from dbt.adapters.clickhouse.util import compare_versions, engine_can_atomic_exchange
from dbt.adapters.exceptions import FailedToConnectError
from dbt_common.exceptions import DbtConfigError, DbtDatabaseError
from dbt_common.exceptions import DbtDatabaseError

_exchange_lock = threading.Lock()
_exchange_result: Optional[bool] = None

LW_DELETE_SETTING = 'allow_experimental_lightweight_delete'
# Databases whose existence has already been ensured in this process. Guarded by
# `_database_lock`. With `reuse_connections: false` a client is created per model,
# so without this cache the `EXISTS DATABASE` probe would run on every model.
_database_lock = threading.Lock()
_ensured_databases: set = set()

ND_MUTATION_SETTING = 'allow_nondeterministic_mutations'
DEDUP_WINDOW_SETTING = 'replicated_deduplication_window'
DEDUP_WINDOW_SETTING_SUPPORTED_MATERIALIZATION = [
Expand Down Expand Up @@ -89,6 +88,11 @@ def __init__(self, credentials: ClickHouseCredentials):
self._conn_settings.setdefault('mutations_sync', '3')
self._conn_settings.setdefault('alter_sync', '3')
self._conn_settings.setdefault('insert_distributed_sync', '1')
# Lightweight deletes that read from other tables require nondeterministic
# mutations. Apply it via the connection settings (which both drivers read
# at client creation) so we don't probe + SET it on every client.
if credentials.use_lw_deletes:
self._conn_settings.setdefault(ND_MUTATION_SETTING, '1')
self._client = self._create_client(credentials)
check_exchange = credentials.check_exchange and not credentials.cluster_mode
try:
Expand Down Expand Up @@ -136,7 +140,10 @@ def get_ch_setting(self, setting_name):
pass

def database_dropped(self, database: str):
pass
# Forget the cached existence so a later model recreating the schema runs
# the EXISTS/CREATE path again instead of trusting a stale cache entry.
with _database_lock:
_ensured_databases.discard(database)

@abstractmethod
def close(self):
Expand All @@ -163,69 +170,43 @@ def update_model_settings(self, model_settings: Dict[str, str], materialization_
model_settings[key] = value

def _check_lightweight_deletes(self, requested: bool):
lw_deletes, lw_read_only = self.get_ch_setting(LW_DELETE_SETTING)
nd_mutations, nd_mutations_read_only = self.get_ch_setting(ND_MUTATION_SETTING)
if lw_deletes is None or nd_mutations is None:
if requested:
logger.warning(lw_deletes_not_enabled_error)
return False, False
lw_deletes = int(lw_deletes) > 0
if not lw_deletes:
if lw_read_only:
lw_deletes = False
if requested:
raise DbtConfigError(lw_deletes_not_enabled_error)
logger.warning(lw_deletes_not_enabled_warning)
else:
try:
self.command(f'SET {LW_DELETE_SETTING} = 1')
self._conn_settings[LW_DELETE_SETTING] = '1'
lw_deletes = True
except DbtDatabaseError:
logger.warning(lw_deletes_not_enabled_warning)
nd_mutations = int(nd_mutations) > 0
if lw_deletes and not nd_mutations:
if nd_mutations_read_only:
nd_mutations = False
if requested:
raise DbtConfigError(nd_mutations_not_enabled_error)
logger.warning(nd_mutations_not_enabled_warning)
else:
try:
self.command(f'SET {ND_MUTATION_SETTING} = 1')
self._conn_settings[ND_MUTATION_SETTING] = '1'
nd_mutations = True
except DbtDatabaseError:
logger.warning(nd_mutations_not_enabled_warning)
if lw_deletes and nd_mutations:
return True, requested
return False, False
# Lightweight deletes have been generally available since ClickHouse 23.3,
# which is older than every version this adapter supports, so there's no
# capability to probe. The `allow_nondeterministic_mutations` setting they
# may require is applied through the connection settings in __init__.
return True, requested
Comment thread
cursor[bot] marked this conversation as resolved.

def _ensure_database(self, database_engine, cluster_name) -> None:
if not self.database:
return
check_db = f'EXISTS DATABASE {quote_identifier(self.database)}'
try:
db_exists = self.command(check_db)
if not db_exists:
engine_clause = f' ENGINE {database_engine} ' if database_engine else ''
cluster_clause = (
f' ON CLUSTER "{cluster_name}" '
if cluster_name is not None and cluster_name.strip() != ''
else ''
)
self.command(
f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}'
)
db_exists = self.command(check_db)
if not db_exists:
# The existence check/create only needs to happen once per process; cache
# the result so a per-model client (reuse_connections: false) doesn't probe
# on every model. The cache is invalidated by `database_dropped`.
with _database_lock:
if self.database not in _ensured_databases:
check_db = f'EXISTS DATABASE {quote_identifier(self.database)}'
try:
db_exists = self.command(check_db)
if not db_exists:
engine_clause = f' ENGINE {database_engine} ' if database_engine else ''
cluster_clause = (
f' ON CLUSTER "{cluster_name}" '
if cluster_name is not None and cluster_name.strip() != ''
else ''
)
self.command(
f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}'
)
db_exists = self.command(check_db)
if not db_exists:
raise FailedToConnectError(
f'Failed to create database {self.database} for unknown reason'
)
except DbtDatabaseError as ex:
raise FailedToConnectError(
f'Failed to create database {self.database} for unknown reason'
)
except DbtDatabaseError as ex:
raise FailedToConnectError(
f'Failed to create {self.database} database due to ClickHouse exception'
) from ex
f'Failed to create {self.database} database due to ClickHouse exception'
) from ex
_ensured_databases.add(self.database)
self._set_client_database()

def _check_atomic_exchange(self) -> bool:
Expand Down
21 changes: 0 additions & 21 deletions dbt/adapters/clickhouse/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,3 @@

Source columns not in target: {0}
"""

lw_deletes_not_enabled_error = """
Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but
`light weight deletes` are either not available or not enabled on this ClickHouse server.
"""

lw_deletes_not_enabled_warning = """
`light weight deletes` are either not available or not enabled on this ClickHouse server. This prevents the use
of the delete+insert incremental strategy, which may negatively affect performance for incremental models.
"""

nd_mutations_not_enabled_error = """
Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but
the required `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user
"""

nd_mutations_not_enabled_warning = """
The setting `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user` This prevents the use
of `light weight deletes` and therefore the delete+insert incremental strategy. This may negatively affect performance
for incremental models
"""
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_ch_setting(self, setting_name):
return (setting.value, setting.readonly) if setting else (None, 0)

def database_dropped(self, database: str):
super().database_dropped(database)
# This is necessary for the http client to avoid exceptions when ClickHouse doesn't recognize the database
# query parameter
if self.database == database:
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def validate_incremental_strategy(
raise DbtRuntimeError(
f"The incremental strategy '{strategy}' is not valid for ClickHouse."
)
if strategy in ('delete_insert', 'microbatch') and not conn.handle.has_lw_deletes:
if strategy in ('delete_insert', 'microbatch') and not conn.handle.use_lw_deletes:
raise DbtRuntimeError(
f"'{strategy}' strategy requires setting the profile config 'use_lw_deletes' to true."
)
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/test_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest.mock import MagicMock, patch

from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager
from dbt.adapters.sql import SQLConnectionManager


def _make_manager_with_client(mock_client):
Expand Down Expand Up @@ -59,3 +60,31 @@ def test_query_id_unique_per_call(self):
r2, _ = manager.execute('SELECT 1')

assert r1.query_id != r2.query_id


def test_reuse_connections_defaults_to_true():
from dbt.adapters.clickhouse.credentials import ClickHouseCredentials

creds = ClickHouseCredentials(host='localhost', schema='default')
assert creds.reuse_connections is True


def _make_manager_with_reuse(reuse_connections: bool) -> ClickHouseConnectionManager:
manager = ClickHouseConnectionManager.__new__(ClickHouseConnectionManager)
manager.profile = MagicMock()
manager.profile.credentials.reuse_connections = reuse_connections
return manager


class TestReleaseRespectsReuseConnections:
def test_release_is_a_noop_when_reuse_connections_is_true(self):
manager = _make_manager_with_reuse(reuse_connections=True)
with patch.object(SQLConnectionManager, 'release') as mock_super_release:
manager.release()
mock_super_release.assert_not_called()

def test_release_calls_super_when_reuse_connections_is_false(self):
manager = _make_manager_with_reuse(reuse_connections=False)
with patch.object(SQLConnectionManager, 'release') as mock_super_release:
manager.release()
mock_super_release.assert_called_once()
80 changes: 80 additions & 0 deletions tests/unit/test_dbclient.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from unittest.mock import MagicMock, patch

import dbt.adapters.clickhouse.dbclient as dbclient_module
import pytest
from dbt.adapters.clickhouse.credentials import ClickHouseCredentials
from dbt.adapters.clickhouse.dbclient import ND_MUTATION_SETTING
from dbt.adapters.clickhouse.httpclient import ChHttpClient


Expand All @@ -12,6 +14,12 @@ def mock_ch_client():
yield mock_get_client


@pytest.fixture(autouse=True)
def reset_ensured_databases():
dbclient_module._ensured_databases.clear()
yield


def _get_settings(mock_get_client):
return mock_get_client.call_args.kwargs['settings']

Expand Down Expand Up @@ -72,3 +80,75 @@ def test_default_engine_settings(mock_ch_client):
assert settings['mutations_sync'] == '3'
assert settings['alter_sync'] == '3'
assert 'select_sequential_consistency' not in settings


def _lw_credentials(use_lw_deletes, schema='default'):
return ClickHouseCredentials(
host='localhost',
port=8123,
user='default',
password='',
schema=schema,
use_lw_deletes=use_lw_deletes,
check_exchange=False,
)


def test_nd_mutation_setting_injected_when_lw_deletes_enabled(mock_ch_client):
"""allow_nondeterministic_mutations is set on the client when lw deletes are requested."""
ChHttpClient(_lw_credentials(use_lw_deletes=True))
settings = _get_settings(mock_ch_client)

assert settings[ND_MUTATION_SETTING] == '1'


def test_nd_mutation_setting_absent_when_lw_deletes_disabled(mock_ch_client):
"""No nondeterministic mutation setting is added by default."""
ChHttpClient(_lw_credentials(use_lw_deletes=False))
settings = _get_settings(mock_ch_client)

assert ND_MUTATION_SETTING not in settings


def test_check_lightweight_deletes_always_available(mock_ch_client):
"""Lightweight deletes are assumed available; use_lw_deletes mirrors the request."""
client = ChHttpClient(_lw_credentials(use_lw_deletes=True))
assert client.has_lw_deletes is True
assert client.use_lw_deletes is True


def test_check_lightweight_deletes_has_lw_true_even_when_not_requested(mock_ch_client):
"""has_lw_deletes is always True (GA on all supported versions) even when use_lw_deletes=False."""
client = ChHttpClient(_lw_credentials(use_lw_deletes=False))
assert client.has_lw_deletes is True
assert client.use_lw_deletes is False


def _exists_calls(mock_ch_client):
client = mock_ch_client.return_value
return [
call
for call in client.command.call_args_list
if str(call.args[0]).startswith('EXISTS DATABASE')
]


def test_ensure_database_probes_only_once_per_process(mock_ch_client):
"""The EXISTS DATABASE probe is cached process-wide across client creations."""
ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))

assert len(_exists_calls(mock_ch_client)) == 1
assert 'cache_test_db' in dbclient_module._ensured_databases


def test_database_dropped_invalidates_existence_cache(mock_ch_client):
"""Dropping a schema forces the next client to probe again."""
client = ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
assert len(_exists_calls(mock_ch_client)) == 1

client.database_dropped('cache_test_db')
assert 'cache_test_db' not in dbclient_module._ensured_databases

ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db'))
assert len(_exists_calls(mock_ch_client)) == 2
Loading
Loading