Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/src/main/sphinx/connector/exasol.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,10 @@ The connector supports pushdown for a number of operations:

- {ref}`limit-pushdown`
- {ref}`topn-pushdown`
- {ref}`join-pushdown`

```{include} pushdown-correctness-behavior.fragment
```

```{include} join-pushdown-enabled-true.fragment
```
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.slice.Slices;
import io.trino.plugin.base.expression.ConnectorExpressionRewriter;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.jdbc.BaseJdbcClient;
import io.trino.plugin.jdbc.BaseJdbcConfig;
Expand All @@ -35,6 +36,9 @@
import io.trino.plugin.jdbc.SliceWriteFunction;
import io.trino.plugin.jdbc.WriteFunction;
import io.trino.plugin.jdbc.WriteMapping;
import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder;
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
import io.trino.plugin.jdbc.expression.RewriteIn;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.AggregateFunction;
Expand All @@ -43,6 +47,8 @@
import io.trino.spi.connector.ColumnPosition;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Type;

import java.sql.Connection;
Expand All @@ -65,6 +71,8 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping;
import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
Expand All @@ -84,6 +92,7 @@ public class ExasolClient
.add("EXA_STATISTICS")
.add("SYS")
.build();
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;

@Inject
public ExasolClient(
Expand All @@ -94,6 +103,25 @@ public ExasolClient(
RemoteQueryModifier queryModifier)
{
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
// Basic implementation required to enable JOIN pushdown support
// It is covered by "testJoinpushdown" integration tests.
// More detailed test case scenarios are covered by Unit tests in "TestExasolClient"
this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder()
.addStandardRules(this::quoted)
.add(new RewriteIn())
.map("$equal(left, right)").to("left = right")
Comment thread
skyglass marked this conversation as resolved.
.map("$not_equal(left, right)").to("left <> right")
// Exasol doesn't support "IS NOT DISTINCT FROM" expression,
// so "$identical(left, right)" is rewritten with equivalent "(left = right OR (left IS NULL AND right IS NULL))" expression
.map("$identical(left, right)").to("(left = right OR (left IS NULL AND right IS NULL))")
.map("$less_than(left, right)").to("left < right")
.map("$less_than_or_equal(left, right)").to("left <= right")
.map("$greater_than(left, right)").to("left > right")
.map("$greater_than_or_equal(left, right)").to("left >= right")
.map("$not($is_null(value))").to("value IS NOT NULL")
.map("$not(value: boolean)").to("NOT value")
.map("$is_null(value)").to("value IS NULL")
.build();
Comment thread
skyglass marked this conversation as resolved.
}

@Override
Expand Down Expand Up @@ -195,11 +223,16 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri
throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming tables");
}

@Override
public Optional<ParameterizedExpression> convertPredicate(ConnectorSession session, ConnectorExpression expression, Map<String, ColumnHandle> assignments)
{
return connectorExpressionRewriter.rewrite(session, expression, assignments);
Comment thread
skyglass marked this conversation as resolved.
}

@Override
protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition)
{
// Deactivated because test 'testJoinPushdown()' requires write access which is not implemented for Exasol
return false;
return true;
}

@Override
Expand Down Expand Up @@ -311,7 +344,15 @@ private static SliceWriteFunction hashTypeWriteFunction()
@Override
public WriteMapping toWriteMapping(ConnectorSession session, Type type)
{
throw new TrinoException(NOT_SUPPORTED, "This connector does not support writing");
if (type instanceof DecimalType decimalType) {
Comment thread
skyglass marked this conversation as resolved.
String dataType = "decimal(%s, %s)".formatted(decimalType.getPrecision(), decimalType.getScale());
if (decimalType.isShort()) {
return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType));
}
return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType));
}

throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exasol;

