Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .github/workflows/product-tests-specific-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,25 @@ jobs:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode -g hdfs_no_impersonation,avro,mixed_case
- name: Product Tests Specific 1.2
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-kerberos-hdfs-no-impersonation -g hdfs_no_impersonation
# temporarily disable this flaky run. see issue #20388 for details
# - name: Product Tests Specific 1.3
# run: presto-product-tests/bin/run_on_docker.sh singlenode-hdfs-impersonation -g storage_formats,cli,hdfs_impersonation
- name: Product Tests Specific 1.4
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-kerberos-hdfs-impersonation -g storage_formats,cli,hdfs_impersonation,authorization,hive_file_header
- name: Product Tests Specific 1.5
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-kerberos-hdfs-impersonation-cross-realm -g storage_formats,cli,hdfs_impersonation
- name: Product Tests Specific 1.6
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh multinode-tls-kerberos -g cli,group-by,join,tls
Expand Down Expand Up @@ -138,35 +138,35 @@ jobs:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-ldap -g ldap -x simba_jdbc
- name: Product Tests Specific 2.2
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh multinode-tls -g smoke,cli,group-by,join,tls
- name: Product Tests Specific 2.3
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-mysql -g mysql_connector,mysql
- name: Product Tests Specific 2.4
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-postgresql -g postgresql_connector
- name: Product Tests Specific 2.5
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-cassandra -g cassandra
# temporarily disable this flaky run. see issue #20388 for details
# - name: Product Tests Specific 2.6
# run: presto-product-tests/bin/run_on_docker.sh singlenode-kerberos-hdfs-impersonation-with-wire-encryption -g storage_formats,cli,hdfs_impersonation,authorization
- name: Product Tests Specific 2.7
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-kafka -g kafka
- name: Product Tests Specific 2.8
if: needs.changes.outputs.codechange == 'true'
if: needs.changes.outputs.codechange == 'true' && ${{ always() }}
env:
OVERRIDE_JDK_DIR: ${{ env.JAVA_HOME }}
run: presto-product-tests/bin/run_on_docker.sh singlenode-sqlserver -g sqlserver
Expand Down
28 changes: 4 additions & 24 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<!-- Changing joda version changes tzdata which must match deployed JVM tzdata
Do not change this without also making sure it matches -->
<dep.joda.version>2.14.0</dep.joda.version>
<dep.tempto.version>1.55</dep.tempto.version>
<dep.tempto.version>1.57</dep.tempto.version>
<dep.testng.version>7.5</dep.testng.version>
<dep.lucene.version>9.12.0</dep.lucene.version>
<dep.assertj-core.version>3.8.0</dep.assertj-core.version>
Expand Down Expand Up @@ -1108,33 +1108,13 @@
<dependency>
<groupId>com.facebook.presto.hive</groupId>
<artifactId>hive-apache</artifactId>
<version>3.0.0-12</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-common</artifactId>
<version>2.3.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
<version>4.0.1-1</version>
</dependency>

