diff --git a/docs/src/main/sphinx/connector/clickhouse.md b/docs/src/main/sphinx/connector/clickhouse.md
index 0a24166ca246..ff3072561ebb 100644
--- a/docs/src/main/sphinx/connector/clickhouse.md
+++ b/docs/src/main/sphinx/connector/clickhouse.md
@@ -70,6 +70,15 @@ driver documentation](https://clickhouse.com/docs/en/interfaces/jdbc/)
```{include} jdbc-authentication.fragment
```
+### Cluster mode
+
+If the clickhouse connected by `connection-url` belongs to the cluster mode, it is recommended to add this item, so that the ddl statement will be executed on all nodes of the cluster by default,
+which will be as convenient as the stand-alone clickhouse.
+
+```properties
+cluster-name=examplecluster
+```
+
### Multiple ClickHouse servers
If you have multiple ClickHouse servers you need to configure one
diff --git a/plugin/trino-clickhouse/pom.xml b/plugin/trino-clickhouse/pom.xml
index a9e8d96958c5..4dff9117e419 100644
--- a/plugin/trino-clickhouse/pom.xml
+++ b/plugin/trino-clickhouse/pom.xml
@@ -59,6 +59,11 @@
jakarta.annotation-api
+
+ jakarta.validation
+ jakarta.validation-api
+
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java
index e3bf999c3043..f8654f7257d8 100644
--- a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java
+++ b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java
@@ -211,20 +211,23 @@ public class ClickHouseClient
private final ConnectorExpressionRewriter connectorExpressionRewriter;
private final AggregateFunctionRewriter aggregateFunctionRewriter;
+ private final Optional clusterName;
private final Type uuidType;
private final Type ipAddressType;
private final AtomicReference clickHouseVersion = new AtomicReference<>();
@Inject
public ClickHouseClient(
- BaseJdbcConfig config,
+ ClickHouseConfig config,
+ BaseJdbcConfig baseJdbcConfig,
ConnectionFactory connectionFactory,
QueryBuilder queryBuilder,
TypeManager typeManager,
IdentifierMapping identifierMapping,
RemoteQueryModifier queryModifier)
{
- super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
+ super("\"", connectionFactory, queryBuilder, baseJdbcConfig.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
+ this.clusterName = config.getClusterName();
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
@@ -253,6 +256,11 @@ public ClickHouseClient(
.build());
}
+ private String getClusterInfo()
+ {
+ return clusterName.map(" ON CLUSTER %s "::formatted).orElse("");
+ }
+
@Override
public Optional implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map assignments)
{
@@ -340,8 +348,9 @@ protected void copyTableSchema(ConnectorSession session, Connection connection,
// 1. create table tbl as tbl2
// 2. create table tbl1 ENGINE= as select * from tbl2
String sql = format(
- "CREATE TABLE %s AS %s ",
+ "CREATE TABLE %s %s AS %s ",
quoted(null, schemaName, newTableName),
+ getClusterInfo(),
quoted(null, schemaName, tableName));
try {
execute(session, connection, sql);
@@ -396,7 +405,7 @@ protected List createTableSqls(RemoteTableName remoteTableName, List tableOptions.add("SAMPLE BY " + quoted(value)));
tableMetadata.getComment().ifPresent(comment -> tableOptions.add(format("COMMENT %s", clickhouseVarcharLiteral(comment))));
- return ImmutableList.of(format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build())));
+ return ImmutableList.of(format("CREATE TABLE %s %s (%s) %s", quoted(remoteTableName), getClusterInfo(), join(", ", columns), join(" ", tableOptions.build())));
}
@Override
@@ -461,8 +470,9 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
try (Connection connection = connectionFactory.openConnection(session)) {
String sql = format(
- "ALTER TABLE %s MODIFY %s",
+ "ALTER TABLE %s %s MODIFY %s",
quoted(handle.asPlainTable().getRemoteTableName()),
+ getClusterInfo(),
join(" ", tableOptions.build()));
execute(session, connection, sql);
}
@@ -495,7 +505,7 @@ protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata
protected void createSchema(ConnectorSession session, Connection connection, String remoteSchemaName)
throws SQLException
{
- execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName));
+ execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
}
@Override
@@ -510,14 +520,14 @@ protected void dropSchema(ConnectorSession session, Connection connection, Strin
}
}
}
- execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName));
+ execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
}
@Override
protected void renameSchema(ConnectorSession session, Connection connection, String remoteSchemaName, String newRemoteSchemaName)
throws SQLException
{
- execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName));
+ execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName) + getClusterInfo());
}
@Override
@@ -535,8 +545,9 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
try (Connection connection = connectionFactory.openConnection(session)) {
String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getName());
String sql = format(
- "ALTER TABLE %s ADD COLUMN %s",
+ "ALTER TABLE %s %s ADD COLUMN %s",
quoted(table),
+ getClusterInfo(),
getColumnDefinitionSql(session, column, remoteColumnName));
execute(session, connection, sql);
}
@@ -545,12 +556,42 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
}
}
+ @Override
+ protected void renameColumn(ConnectorSession session, Connection connection, RemoteTableName remoteTableName, String remoteColumnName, String newRemoteColumnName)
+ throws SQLException
+ {
+ execute(session, connection, format(
+ "ALTER TABLE %s RENAME COLUMN %s TO %s %s",
+ quoted(remoteTableName),
+ quoted(remoteColumnName),
+ quoted(newRemoteColumnName),
+ getClusterInfo()));
+ }
+
+ @Override
+ public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column)
+ {
+ try (Connection connection = connectionFactory.openConnection(session)) {
+ String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getColumnName());
+ String sql = format(
+ "ALTER TABLE %s %s DROP COLUMN %s",
+ quoted(handle.asPlainTable().getRemoteTableName()),
+ getClusterInfo(),
+ quoted(remoteColumnName));
+ execute(session, connection, sql);
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, e);
+ }
+ }
+
@Override
public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional comment)
{
String sql = format(
- "ALTER TABLE %s MODIFY COMMENT %s",
+ "ALTER TABLE %s %s MODIFY COMMENT %s",
quoted(handle.asPlainTable().getRemoteTableName()),
+ getClusterInfo(),
clickhouseVarcharLiteral(comment.orElse(NO_COMMENT)));
execute(session, sql);
}
@@ -559,8 +600,9 @@ public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Op
public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional comment)
{
String sql = format(
- "ALTER TABLE %s COMMENT COLUMN %s %s",
+ "ALTER TABLE %s %s COMMENT COLUMN %s %s",
quoted(handle.asPlainTable().getRemoteTableName()),
+ getClusterInfo(),
quoted(column.getColumnName()),
clickhouseVarcharLiteral(comment.orElse("")));
execute(session, sql);
@@ -594,11 +636,19 @@ protected Optional> getTableTypes()
protected void renameTable(ConnectorSession session, Connection connection, String catalogName, String remoteSchemaName, String remoteTableName, String newRemoteSchemaName, String newRemoteTableName)
throws SQLException
{
- execute(session, connection, format("RENAME TABLE %s TO %s",
+ execute(session, connection, format("RENAME TABLE %s %s TO %s",
quoted(catalogName, remoteSchemaName, remoteTableName),
+ getClusterInfo(),
quoted(catalogName, newRemoteSchemaName, newRemoteTableName)));
}
+ @Override
+ protected void dropTable(ConnectorSession session, RemoteTableName remoteTableName, boolean temporaryTable)
+ {
+ String sql = "DROP TABLE " + quoted(remoteTableName) + getClusterInfo();
+ execute(session, sql);
+ }
+
@Override
protected Optional> limitFunction()
{
diff --git a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseConfig.java b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseConfig.java
index 0e405b129fd7..15ca637cb810 100644
--- a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseConfig.java
+++ b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseConfig.java
@@ -16,6 +16,9 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
+import jakarta.validation.constraints.NotNull;
+
+import java.util.Optional;
@DefunctConfig("clickhouse.legacy-driver")
public class ClickHouseConfig
@@ -35,4 +38,20 @@ public ClickHouseConfig setMapStringAsVarchar(boolean mapStringAsVarchar)
this.mapStringAsVarchar = mapStringAsVarchar;
return this;
}
+
+ private Optional clusterName = Optional.empty();
+
+ @NotNull
+ public Optional getClusterName()
+ {
+ return clusterName;
+ }
+
+ @Config("clickhouse.cluster-name")
+ @ConfigDescription("ClickHouse cluster name")
+ public ClickHouseConfig setClusterName(String clusterName)
+ {
+ this.clusterName = Optional.ofNullable(clusterName);
+ return this;
+ }
}
diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConfig.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConfig.java
index b9f3e649cc82..10f8fe03b7ef 100644
--- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConfig.java
+++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConfig.java
@@ -42,7 +42,8 @@ public class TestClickHouseConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ClickHouseConfig.class)
- .setMapStringAsVarchar(false));
+ .setMapStringAsVarchar(false)
+ .setClusterName(null));
}
@Test
@@ -50,9 +51,11 @@ public void testExplicitPropertyMappings()
{
Map properties = ImmutableMap.builder()
.put("clickhouse.map-string-as-varchar", "true")
+ .put("clickhouse.cluster-name", "test")
.buildOrThrow();
ClickHouseConfig expected = new ClickHouseConfig()
+ .setClusterName("test")
.setMapStringAsVarchar(true);
assertFullMapping(properties, expected);