import com.google.common.collect.ImmutableList;
import io.trino.metadata.TestingFunctionResolution;
import io.trino.plugin.base.mapping.DefaultIdentifierMapping;
import io.trino.plugin.jdbc.BaseJdbcConfig;
import io.trino.plugin.jdbc.DefaultQueryBuilder;
import io.trino.plugin.jdbc.JdbcClient;
import io.trino.plugin.jdbc.JdbcColumnHandle;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcMetadataSessionProperties;
import io.trino.plugin.jdbc.JdbcTypeHandle;
import io.trino.plugin.jdbc.QueryParameter;
import io.trino.plugin.jdbc.expression.ParameterizedExpression;
import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.expression.ConnectorExpression;
import io.trino.spi.session.PropertyMetadata;
import io.trino.sql.ir.Comparison;
import io.trino.sql.ir.Constant;
import io.trino.sql.ir.Expression;
import io.trino.sql.ir.In;
import io.trino.sql.ir.IsNull;
import io.trino.sql.ir.Logical;
import io.trino.sql.ir.Reference;
import io.trino.sql.planner.ConnectorExpressionTranslator;
import io.trino.testing.TestingConnectorSession;
import org.junit.jupiter.api.Test;

import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.SessionTestUtils.TEST_SESSION;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.sql.ir.IrExpressions.not;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;