<dependency>
<groupId>com.facebook.presto.orc</groupId>
<artifactId>orc-protobuf</artifactId>
<version>13</version>
<version>14</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -1232,7 +1212,7 @@
<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
<version>0.8.7</version>
<version>0.8.8</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ public void testSelectTime()
{
MaterializedResult actualRow = computeActual("SELECT * from event WHERE id = 1");
Session session = getSession();
TimeZoneKey timeZoneKey = session.getSqlFunctionProperties().isLegacyTimestamp() ? session.getTimeZoneKey() : TimeZoneKey.UTC_KEY;
MaterializedResult expectedRow = resultBuilder(session, INTEGER, DATE, TIME, TIMESTAMP)
.row(1,
getDate("2004-12-31"),
getTimeAtZone("23:59:59", session.getTimeZoneKey()),
getDateTimeAtZone("2005-12-31 23:59:59", session.getTimeZoneKey()))
getTimeAtZone("23:59:59", timeZoneKey),
getDateTimeAtZone("2005-12-31 23:59:59", timeZoneKey))
.build();
assertTrue(actualRow.equals(expectedRow));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public CassandraTableHandle getTableHandle(ConnectorSession session, SchemaTable
{
requireNonNull(tableName, "tableName is null");
try {
return cassandraSession.getTable(tableName).getTableHandle();
return cassandraSession.getTable(session, tableName).getTableHandle();
}
catch (TableNotFoundException | SchemaNotFoundException e) {
// table was not found
Expand All @@ -126,7 +126,7 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect

private ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName tableName)
{
CassandraTable table = cassandraSession.getTable(tableName);
CassandraTable table = cassandraSession.getTable(session, tableName);
List<ColumnMetadata> columns = table.getColumns().stream()
.map(column -> column.getColumnMetadata(normalizeIdentifier(session, cqlNameToSqlName(column.getName()))))
.collect(toImmutableList());
Expand Down Expand Up @@ -164,7 +164,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
requireNonNull(session, "session is null");
requireNonNull(tableHandle, "tableHandle is null");
CassandraTable table = cassandraSession.getTable(getTableName(tableHandle));
CassandraTable table = cassandraSession.getTable(session, getTableName(tableHandle));
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (CassandraColumnHandle columnHandle : table.getColumns()) {
String columnName = cqlNameToSqlName(columnHandle.getName());
Expand Down Expand Up @@ -212,7 +212,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
Optional<Set<ColumnHandle>> desiredColumns)
{
CassandraTableHandle handle = (CassandraTableHandle) table;
CassandraPartitionResult partitionResult = partitionManager.getPartitions(handle, constraint.getSummary());
CassandraPartitionResult partitionResult = partitionManager.getPartitions(handle, session, constraint.getSummary());

String clusteringKeyPredicates = "";
TupleDomain<ColumnHandle> unenforcedConstraint;
Expand All @@ -221,7 +221,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
}
else {
CassandraClusteringPredicatesExtractor clusteringPredicatesExtractor = new CassandraClusteringPredicatesExtractor(
cassandraSession.getTable(getTableName(handle)).getClusteringKeyColumns(),
cassandraSession.getTable(session, getTableName(handle)).getClusteringKeyColumns(),
partitionResult.getUnenforcedConstraint(),
cassandraSession.getCassandraVersion());
clusteringKeyPredicates = clusteringPredicatesExtractor.getClusteringKeyPredicates();
Expand Down Expand Up @@ -347,7 +347,7 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
}

SchemaTableName schemaTableName = new SchemaTableName(table.getSchemaName(), table.getTableName());
List<CassandraColumnHandle> columns = cassandraSession.getTable(schemaTableName).getColumns();
List<CassandraColumnHandle> columns = cassandraSession.getTable(session, schemaTableName).getColumns();
List<String> columnNames = columns.stream().map(CassandraColumnHandle::getName).collect(Collectors.toList());
List<Type> columnTypes = columns.stream().map(CassandraColumnHandle::getType).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
Expand All @@ -44,12 +45,14 @@
import static com.facebook.presto.cassandra.util.CassandraCqlUtils.validTableName;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.common.type.DateType.DATE;
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
Expand All @@ -67,13 +70,15 @@ public class CassandraPageSink
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE.withZone(ZoneId.of("UTC"));

private final CassandraSession cassandraSession;
private final ConnectorSession session;
private final PreparedStatement insert;
private final List<Type> columnTypes;
private final boolean generateUUID;
private final Function<Long, Object> toCassandraDate;

public CassandraPageSink(
CassandraSession cassandraSession,
ConnectorSession connectorSession,
ProtocolVersion protocolVersion,
String schemaName,
String tableName,
Expand All @@ -82,6 +87,7 @@ public CassandraPageSink(
boolean generateUUID)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession");
this.session = requireNonNull(connectorSession, "connectorSession is null");
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
requireNonNull(columnNames, "columnNames is null");
Expand Down Expand Up @@ -156,9 +162,12 @@ else if (REAL.equals(type)) {
else if (DATE.equals(type)) {
values.add(toCassandraDate.apply(type.getLong(block, position)));
}
else if (TIMESTAMP.equals(type)) {
else if (session.getSqlFunctionProperties().isLegacyTimestamp() && TIMESTAMP.equals(type)) {
values.add(new Timestamp(type.getLong(block, position)));
}
else if (!session.getSqlFunctionProperties().isLegacyTimestamp() && TIMESTAMP_WITH_TIME_ZONE.equals(type)) {
values.add(new Timestamp(unpackMillisUtc(type.getLong(block, position))));
}
else if (isVarcharType(type)) {
values.add(type.getSlice(block, position).toStringUtf8());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

return new CassandraPageSink(
cassandraSession,
session,
protocolVersion,
handle.getSchemaName(),
handle.getTableName(),
Expand All @@ -67,6 +68,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

return new CassandraPageSink(
cassandraSession,
session,
protocolVersion,
handle.getSchemaName(),
handle.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.facebook.presto.common.predicate.Range;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -49,11 +50,11 @@ public CassandraPartitionManager(CassandraSession cassandraSession)
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
}

public CassandraPartitionResult getPartitions(ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
public CassandraPartitionResult getPartitions(ConnectorTableHandle tableHandle, ConnectorSession connectorSession, TupleDomain<ColumnHandle> tupleDomain)
{
CassandraTableHandle cassandraTableHandle = (CassandraTableHandle) tableHandle;

CassandraTable table = cassandraSession.getTable(cassandraTableHandle.getSchemaTableName());
CassandraTable table = cassandraSession.getTable(connectorSession, cassandraTableHandle.getSchemaTableName());
List<CassandraColumnHandle> partitionKeys = table.getPartitionKeyColumns();

// fetch the partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,31 @@
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.facebook.presto.common.predicate.NullableValue;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.RecordCursor;
import io.airlift.slice.Slice;

import java.util.List;

import static com.facebook.presto.common.type.DateTimeEncoding.packDateTimeWithZone;
import static io.airlift.slice.Slices.utf8Slice;
import static java.lang.Float.floatToRawIntBits;
import static java.util.Objects.requireNonNull;

public class CassandraRecordCursor
implements RecordCursor
{
private final List<FullCassandraType> fullCassandraTypes;
private ConnectorSession session;
private final ResultSet rs;
private Row currentRow;
private long count;

public CassandraRecordCursor(CassandraSession cassandraSession, List<FullCassandraType> fullCassandraTypes, String cql)
public CassandraRecordCursor(CassandraSession cassandraSession, ConnectorSession connectorSession, List<FullCassandraType> fullCassandraTypes, String cql)
{
this.session = requireNonNull(connectorSession, "connectorSession is null");
this.fullCassandraTypes = fullCassandraTypes;
rs = cassandraSession.execute(cql);
currentRow = null;
Expand Down Expand Up @@ -104,6 +110,8 @@ public long getLong(int i)
return currentRow.getLong(i);
case TIMESTAMP:
return currentRow.getTimestamp(i).getTime();
case TIMESTAMP_WITH_TIMEZONE:
return packDateTimeWithZone(currentRow.getTimestamp(i).getTime(), TimeZoneKey.UTC_KEY);
case DATE:
return currentRow.getDate(i).getDaysSinceEpoch();
case FLOAT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.cassandra;

import com.facebook.presto.common.type.Type;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.google.common.collect.ImmutableList;
Expand All @@ -28,13 +29,15 @@ public class CassandraRecordSet
implements RecordSet
{
private final CassandraSession cassandraSession;
private final ConnectorSession session;
private final String cql;
private final List<FullCassandraType> cassandraTypes;
private final List<Type> columnTypes;

public CassandraRecordSet(CassandraSession cassandraSession, String cql, List<CassandraColumnHandle> cassandraColumns)
public CassandraRecordSet(CassandraSession cassandraSession, ConnectorSession connectorSession, String cql, List<CassandraColumnHandle> cassandraColumns)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.session = requireNonNull(connectorSession, "connectorSession is null");
this.cql = requireNonNull(cql, "cql is null");

requireNonNull(cassandraColumns, "cassandraColumns is null");
Expand All @@ -51,7 +54,7 @@ public List<Type> getColumnTypes()
@Override
public RecordCursor cursor()
{
return new CassandraRecordCursor(cassandraSession, cassandraTypes, cql);
return new CassandraRecordCursor(cassandraSession, session, cassandraTypes, cql);
}

private static <T, R> List<R> transformList(List<T> list, Function<T, R> function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
String cql = sb.toString();
log.debug("Creating record set: %s", cql);

return new CassandraRecordSet(cassandraSession, cql, cassandraColumns);
return new CassandraRecordSet(cassandraSession, session, cql, cassandraColumns);
}

@Override
Expand Down
Loading
Loading