diff --git a/docs/src/main/sphinx/connector/exasol.md b/docs/src/main/sphinx/connector/exasol.md index 5f0c29a89f4b..2f5aafeb0b83 100644 --- a/docs/src/main/sphinx/connector/exasol.md +++ b/docs/src/main/sphinx/connector/exasol.md @@ -221,3 +221,10 @@ The connector supports pushdown for a number of operations: - {ref}`limit-pushdown` - {ref}`topn-pushdown` +- {ref}`join-pushdown` + +```{include} pushdown-correctness-behavior.fragment +``` + +```{include} join-pushdown-enabled-true.fragment +``` diff --git a/plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java b/plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java index 915b7c58e72a..a85be0f8e07e 100644 --- a/plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java +++ b/plugin/trino-exasol/src/main/java/io/trino/plugin/exasol/ExasolClient.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableSet; import com.google.inject.Inject; import io.airlift.slice.Slices; +import io.trino.plugin.base.expression.ConnectorExpressionRewriter; import io.trino.plugin.base.mapping.IdentifierMapping; import io.trino.plugin.jdbc.BaseJdbcClient; import io.trino.plugin.jdbc.BaseJdbcConfig; @@ -35,6 +36,9 @@ import io.trino.plugin.jdbc.SliceWriteFunction; import io.trino.plugin.jdbc.WriteFunction; import io.trino.plugin.jdbc.WriteMapping; +import io.trino.plugin.jdbc.expression.JdbcConnectorExpressionRewriterBuilder; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.plugin.jdbc.expression.RewriteIn; import io.trino.plugin.jdbc.logging.RemoteQueryModifier; import io.trino.spi.TrinoException; import io.trino.spi.connector.AggregateFunction; @@ -43,6 +47,8 @@ import io.trino.spi.connector.ColumnPosition; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.type.DecimalType; import io.trino.spi.type.Type; import java.sql.Connection; @@ -65,6 +71,8 @@ import static io.trino.plugin.jdbc.StandardColumnMappings.defaultVarcharColumnMapping; import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping; import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction; import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping; import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling; import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR; @@ -84,6 +92,7 @@ public class ExasolClient .add("EXA_STATISTICS") .add("SYS") .build(); + private final ConnectorExpressionRewriter connectorExpressionRewriter; @Inject public ExasolClient( @@ -94,6 +103,25 @@ public ExasolClient( RemoteQueryModifier queryModifier) { super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false); + // Basic implementation required to enable JOIN pushdown support + // It is covered by "testJoinpushdown" integration tests. + // More detailed test case scenarios are covered by Unit tests in "TestExasolClient" + this.connectorExpressionRewriter = JdbcConnectorExpressionRewriterBuilder.newBuilder() + .addStandardRules(this::quoted) + .add(new RewriteIn()) + .map("$equal(left, right)").to("left = right") + .map("$not_equal(left, right)").to("left <> right") + // Exasol doesn't support "IS NOT DISTINCT FROM" expression, + // so "$identical(left, right)" is rewritten with equivalent "(left = right OR (left IS NULL AND right IS NULL))" expression + .map("$identical(left, right)").to("(left = right OR (left IS NULL AND right IS NULL))") + .map("$less_than(left, right)").to("left < right") + .map("$less_than_or_equal(left, right)").to("left <= right") + .map("$greater_than(left, right)").to("left > right") + .map("$greater_than_or_equal(left, right)").to("left >= right") + .map("$not($is_null(value))").to("value IS NOT NULL") + .map("$not(value: boolean)").to("NOT value") + .map("$is_null(value)").to("value IS NULL") + .build(); } @Override @@ -195,11 +223,16 @@ protected void renameTable(ConnectorSession session, Connection connection, Stri throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming tables"); } + @Override + public Optional convertPredicate(ConnectorSession session, ConnectorExpression expression, Map assignments) + { + return connectorExpressionRewriter.rewrite(session, expression, assignments); + } + @Override protected boolean isSupportedJoinCondition(ConnectorSession session, JdbcJoinCondition joinCondition) { - // Deactivated because test 'testJoinPushdown()' requires write access which is not implemented for Exasol - return false; + return true; } @Override @@ -311,7 +344,15 @@ private static SliceWriteFunction hashTypeWriteFunction() @Override public WriteMapping toWriteMapping(ConnectorSession session, Type type) { - throw new TrinoException(NOT_SUPPORTED, "This connector does not support writing"); + if (type instanceof DecimalType decimalType) { + String dataType = "decimal(%s, %s)".formatted(decimalType.getPrecision(), decimalType.getScale()); + if (decimalType.isShort()) { + return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType)); + } + return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType)); + } + + throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type.getDisplayName()); } @Override diff --git a/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolClient.java b/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolClient.java new file mode 100644 index 000000000000..3475eff7a14c --- /dev/null +++ b/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolClient.java @@ -0,0 +1,243 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.exasol; + +import com.google.common.collect.ImmutableList; +import io.trino.metadata.TestingFunctionResolution; +import io.trino.plugin.base.mapping.DefaultIdentifierMapping; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.DefaultQueryBuilder; +import io.trino.plugin.jdbc.JdbcClient; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcMetadataConfig; +import io.trino.plugin.jdbc.JdbcMetadataSessionProperties; +import io.trino.plugin.jdbc.JdbcTypeHandle; +import io.trino.plugin.jdbc.QueryParameter; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.plugin.jdbc.logging.RemoteQueryModifier; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.expression.ConnectorExpression; +import io.trino.spi.session.PropertyMetadata; +import io.trino.sql.ir.Comparison; +import io.trino.sql.ir.Constant; +import io.trino.sql.ir.Expression; +import io.trino.sql.ir.In; +import io.trino.sql.ir.IsNull; +import io.trino.sql.ir.Logical; +import io.trino.sql.ir.Reference; +import io.trino.sql.planner.ConnectorExpressionTranslator; +import io.trino.testing.TestingConnectorSession; +import org.junit.jupiter.api.Test; + +import java.sql.Types; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.spi.type.VarcharType.createVarcharType; +import static io.trino.sql.ir.IrExpressions.not; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +final class TestExasolClient +{ + private static final JdbcClient JDBC_CLIENT = new ExasolClient( + new BaseJdbcConfig(), + session -> { + throw new UnsupportedOperationException(); + }, + new DefaultQueryBuilder(RemoteQueryModifier.NONE), + new DefaultIdentifierMapping(), + RemoteQueryModifier.NONE); + + private static final ConnectorSession SESSION = TestingConnectorSession + .builder() + .setPropertyMetadata(ImmutableList.>builder() + .addAll(new JdbcMetadataSessionProperties(new JdbcMetadataConfig(), Optional.empty()).getSessionProperties()) + .build()) + .build(); + + private static final TestingFunctionResolution FUNCTIONS = new TestingFunctionResolution(); + + private static final JdbcColumnHandle BIGINT_COLUMN = + JdbcColumnHandle.builder() + .setColumnName("c_bigint") + .setColumnType(BIGINT) + .setJdbcTypeHandle(new JdbcTypeHandle(Types.BIGINT, Optional.of("int8"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty())) + .build(); + + private static final JdbcColumnHandle VARCHAR_COLUMN = + JdbcColumnHandle.builder() + .setColumnName("c_varchar") + .setColumnType(createVarcharType(10)) + .setJdbcTypeHandle(new JdbcTypeHandle(Types.VARCHAR, Optional.of("varchar"), Optional.of(10), Optional.empty(), Optional.empty(), Optional.empty())) + .build(); + + private static final JdbcColumnHandle VARCHAR_COLUMN2 = + JdbcColumnHandle.builder() + .setColumnName("c_varchar2") + .setColumnType(createVarcharType(10)) + .setJdbcTypeHandle(new JdbcTypeHandle(Types.VARCHAR, Optional.of("varchar"), Optional.of(10), Optional.empty(), Optional.empty(), Optional.empty())) + .build(); + + @Test + void testConvertOr() + { + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate( + SESSION, + translateToConnectorExpression( + new Logical( + Logical.Operator.OR, + List.of( + new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L)), + new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol_2"), new Constant(BIGINT, 415L))))), + Map.of( + "c_bigint_symbol", BIGINT_COLUMN, + "c_bigint_symbol_2", BIGINT_COLUMN)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("((\"c_bigint\") = (?)) OR ((\"c_bigint\") = (?))"); + assertThat(converted.parameters()).isEqualTo(List.of( + new QueryParameter(BIGINT, Optional.of(42L)), + new QueryParameter(BIGINT, Optional.of(415L)))); + } + + @Test + void testConvertOrWithAnd() + { + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate( + SESSION, + translateToConnectorExpression( + new Logical( + Logical.Operator.OR, + List.of( + new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L)), + new Logical( + Logical.Operator.AND, + List.of( + new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 43L)), + new Comparison(Comparison.Operator.EQUAL, new Reference(BIGINT, "c_bigint_symbol_2"), new Constant(BIGINT, 44L))))))), + Map.of( + "c_bigint_symbol", BIGINT_COLUMN, + "c_bigint_symbol_2", BIGINT_COLUMN)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("((\"c_bigint\") = (?)) OR (((\"c_bigint\") = (?)) AND ((\"c_bigint\") = (?)))"); + assertThat(converted.parameters()).isEqualTo(List.of( + new QueryParameter(BIGINT, Optional.of(42L)), + new QueryParameter(BIGINT, Optional.of(43L)), + new QueryParameter(BIGINT, Optional.of(44L)))); + } + + @Test + void testConvertComparison() + { + for (Comparison.Operator operator : Comparison.Operator.values()) { + Optional converted = JDBC_CLIENT.convertPredicate( + SESSION, + translateToConnectorExpression( + new Comparison(operator, new Reference(BIGINT, "c_bigint_symbol"), new Constant(BIGINT, 42L))), + Map.of("c_bigint_symbol", BIGINT_COLUMN)); + + switch (operator) { + case EQUAL: + case NOT_EQUAL: + case LESS_THAN: + case LESS_THAN_OR_EQUAL: + case GREATER_THAN: + case GREATER_THAN_OR_EQUAL: + assertThat(converted).isPresent(); + assertThat(converted.get().expression()).isEqualTo(format("(\"c_bigint\") %s (?)", operator.getValue())); + assertThat(converted.get().parameters()).isEqualTo(List.of(new QueryParameter(BIGINT, Optional.of(42L)))); + break; + case IDENTICAL: + assertThat(converted).isPresent(); + assertThat(converted.get().expression()).isEqualTo(format("((\"c_bigint\") = (?) OR ((\"c_bigint\") IS NULL AND (?) IS NULL))")); + assertThat(converted.get().parameters()).isEqualTo(List.of(new QueryParameter(BIGINT, Optional.of(42L)), + new QueryParameter(BIGINT, Optional.of(42L)))); + break; + } + } + } + + @Test + void testConvertIsNull() + { + // c_varchar IS NULL + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION, + translateToConnectorExpression( + new IsNull( + new Reference(VARCHAR, "c_varchar_symbol"))), + Map.of("c_varchar_symbol", VARCHAR_COLUMN)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IS NULL"); + assertThat(converted.parameters()).isEqualTo(List.of()); + } + + @Test + void testConvertIsNotNull() + { + // c_varchar IS NOT NULL + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION, + translateToConnectorExpression( + not(FUNCTIONS.getMetadata(), new IsNull(new Reference(VARCHAR, "c_varchar_symbol")))), + Map.of("c_varchar_symbol", VARCHAR_COLUMN)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IS NOT NULL"); + assertThat(converted.parameters()).isEqualTo(List.of()); + } + + @Test + void testConvertNotExpression() + { + // NOT(expression) + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate(SESSION, + translateToConnectorExpression( + not( + FUNCTIONS.getMetadata(), + not(FUNCTIONS.getMetadata(), new IsNull(new Reference(VARCHAR, "c_varchar_symbol"))))), + Map.of("c_varchar_symbol", VARCHAR_COLUMN)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("NOT ((\"c_varchar\") IS NOT NULL)"); + assertThat(converted.parameters()).isEqualTo(List.of()); + } + + @Test + void testConvertIn() + { + ParameterizedExpression converted = JDBC_CLIENT.convertPredicate( + SESSION, + translateToConnectorExpression( + new In( + new Reference(createVarcharType(10), "c_varchar"), + List.of( + new Constant(VARCHAR_COLUMN.getColumnType(), utf8Slice("value1")), + new Constant(VARCHAR_COLUMN.getColumnType(), utf8Slice("value2")), + new Reference(createVarcharType(10), "c_varchar2")))), + Map.of(VARCHAR_COLUMN.getColumnName(), VARCHAR_COLUMN, VARCHAR_COLUMN2.getColumnName(), VARCHAR_COLUMN2)) + .orElseThrow(); + assertThat(converted.expression()).isEqualTo("(\"c_varchar\") IN (?, ?, \"c_varchar2\")"); + assertThat(converted.parameters()).isEqualTo(List.of( + new QueryParameter(createVarcharType(10), Optional.of(utf8Slice("value1"))), + new QueryParameter(createVarcharType(10), Optional.of(utf8Slice("value2"))))); + } + + private ConnectorExpression translateToConnectorExpression(Expression expression) + { + return ConnectorExpressionTranslator.translate(TEST_SESSION, expression) + .orElseThrow(); + } +} diff --git a/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolConnectorTest.java b/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolConnectorTest.java index 175f9dd72e7a..cbf57d4e75cf 100644 --- a/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolConnectorTest.java +++ b/plugin/trino-exasol/src/test/java/io/trino/plugin/exasol/TestExasolConnectorTest.java @@ -13,7 +13,9 @@ */ package io.trino.plugin.exasol; +import io.trino.Session; import io.trino.plugin.jdbc.BaseJdbcConnectorTest; +import io.trino.plugin.jdbc.JoinOperator; import io.trino.sql.planner.plan.AggregationNode; import io.trino.sql.planner.plan.ProjectNode; import io.trino.testing.MaterializedResult; @@ -35,6 +37,7 @@ import static io.trino.testing.MaterializedResult.resultBuilder; import static io.trino.testing.TestingNames.randomNameSuffix; import static java.lang.String.format; +import static java.util.Arrays.asList; import static java.util.stream.Collectors.joining; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; @@ -60,6 +63,8 @@ protected QueryRunner createQueryRunner() protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { return switch (connectorBehavior) { + case SUPPORTS_JOIN_PUSHDOWN -> true; + // Parallel writing is not supported due to restrictions of the Exasol JDBC driver. case SUPPORTS_ADD_COLUMN, SUPPORTS_ARRAY, @@ -87,7 +92,16 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) protected TestTable newTrinoTable(String namePrefix, @Language("SQL") String tableDefinition, List rowsToInsert) { // Use Exasol executor because the connector does not support creating tables - return new TestTable(exasolServer.getSqlExecutor(), TEST_SCHEMA + "." + namePrefix, tableDefinition, rowsToInsert); + return new TestTable(exasolServer.getSqlExecutor(), TEST_SCHEMA + "." + namePrefix, + normalizeTableDefinition(tableDefinition), rowsToInsert); + } + + // Normalize to add test schema prefix to the possible name of the nation table in table definition sql + // Workaround to fix `testJoinpushdown` in `BaseJdbcConnectorTest` + // Exasol table definition sql for `nation` table is prefixed with test schema name to fix the test + private String normalizeTableDefinition(String original) + { + return original.replaceAll("FROM nation", "FROM %s.nation".formatted(TEST_SCHEMA)); } @Override @@ -393,6 +407,35 @@ public void testExecuteProcedureWithNamedArgument() } } + @Test + // These integration tests trigger "toWriteMapping" in ExasolClient for DECIMAL types + // These integration tests also trigger "convertPredicate" in ExasolClient for EQUAL predicate + // Basic implementations of "toWriteMapping" and "convertPredicate" are prerequisites for enabling JOIN pushdown support. + // These integration tests cover basic implementations of "toWriteMapping" and "convertPredicate" + // "testJoinPushdown" integration test cases additionally cover basic implementations of "toWriteMapping" and "convertPredicate" + void testToWriteMappingForDecimalType() + { + testToWriteMappingForDecimalType(16, 6, "123456.123456"); + testToWriteMappingForDecimalType(36, 12, "123456789012345612345678.901234567890"); + testToWriteMappingForDecimalType(19, 0, "1"); + testToWriteMappingForDecimalType(19, 0, "1234567890123456789"); + } + + private void testToWriteMappingForDecimalType(int precision, int scale, String decimalValue) + { + String tableDefinition = "(d_col decimal(%d, %d))".formatted(precision, scale); + try (TestTable testTable = new TestTable( + exasolServer::execute, + "tpch.test_to_write_mapping_decimal", + tableDefinition, + asList(decimalValue))) { + Session session = joinPushdownEnabled(getSession()); + assertJoinConditionallyPushedDown(session, + "SELECT n.d_col FROM %s n LEFT JOIN (SELECT * FROM orders WHERE orderkey = 1) o ON n.d_col = %s".formatted(testTable.getName(), decimalValue), + expectJoinPushdownOnEmptyProjection(JoinOperator.LEFT_JOIN)); + } + } + @Override protected SqlExecutor onRemoteDatabase() {