Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ selectItem
relation
: left=relation
( CROSS JOIN right=sampledRelation
| ASOF JOIN rightRelation=relation joinCriteria
| joinType JOIN rightRelation=relation joinCriteria
| NATURAL joinType JOIN right=sampledRelation
) #joinRelation
Expand Down Expand Up @@ -1070,6 +1071,7 @@ AND: 'AND';
ANY: 'ANY';
ARRAY: 'ARRAY';
AS: 'AS';
ASOF: 'ASOF';
ASC: 'ASC';
AT: 'AT';
AUTHORIZATION: 'AUTHORIZATION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void test()
"ARRAY",
"AS",
"ASC",
"ASOF",
"AT",
"AUTHORIZATION",
"AUTO",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected Optional<PlanNodeStatsEstimate> doCalculate(JoinNode node, Context con

return switch (node.getType()) {
case INNER -> Optional.of(computeInnerJoinStats(node, crossJoinStats, context.session()));
case LEFT -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case LEFT, ASOF -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case RIGHT -> Optional.of(computeRightJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case FULL -> Optional.of(computeFullJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected Optional<PlanNodeStatsEstimate> doCalculate(SpatialJoinNode node, Cont

return switch (node.getType()) {
case INNER -> Optional.of(statsCalculator.filterStats(crossJoinStats, node.getFilter(), context.session()));
case LEFT -> Optional.of(PlanNodeStatsEstimate.unknown());
case LEFT, ASOF -> Optional.of(PlanNodeStatsEstimate.unknown());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static JoinOperatorType ofJoinNodeType(JoinType joinNodeType, boolean out
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case LEFT, ASOF -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public Expression visitUnion(UnionNode node, Void context)
public Expression visitUnnest(UnnestNode node, Void context)
{
return switch (node.getJoinType()) {
case INNER, LEFT -> pullExpressionThroughSymbols(node.getSource().accept(this, context), node.getOutputSymbols());
case INNER, LEFT, ASOF -> pullExpressionThroughSymbols(node.getSource().accept(this, context), node.getOutputSymbols());
case RIGHT, FULL -> TRUE;
};
}
Expand All @@ -324,7 +324,7 @@ public Expression visitJoin(JoinNode node, Void context)
.add(combineConjuncts(joinConjuncts))
.add(node.getFilter().orElse(TRUE))
.build()), node.getOutputSymbols());
case LEFT -> combineConjuncts(ImmutableList.<Expression>builder()
case LEFT, ASOF -> combineConjuncts(ImmutableList.<Expression>builder()
.add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols()))
.addAll(pullNullableConjunctsThroughOuterJoin(extractConjuncts(rightPredicate), node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains))
.addAll(pullNullableConjunctsThroughOuterJoin(joinConjuncts, node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains))
Expand Down Expand Up @@ -536,7 +536,7 @@ public Expression visitSpatialJoin(SpatialJoinNode node, Void context)
.add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols()))
.add(pullExpressionThroughSymbols(rightPredicate, node.getOutputSymbols()))
.build());
case LEFT -> combineConjuncts(ImmutableList.<Expression>builder()
case LEFT, ASOF -> combineConjuncts(ImmutableList.<Expression>builder()
.add(pullExpressionThroughSymbols(leftPredicate, node.getOutputSymbols()))
.addAll(pullNullableConjunctsThroughOuterJoin(extractConjuncts(rightPredicate), node.getOutputSymbols(), node.getRight().getOutputSymbols()::contains))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2563,7 +2563,7 @@ public PhysicalOperation visitJoin(JoinNode node, LocalExecutionPlanContext cont
List<Symbol> rightSymbols = Lists.transform(clauses, JoinNode.EquiJoinClause::getRight);

return switch (node.getType()) {
case INNER, LEFT, RIGHT, FULL ->
case INNER, LEFT, RIGHT, FULL, ASOF ->
createLookupJoin(node, node.getLeft(), leftSymbols, node.getRight(), rightSymbols, localDynamicFilters, context);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@
import io.trino.sql.planner.iterative.rule.ReplaceRedundantJoinWithProject;
import io.trino.sql.planner.iterative.rule.ReplaceRedundantJoinWithSource;
import io.trino.sql.planner.iterative.rule.ReplaceWindowWithRowNumber;
import io.trino.sql.planner.iterative.rule.RewriteAsofJoinToLeftJoinWithTop1;
import io.trino.sql.planner.iterative.rule.RewriteExcludeColumnsFunctionToProjection;
import io.trino.sql.planner.iterative.rule.RewriteSpatialPartitioningAggregation;
import io.trino.sql.planner.iterative.rule.RewriteTableFunctionToTableScan;
Expand Down Expand Up @@ -854,6 +855,16 @@ public PlanOptimizers(
// new join nodes without JoinNode.maySkipOutputDuplicates flag set
new OptimizeDuplicateInsensitiveJoins())));

// Rewrite ASOF join semantics into LEFT join with window row_number + filter (Top-1 per left row)
// Run this before exchanges so that the physical planning operates on the rewritten plan
builder.add(new IterativeOptimizer(
plannerContext,
ruleStats,
statsCalculator,
costCalculator,
ImmutableSet.of(
new RewriteAsofJoinToLeftJoinWithTop1(plannerContext))));

// Previous invocations of PushPredicateIntoTableScan do not prune using predicate expression. The invocation in AddExchanges
// does this pruning - and we may end up with empty union branches after that. We invoke PushPredicateIntoTableScan
// and rules to remove empty branches here to get empty values node through pushdown and then prune them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public static JoinType mapJoinType(Join.Type joinType)
case LEFT -> JoinType.LEFT;
case RIGHT -> JoinType.RIGHT;
case FULL -> JoinType.FULL;
case ASOF -> JoinType.ASOF;
};
}

Expand Down Expand Up @@ -1112,6 +1113,10 @@ private RelationPlan planJoinUsing(Join node, RelationPlan left, RelationPlan ri

If casts are redundant (due to column type and common type being equal),
they will be removed by optimization passes.

For ASOF joins: l ASOF JOIN r USING (k1, ..., kn, ts)
The last column (ts) is treated as an inequality: r.ts <= l.ts
All other columns are treated as equi-join criteria.
*/

List<Identifier> joinColumns = ((JoinUsing) node.getCriteria().orElseThrow()).getColumns();
Expand All @@ -1128,6 +1133,11 @@ If casts are redundant (due to column type and common type being equal),

leftCoercions.putIdentities(left.getRoot().getOutputSymbols());
rightCoercions.putIdentities(right.getRoot().getOutputSymbols());

// For ASOF joins, the last column in USING is treated as an inequality condition
boolean isAsofJoin = node.getType() == Join.Type.ASOF;
int equiColumnCount = isAsofJoin ? joinColumns.size() - 1 : joinColumns.size();

for (int i = 0; i < joinColumns.size(); i++) {
Identifier identifier = joinColumns.get(i);
Type type = analysis.getType(identifier);
Expand All @@ -1144,12 +1154,24 @@ If casts are redundant (due to column type and common type being equal),
rightCoercions.put(rightOutput, new Cast(right.getSymbol(rightField).toSymbolReference(), type));
rightJoinColumns.put(identifier, rightOutput);

clauses.add(new JoinNode.EquiJoinClause(leftOutput, rightOutput));
// For ASOF joins, only add equi-join clauses for all but the last column
if (i < equiColumnCount) {
clauses.add(new JoinNode.EquiJoinClause(leftOutput, rightOutput));
}
}

ProjectNode leftCoercion = new ProjectNode(idAllocator.getNextId(), left.getRoot(), leftCoercions.build());
ProjectNode rightCoercion = new ProjectNode(idAllocator.getNextId(), right.getRoot(), rightCoercions.build());

// For ASOF joins, create an inequality filter for the last column: right.ts <= left.ts
Optional<Expression> filter = Optional.empty();
if (isAsofJoin) {
Identifier lastColumn = joinColumns.get(joinColumns.size() - 1);
Symbol leftTs = leftJoinColumns.get(lastColumn);
Symbol rightTs = rightJoinColumns.get(lastColumn);
filter = Optional.of(new Comparison(LESS_THAN_OR_EQUAL, rightTs.toSymbolReference(), leftTs.toSymbolReference()));
}

JoinNode join = new JoinNode(
idAllocator.getNextId(),
mapJoinType(node.getType()),
Expand All @@ -1159,7 +1181,7 @@ If casts are redundant (due to column type and common type being equal),
leftCoercion.getOutputSymbols(),
rightCoercion.getOutputSymbols(),
false,
Optional.empty(),
filter,
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ private JoinType getJoinType(JoinNode joinNode)
return switch (joinNode.getType()) {
case INNER -> JoinType.INNER;
case LEFT -> JoinType.LEFT_OUTER;
case ASOF -> JoinType.ASOF;
case RIGHT -> JoinType.RIGHT_OUTER;
case FULL -> JoinType.FULL_OUTER;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private boolean canRemoveJoin(JoinNode joinNode, Lookup lookup)
PlanNode right = joinNode.getRight();
return switch (joinNode.getType()) {
case INNER -> isEmpty(left, lookup) || isEmpty(right, lookup);
case LEFT -> isEmpty(left, lookup);
case LEFT, ASOF -> isEmpty(left, lookup);
case RIGHT -> isEmpty(right, lookup);
case FULL -> isEmpty(left, lookup) && isEmpty(right, lookup);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Result apply(JoinNode node, Captures captures, Context context)
}
yield Result.empty();
}
case LEFT -> {
case LEFT, ASOF -> {
if (canInlineLeftSource && rightCardinality.isAtLeastScalar()) {
yield Result.ofPlanNode(appendProjection(right, node.getRightOutputSymbols(), left, node.getLeftOutputSymbols(), context.getIdAllocator()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Result apply(JoinNode node, Captures captures, Context context)

return switch (node.getType()) {
case INNER -> Result.empty();
case LEFT -> !isEmpty(left, lookup) && isEmpty(right, lookup) ?
case LEFT, ASOF -> !isEmpty(left, lookup) && isEmpty(right, lookup) ?
Result.ofPlanNode(appendNulls(
left,
node.getLeftOutputSymbols(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ else if (rightSourceScalarWithNoOutputs) {
}
yield Result.ofPlanNode(restrictOutputs(context.getIdAllocator(), source, ImmutableSet.copyOf(sourceOutputs)).orElse(source));
}
case LEFT -> rightSourceScalarWithNoOutputs ?
case LEFT, ASOF -> rightSourceScalarWithNoOutputs ?
Result.ofPlanNode(restrictOutputs(context.getIdAllocator(), node.getLeft(), ImmutableSet.copyOf(node.getLeftOutputSymbols()))
.orElse(node.getLeft())) :
Result.empty();
Expand Down
Loading