diff --git a/build.gradle b/build.gradle index 178f97bf..7adf0149 100644 --- a/build.gradle +++ b/build.gradle @@ -58,7 +58,7 @@ ext.versions = [ freemarker : '2.3.22', objenesis : '1.4', jackson : '2.4.4', - cassandra : '3.4.0', + cassandra : '4.19.2', commons_cli : '1.3.1', thrift : '0.9.3', kafka : '3.9.0', @@ -102,7 +102,7 @@ ext.libraries = [ objenesis : "org.objenesis:objenesis:${versions.objenesis}", jackson_databind : "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}", jackson_datatype_jdk8: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}", - cassandra_driver : "com.datastax.cassandra:cassandra-driver-core:${versions.cassandra}", + cassandra_driver : "org.apache.cassandra:java-driver-core:${versions.cassandra}", commons_cli : "commons-cli:commons-cli:${versions.commons_cli}", thrift : "org.apache.thrift:libthrift:${versions.thrift}", kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}", diff --git a/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java b/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java index 6141d123..2772d23f 100644 --- a/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java +++ b/tempto-core/src/main/java/io/prestodb/tempto/internal/fulfillment/table/cassandra/CassandraBatchLoader.java @@ -13,9 +13,11 @@ */ package io.prestodb.tempto.internal.fulfillment.table.cassandra; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Session; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BatchStatement; +import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; +import com.datastax.oss.driver.api.core.cql.DefaultBatchType; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; import java.util.Iterator; import java.util.List; @@ -28,12 +30,12 @@ public class CassandraBatchLoader { - private final Session session; + private final CqlSession session; private final String insertQuery; private final int columnsCount; private final int batchRowsCount; - public CassandraBatchLoader(Session session, String tableName, List columnNames, int batchRowsCount) + public CassandraBatchLoader(CqlSession session, String tableName, List columnNames, int batchRowsCount) { this.session = requireNonNull(session, "session is null"); requireNonNull(tableName, "tableName is null"); @@ -67,24 +69,28 @@ public void load(Iterator> rows) { PreparedStatement statement = session.prepare(insertQuery); - BatchStatement batch = createBatchStatement(); + BatchStatementBuilder batchBuilder = createBatchStatementBuilder(); + int currentBatchSize = 0; + while (rows.hasNext()) { - if (batch.size() >= batchRowsCount) { - session.execute(batch); - batch = createBatchStatement(); + if (currentBatchSize >= batchRowsCount) { + session.execute(batchBuilder.build()); + batchBuilder = createBatchStatementBuilder(); + currentBatchSize = 0; } List row = rows.next(); checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size()); - batch.add(statement.bind(row.toArray())); + batchBuilder.addStatement(statement.bind(row.toArray())); + currentBatchSize++; } - if (batch.size() > 0) { - session.execute(batch); + if (currentBatchSize > 0) { + session.execute(batchBuilder.build()); } } - private static BatchStatement createBatchStatement() + private static BatchStatementBuilder createBatchStatementBuilder() { - return new BatchStatement(BatchStatement.Type.UNLOGGED); + return BatchStatement.builder(DefaultBatchType.UNLOGGED); } } diff --git a/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java b/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java index cc6b6f8b..e3351866 100644 --- a/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java +++ b/tempto-core/src/main/java/io/prestodb/tempto/internal/query/CassandraQueryExecutor.java @@ -13,25 +13,27 @@ */ package io.prestodb.tempto.internal.query; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ColumnDefinitions; -import com.datastax.driver.core.ColumnMetadata; -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.Metadata; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.TableMetadata; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ColumnDefinition; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.api.core.type.DataType; +import com.datastax.oss.driver.api.core.type.DataTypes; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.prestodb.tempto.configuration.Configuration; import io.prestodb.tempto.query.QueryExecutionException; import io.prestodb.tempto.query.QueryResult; +import java.net.InetSocketAddress; import java.sql.JDBCType; import java.util.List; import java.util.Map; +import java.util.Optional; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Lists.newArrayList; @@ -42,27 +44,25 @@ public class CassandraQueryExecutor implements AutoCloseable { private static final Map typeMapping; - private final Cluster cluster; - private Session session; + private final CqlSession session; static { typeMapping = ImmutableMap.builder() - .put(DataType.ascii(), JDBCType.VARCHAR) - .put(DataType.bigint(), JDBCType.BIGINT) - .put(DataType.blob(), JDBCType.BLOB) - .put(DataType.cboolean(), JDBCType.BOOLEAN) - .put(DataType.counter(), JDBCType.BIGINT) - .put(DataType.date(), JDBCType.DATE) - .put(DataType.decimal(), JDBCType.DECIMAL) - .put(DataType.cdouble(), JDBCType.DOUBLE) - .put(DataType.cfloat(), JDBCType.REAL) - .put(DataType.cint(), JDBCType.INTEGER) - .put(DataType.smallint(), JDBCType.SMALLINT) - //.put(DataType.text(), JDBCType.NVARCHAR) - .put(DataType.time(), JDBCType.TIME) - .put(DataType.timestamp(), JDBCType.TIMESTAMP) - .put(DataType.tinyint(), JDBCType.TINYINT) - .put(DataType.varchar(), JDBCType.VARCHAR) + .put(DataTypes.ASCII, JDBCType.VARCHAR) + .put(DataTypes.BIGINT, JDBCType.BIGINT) + .put(DataTypes.BLOB, JDBCType.BLOB) + .put(DataTypes.BOOLEAN, JDBCType.BOOLEAN) + .put(DataTypes.COUNTER, JDBCType.BIGINT) + .put(DataTypes.DATE, JDBCType.DATE) + .put(DataTypes.DECIMAL, JDBCType.DECIMAL) + .put(DataTypes.DOUBLE, JDBCType.DOUBLE) + .put(DataTypes.FLOAT, JDBCType.REAL) + .put(DataTypes.INT, JDBCType.INTEGER) + .put(DataTypes.SMALLINT, JDBCType.SMALLINT) + .put(DataTypes.TEXT, JDBCType.VARCHAR) + .put(DataTypes.TIME, JDBCType.TIME) + .put(DataTypes.TIMESTAMP, JDBCType.TIMESTAMP) + .put(DataTypes.TINYINT, JDBCType.TINYINT) .build(); } @@ -77,25 +77,36 @@ public static class TypeNotSupportedException public CassandraQueryExecutor(Configuration configuration) { - cluster = Cluster.builder() - .addContactPoint(configuration.getStringMandatory("databases.cassandra.host")) - .withPort(configuration.getIntMandatory("databases.cassandra.port")) + String host = configuration.getStringMandatory("databases.cassandra.host"); + int port = configuration.getIntMandatory("databases.cassandra.port"); + String dc = configuration.getString("databases.cassandra.datacenter").orElse("datacenter1"); + + // Driver 4.x requires a local datacenter to be specified + // Using "datacenter1" as the default, which is the standard for single-datacenter deployments + session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(host, port)) + .withLocalDatacenter(dc) .build(); } public QueryResult executeQuery(String sql) throws QueryExecutionException { - ensureConnected(); + checkState(!session.isClosed(), "Trying to execute query using closed Session"); ResultSet rs = session.execute(sql); - List definitions = rs.getColumnDefinitions().asList(); + List definitions = newArrayList(); + for (ColumnDefinition def : rs.getColumnDefinitions()) { + definitions.add(def); + } + List types = definitions.stream() .map(definition -> getJDBCType(definition.getType())) .collect(toList()); List columnNames = definitions.stream() - .map(ColumnDefinitions.Definition::getName) + .map(ColumnDefinition::getName) + .map(Object::toString) .collect(toList()); QueryResult.QueryResultBuilder resultBuilder = new QueryResult.QueryResultBuilder(types, columnNames); @@ -103,7 +114,7 @@ public QueryResult executeQuery(String sql) for (Row row : rs) { List builderRow = newArrayList(); for (int i = 0; i < types.size(); ++i) { - builderRow.add(row.getToken(i).getValue()); + builderRow.add(row.getObject(i)); } resultBuilder.addRow(builderRow); } @@ -111,59 +122,61 @@ public QueryResult executeQuery(String sql) return resultBuilder.build(); } - public Session getSession() + public CqlSession getSession() { return session; } - public List getColumnNames(String keySpace, String tableName) + public List getColumnNames(String keyspaceName, String tableName) { - checkState(tableExists(keySpace, tableName), "table %s.%s does not exist", keySpace, tableName); - KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(keySpace); - TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName); - return tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toList()); + Optional keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName); + if (!keyspaceMetadata.isPresent()) { + throw new IllegalStateException(format("Keyspace %s does not exist", keyspaceName)); + } + Optional tableMetadata = keyspaceMetadata.get().getTable(tableName); + if (!tableMetadata.isPresent()) { + throw new IllegalStateException(format("Table %s.%s does not exist", keyspaceName, tableName)); + } + return tableMetadata.get().getColumns().values().stream() + .map(ColumnMetadata::getName) + .map(Object::toString) + .collect(toList()); } - public boolean tableExists(String keySpace, String tableName) + public boolean tableExists(String keyspaceName, String tableName) { - KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keySpace); - if (keyspaceMetadata == null) { + Optional keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName); + if (!keyspaceMetadata.isPresent()) { return false; } - return keyspaceMetadata.getTable(tableName) != null; + return keyspaceMetadata.get().getTable(tableName).isPresent(); } - public List getTableNames(String keySpace) + public List getTableNames(String keyspaceName) { - Metadata clusterMetadata = cluster.getMetadata(); - KeyspaceMetadata keyspaceMetadata = clusterMetadata.getKeyspace(keySpace); - if (keyspaceMetadata == null) { + Metadata clusterMetadata = session.getMetadata(); + Optional keyspaceMetadata = clusterMetadata.getKeyspace(keyspaceName); + if (!keyspaceMetadata.isPresent()) { return ImmutableList.of(); } - return keyspaceMetadata.getTables().stream() + return keyspaceMetadata.get().getTables().values().stream() .map(TableMetadata::getName) + .map(Object::toString) .collect(toList()); } @Override public void close() { - cluster.close(); - } - - private void ensureConnected() - { - checkState(!cluster.isClosed(), "Trying to connect using closed Cluster"); - - if (session == null || session.isClosed()) { - session = cluster.connect(); + if (session != null && !session.isClosed()) { + session.close(); } } private static JDBCType getJDBCType(DataType type) { JDBCType jdbcType = typeMapping.get(type); - if (type == null) { + if (jdbcType == null) { throw new TypeNotSupportedException(type); } diff --git a/tempto-examples/docker/docker-compose.yml b/tempto-examples/docker/docker-compose.yml index be8810dc..d6c25f9a 100644 --- a/tempto-examples/docker/docker-compose.yml +++ b/tempto-examples/docker/docker-compose.yml @@ -32,7 +32,7 @@ services: cassandra: hostname: cassandra - image: 'cassandra:2.1.15' + image: 'cassandra:3.11.19' ports: - '9042:9042' - '9160:9160' diff --git a/tempto-examples/src/main/resources/tempto-configuration.yaml b/tempto-examples/src/main/resources/tempto-configuration.yaml index 63f1449f..9ea80b30 100644 --- a/tempto-examples/src/main/resources/tempto-configuration.yaml +++ b/tempto-examples/src/main/resources/tempto-configuration.yaml @@ -71,6 +71,9 @@ databases: cassandra: host: ${cluster.cassandra} port: 9042 + datacenter: datacenter1 + keyspace: tempto + table_manager_type: cassandra default_schema: test skip_create_schema: false table_manager_type: cassandra