diff --git a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 index 02b83dbdb7fe..a4254f481270 100644 --- a/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 +++ b/core/trino-grammar/src/main/antlr4/io/trino/grammar/sql/SqlBase.g4 @@ -365,6 +365,7 @@ selectItem relation : left=relation ( CROSS JOIN right=sampledRelation + | ASOF JOIN rightRelation=relation joinCriteria | joinType JOIN rightRelation=relation joinCriteria | NATURAL joinType JOIN right=sampledRelation ) #joinRelation @@ -1070,6 +1071,7 @@ AND: 'AND'; ANY: 'ANY'; ARRAY: 'ARRAY'; AS: 'AS'; +ASOF: 'ASOF'; ASC: 'ASC'; AT: 'AT'; AUTHORIZATION: 'AUTHORIZATION'; diff --git a/core/trino-grammar/src/test/java/io/trino/grammar/sql/TestSqlKeywords.java b/core/trino-grammar/src/test/java/io/trino/grammar/sql/TestSqlKeywords.java index cb44176dc6ef..f092a6e89d85 100644 --- a/core/trino-grammar/src/test/java/io/trino/grammar/sql/TestSqlKeywords.java +++ b/core/trino-grammar/src/test/java/io/trino/grammar/sql/TestSqlKeywords.java @@ -38,6 +38,7 @@ public void test() "ARRAY", "AS", "ASC", + "ASOF", "AT", "AUTHORIZATION", "AUTO", diff --git a/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java b/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java index be4a580bcb2a..b8ad564d503b 100644 --- a/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java +++ b/core/trino-main/src/main/java/io/trino/cost/JoinStatsRule.java @@ -85,7 +85,7 @@ protected Optional doCalculate(JoinNode node, Context con return switch (node.getType()) { case INNER -> Optional.of(computeInnerJoinStats(node, crossJoinStats, context.session())); - case LEFT -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session())); + case LEFT, ASOF -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session())); case RIGHT -> Optional.of(computeRightJoinStats(node, leftStats, rightStats, crossJoinStats, context.session())); case FULL -> Optional.of(computeFullJoinStats(node, leftStats, rightStats, crossJoinStats, context.session())); }; diff --git a/core/trino-main/src/main/java/io/trino/cost/SpatialJoinStatsRule.java b/core/trino-main/src/main/java/io/trino/cost/SpatialJoinStatsRule.java index 0f5d740417b3..b89934503b38 100644 --- a/core/trino-main/src/main/java/io/trino/cost/SpatialJoinStatsRule.java +++ b/core/trino-main/src/main/java/io/trino/cost/SpatialJoinStatsRule.java @@ -44,7 +44,7 @@ protected Optional doCalculate(SpatialJoinNode node, Cont return switch (node.getType()) { case INNER -> Optional.of(statsCalculator.filterStats(crossJoinStats, node.getFilter(), context.session())); - case LEFT -> Optional.of(PlanNodeStatsEstimate.unknown()); + case LEFT, ASOF -> Optional.of(PlanNodeStatsEstimate.unknown()); }; } diff --git a/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java b/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java index 3c2797bcf670..2e15baad5438 100644 --- a/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java +++ b/core/trino-main/src/main/java/io/trino/operator/JoinOperatorType.java @@ -32,7 +32,7 @@ public static JoinOperatorType ofJoinNodeType(JoinType joinNodeType, boolean out { return switch (joinNodeType) { case INNER -> innerJoin(outputSingleMatch, waitForBuild); - case LEFT -> probeOuterJoin(outputSingleMatch); + case LEFT, ASOF -> probeOuterJoin(outputSingleMatch); case RIGHT -> lookupOuterJoin(waitForBuild); case FULL -> fullOuterJoin(); }; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/EffectivePredicateExtractor.java b/core/trino-main/src/main/java/io/trino/sql/planner/EffectivePredicateExtractor.java index 3df5380c757e..e5ac3bdaea38 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/EffectivePredicateExtractor.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/EffectivePredicateExtractor.java @@ -302,7 +302,7 @@ public Expression visitUnion(UnionNode node, Void context) public Expression visitUnnest(UnnestNode node, Void context) { return switch (node.getJoinType()) { - case INNER, LEFT -> pullExpressionThroughSymbols(node.getSource().accept(this, context), node.getOutputSymbols()); + case INNER, LEFT, ASOF -> pullExpressionThroughSymbols(node.getSource().accept(this, context), node.getOutputSymbols()); case RIGHT, FULL -> TRUE; }; } @@ -324,7 +324,7 @@ public Expression visitJoin(JoinNode node, Void context) .add(combineConjuncts(joinConjuncts)) .add(node.getFilter().orElse(TRUE)) .build()), node.getOutputSymbols()); - case LEFT -> combineConjuncts(ImmutableList.builder() + case LEFT, ASOF -> combineConjuncts(ImmutableList.builder() .add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols())) .addAll(pullNullableConjunctsThroughOuterJoin(extractConjuncts(rightPredicate), node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains)) .addAll(pullNullableConjunctsThroughOuterJoin(joinConjuncts, node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains)) @@ -536,7 +536,7 @@ public Expression visitSpatialJoin(SpatialJoinNode node, Void context) .add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols())) .add(pullExpressionThroughSymbols(rightPredicate, node.getOutputSymbols())) .build()); - case LEFT -> combineConjuncts(ImmutableList.builder() + case LEFT, ASOF -> combineConjuncts(ImmutableList.builder() .add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols())) .addAll(pullNullableConjunctsThroughOuterJoin(extractConjuncts(rightPredicate), node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains)) .build()); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index bc5284ada326..119ce09309b5 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -2563,7 +2563,7 @@ public PhysicalOperation visitJoin(JoinNode node, LocalExecutionPlanContext cont List rightSymbols = Lists.transform(clauses, JoinNode.EquiJoinClause::getRight); return switch (node.getType()) { - case INNER, LEFT, RIGHT, FULL -> + case INNER, LEFT, RIGHT, FULL, ASOF -> createLookupJoin(node, node.getLeft(), leftSymbols, node.getRight(), rightSymbols, localDynamicFilters, context); }; } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java index 138f6dff9b8c..98a6bd165889 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/PlanOptimizers.java @@ -220,6 +220,7 @@ import io.trino.sql.planner.iterative.rule.ReplaceRedundantJoinWithProject; import io.trino.sql.planner.iterative.rule.ReplaceRedundantJoinWithSource; import io.trino.sql.planner.iterative.rule.ReplaceWindowWithRowNumber; +import io.trino.sql.planner.iterative.rule.RewriteAsofJoinToLeftJoinWithTop1; import io.trino.sql.planner.iterative.rule.RewriteExcludeColumnsFunctionToProjection; import io.trino.sql.planner.iterative.rule.RewriteSpatialPartitioningAggregation; import io.trino.sql.planner.iterative.rule.RewriteTableFunctionToTableScan; @@ -854,6 +855,16 @@ public PlanOptimizers( // new join nodes without JoinNode.maySkipOutputDuplicates flag set new OptimizeDuplicateInsensitiveJoins()))); + // Rewrite ASOF join semantics into LEFT join with window row_number + filter (Top-1 per left row) + // Run this before exchanges so that the physical planning operates on the rewritten plan + builder.add(new IterativeOptimizer( + plannerContext, + ruleStats, + statsCalculator, + costCalculator, + ImmutableSet.of( + new RewriteAsofJoinToLeftJoinWithTop1(plannerContext)))); + // Previous invocations of PushPredicateIntoTableScan do not prune using predicate expression. The invocation in AddExchanges // does this pruning - and we may end up with empty union branches after that. We invoke PushPredicateIntoTableScan // and rules to remove empty branches here to get empty values node through pushdown and then prune them. diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java index 599913025f57..bd6657c9e1b4 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/RelationPlanner.java @@ -266,6 +266,7 @@ public static JoinType mapJoinType(Join.Type joinType) case LEFT -> JoinType.LEFT; case RIGHT -> JoinType.RIGHT; case FULL -> JoinType.FULL; + case ASOF -> JoinType.ASOF; }; } @@ -1112,6 +1113,10 @@ private RelationPlan planJoinUsing(Join node, RelationPlan left, RelationPlan ri If casts are redundant (due to column type and common type being equal), they will be removed by optimization passes. + + For ASOF joins: l ASOF JOIN r USING (k1, ..., kn, ts) + The last column (ts) is treated as an inequality: r.ts <= l.ts + All other columns are treated as equi-join criteria. */ List joinColumns = ((JoinUsing) node.getCriteria().orElseThrow()).getColumns(); @@ -1128,6 +1133,11 @@ If casts are redundant (due to column type and common type being equal), leftCoercions.putIdentities(left.getRoot().getOutputSymbols()); rightCoercions.putIdentities(right.getRoot().getOutputSymbols()); + + // For ASOF joins, the last column in USING is treated as an inequality condition + boolean isAsofJoin = node.getType() == Join.Type.ASOF; + int equiColumnCount = isAsofJoin ? joinColumns.size() - 1 : joinColumns.size(); + for (int i = 0; i < joinColumns.size(); i++) { Identifier identifier = joinColumns.get(i); Type type = analysis.getType(identifier); @@ -1144,12 +1154,24 @@ If casts are redundant (due to column type and common type being equal), rightCoercions.put(rightOutput, new Cast(right.getSymbol(rightField).toSymbolReference(), type)); rightJoinColumns.put(identifier, rightOutput); - clauses.add(new JoinNode.EquiJoinClause(leftOutput, rightOutput)); + // For ASOF joins, only add equi-join clauses for all but the last column + if (i < equiColumnCount) { + clauses.add(new JoinNode.EquiJoinClause(leftOutput, rightOutput)); + } } ProjectNode leftCoercion = new ProjectNode(idAllocator.getNextId(), left.getRoot(), leftCoercions.build()); ProjectNode rightCoercion = new ProjectNode(idAllocator.getNextId(), right.getRoot(), rightCoercions.build()); + // For ASOF joins, create an inequality filter for the last column: right.ts <= left.ts + Optional filter = Optional.empty(); + if (isAsofJoin) { + Identifier lastColumn = joinColumns.get(joinColumns.size() - 1); + Symbol leftTs = leftJoinColumns.get(lastColumn); + Symbol rightTs = rightJoinColumns.get(lastColumn); + filter = Optional.of(new Comparison(LESS_THAN_OR_EQUAL, rightTs.toSymbolReference(), leftTs.toSymbolReference())); + } + JoinNode join = new JoinNode( idAllocator.getNextId(), mapJoinType(node.getType()), @@ -1159,7 +1181,7 @@ If casts are redundant (due to column type and common type being equal), leftCoercion.getOutputSymbols(), rightCoercion.getOutputSymbols(), false, - Optional.empty(), + filter, Optional.empty(), Optional.empty(), ImmutableMap.of(), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushJoinIntoTableScan.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushJoinIntoTableScan.java index 37f0054ae67d..ecadebc816c0 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushJoinIntoTableScan.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/PushJoinIntoTableScan.java @@ -253,6 +253,7 @@ private JoinType getJoinType(JoinNode joinNode) return switch (joinNode.getType()) { case INNER -> JoinType.INNER; case LEFT -> JoinType.LEFT_OUTER; + case ASOF -> JoinType.ASOF; case RIGHT -> JoinType.RIGHT_OUTER; case FULL -> JoinType.FULL_OUTER; }; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveRedundantJoin.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveRedundantJoin.java index 75a2e25b8f15..058830ac6e74 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveRedundantJoin.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RemoveRedundantJoin.java @@ -51,7 +51,7 @@ private boolean canRemoveJoin(JoinNode joinNode, Lookup lookup) PlanNode right = joinNode.getRight(); return switch (joinNode.getType()) { case INNER -> isEmpty(left, lookup) || isEmpty(right, lookup); - case LEFT -> isEmpty(left, lookup); + case LEFT, ASOF -> isEmpty(left, lookup); case RIGHT -> isEmpty(right, lookup); case FULL -> isEmpty(left, lookup) && isEmpty(right, lookup); }; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceJoinOverConstantWithProject.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceJoinOverConstantWithProject.java index 702a06386e45..680abedfd07d 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceJoinOverConstantWithProject.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceJoinOverConstantWithProject.java @@ -114,7 +114,7 @@ public Result apply(JoinNode node, Captures captures, Context context) } yield Result.empty(); } - case LEFT -> { + case LEFT, ASOF -> { if (canInlineLeftSource && rightCardinality.isAtLeastScalar()) { yield Result.ofPlanNode(appendProjection(right, node.getRightOutputSymbols(), left, node.getLeftOutputSymbols(), context.getIdAllocator())); } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithProject.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithProject.java index 5335060f6d4c..fecc30e80ced 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithProject.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithProject.java @@ -56,7 +56,7 @@ public Result apply(JoinNode node, Captures captures, Context context) return switch (node.getType()) { case INNER -> Result.empty(); - case LEFT -> !isEmpty(left, lookup) && isEmpty(right, lookup) ? + case LEFT, ASOF -> !isEmpty(left, lookup) && isEmpty(right, lookup) ? Result.ofPlanNode(appendNulls( left, node.getLeftOutputSymbols(), diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithSource.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithSource.java index fd8d1d803475..e09143171c5c 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithSource.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/ReplaceRedundantJoinWithSource.java @@ -89,7 +89,7 @@ else if (rightSourceScalarWithNoOutputs) { } yield Result.ofPlanNode(restrictOutputs(context.getIdAllocator(), source, ImmutableSet.copyOf(sourceOutputs)).orElse(source)); } - case LEFT -> rightSourceScalarWithNoOutputs ? + case LEFT, ASOF -> rightSourceScalarWithNoOutputs ? Result.ofPlanNode(restrictOutputs(context.getIdAllocator(), node.getLeft(), ImmutableSet.copyOf(node.getLeftOutputSymbols())) .orElse(node.getLeft())) : Result.empty(); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteAsofJoinToLeftJoinWithTop1.java b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteAsofJoinToLeftJoinWithTop1.java new file mode 100644 index 000000000000..23652ea089f7 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/sql/planner/iterative/rule/RewriteAsofJoinToLeftJoinWithTop1.java @@ -0,0 +1,238 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.planner.iterative.rule; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.trino.matching.Captures; +import io.trino.matching.Pattern; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.SortOrder; +import io.trino.sql.PlannerContext; +import io.trino.sql.ir.Comparison; +import io.trino.sql.ir.Constant; +import io.trino.sql.ir.Expression; +import io.trino.sql.ir.IrUtils; +import io.trino.sql.ir.Reference; +import io.trino.sql.planner.OrderingScheme; +import io.trino.sql.planner.Symbol; +import io.trino.sql.planner.iterative.Rule; +import io.trino.sql.planner.plan.AssignUniqueId; +import io.trino.sql.planner.plan.Assignments; +import io.trino.sql.planner.plan.DataOrganizationSpecification; +import io.trino.sql.planner.plan.FilterNode; +import io.trino.sql.planner.plan.JoinNode; +import io.trino.sql.planner.plan.PlanNode; +import io.trino.sql.planner.plan.ProjectNode; +import io.trino.sql.planner.plan.WindowNode; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static com.google.common.base.Preconditions.checkArgument; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.sql.ir.Comparison.Operator.GREATER_THAN; +import static io.trino.sql.ir.Comparison.Operator.GREATER_THAN_OR_EQUAL; +import static io.trino.sql.ir.Comparison.Operator.LESS_THAN; +import static io.trino.sql.ir.Comparison.Operator.LESS_THAN_OR_EQUAL; +import static io.trino.sql.planner.plan.JoinType.ASOF; +import static io.trino.sql.planner.plan.JoinType.LEFT; +import static io.trino.sql.planner.plan.Patterns.Join.type; +import static io.trino.sql.planner.plan.Patterns.join; +import static java.util.Objects.requireNonNull; + +/** + * Rewrites an ASOF join into: + * AssignUniqueId(left) -> AssignUniqueId(right) -> LEFT JOIN (with original equi-criteria and inequality) + * -> Window(row_number() PARTITION BY left_uid ORDER BY right_ts DESC NULLS LAST, right_uid DESC NULLS LAST) + * -> Filter(row_number = 1) + * -> Project(original outputs) + */ +public class RewriteAsofJoinToLeftJoinWithTop1 + implements Rule +{ + private static final Pattern PATTERN = join().with(type().equalTo(ASOF)); + + private final PlannerContext plannerContext; + + public RewriteAsofJoinToLeftJoinWithTop1(PlannerContext plannerContext) + { + this.plannerContext = requireNonNull(plannerContext, "plannerContext is null"); + } + + @Override + public Pattern getPattern() + { + return PATTERN; + } + + @Override + public Result apply(JoinNode node, Captures captures, Context context) + { + // Validate equi-criteria + if (node.getCriteria().isEmpty()) { + throw new TrinoException(NOT_SUPPORTED, "ASOF join requires at least one equi-join criterion"); + } + + // Extract and validate non-equi inequality predicate + Comparison inequality = extractSupportedInequality(node) + .orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "ASOF join requires a single supported inequality predicate")); + + // Determine which side provides the right-side timestamp symbol and ensure orderable + RightOrderingColumns orderingColumns = resolveOrderingColumns(node, inequality); + if (!orderingColumns.rightTs.type().isOrderable()) { + throw new TrinoException(NOT_SUPPORTED, "ASOF join inequality requires an orderable type on the build side"); + } + + // Assign unique ids for partitioning and deterministic tie-breaking + Symbol leftUid = context.getSymbolAllocator().newSymbol("asof_left_uid", BIGINT); + PlanNode leftWithUid = new AssignUniqueId(context.getIdAllocator().getNextId(), node.getLeft(), leftUid); + + Symbol rightUid = context.getSymbolAllocator().newSymbol("asof_right_uid", BIGINT); + PlanNode rightWithUid = new AssignUniqueId(context.getIdAllocator().getNextId(), node.getRight(), rightUid); + + // Build LEFT join preserving original equi-criteria and filter + List newLeftOutputs = ImmutableList.builder() + .addAll(node.getLeftOutputSymbols()) + .add(leftUid) + .build(); + + // Ensure right outputs include the timestamp used for ordering and right uid + ImmutableList.Builder rightOutputsBuilder = ImmutableList.builder() + .addAll(node.getRightOutputSymbols()); + if (!node.getRightOutputSymbols().contains(orderingColumns.rightTs)) { + rightOutputsBuilder.add(orderingColumns.rightTs); + } + rightOutputsBuilder.add(rightUid); + List newRightOutputs = rightOutputsBuilder.build(); + + JoinNode leftJoin = new JoinNode( + node.getId(), + LEFT, + leftWithUid, + rightWithUid, + node.getCriteria(), + newLeftOutputs, + newRightOutputs, + node.isMaySkipOutputDuplicates(), + node.getFilter(), // keep original inequality filter as-is + node.getDistributionType(), + node.isSpillable(), + node.getDynamicFilters(), + node.getReorderJoinStatsAndCost()); + + // Window: row_number over partition by leftUid ordered by rightTs desc nulls last, rightUid desc nulls last + Symbol rankSymbol = context.getSymbolAllocator().newSymbol("asof_rank", BIGINT); + WindowNode.Function rowNumber = new WindowNode.Function( + plannerContext.getMetadata().resolveBuiltinFunction("row_number", ImmutableList.of()), + ImmutableList.of(), + Optional.empty(), + WindowNode.Frame.DEFAULT_FRAME, + false, + false); + + OrderingScheme orderingScheme = new OrderingScheme( + ImmutableList.of(orderingColumns.rightTs, rightUid), + ImmutableMap.of(orderingColumns.rightTs, SortOrder.DESC_NULLS_LAST, rightUid, SortOrder.DESC_NULLS_LAST)); + + WindowNode windowNode = new WindowNode( + context.getIdAllocator().getNextId(), + leftJoin, + new DataOrganizationSpecification(ImmutableList.of(leftUid), Optional.of(orderingScheme)), + ImmutableMap.of(rankSymbol, rowNumber), + ImmutableSet.of(), + 0); + + // Filter: rank = 1 + FilterNode filterNode = new FilterNode( + context.getIdAllocator().getNextId(), + windowNode, + new Comparison(LESS_THAN_OR_EQUAL, rankSymbol.toSymbolReference(), new Constant(BIGINT, 1L))); + + // Project back to original outputs (drop helper symbols) + List finalOutputs = ImmutableList.builder() + .addAll(node.getLeftOutputSymbols()) + .addAll(node.getRightOutputSymbols()) + .build(); + ProjectNode projectNode = new ProjectNode( + context.getIdAllocator().getNextId(), + filterNode, + Assignments.identity(finalOutputs)); + + return Result.ofPlanNode(projectNode); + } + + private Optional extractSupportedInequality(JoinNode node) + { + Optional filter = node.getFilter(); + if (filter.isEmpty()) { + return Optional.empty(); + } + List conjuncts = IrUtils.extractConjuncts(filter.get()); + List comparisons = conjuncts.stream() + .filter(Comparison.class::isInstance) + .map(Comparison.class::cast) + .filter(c -> c.operator() == LESS_THAN || c.operator() == LESS_THAN_OR_EQUAL || c.operator() == GREATER_THAN || c.operator() == GREATER_THAN_OR_EQUAL) + .collect(Collectors.toList()); + if (comparisons.size() != 1) { + return Optional.empty(); + } + return Optional.of(comparisons.get(0)); + } + + private RightOrderingColumns resolveOrderingColumns(JoinNode node, Comparison inequality) + { + Set leftSymbols = ImmutableSet.copyOf(node.getLeft().getOutputSymbols()); + Set rightSymbols = ImmutableSet.copyOf(node.getRight().getOutputSymbols()); + + Expression leftExpr = inequality.left(); + Expression rightExpr = inequality.right(); + + boolean leftExprFromRight = referencesOnly(leftExpr, rightSymbols); + boolean rightExprFromLeft = referencesOnly(rightExpr, leftSymbols); + boolean leftExprFromLeft = referencesOnly(leftExpr, leftSymbols); + boolean rightExprFromRight = referencesOnly(rightExpr, rightSymbols); + + // Supported forms: + // 1) right.ts <= left.ts (or <) + if ((inequality.operator() == LESS_THAN || inequality.operator() == LESS_THAN_OR_EQUAL) && leftExprFromRight && rightExprFromLeft) { + Symbol rightTs = symbolFromReference(leftExpr); + return new RightOrderingColumns(rightTs); + } + // 2) left.ts >= right.ts (or >) + if ((inequality.operator() == GREATER_THAN || inequality.operator() == GREATER_THAN_OR_EQUAL) && leftExprFromLeft && rightExprFromRight) { + Symbol rightTs = symbolFromReference(rightExpr); + return new RightOrderingColumns(rightTs); + } + + throw new TrinoException(NOT_SUPPORTED, "ASOF join inequality must compare build-side timestamp to probe-side timestamp with <=/< or >=/>"); + } + + private boolean referencesOnly(Expression expression, Set allowed) + { + return io.trino.sql.planner.SymbolsExtractor.extractUnique(expression).stream().allMatch(allowed::contains); + } + + private Symbol symbolFromReference(Expression expression) + { + checkArgument(expression instanceof Reference, "Expected symbol reference, got: %s", expression); + return Symbol.from(expression); + } + + private record RightOrderingColumns(Symbol rightTs) {} +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/IndexJoinOptimizer.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/IndexJoinOptimizer.java index 4f81a4489a33..fa1aad2a6f24 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/IndexJoinOptimizer.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/IndexJoinOptimizer.java @@ -160,7 +160,7 @@ else if (leftIndexCandidate.isPresent()) { } break; - case LEFT: + case LEFT, ASOF: // We cannot use indices for outer joins until index join supports in-line filtering if (node.getFilter().isEmpty() && rightIndexCandidate.isPresent()) { return createIndexJoinWithExpectedOutputs(node.getOutputSymbols(), IndexJoinNode.Type.SOURCE_OUTER, leftRewritten, rightIndexCandidate.get(), createEquiJoinClause(leftJoinSymbols, rightJoinSymbols), idAllocator); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PredicatePushDown.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PredicatePushDown.java index 4fd2c1e2a56e..f53ebc502d79 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PredicatePushDown.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PredicatePushDown.java @@ -105,6 +105,7 @@ import static io.trino.sql.planner.SymbolsExtractor.extractUnique; import static io.trino.sql.planner.iterative.rule.CanonicalizeExpressionRewriter.canonicalizeExpression; import static io.trino.sql.planner.iterative.rule.UnwrapCastInComparison.unwrapCasts; +import static io.trino.sql.planner.plan.JoinType.ASOF; import static io.trino.sql.planner.plan.JoinType.FULL; import static io.trino.sql.planner.plan.JoinType.INNER; import static io.trino.sql.planner.plan.JoinType.LEFT; @@ -445,7 +446,7 @@ public PlanNode visitJoin(JoinNode node, RewriteContext context) postJoinPredicate = innerJoinPushDownResult.getPostJoinPredicate(); newJoinPredicate = innerJoinPushDownResult.getJoinPredicate(); } - case LEFT -> { + case LEFT, ASOF -> { OuterJoinPushDownResult leftOuterJoinPushDownResult = processLimitedOuterJoin( inheritedPredicate, leftEffectivePredicate, @@ -1155,7 +1156,7 @@ private Expression extractJoinPredicate(JoinNode joinNode) private JoinNode tryNormalizeToOuterToInnerJoin(JoinNode node, Expression inheritedPredicate) { - checkArgument(EnumSet.of(INNER, RIGHT, LEFT, FULL).contains(node.getType()), "Unsupported join type: %s", node.getType()); + checkArgument(EnumSet.of(INNER, RIGHT, LEFT, FULL, ASOF).contains(node.getType()), "Unsupported join type: %s", node.getType()); if (node.getType() == JoinType.INNER) { return node; @@ -1199,7 +1200,7 @@ private JoinNode tryNormalizeToOuterToInnerJoin(JoinNode node, Expression inheri node.getReorderJoinStatsAndCost()); } - if (node.getType() == JoinType.LEFT && !canConvertOuterToInner(node.getRight().getOutputSymbols(), inheritedPredicate) || + if ((node.getType() == JoinType.LEFT || node.getType() == JoinType.ASOF) && !canConvertOuterToInner(node.getRight().getOutputSymbols(), inheritedPredicate) || node.getType() == JoinType.RIGHT && !canConvertOuterToInner(node.getLeft().getOutputSymbols(), inheritedPredicate)) { return node; } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java index 8f64190b7512..ddfa10e379c1 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java @@ -567,7 +567,7 @@ public ActualProperties visitJoin(JoinNode node, List inputPro .unordered(unordered) .build(); } - case LEFT -> ActualProperties.builderFrom(probeProperties.translate(column -> filterIfMissing(node.getOutputSymbols(), column))) + case LEFT, ASOF -> ActualProperties.builderFrom(probeProperties.translate(column -> filterIfMissing(node.getOutputSymbols(), column))) .unordered(unordered) .build(); case RIGHT -> ActualProperties.builderFrom(buildProperties.translate(column -> filterIfMissing(node.getOutputSymbols(), column))) @@ -608,7 +608,7 @@ public ActualProperties visitSpatialJoin(SpatialJoinNode node, List ActualProperties.builderFrom(probeProperties.translate(column -> filterIfMissing(node.getOutputSymbols(), column))) + case LEFT, ASOF -> ActualProperties.builderFrom(probeProperties.translate(column -> filterIfMissing(node.getOutputSymbols(), column))) .build(); }; } @@ -843,7 +843,7 @@ public ActualProperties visitUnnest(UnnestNode node, List inpu }); return switch (node.getJoinType()) { - case INNER, LEFT -> translatedProperties; + case INNER, LEFT, ASOF -> translatedProperties; case RIGHT, FULL -> ActualProperties.builderFrom(translatedProperties) .local(ImmutableList.of()) .build(); @@ -926,7 +926,7 @@ static boolean spillPossible(Session session, JoinType joinType) return false; } return switch (joinType) { - case INNER, LEFT -> true; + case INNER, LEFT, ASOF -> true; // Even though join might not have "spillable" property set yet // it might still be set as spillable later on by AddLocalExchanges. case RIGHT, FULL -> false; // Currently there is no spill support for outer on the build side. diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java index 6884394b9f7d..171bdb0be52a 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/StreamPropertyDerivations.java @@ -239,7 +239,7 @@ public StreamProperties visitJoin(JoinNode node, List inputPro case INNER -> leftProperties .translate(column -> PropertyDerivations.filterOrRewrite(node.getOutputSymbols(), node.getCriteria(), column)) .unordered(unordered); - case LEFT -> leftProperties + case LEFT, ASOF -> leftProperties .translate(column -> PropertyDerivations.filterIfMissing(node.getOutputSymbols(), column)) .unordered(unordered); case RIGHT -> @@ -272,7 +272,7 @@ public StreamProperties visitSpatialJoin(SpatialJoinNode node, List leftProperties.translate(column -> PropertyDerivations.filterIfMissing(node.getOutputSymbols(), column)); + case INNER, LEFT, ASOF -> leftProperties.translate(column -> PropertyDerivations.filterIfMissing(node.getOutputSymbols(), column)); }; } @@ -537,7 +537,7 @@ public StreamProperties visitUnnest(UnnestNode node, List inpu }); return switch (node.getJoinType()) { - case INNER, LEFT -> translatedProperties; + case INNER, LEFT, ASOF -> translatedProperties; case RIGHT, FULL -> translatedProperties.unordered(true); }; } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinNode.java index 49b383198a03..9f6c4bb137d9 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinNode.java @@ -35,6 +35,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED; +import static io.trino.sql.planner.plan.JoinType.ASOF; import static io.trino.sql.planner.plan.JoinType.FULL; import static io.trino.sql.planner.plan.JoinType.INNER; import static io.trino.sql.planner.plan.JoinType.LEFT; @@ -165,6 +166,7 @@ private static JoinType flipType(JoinType type) return switch (type) { case INNER -> INNER; case FULL -> FULL; + case ASOF -> ASOF; case LEFT -> RIGHT; case RIGHT -> LEFT; }; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinType.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinType.java index 5796a9baba62..dd5e5a8f113e 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinType.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/JoinType.java @@ -18,7 +18,8 @@ public enum JoinType INNER("InnerJoin"), LEFT("LeftJoin"), RIGHT("RightJoin"), - FULL("FullJoin"); + FULL("FullJoin"), + ASOF("AsofJoin"); private final String joinLabel; diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/SpatialJoinNode.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SpatialJoinNode.java index 6fc13a914382..2be20ba5d802 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/SpatialJoinNode.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/SpatialJoinNode.java @@ -36,7 +36,8 @@ public class SpatialJoinNode public enum Type { INNER("SpatialInnerJoin"), - LEFT("SpatialLeftJoin"); + LEFT("SpatialLeftJoin"), + ASOF("SpatialAsofJoin"); private final String joinLabel; @@ -55,6 +56,7 @@ public static Type fromJoinNodeType(JoinType joinNodeType) return switch (joinNodeType) { case INNER -> Type.INNER; case LEFT -> Type.LEFT; + case ASOF -> Type.ASOF; default -> throw new IllegalArgumentException("Unsupported spatial join type: " + joinNodeType); }; } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/TestLogicalPlanner.java b/core/trino-main/src/test/java/io/trino/sql/planner/TestLogicalPlanner.java index 9c2c9b06e8d7..1f3d016de802 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/TestLogicalPlanner.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/TestLogicalPlanner.java @@ -104,6 +104,7 @@ import static io.trino.spi.predicate.Domain.multipleValues; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RealType.REAL; @@ -177,6 +178,7 @@ import static io.trino.sql.planner.plan.FrameBoundType.UNBOUNDED_FOLLOWING; import static io.trino.sql.planner.plan.JoinNode.DistributionType.PARTITIONED; import static io.trino.sql.planner.plan.JoinNode.DistributionType.REPLICATED; +import static io.trino.sql.planner.plan.JoinType.ASOF; import static io.trino.sql.planner.plan.JoinType.INNER; import static io.trino.sql.planner.plan.JoinType.LEFT; import static io.trino.sql.planner.plan.RowsPerMatch.WINDOW; @@ -191,6 +193,7 @@ import static io.trino.type.UnknownType.UNKNOWN; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestLogicalPlanner extends BasePlanTest @@ -615,6 +618,95 @@ public void testJoinWithOrderBySameKey() tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey"))))))); } + @Test + public void testAsofJoin() + { + assertPlan("SELECT o.orderkey, l.extendedprice FROM orders o ASOF JOIN lineitem l ON l.orderkey = o.orderkey AND l.shipdate <= o.orderdate", + CREATED, + anyTree( + join(ASOF, builder -> builder + .equiCriteria("ORDERS_OK", "LINEITEM_OK") + .filter(new Comparison(Comparison.Operator.LESS_THAN_OR_EQUAL, new Reference(DATE, "LINEITEM_DATE"), new Reference(DATE, "ORDERS_DATE"))) + .left( + anyTree( + tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey", "ORDERS_DATE", "orderdate")))) + .right( + anyTree( + tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey", "LINEITEM_EP", "extendedprice", "LINEITEM_DATE", "shipdate"))))))); + } + + @Test + public void testAsofJoinOptimizedPlan() + { + assertPlan( + "SELECT o.orderkey, l.extendedprice FROM orders o ASOF JOIN lineitem l ON l.orderkey = o.orderkey AND l.shipdate <= o.orderdate", + anyTree( + node(FilterNode.class, + project( + node(WindowNode.class, + anyTree( + join(LEFT, builder -> builder + .equiCriteria("ORDERS_OK", "LINEITEM_OK") + .filter(new Comparison(Comparison.Operator.LESS_THAN_OR_EQUAL, new Reference(DATE, "LINEITEM_DATE"), new Reference(DATE, "ORDERS_DATE"))) + .left( + anyTree( + tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey", "ORDERS_DATE", "orderdate")))) + .right( + anyTree( + tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey", "LINEITEM_EP", "extendedprice", "LINEITEM_DATE", "shipdate"))))))))))); + } + + @Test + public void testAsofJoinUsingOptimizedPlan() + { + assertPlan(""" + WITH orders_norm AS ( + SELECT orderkey, orderdate as date FROM orders + ), lineitem_norm AS ( + SELECT orderkey, extendedprice, shipdate as date FROM lineitem + ) + SELECT orderkey, extendedprice FROM orders_norm ASOF JOIN lineitem_norm USING (orderkey, date) + """, + anyTree( + node(FilterNode.class, + project( + node(WindowNode.class, + anyTree( + join(LEFT, builder -> builder + .equiCriteria("ORDERS_OK", "LINEITEM_OK") + .filter(new Comparison(Comparison.Operator.LESS_THAN_OR_EQUAL, new Reference(DATE, "LINEITEM_DATE"), new Reference(DATE, "ORDERS_DATE"))) + .left( + anyTree( + tableScan("orders", ImmutableMap.of("ORDERS_OK", "orderkey", "ORDERS_DATE", "orderdate")))) + .right( + anyTree( + tableScan("lineitem", ImmutableMap.of("LINEITEM_OK", "orderkey", "LINEITEM_EP", "extendedprice", "LINEITEM_DATE", "shipdate")))) + ) + ) + ) + ) + ) + )); + } + + @Test + public void testAsofJoinNegativeNoEquiCriteria() + { + assertThatThrownBy(() -> assertPlan( + "SELECT o.orderkey FROM orders o ASOF JOIN lineitem l ON l.shipdate <= o.orderdate", + anyTree(any()))) + .hasMessageContaining("ASOF join requires at least one equi-join criterion"); + } + + @Test + public void testAsofJoinNegativeMultipleInequalities() + { + assertThatThrownBy(() -> assertPlan( + "SELECT o.orderkey, l.extendedprice FROM orders o ASOF JOIN lineitem l ON l.orderkey = o.orderkey AND l.shipdate <= o.orderdate AND l.linenumber < o.custkey", + anyTree(any()))) + .hasMessageContaining("ASOF join requires a single supported inequality predicate"); + } + @Test public void testInequalityPredicatePushdownWithOuterJoin() { @@ -1325,14 +1417,14 @@ public void testRemoveEmptyUnionBranch() { assertThat(countOfMatchingNodes( plan(""" - SELECT * - FROM ( - SELECT n.name, CAST(null AS varchar) AS comment FROM nation n WHERE n.nationkey <= 3 - UNION ALL - SELECT r.name, r.comment FROM region r - ) - WHERE comment IN (SELECT r.comment FROM region r) - """), + SELECT * + FROM ( + SELECT n.name, CAST(null AS varchar) AS comment FROM nation n WHERE n.nationkey <= 3 + UNION ALL + SELECT r.name, r.comment FROM region r + ) + WHERE comment IN (SELECT r.comment FROM region r) + """), ValuesNode.class::isInstance)).isEqualTo(0); } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPushJoinIntoTableScan.java b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPushJoinIntoTableScan.java index 7e0981f7d9b3..2d5a01354f14 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPushJoinIntoTableScan.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/iterative/rule/TestPushJoinIntoTableScan.java @@ -626,6 +626,7 @@ private JoinType toSpiJoinType(io.trino.sql.planner.plan.JoinType joinType) { return switch (joinType) { case INNER -> JoinType.INNER; + case ASOF -> JoinType.ASOF; case LEFT -> JoinType.LEFT_OUTER; case RIGHT -> JoinType.RIGHT_OUTER; case FULL -> JoinType.FULL_OUTER; diff --git a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java index 8a33fe5e4916..e7c6f5753e25 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java +++ b/core/trino-parser/src/main/java/io/trino/sql/parser/AstBuilder.java @@ -1963,7 +1963,10 @@ else if (context.joinCriteria().USING() != null) { } Join.Type joinType; - if (context.joinType().LEFT() != null) { + if (context.ASOF() != null) { + joinType = Join.Type.ASOF; + } + else if (context.joinType().LEFT() != null) { joinType = Join.Type.LEFT; } else if (context.joinType().RIGHT() != null) { diff --git a/core/trino-parser/src/main/java/io/trino/sql/tree/Join.java b/core/trino-parser/src/main/java/io/trino/sql/tree/Join.java index 8063052b3bfd..1b9674789576 100644 --- a/core/trino-parser/src/main/java/io/trino/sql/tree/Join.java +++ b/core/trino-parser/src/main/java/io/trino/sql/tree/Join.java @@ -28,7 +28,7 @@ public class Join { public enum Type { - CROSS, INNER, LEFT, RIGHT, FULL, IMPLICIT + CROSS, INNER, LEFT, RIGHT, FULL, IMPLICIT, ASOF } public Join(Type type, Relation left, Relation right, Optional criteria) diff --git a/core/trino-parser/src/test/java/io/trino/sql/parser/TestAsofJoin.java b/core/trino-parser/src/test/java/io/trino/sql/parser/TestAsofJoin.java new file mode 100644 index 000000000000..e2ccde713481 --- /dev/null +++ b/core/trino-parser/src/test/java/io/trino/sql/parser/TestAsofJoin.java @@ -0,0 +1,83 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.sql.parser; + +import io.trino.sql.tree.AllColumns; +import io.trino.sql.tree.ComparisonExpression; +import io.trino.sql.tree.Identifier; +import io.trino.sql.tree.Join; +import io.trino.sql.tree.JoinOn; +import io.trino.sql.tree.JoinUsing; +import io.trino.sql.tree.LogicalExpression; +import io.trino.sql.tree.QualifiedName; +import io.trino.sql.tree.Table; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Optional; + +import static io.trino.sql.QueryUtil.equal; +import static io.trino.sql.QueryUtil.nameReference; +import static io.trino.sql.QueryUtil.selectList; +import static io.trino.sql.QueryUtil.simpleQuery; +import static io.trino.sql.parser.ParserAssert.statement; +import static io.trino.sql.parser.TreeNodes.location; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for ASOF JOIN syntax. + * These tests are disabled until the ASOF JOIN feature is implemented. + */ +public class TestAsofJoin +{ + @Test + public void testAsofJoinWithExplicitCondition() + { + assertThat(statement("SELECT * FROM a ASOF JOIN b ON a.key = b.key AND a.ts >= b.ts")) + .ignoringLocation() + .isEqualTo(simpleQuery( + selectList(new AllColumns()), + new Join( + Join.Type.ASOF, + new Table(QualifiedName.of("a")), + new Table(QualifiedName.of("b")), + Optional.of(new JoinOn( + new LogicalExpression( + location(1, 32), + LogicalExpression.Operator.AND, + List.of( + equal(nameReference("a", "key"), nameReference("b", "key")), + new ComparisonExpression( + ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL, + nameReference("a", "ts"), + nameReference("b", "ts"))))))))); + } + + @Test + public void testAsofJoinWithUsingClause() + { + assertThat(statement("SELECT * FROM a ASOF JOIN b USING (key, ts)")) + .ignoringLocation() + .isEqualTo(simpleQuery( + selectList(new AllColumns()), + new Join( + Join.Type.ASOF, + new Table(QualifiedName.of("a")), + new Table(QualifiedName.of("b")), + Optional.of(new JoinUsing( + List.of( + new Identifier("key"), + new Identifier("ts"))))))); + } +} diff --git a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParserErrorHandling.java b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParserErrorHandling.java index c9e0f2e6dd2f..49dad3193f5c 100644 --- a/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParserErrorHandling.java +++ b/core/trino-parser/src/test/java/io/trino/sql/parser/TestSqlParserErrorHandling.java @@ -50,7 +50,7 @@ private static Stream statements() Arguments.of("select * from 'oops", "line 1:15: mismatched input '''. Expecting: '(', 'JSON_TABLE', 'LATERAL', 'TABLE', 'UNNEST', "), Arguments.of("select *\nfrom x\nfrom", - "line 3:1: mismatched input 'from'. Expecting: ',', '.', 'AS', 'CROSS', 'EXCEPT', 'FETCH', 'FOR', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', " + + "line 3:1: mismatched input 'from'. Expecting: ',', '.', 'AS', 'ASOF', 'CROSS', 'EXCEPT', 'FETCH', 'FOR', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', " + "'LIMIT', 'MATCH_RECOGNIZE', 'NATURAL', 'OFFSET', 'ORDER', 'RIGHT', 'TABLESAMPLE', 'UNION', 'WHERE', 'WINDOW', , "), Arguments.of("select *\nfrom x\nwhere from", "line 3:7: mismatched input 'from'. Expecting: "), @@ -121,7 +121,7 @@ private static Stream statements() Arguments.of("SELECT foo(*) filter (", "line 1:23: mismatched input ''. Expecting: 'WHERE'"), Arguments.of("SELECT * FROM t t x", - "line 1:19: mismatched input 'x'. Expecting: '(', ',', 'CROSS', 'EXCEPT', 'FETCH', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', 'LIMIT', " + + "line 1:19: mismatched input 'x'. Expecting: '(', ',', 'ASOF', 'CROSS', 'EXCEPT', 'FETCH', 'FULL', 'GROUP', 'HAVING', 'INNER', 'INTERSECT', 'JOIN', 'LEFT', 'LIMIT', " + "'MATCH_RECOGNIZE', 'NATURAL', 'OFFSET', 'ORDER', 'RIGHT', 'TABLESAMPLE', 'UNION', 'WHERE', 'WINDOW', "), Arguments.of("SELECT * FROM t WHERE EXISTS (", "line 1:31: mismatched input ''. Expecting: "), diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/JoinType.java b/core/trino-spi/src/main/java/io/trino/spi/connector/JoinType.java index 4e699324580f..fee0d38cae25 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/JoinType.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/JoinType.java @@ -19,4 +19,5 @@ public enum JoinType LEFT_OUTER, RIGHT_OUTER, FULL_OUTER, + ASOF, } diff --git a/docs/src/main/sphinx/language/reserved.md b/docs/src/main/sphinx/language/reserved.md index 5edee925719d..1e73d2b2e81b 100644 --- a/docs/src/main/sphinx/language/reserved.md +++ b/docs/src/main/sphinx/language/reserved.md @@ -12,6 +12,7 @@ be quoted (using double quotes) in order to be used as an identifier. | `ALTER` | reserved | reserved | | `AND` | reserved | reserved | | `AS` | reserved | reserved | +| `ASOF` | | | | `AUTO` | | | | `BETWEEN` | reserved | reserved | | `BY` | reserved | reserved | diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java index 3a9c658a0abe..3d20aa80d45a 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java @@ -375,6 +375,7 @@ protected String formatJoinType(JoinType joinType) return switch (joinType) { case INNER -> "INNER JOIN"; case LEFT_OUTER -> "LEFT JOIN"; + case ASOF -> "ASOF JOIN"; case RIGHT_OUTER -> "RIGHT JOIN"; case FULL_OUTER -> "FULL JOIN"; }; diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java index 89f099cc897f..fd6c44c30683 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestEventListenerBasic.java @@ -1542,7 +1542,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of(new PlanNodeStatsAndCostSummary(10., 90., 90., 0., 0.)), ImmutableList.of(new JsonRenderedNode( - "173", + "175", "LocalExchange", ImmutableMap.of( "partitioning", "[connectorHandleType = SystemPartitioningHandle, partitioning = SINGLE, function = SINGLE]", @@ -1552,7 +1552,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of(new PlanNodeStatsAndCostSummary(10., 90., 0., 0., 0.)), ImmutableList.of(new JsonRenderedNode( - "140", + "142", "RemoteSource", ImmutableMap.of("sourceFragmentIds", "[1]"), ImmutableList.of(new Symbol(DOUBLE, "symbol_1")), @@ -1560,7 +1560,7 @@ public void testAnonymizedJsonPlan() ImmutableList.of(), ImmutableList.of()))))))), "1", new JsonRenderedNode( - "139", + "141", "LimitPartial", ImmutableMap.of( "count", "10", diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestAsofJoinQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestAsofJoinQueries.java new file mode 100644 index 000000000000..7665069e393f --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestAsofJoinQueries.java @@ -0,0 +1,166 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests; + +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; + +import static io.trino.SystemSessionProperties.JOIN_REORDERING_STRATEGY; +import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy.NONE; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public class TestAsofJoinQueries + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + { + QueryRunner queryRunner = new io.trino.testing.StandaloneQueryRunner(testSessionBuilder() + .setSystemProperty(JOIN_REORDERING_STRATEGY, NONE.toString()) + .build()); + + queryRunner.installPlugin(new io.trino.plugin.tpch.TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch", com.google.common.collect.ImmutableMap.of("tpch.splits-per-node", "1")); + + return queryRunner; + } + + @org.junit.jupiter.api.Test + public void testRightLteLeft() + { + assertQuery( + """ + WITH + left_t(k, ts, v) AS ( + VALUES + (1, DATE '1992-01-01', 10), + (1, DATE '1992-01-03', 20), + (2, DATE '1992-01-02', 30), + (3, DATE '1992-01-01', 40) -- no match case + ), + right_t(k, ts, x) AS ( + VALUES + (1, DATE '1992-01-01', 100), + (1, DATE '1992-01-02', 200), + (2, DATE '1992-01-01', 300) + ) + SELECT l.k, l.ts, r.x + FROM left_t l ASOF JOIN right_t r + ON l.k = r.k AND r.ts <= l.ts + ORDER BY l.k, l.ts""", + """ + VALUES + (1, DATE '1992-01-01', 100), + (1, DATE '1992-01-03', 200), + (2, DATE '1992-01-02', 300), + (3, DATE '1992-01-01', CAST(NULL AS INTEGER)) + """); + } + + @org.junit.jupiter.api.Test + public void testLeftGteRight() + { + assertQuery( + """ + WITH + left_t(k, ts, v) AS ( + VALUES + (1, DATE '1992-01-01', 10), + (1, DATE '1992-01-03', 20), + (2, DATE '1992-01-02', 30), + (3, DATE '1992-01-01', 40) -- no match case + ), + right_t(k, ts, x) AS ( + VALUES + (1, DATE '1992-01-01', 100), + (1, DATE '1992-01-02', 200), + (2, DATE '1992-01-01', 300) + ) + SELECT l.k, l.ts, r.x + FROM left_t l ASOF JOIN right_t r + ON l.k = r.k AND l.ts >= r.ts + ORDER BY l.k, l.ts""", + """ + VALUES + (1, DATE '1992-01-01', 100), + (1, DATE '1992-01-03', 200), + (2, DATE '1992-01-02', 300), + (3, DATE '1992-01-01', CAST(NULL AS INTEGER)) + """); + } + + @org.junit.jupiter.api.Test + public void testRightLtLeft() + { + assertQuery( + """ + WITH + left_t(k, ts, v) AS ( + VALUES + (1, DATE '1992-01-01', 10), -- no match case + (1, DATE '1992-01-03', 20), + (2, DATE '1992-01-02', 30), + (3, DATE '1992-01-01', 40) -- no match case + ), + right_t(k, ts, x) AS ( + VALUES + (1, DATE '1992-01-01', 100), + (1, DATE '1992-01-02', 200), + (2, DATE '1992-01-01', 300) + ) + SELECT l.k, l.ts, r.x + FROM left_t l ASOF JOIN right_t r + ON l.k = r.k AND r.ts < l.ts + ORDER BY l.k, l.ts""", + """ + VALUES + (1, DATE '1992-01-01', CAST(NULL AS INTEGER)), + (1, DATE '1992-01-03', 200), + (2, DATE '1992-01-02', 300), + (3, DATE '1992-01-01', CAST(NULL AS INTEGER)) + """); + } + + @org.junit.jupiter.api.Test + public void testCompositeEquiKeys() + { + assertQuery( + """ + WITH + left_t(k1, k2, ts, v) AS ( + VALUES + (1, 10, DATE '1992-01-01', 10), -- no match case + (1, 10, DATE '1992-01-03', 20), + (2, 20, DATE '1992-01-02', 30), + (3, 30, DATE '1992-01-01', 40) -- no match case + ), + right_t(k1, k2, ts, x) AS ( + VALUES + (1, 10, DATE '1992-01-01', 100), + (1, 10, DATE '1992-01-02', 200), + (2, 20, DATE '1992-01-01', 300) + ) + SELECT l.k1, l.k2, l.ts, r.x + FROM left_t l ASOF JOIN right_t r + ON l.k1 = r.k1 AND l.k2 = r.k2 AND r.ts < l.ts + ORDER BY l.k1, l.k2, l.ts""", + """ + VALUES + (1, 10, DATE '1992-01-01', CAST(NULL AS INTEGER)), + (1, 10, DATE '1992-01-03', 200), + (2, 20, DATE '1992-01-02', 300), + (3, 30, DATE '1992-01-01', CAST(NULL AS INTEGER)) + """); + } +}