Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ext.versions = [
freemarker : '2.3.22',
objenesis : '1.4',
jackson : '2.4.4',
cassandra : '3.4.0',
cassandra : '4.19.2',
commons_cli : '1.3.1',
thrift : '0.9.3',
kafka : '3.9.0',
Expand Down Expand Up @@ -102,7 +102,7 @@ ext.libraries = [
objenesis : "org.objenesis:objenesis:${versions.objenesis}",
jackson_databind : "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}",
jackson_datatype_jdk8: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}",
cassandra_driver : "com.datastax.cassandra:cassandra-driver-core:${versions.cassandra}",
cassandra_driver : "org.apache.cassandra:java-driver-core:${versions.cassandra}",
commons_cli : "commons-cli:commons-cli:${versions.commons_cli}",
thrift : "org.apache.thrift:libthrift:${versions.thrift}",
kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
*/
package io.prestodb.tempto.internal.fulfillment.table.cassandra;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;

import java.util.Iterator;
import java.util.List;
Expand All @@ -28,12 +30,12 @@

public class CassandraBatchLoader
{
private final Session session;
private final CqlSession session;
private final String insertQuery;
private final int columnsCount;
private final int batchRowsCount;

public CassandraBatchLoader(Session session, String tableName, List<String> columnNames, int batchRowsCount)
public CassandraBatchLoader(CqlSession session, String tableName, List<String> columnNames, int batchRowsCount)
{
this.session = requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Expand Down Expand Up @@ -67,24 +69,28 @@ public void load(Iterator<List<Object>> rows)
{
PreparedStatement statement = session.prepare(insertQuery);

BatchStatement batch = createBatchStatement();
BatchStatementBuilder batchBuilder = createBatchStatementBuilder();
int currentBatchSize = 0;

while (rows.hasNext()) {
if (batch.size() >= batchRowsCount) {
session.execute(batch);
batch = createBatchStatement();
if (currentBatchSize >= batchRowsCount) {
session.execute(batchBuilder.build());
batchBuilder = createBatchStatementBuilder();
currentBatchSize = 0;
}
List<Object> row = rows.next();
checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size());
batch.add(statement.bind(row.toArray()));
batchBuilder.addStatement(statement.bind(row.toArray()));
currentBatchSize++;
}

if (batch.size() > 0) {
session.execute(batch);
if (currentBatchSize > 0) {
session.execute(batchBuilder.build());
}
}

private static BatchStatement createBatchStatement()
private static BatchStatementBuilder createBatchStatementBuilder()
{
return new BatchStatement(BatchStatement.Type.UNLOGGED);
return BatchStatement.builder(DefaultBatchType.UNLOGGED);
Comment thread
pdabre12 marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,27 @@
*/
package io.prestodb.tempto.internal.query;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestodb.tempto.configuration.Configuration;
import io.prestodb.tempto.query.QueryExecutionException;
import io.prestodb.tempto.query.QueryResult;

import java.net.InetSocketAddress;
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -42,27 +44,25 @@ public class CassandraQueryExecutor
implements AutoCloseable
{
private static final Map<DataType, JDBCType> typeMapping;
private final Cluster cluster;
private Session session;
private final CqlSession session;

static {
typeMapping = ImmutableMap.<DataType, JDBCType>builder()
.put(DataType.ascii(), JDBCType.VARCHAR)
.put(DataType.bigint(), JDBCType.BIGINT)
.put(DataType.blob(), JDBCType.BLOB)
.put(DataType.cboolean(), JDBCType.BOOLEAN)
.put(DataType.counter(), JDBCType.BIGINT)
.put(DataType.date(), JDBCType.DATE)
.put(DataType.decimal(), JDBCType.DECIMAL)
.put(DataType.cdouble(), JDBCType.DOUBLE)
.put(DataType.cfloat(), JDBCType.REAL)
.put(DataType.cint(), JDBCType.INTEGER)
.put(DataType.smallint(), JDBCType.SMALLINT)
//.put(DataType.text(), JDBCType.NVARCHAR)
.put(DataType.time(), JDBCType.TIME)
.put(DataType.timestamp(), JDBCType.TIMESTAMP)
.put(DataType.tinyint(), JDBCType.TINYINT)
.put(DataType.varchar(), JDBCType.VARCHAR)
.put(DataTypes.ASCII, JDBCType.VARCHAR)
.put(DataTypes.BIGINT, JDBCType.BIGINT)
.put(DataTypes.BLOB, JDBCType.BLOB)
.put(DataTypes.BOOLEAN, JDBCType.BOOLEAN)
.put(DataTypes.COUNTER, JDBCType.BIGINT)
.put(DataTypes.DATE, JDBCType.DATE)
.put(DataTypes.DECIMAL, JDBCType.DECIMAL)
.put(DataTypes.DOUBLE, JDBCType.DOUBLE)
.put(DataTypes.FLOAT, JDBCType.REAL)
.put(DataTypes.INT, JDBCType.INTEGER)
.put(DataTypes.SMALLINT, JDBCType.SMALLINT)
.put(DataTypes.TEXT, JDBCType.VARCHAR)
.put(DataTypes.TIME, JDBCType.TIME)
.put(DataTypes.TIMESTAMP, JDBCType.TIMESTAMP)
.put(DataTypes.TINYINT, JDBCType.TINYINT)
.build();
}

Expand All @@ -77,93 +77,106 @@ public static class TypeNotSupportedException

public CassandraQueryExecutor(Configuration configuration)
{
cluster = Cluster.builder()
.addContactPoint(configuration.getStringMandatory("databases.cassandra.host"))
.withPort(configuration.getIntMandatory("databases.cassandra.port"))
String host = configuration.getStringMandatory("databases.cassandra.host");
int port = configuration.getIntMandatory("databases.cassandra.port");
String dc = configuration.getString("databases.cassandra.datacenter").orElse("datacenter1");

// Driver 4.x requires a local datacenter to be specified
// Using "datacenter1" as the default, which is the standard for single-datacenter deployments
session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withLocalDatacenter(dc)
.build();
}

public QueryResult executeQuery(String sql)
throws QueryExecutionException
{
ensureConnected();
checkState(!session.isClosed(), "Trying to execute query using closed Session");

ResultSet rs = session.execute(sql);
List<ColumnDefinitions.Definition> definitions = rs.getColumnDefinitions().asList();
List<ColumnDefinition> definitions = newArrayList();
for (ColumnDefinition def : rs.getColumnDefinitions()) {
definitions.add(def);
}

List<JDBCType> types = definitions.stream()
.map(definition -> getJDBCType(definition.getType()))
.collect(toList());

List<String> columnNames = definitions.stream()
.map(ColumnDefinitions.Definition::getName)
.map(ColumnDefinition::getName)
.map(Object::toString)
.collect(toList());

QueryResult.QueryResultBuilder resultBuilder = new QueryResult.QueryResultBuilder(types, columnNames);

for (Row row : rs) {
List<Object> builderRow = newArrayList();
for (int i = 0; i < types.size(); ++i) {
builderRow.add(row.getToken(i).getValue());
builderRow.add(row.getObject(i));
}
resultBuilder.addRow(builderRow);
}

return resultBuilder.build();
}

public Session getSession()
public CqlSession getSession()
{
return session;
}

public List<String> getColumnNames(String keySpace, String tableName)
public List<String> getColumnNames(String keyspaceName, String tableName)
{
checkState(tableExists(keySpace, tableName), "table %s.%s does not exist", keySpace, tableName);
KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(keySpace);
TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName);
return tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toList());
Optional<KeyspaceMetadata> keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
throw new IllegalStateException(format("Keyspace %s does not exist", keyspaceName));
}
Optional<TableMetadata> tableMetadata = keyspaceMetadata.get().getTable(tableName);
if (!tableMetadata.isPresent()) {
throw new IllegalStateException(format("Table %s.%s does not exist", keyspaceName, tableName));
}
return tableMetadata.get().getColumns().values().stream()
.map(ColumnMetadata::getName)
.map(Object::toString)
.collect(toList());
}

public boolean tableExists(String keySpace, String tableName)
public boolean tableExists(String keyspaceName, String tableName)
{
KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keySpace);
if (keyspaceMetadata == null) {
Optional<KeyspaceMetadata> keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
return false;
}
return keyspaceMetadata.getTable(tableName) != null;
return keyspaceMetadata.get().getTable(tableName).isPresent();
}

public List<String> getTableNames(String keySpace)
public List<String> getTableNames(String keyspaceName)
{
Metadata clusterMetadata = cluster.getMetadata();
KeyspaceMetadata keyspaceMetadata = clusterMetadata.getKeyspace(keySpace);
if (keyspaceMetadata == null) {
Metadata clusterMetadata = session.getMetadata();
Optional<KeyspaceMetadata> keyspaceMetadata = clusterMetadata.getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
return ImmutableList.of();
}
return keyspaceMetadata.getTables().stream()
return keyspaceMetadata.get().getTables().values().stream()
.map(TableMetadata::getName)
.map(Object::toString)
.collect(toList());
}

@Override
public void close()
{
cluster.close();
}

private void ensureConnected()
{
checkState(!cluster.isClosed(), "Trying to connect using closed Cluster");

if (session == null || session.isClosed()) {
session = cluster.connect();
if (session != null && !session.isClosed()) {
session.close();
}
}

private static JDBCType getJDBCType(DataType type)
{
JDBCType jdbcType = typeMapping.get(type);
if (type == null) {
if (jdbcType == null) {
throw new TypeNotSupportedException(type);
}

Expand Down
2 changes: 1 addition & 1 deletion tempto-examples/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

cassandra:
hostname: cassandra
image: 'cassandra:2.1.15'
image: 'cassandra:3.11.19'
ports:
- '9042:9042'
- '9160:9160'
Expand Down
3 changes: 3 additions & 0 deletions tempto-examples/src/main/resources/tempto-configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ databases:
cassandra:
host: ${cluster.cassandra}
port: 9042
datacenter: datacenter1
keyspace: tempto
table_manager_type: cassandra
default_schema: test
skip_create_schema: false
table_manager_type: cassandra
Expand Down