final class TestExasolClient
{
private static final JdbcClient JDBC_CLIENT = new ExasolClient(
new BaseJdbcConfig(),
session -> {
throw new UnsupportedOperationException();
},
new DefaultQueryBuilder(RemoteQueryModifier.NONE),
new DefaultIdentifierMapping(),
RemoteQueryModifier.NONE);

private static final ConnectorSession SESSION = TestingConnectorSession
.builder()
.setPropertyMetadata(ImmutableList.<PropertyMetadata<?>>builder()
.addAll(new JdbcMetadataSessionProperties(new JdbcMetadataConfig(), Optional.empty()).getSessionProperties())
.build())
.build();

private static final TestingFunctionResolution FUNCTIONS = new TestingFunctionResolution();

private static final JdbcColumnHandle BIGINT_COLUMN =
JdbcColumnHandle.builder()
.setColumnName("c_bigint")
.setColumnType(BIGINT)
.setJdbcTypeHandle(new JdbcTypeHandle(Types.BIGINT, Optional.of("int8"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()))
.build();

private static final JdbcColumnHandle VARCHAR_COLUMN =
JdbcColumnHandle.builder()
.setColumnName("c_varchar")
.setColumnType(createVarcharType(10))
.setJdbcTypeHandle(new JdbcTypeHandle(Types.VARCHAR, Optional.of("varchar"), Optional.of(10), Optional.empty(), Optional.empty(), Optional.empty()))
.build();

private static final JdbcColumnHandle VARCHAR_COLUMN2 =
JdbcColumnHandle.builder()
.setColumnName("c_varchar2")
.setColumnType(createVarcharType(10))
.setJdbcTypeHandle(new JdbcTypeHandle(Types.VARCHAR, Optional.of("varchar"), Optional.of(10), Optional.empty(), Optional.empty(), Optional.empty()))
.build();

@Test
void testConvertOr()
{
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(
SESSION,
translateToConnectorExpression(
new Logical(
Logical.Operator.OR,
List.of(
new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L)),
new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol_2"), new Constant(BIGINT, 415L))))),
Map.of(
"c_bigint_symbol", BIGINT_COLUMN,
"c_bigint_symbol_2", BIGINT_COLUMN))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("((\"c_bigint\") = (?)) OR ((\"c_bigint\") = (?))");
assertThat(converted.parameters()).isEqualTo(List.of(
new QueryParameter(BIGINT, Optional.of(42L)),
new QueryParameter(BIGINT, Optional.of(415L))));
}

@Test
void testConvertOrWithAnd()
{
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(
SESSION,
translateToConnectorExpression(
new Logical(
Logical.Operator.OR,
List.of(
new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L)),
new Logical(
Logical.Operator.AND,
List.of(
new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 43L)),
new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol_2"), new Constant(BIGINT, 44L))))))),
Map.of(
"c_bigint_symbol", BIGINT_COLUMN,
"c_bigint_symbol_2", BIGINT_COLUMN))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("((\"c_bigint\") = (?)) OR (((\"c_bigint\") = (?)) AND ((\"c_bigint\") = (?)))");
assertThat(converted.parameters()).isEqualTo(List.of(
new QueryParameter(BIGINT, Optional.of(42L)),
new QueryParameter(BIGINT, Optional.of(43L)),
new QueryParameter(BIGINT, Optional.of(44L))));
}

@Test
void testConvertComparison()
{
for (Comparison.Operator operator : Comparison.Operator.values()) {
Optional<ParameterizedExpression> converted = JDBC_CLIENT.convertPredicate(
SESSION,
translateToConnectorExpression(
new Comparison(operator, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L))),
Map.of("c_bigint_symbol", BIGINT_COLUMN));

switch (operator) {
case EQUAL:
case NOT_EQUAL:
case LESS_THAN:
case LESS_THAN_OR_EQUAL:
case GREATER_THAN:
case GREATER_THAN_OR_EQUAL:
assertThat(converted).isPresent();
assertThat(converted.get().expression()).isEqualTo(format("(\"c_bigint\") %s (?)", operator.getValue()));
assertThat(converted.get().parameters()).isEqualTo(List.of(new QueryParameter(BIGINT, Optional.of(42L))));
break;
case IDENTICAL:
assertThat(converted).isPresent();
assertThat(converted.get().expression()).isEqualTo(format("((\"c_bigint\") = (?) OR ((\"c_bigint\") IS NULL AND (?) IS NULL))"));
assertThat(converted.get().parameters()).isEqualTo(List.of(new QueryParameter(BIGINT, Optional.of(42L)),
new QueryParameter(BIGINT, Optional.of(42L))));
break;
}
}
}

@Test
void testConvertIsNull()
{
// c_varchar IS NULL
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION,
translateToConnectorExpression(
new IsNull(
new Reference(VARCHAR, "c_varchar_symbol"))),
Map.of("c_varchar_symbol", VARCHAR_COLUMN))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IS NULL");
assertThat(converted.parameters()).isEqualTo(List.of());
}

@Test
void testConvertIsNotNull()
{
// c_varchar IS NOT NULL
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION,
translateToConnectorExpression(
not(FUNCTIONS.getMetadata(), new IsNull(new Reference(VARCHAR, "c_varchar_symbol")))),
Map.of("c_varchar_symbol", VARCHAR_COLUMN))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IS NOT NULL");
assertThat(converted.parameters()).isEqualTo(List.of());
}

@Test
void testConvertNotExpression()
{
// NOT(expression)
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION,
translateToConnectorExpression(
not(
FUNCTIONS.getMetadata(),
not(FUNCTIONS.getMetadata(), new IsNull(new Reference(VARCHAR, "c_varchar_symbol"))))),
Map.of("c_varchar_symbol", VARCHAR_COLUMN))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("NOT ((\"c_varchar\") IS NOT NULL)");
assertThat(converted.parameters()).isEqualTo(List.of());
}

@Test
void testConvertIn()
{
ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(
SESSION,
translateToConnectorExpression(
new In(
new Reference(createVarcharType(10), "c_varchar"),
List.of(
new Constant(VARCHAR_COLUMN.getColumnType(), utf8Slice("value1")),
new Constant(VARCHAR_COLUMN.getColumnType(), utf8Slice("value2")),
new Reference(createVarcharType(10), "c_varchar2")))),
Map.of(VARCHAR_COLUMN.getColumnName(), VARCHAR_COLUMN, VARCHAR_COLUMN2.getColumnName(), VARCHAR_COLUMN2))
.orElseThrow();
assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IN (?, ?, \"c_varchar2\")");
assertThat(converted.parameters()).isEqualTo(List.of(
new QueryParameter(createVarcharType(10), Optional.of(utf8Slice("value1"))),
new QueryParameter(createVarcharType(10), Optional.of(utf8Slice("value2")))));
}

private ConnectorExpression translateToConnectorExpression(Expression expression)
{
return ConnectorExpressionTranslator.translate(TEST_SESSION, expression)
.orElseThrow();
}
}
Loading