Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ext.versions = [
freemarker : '2.3.22',
objenesis : '1.4',
jackson : '2.4.4',
cassandra : '4.19.2',
cassandra : '3.4.0',
commons_cli : '1.3.1',
thrift : '0.16.0',
kafka : '3.9.0',
Expand Down Expand Up @@ -102,7 +102,7 @@ ext.libraries = [
objenesis : "org.objenesis:objenesis:${versions.objenesis}",
jackson_databind : "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}",
jackson_datatype_jdk8: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:${versions.jackson}",
cassandra_driver : "org.apache.cassandra:java-driver-core:${versions.cassandra}",
cassandra_driver : "com.datastax.cassandra:cassandra-driver-core:${versions.cassandra}",
commons_cli : "commons-cli:commons-cli:${versions.commons_cli}",
thrift : "org.apache.thrift:libthrift:${versions.thrift}",
kafka_clients : "org.apache.kafka:kafka-clients:${versions.kafka}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
*/
package io.prestodb.tempto.internal.fulfillment.table.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;

import java.util.Iterator;
import java.util.List;
Expand All @@ -30,12 +28,12 @@

public class CassandraBatchLoader
{
private final CqlSession session;
private final Session session;
private final String insertQuery;
private final int columnsCount;
private final int batchRowsCount;

public CassandraBatchLoader(CqlSession session, String tableName, List<String> columnNames, int batchRowsCount)
public CassandraBatchLoader(Session session, String tableName, List<String> columnNames, int batchRowsCount)
{
this.session = requireNonNull(session, "session is null");
requireNonNull(tableName, "tableName is null");
Expand Down Expand Up @@ -69,28 +67,24 @@ public void load(Iterator<List<Object>> rows)
{
PreparedStatement statement = session.prepare(insertQuery);

BatchStatementBuilder batchBuilder = createBatchStatementBuilder();
int currentBatchSize = 0;

BatchStatement batch = createBatchStatement();
while (rows.hasNext()) {
if (currentBatchSize >= batchRowsCount) {
session.execute(batchBuilder.build());
batchBuilder = createBatchStatementBuilder();
currentBatchSize = 0;
if (batch.size() >= batchRowsCount) {
session.execute(batch);
batch = createBatchStatement();
}
List<Object> row = rows.next();
checkState(row.size() == columnsCount, "values count in a row is expected to be %d, but found: %d", columnsCount, row.size());
batchBuilder.addStatement(statement.bind(row.toArray()));
currentBatchSize++;
batch.add(statement.bind(row.toArray()));
}

if (currentBatchSize > 0) {
session.execute(batchBuilder.build());
if (batch.size() > 0) {
session.execute(batch);
}
}

private static BatchStatementBuilder createBatchStatementBuilder()
private static BatchStatement createBatchStatement()
{
return BatchStatement.builder(DefaultBatchType.UNLOGGED);
return new BatchStatement(BatchStatement.Type.UNLOGGED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,25 @@
*/
package io.prestodb.tempto.internal.query;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TableMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestodb.tempto.configuration.Configuration;
import io.prestodb.tempto.query.QueryExecutionException;
import io.prestodb.tempto.query.QueryResult;

import java.net.InetSocketAddress;
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Lists.newArrayList;
Expand All @@ -44,25 +42,27 @@ public class CassandraQueryExecutor
implements AutoCloseable
{
private static final Map<DataType, JDBCType> typeMapping;
private final CqlSession session;
private final Cluster cluster;
private Session session;

static {
typeMapping = ImmutableMap.<DataType, JDBCType>builder()
.put(DataTypes.ASCII, JDBCType.VARCHAR)
.put(DataTypes.BIGINT, JDBCType.BIGINT)
.put(DataTypes.BLOB, JDBCType.BLOB)
.put(DataTypes.BOOLEAN, JDBCType.BOOLEAN)
.put(DataTypes.COUNTER, JDBCType.BIGINT)
.put(DataTypes.DATE, JDBCType.DATE)
.put(DataTypes.DECIMAL, JDBCType.DECIMAL)
.put(DataTypes.DOUBLE, JDBCType.DOUBLE)
.put(DataTypes.FLOAT, JDBCType.REAL)
.put(DataTypes.INT, JDBCType.INTEGER)
.put(DataTypes.SMALLINT, JDBCType.SMALLINT)
.put(DataTypes.TEXT, JDBCType.VARCHAR)
.put(DataTypes.TIME, JDBCType.TIME)
.put(DataTypes.TIMESTAMP, JDBCType.TIMESTAMP)
.put(DataTypes.TINYINT, JDBCType.TINYINT)
.put(DataType.ascii(), JDBCType.VARCHAR)
.put(DataType.bigint(), JDBCType.BIGINT)
.put(DataType.blob(), JDBCType.BLOB)
.put(DataType.cboolean(), JDBCType.BOOLEAN)
.put(DataType.counter(), JDBCType.BIGINT)
.put(DataType.date(), JDBCType.DATE)
.put(DataType.decimal(), JDBCType.DECIMAL)
.put(DataType.cdouble(), JDBCType.DOUBLE)
.put(DataType.cfloat(), JDBCType.REAL)
.put(DataType.cint(), JDBCType.INTEGER)
.put(DataType.smallint(), JDBCType.SMALLINT)
//.put(DataType.text(), JDBCType.NVARCHAR)
.put(DataType.time(), JDBCType.TIME)
.put(DataType.timestamp(), JDBCType.TIMESTAMP)
.put(DataType.tinyint(), JDBCType.TINYINT)
.put(DataType.varchar(), JDBCType.VARCHAR)
.build();
Comment on lines +59 to 66
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Dropping the mapping for DataType.text() may cause unsupported-type failures for text columns.

The old mapping handled text via DataTypes.TEXT, but the new version removes DataType.text() entirely. Even though Cassandra treats text and varchar as aliases, schemas may still declare text, and the driver may treat DataType.text() separately from DataType.varchar(). In that case, text columns would fail with TypeNotSupportedException. Consider mapping both DataType.text() and DataType.varchar() to the same JDBC type (e.g., JDBCType.VARCHAR or NVARCHAR, depending on the intent) rather than dropping text support.

Suggested change
.put(DataType.cint(), JDBCType.INTEGER)
.put(DataType.smallint(), JDBCType.SMALLINT)
//.put(DataType.text(), JDBCType.NVARCHAR)
.put(DataType.time(), JDBCType.TIME)
.put(DataType.timestamp(), JDBCType.TIMESTAMP)
.put(DataType.tinyint(), JDBCType.TINYINT)
.put(DataType.varchar(), JDBCType.VARCHAR)
.build();
.put(DataType.cint(), JDBCType.INTEGER)
.put(DataType.smallint(), JDBCType.SMALLINT)
.put(DataType.text(), JDBCType.VARCHAR)
.put(DataType.time(), JDBCType.TIME)
.put(DataType.timestamp(), JDBCType.TIMESTAMP)
.put(DataType.tinyint(), JDBCType.TINYINT)
.put(DataType.varchar(), JDBCType.VARCHAR)
.build();

}

Expand All @@ -77,106 +77,93 @@ public static class TypeNotSupportedException

public CassandraQueryExecutor(Configuration configuration)
{
String host = configuration.getStringMandatory("databases.cassandra.host");
int port = configuration.getIntMandatory("databases.cassandra.port");
String dc = configuration.getString("databases.cassandra.datacenter").orElse("datacenter1");

// Driver 4.x requires a local datacenter to be specified
// Using "datacenter1" as the default, which is the standard for single-datacenter deployments
session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withLocalDatacenter(dc)
cluster = Cluster.builder()
.addContactPoint(configuration.getStringMandatory("databases.cassandra.host"))
.withPort(configuration.getIntMandatory("databases.cassandra.port"))
.build();
}

public QueryResult executeQuery(String sql)
throws QueryExecutionException
{
checkState(!session.isClosed(), "Trying to execute query using closed Session");
ensureConnected();

ResultSet rs = session.execute(sql);
List<ColumnDefinition> definitions = newArrayList();
for (ColumnDefinition def : rs.getColumnDefinitions()) {
definitions.add(def);
}

List<ColumnDefinitions.Definition> definitions = rs.getColumnDefinitions().asList();
List<JDBCType> types = definitions.stream()
.map(definition -> getJDBCType(definition.getType()))
.collect(toList());

List<String> columnNames = definitions.stream()
.map(ColumnDefinition::getName)
.map(Object::toString)
.map(ColumnDefinitions.Definition::getName)
.collect(toList());

QueryResult.QueryResultBuilder resultBuilder = new QueryResult.QueryResultBuilder(types, columnNames);

for (Row row : rs) {
List<Object> builderRow = newArrayList();
for (int i = 0; i < types.size(); ++i) {
builderRow.add(row.getObject(i));
builderRow.add(row.getToken(i).getValue());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Using row.getToken(i) instead of row.getObject(i) is likely incorrect for fetching column values.

getToken(i) only returns partitioner tokens for partition key columns and does not represent the actual column values, especially for non-key columns. This changes the semantics of the query result versus getObject(i) and will produce incorrect data unless you explicitly intend to expose partition tokens. Please revert to row.getObject(i) (or a type-specific accessor) for returning row values.

}
resultBuilder.addRow(builderRow);
}

return resultBuilder.build();
}

public CqlSession getSession()
public Session getSession()
{
return session;
}
Comment on lines +114 to 117
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): getSession() may now return null due to lazy initialization.

With the previous eager construction, getSession() was guaranteed non-null; now it can return null unless ensureConnected() has been called first. If external callers depend on a non-null session, either call ensureConnected() from getSession() or clearly document/enforce that callers must use higher-level methods (e.g., executeQuery()) instead of getSession() directly.

Suggested change
public Session getSession()
{
return session;
}
public Session getSession()
{
ensureConnected();
return session;
}


public List<String> getColumnNames(String keyspaceName, String tableName)
public List<String> getColumnNames(String keySpace, String tableName)
{
Optional<KeyspaceMetadata> keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
throw new IllegalStateException(format("Keyspace %s does not exist", keyspaceName));
}
Optional<TableMetadata> tableMetadata = keyspaceMetadata.get().getTable(tableName);
if (!tableMetadata.isPresent()) {
throw new IllegalStateException(format("Table %s.%s does not exist", keyspaceName, tableName));
}
return tableMetadata.get().getColumns().values().stream()
.map(ColumnMetadata::getName)
.map(Object::toString)
.collect(toList());
checkState(tableExists(keySpace, tableName), "table %s.%s does not exist", keySpace, tableName);
KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(keySpace);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): getColumnNames uses session without ensuring it is initialized, risking an NPE.

ensureConnected() now handles session initialization, but this method only checks tableExists, which uses cluster and never initializes session. If getColumnNames is called before anything that triggers ensureConnected(), session will be null and session.getCluster() will throw. Please either call ensureConnected() at the start of this method or access metadata via cluster as in tableExists.

TableMetadata tableMetadata = keyspaceMetadata.getTable(tableName);
return tableMetadata.getColumns().stream().map(ColumnMetadata::getName).collect(toList());
}

public boolean tableExists(String keyspaceName, String tableName)
public boolean tableExists(String keySpace, String tableName)
{
Optional<KeyspaceMetadata> keyspaceMetadata = session.getMetadata().getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
KeyspaceMetadata keyspaceMetadata = cluster.getMetadata().getKeyspace(keySpace);
if (keyspaceMetadata == null) {
return false;
}
return keyspaceMetadata.get().getTable(tableName).isPresent();
return keyspaceMetadata.getTable(tableName) != null;
}

public List<String> getTableNames(String keyspaceName)
public List<String> getTableNames(String keySpace)
{
Metadata clusterMetadata = session.getMetadata();
Optional<KeyspaceMetadata> keyspaceMetadata = clusterMetadata.getKeyspace(keyspaceName);
if (!keyspaceMetadata.isPresent()) {
Metadata clusterMetadata = cluster.getMetadata();
KeyspaceMetadata keyspaceMetadata = clusterMetadata.getKeyspace(keySpace);
if (keyspaceMetadata == null) {
return ImmutableList.of();
}
return keyspaceMetadata.get().getTables().values().stream()
return keyspaceMetadata.getTables().stream()
.map(TableMetadata::getName)
.map(Object::toString)
.collect(toList());
}

@Override
public void close()
{
if (session != null && !session.isClosed()) {
session.close();
cluster.close();
}

private void ensureConnected()
{
checkState(!cluster.isClosed(), "Trying to connect using closed Cluster");

if (session == null || session.isClosed()) {
session = cluster.connect();
}
}

private static JDBCType getJDBCType(DataType type)
{
JDBCType jdbcType = typeMapping.get(type);
if (jdbcType == null) {
if (type == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): The null check in getJDBCType is applied to the wrong variable.

The original logic correctly used jdbcType == null to detect unsupported Cassandra DataTypes. Changing the condition to type == null means unsupported but valid DataTypes now return a null jdbcType instead of throwing, which can lead to NPEs or incorrect type handling downstream. Please restore the check to if (jdbcType == null).

throw new TypeNotSupportedException(type);
}

Expand Down
2 changes: 1 addition & 1 deletion tempto-examples/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:

cassandra:
hostname: cassandra
image: 'cassandra:3.11.19'
image: 'cassandra:2.1.15'
ports:
- '9042:9042'
- '9160:9160'
Expand Down
3 changes: 0 additions & 3 deletions tempto-examples/src/main/resources/tempto-configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ databases:
cassandra:
host: ${cluster.cassandra}
port: 9042
datacenter: datacenter1
keyspace: tempto
table_manager_type: cassandra
default_schema: test
skip_create_schema: false
table_manager_type: cassandra
Expand Down
Loading