diff --git a/CHANGELOG.md b/CHANGELOG.md index 85f09bed..e8fd9ae7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Reduce connection startup overhead from the `EXCHANGE TABLES` capability check. On ClickHouse Cloud (Shared engine), the check now short-circuits immediately after detecting the engine — skipping 5 DDL round-trips (2× `CREATE TABLE`, `EXCHANGE TABLES`, 2× `DROP TABLE`) that were previously run on every connection open. For all other engine types, the result is cached behind a process-level lock so the DDL test runs at most once per dbt invocation regardless of thread count. ([#653](https://github.com/ClickHouse/dbt-clickhouse/pull/653)). * `dbt clone` improvements: tables backed by a MergeTree-family engine are cloned with ClickHouse's zero-copy `CREATE OR REPLACE TABLE ... CLONE AS ...`; other engines and Distributed tables fall back to dbt's view behavior ([#655](https://github.com/ClickHouse/dbt-clickhouse/pull/655)). * Add relation-scoped catalog metadata support with `clickhouse__get_catalog_relations` ([#657](https://github.com/ClickHouse/dbt-clickhouse/pull/657)). +* Add a `reuse_connections` profile option (default `true`). When set to `false`, dbt closes the connection after each model so the next opens a fresh one — useful for multi-replica ClickHouse Cloud where connection-sticky load balancing would otherwise pin a `dbt run` to one replica. Per-model reconnects are kept cheap by caching the `EXISTS DATABASE` probe and dropping the now-redundant lightweight-delete capability check ([#669](https://github.com/ClickHouse/dbt-clickhouse/issues/669), [#670](https://github.com/ClickHouse/dbt-clickhouse/pull/670)). #### Repository maintenance * Replaced legacy `docker-compose` commands with `docker compose` (V2) and updated the GitHub Actions workflow to use Docker Compose V2 ([#647](https://github.com/ClickHouse/dbt-clickhouse/pull/647)). diff --git a/dbt/adapters/clickhouse/connections.py b/dbt/adapters/clickhouse/connections.py index bc0bf9e0..3b4bbebd 100644 --- a/dbt/adapters/clickhouse/connections.py +++ b/dbt/adapters/clickhouse/connections.py @@ -59,7 +59,12 @@ def cancel(self, connection): logger.debug('Cancel query \'{}\'', connection_name) def release(self): - pass # There is no "release" type functionality in the existing ClickHouse connectors + # Default: keep the connection open (historical behavior). With + # `reuse_connections: false`, fall through to the base release() + # which closes the handle so the next model picks a fresh replica. + if self.profile.credentials.reuse_connections: + return + super().release() @classmethod def get_table_from_response(cls, response, column_names) -> "agate.Table": diff --git a/dbt/adapters/clickhouse/credentials.py b/dbt/adapters/clickhouse/credentials.py index e9cf6496..46da5e74 100644 --- a/dbt/adapters/clickhouse/credentials.py +++ b/dbt/adapters/clickhouse/credentials.py @@ -38,6 +38,9 @@ class ClickHouseCredentials(Credentials): local_db_prefix: str = '' allow_automatic_deduplication: bool = False tcp_keepalive: Union[bool, tuple[int, int, int], list[int]] = False + # When False, close the connection after each model so the next opens a + # fresh TCP socket — lets a Cloud LB rebalance dbt across replicas. + reuse_connections: bool = True @property def type(self): @@ -87,4 +90,5 @@ def _connection_keys(self): 'use_lw_deletes', 'allow_automatic_deduplication', 'tcp_keepalive', + 'reuse_connections', ) diff --git a/dbt/adapters/clickhouse/dbclient.py b/dbt/adapters/clickhouse/dbclient.py index 05734631..df314cfc 100644 --- a/dbt/adapters/clickhouse/dbclient.py +++ b/dbt/adapters/clickhouse/dbclient.py @@ -5,22 +5,21 @@ from typing import Dict, Optional from dbt.adapters.clickhouse.credentials import ClickHouseCredentials -from dbt.adapters.clickhouse.errors import ( - lw_deletes_not_enabled_error, - lw_deletes_not_enabled_warning, - nd_mutations_not_enabled_error, - nd_mutations_not_enabled_warning, -) from dbt.adapters.clickhouse.logger import logger from dbt.adapters.clickhouse.query import quote_identifier from dbt.adapters.clickhouse.util import compare_versions, engine_can_atomic_exchange from dbt.adapters.exceptions import FailedToConnectError -from dbt_common.exceptions import DbtConfigError, DbtDatabaseError +from dbt_common.exceptions import DbtDatabaseError _exchange_lock = threading.Lock() _exchange_result: Optional[bool] = None -LW_DELETE_SETTING = 'allow_experimental_lightweight_delete' +# Databases whose existence has already been ensured in this process. Guarded by +# `_database_lock`. With `reuse_connections: false` a client is created per model, +# so without this cache the `EXISTS DATABASE` probe would run on every model. +_database_lock = threading.Lock() +_ensured_databases: set = set() + ND_MUTATION_SETTING = 'allow_nondeterministic_mutations' DEDUP_WINDOW_SETTING = 'replicated_deduplication_window' DEDUP_WINDOW_SETTING_SUPPORTED_MATERIALIZATION = [ @@ -89,6 +88,11 @@ def __init__(self, credentials: ClickHouseCredentials): self._conn_settings.setdefault('mutations_sync', '3') self._conn_settings.setdefault('alter_sync', '3') self._conn_settings.setdefault('insert_distributed_sync', '1') + # Lightweight deletes that read from other tables require nondeterministic + # mutations. Apply it via the connection settings (which both drivers read + # at client creation) so we don't probe + SET it on every client. + if credentials.use_lw_deletes: + self._conn_settings.setdefault(ND_MUTATION_SETTING, '1') self._client = self._create_client(credentials) check_exchange = credentials.check_exchange and not credentials.cluster_mode try: @@ -136,7 +140,10 @@ def get_ch_setting(self, setting_name): pass def database_dropped(self, database: str): - pass + # Forget the cached existence so a later model recreating the schema runs + # the EXISTS/CREATE path again instead of trusting a stale cache entry. + with _database_lock: + _ensured_databases.discard(database) @abstractmethod def close(self): @@ -163,69 +170,43 @@ def update_model_settings(self, model_settings: Dict[str, str], materialization_ model_settings[key] = value def _check_lightweight_deletes(self, requested: bool): - lw_deletes, lw_read_only = self.get_ch_setting(LW_DELETE_SETTING) - nd_mutations, nd_mutations_read_only = self.get_ch_setting(ND_MUTATION_SETTING) - if lw_deletes is None or nd_mutations is None: - if requested: - logger.warning(lw_deletes_not_enabled_error) - return False, False - lw_deletes = int(lw_deletes) > 0 - if not lw_deletes: - if lw_read_only: - lw_deletes = False - if requested: - raise DbtConfigError(lw_deletes_not_enabled_error) - logger.warning(lw_deletes_not_enabled_warning) - else: - try: - self.command(f'SET {LW_DELETE_SETTING} = 1') - self._conn_settings[LW_DELETE_SETTING] = '1' - lw_deletes = True - except DbtDatabaseError: - logger.warning(lw_deletes_not_enabled_warning) - nd_mutations = int(nd_mutations) > 0 - if lw_deletes and not nd_mutations: - if nd_mutations_read_only: - nd_mutations = False - if requested: - raise DbtConfigError(nd_mutations_not_enabled_error) - logger.warning(nd_mutations_not_enabled_warning) - else: - try: - self.command(f'SET {ND_MUTATION_SETTING} = 1') - self._conn_settings[ND_MUTATION_SETTING] = '1' - nd_mutations = True - except DbtDatabaseError: - logger.warning(nd_mutations_not_enabled_warning) - if lw_deletes and nd_mutations: - return True, requested - return False, False + # Lightweight deletes have been generally available since ClickHouse 23.3, + # which is older than every version this adapter supports, so there's no + # capability to probe. The `allow_nondeterministic_mutations` setting they + # may require is applied through the connection settings in __init__. + return True, requested def _ensure_database(self, database_engine, cluster_name) -> None: if not self.database: return - check_db = f'EXISTS DATABASE {quote_identifier(self.database)}' - try: - db_exists = self.command(check_db) - if not db_exists: - engine_clause = f' ENGINE {database_engine} ' if database_engine else '' - cluster_clause = ( - f' ON CLUSTER "{cluster_name}" ' - if cluster_name is not None and cluster_name.strip() != '' - else '' - ) - self.command( - f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}' - ) - db_exists = self.command(check_db) - if not db_exists: + # The existence check/create only needs to happen once per process; cache + # the result so a per-model client (reuse_connections: false) doesn't probe + # on every model. The cache is invalidated by `database_dropped`. + with _database_lock: + if self.database not in _ensured_databases: + check_db = f'EXISTS DATABASE {quote_identifier(self.database)}' + try: + db_exists = self.command(check_db) + if not db_exists: + engine_clause = f' ENGINE {database_engine} ' if database_engine else '' + cluster_clause = ( + f' ON CLUSTER "{cluster_name}" ' + if cluster_name is not None and cluster_name.strip() != '' + else '' + ) + self.command( + f'CREATE DATABASE IF NOT EXISTS {quote_identifier(self.database)}{cluster_clause}{engine_clause}' + ) + db_exists = self.command(check_db) + if not db_exists: + raise FailedToConnectError( + f'Failed to create database {self.database} for unknown reason' + ) + except DbtDatabaseError as ex: raise FailedToConnectError( - f'Failed to create database {self.database} for unknown reason' - ) - except DbtDatabaseError as ex: - raise FailedToConnectError( - f'Failed to create {self.database} database due to ClickHouse exception' - ) from ex + f'Failed to create {self.database} database due to ClickHouse exception' + ) from ex + _ensured_databases.add(self.database) self._set_client_database() def _check_atomic_exchange(self) -> bool: diff --git a/dbt/adapters/clickhouse/errors.py b/dbt/adapters/clickhouse/errors.py index 23248c53..ecb52239 100644 --- a/dbt/adapters/clickhouse/errors.py +++ b/dbt/adapters/clickhouse/errors.py @@ -22,24 +22,3 @@ Source columns not in target: {0} """ - -lw_deletes_not_enabled_error = """ -Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but -`light weight deletes` are either not available or not enabled on this ClickHouse server. -""" - -lw_deletes_not_enabled_warning = """ -`light weight deletes` are either not available or not enabled on this ClickHouse server. This prevents the use -of the delete+insert incremental strategy, which may negatively affect performance for incremental models. -""" - -nd_mutations_not_enabled_error = """ -Attempting to apply the configuration `use_lw_deletes` to enable the delete+insert incremental strategy, but -the required `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user -""" - -nd_mutations_not_enabled_warning = """ -The setting `allow_nondeterministic_mutations` is not enabled and is `read_only` for this user` This prevents the use -of `light weight deletes` and therefore the delete+insert incremental strategy. This may negatively affect performance -for incremental models -""" diff --git a/dbt/adapters/clickhouse/httpclient.py b/dbt/adapters/clickhouse/httpclient.py index 62015266..8e87cebe 100644 --- a/dbt/adapters/clickhouse/httpclient.py +++ b/dbt/adapters/clickhouse/httpclient.py @@ -52,6 +52,7 @@ def get_ch_setting(self, setting_name): return (setting.value, setting.readonly) if setting else (None, 0) def database_dropped(self, database: str): + super().database_dropped(database) # This is necessary for the http client to avoid exceptions when ClickHouse doesn't recognize the database # query parameter if self.database == database: diff --git a/dbt/adapters/clickhouse/impl.py b/dbt/adapters/clickhouse/impl.py index fdc6204d..c3e2a7f8 100644 --- a/dbt/adapters/clickhouse/impl.py +++ b/dbt/adapters/clickhouse/impl.py @@ -200,7 +200,7 @@ def validate_incremental_strategy( raise DbtRuntimeError( f"The incremental strategy '{strategy}' is not valid for ClickHouse." ) - if strategy in ('delete_insert', 'microbatch') and not conn.handle.has_lw_deletes: + if strategy in ('delete_insert', 'microbatch') and not conn.handle.use_lw_deletes: raise DbtRuntimeError( f"'{strategy}' strategy requires setting the profile config 'use_lw_deletes' to true." ) diff --git a/tests/unit/test_connections.py b/tests/unit/test_connections.py index ad12b898..8d898470 100644 --- a/tests/unit/test_connections.py +++ b/tests/unit/test_connections.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch from dbt.adapters.clickhouse.connections import ClickHouseConnectionManager +from dbt.adapters.sql import SQLConnectionManager def _make_manager_with_client(mock_client): @@ -59,3 +60,31 @@ def test_query_id_unique_per_call(self): r2, _ = manager.execute('SELECT 1') assert r1.query_id != r2.query_id + + +def test_reuse_connections_defaults_to_true(): + from dbt.adapters.clickhouse.credentials import ClickHouseCredentials + + creds = ClickHouseCredentials(host='localhost', schema='default') + assert creds.reuse_connections is True + + +def _make_manager_with_reuse(reuse_connections: bool) -> ClickHouseConnectionManager: + manager = ClickHouseConnectionManager.__new__(ClickHouseConnectionManager) + manager.profile = MagicMock() + manager.profile.credentials.reuse_connections = reuse_connections + return manager + + +class TestReleaseRespectsReuseConnections: + def test_release_is_a_noop_when_reuse_connections_is_true(self): + manager = _make_manager_with_reuse(reuse_connections=True) + with patch.object(SQLConnectionManager, 'release') as mock_super_release: + manager.release() + mock_super_release.assert_not_called() + + def test_release_calls_super_when_reuse_connections_is_false(self): + manager = _make_manager_with_reuse(reuse_connections=False) + with patch.object(SQLConnectionManager, 'release') as mock_super_release: + manager.release() + mock_super_release.assert_called_once() diff --git a/tests/unit/test_dbclient.py b/tests/unit/test_dbclient.py index 76cca427..cc8481a8 100644 --- a/tests/unit/test_dbclient.py +++ b/tests/unit/test_dbclient.py @@ -1,7 +1,9 @@ from unittest.mock import MagicMock, patch +import dbt.adapters.clickhouse.dbclient as dbclient_module import pytest from dbt.adapters.clickhouse.credentials import ClickHouseCredentials +from dbt.adapters.clickhouse.dbclient import ND_MUTATION_SETTING from dbt.adapters.clickhouse.httpclient import ChHttpClient @@ -12,6 +14,12 @@ def mock_ch_client(): yield mock_get_client +@pytest.fixture(autouse=True) +def reset_ensured_databases(): + dbclient_module._ensured_databases.clear() + yield + + def _get_settings(mock_get_client): return mock_get_client.call_args.kwargs['settings'] @@ -72,3 +80,75 @@ def test_default_engine_settings(mock_ch_client): assert settings['mutations_sync'] == '3' assert settings['alter_sync'] == '3' assert 'select_sequential_consistency' not in settings + + +def _lw_credentials(use_lw_deletes, schema='default'): + return ClickHouseCredentials( + host='localhost', + port=8123, + user='default', + password='', + schema=schema, + use_lw_deletes=use_lw_deletes, + check_exchange=False, + ) + + +def test_nd_mutation_setting_injected_when_lw_deletes_enabled(mock_ch_client): + """allow_nondeterministic_mutations is set on the client when lw deletes are requested.""" + ChHttpClient(_lw_credentials(use_lw_deletes=True)) + settings = _get_settings(mock_ch_client) + + assert settings[ND_MUTATION_SETTING] == '1' + + +def test_nd_mutation_setting_absent_when_lw_deletes_disabled(mock_ch_client): + """No nondeterministic mutation setting is added by default.""" + ChHttpClient(_lw_credentials(use_lw_deletes=False)) + settings = _get_settings(mock_ch_client) + + assert ND_MUTATION_SETTING not in settings + + +def test_check_lightweight_deletes_always_available(mock_ch_client): + """Lightweight deletes are assumed available; use_lw_deletes mirrors the request.""" + client = ChHttpClient(_lw_credentials(use_lw_deletes=True)) + assert client.has_lw_deletes is True + assert client.use_lw_deletes is True + + +def test_check_lightweight_deletes_has_lw_true_even_when_not_requested(mock_ch_client): + """has_lw_deletes is always True (GA on all supported versions) even when use_lw_deletes=False.""" + client = ChHttpClient(_lw_credentials(use_lw_deletes=False)) + assert client.has_lw_deletes is True + assert client.use_lw_deletes is False + + +def _exists_calls(mock_ch_client): + client = mock_ch_client.return_value + return [ + call + for call in client.command.call_args_list + if str(call.args[0]).startswith('EXISTS DATABASE') + ] + + +def test_ensure_database_probes_only_once_per_process(mock_ch_client): + """The EXISTS DATABASE probe is cached process-wide across client creations.""" + ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db')) + ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db')) + + assert len(_exists_calls(mock_ch_client)) == 1 + assert 'cache_test_db' in dbclient_module._ensured_databases + + +def test_database_dropped_invalidates_existence_cache(mock_ch_client): + """Dropping a schema forces the next client to probe again.""" + client = ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db')) + assert len(_exists_calls(mock_ch_client)) == 1 + + client.database_dropped('cache_test_db') + assert 'cache_test_db' not in dbclient_module._ensured_databases + + ChHttpClient(_lw_credentials(use_lw_deletes=False, schema='cache_test_db')) + assert len(_exists_calls(mock_ch_client)) == 2 diff --git a/tests/unit/test_incremental_strategy.py b/tests/unit/test_incremental_strategy.py new file mode 100644 index 00000000..d0c8dc2d --- /dev/null +++ b/tests/unit/test_incremental_strategy.py @@ -0,0 +1,60 @@ +from multiprocessing.context import SpawnContext +from unittest.mock import MagicMock, Mock + +import pytest +from dbt.adapters.clickhouse.impl import ClickHouseAdapter +from dbt_common.exceptions import DbtRuntimeError + + +def _make_adapter(use_lw_deletes: bool) -> ClickHouseAdapter: + adapter = ClickHouseAdapter.__new__(ClickHouseAdapter) + adapter.connections = MagicMock() + mock_conn = MagicMock() + mock_conn.handle.use_lw_deletes = use_lw_deletes + mock_conn.handle.has_lw_deletes = True + adapter.connections.get_if_exists.return_value = mock_conn + return adapter + + +class TestValidateIncrementalStrategy: + def test_delete_insert_raises_when_use_lw_deletes_false(self): + """delete_insert requires use_lw_deletes:true — the guard must check use_lw_deletes, + not has_lw_deletes (which is always True).""" + adapter = _make_adapter(use_lw_deletes=False) + with pytest.raises(DbtRuntimeError, match="use_lw_deletes"): + adapter.validate_incremental_strategy('delete_insert', [], 'id', None) + + def test_microbatch_raises_when_use_lw_deletes_false(self): + adapter = _make_adapter(use_lw_deletes=False) + with pytest.raises(DbtRuntimeError, match="use_lw_deletes"): + adapter.validate_incremental_strategy('microbatch', [], 'id', None) + + def test_delete_insert_passes_when_use_lw_deletes_true(self): + adapter = _make_adapter(use_lw_deletes=True) + adapter.validate_incremental_strategy('delete_insert', [], 'id', None) + + def test_microbatch_passes_when_use_lw_deletes_true(self): + adapter = _make_adapter(use_lw_deletes=True) + adapter.validate_incremental_strategy('microbatch', [], 'id', None) + + def test_legacy_does_not_require_use_lw_deletes(self): + adapter = _make_adapter(use_lw_deletes=False) + adapter.validate_incremental_strategy('legacy', [], None, None) + + def test_append_does_not_require_use_lw_deletes(self): + adapter = _make_adapter(use_lw_deletes=False) + adapter.validate_incremental_strategy('append', [], None, None) + + def test_insert_overwrite_does_not_require_use_lw_deletes(self): + adapter = _make_adapter(use_lw_deletes=False) + adapter.validate_incremental_strategy('insert_overwrite', [], None, 'date') + + def test_delete_insert_raises_without_unique_key(self): + adapter = _make_adapter(use_lw_deletes=True) + with pytest.raises(DbtRuntimeError, match="unique_key"): + adapter.validate_incremental_strategy('delete_insert', [], None, None) + + def test_unknown_strategy_raises(self): + adapter = _make_adapter(use_lw_deletes=True) + with pytest.raises(DbtRuntimeError, match="not valid"): + adapter.validate_incremental_strategy('invalid_strategy', [], 'id', None)