Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ joinType
| LEFT OUTER?
| RIGHT OUTER?
| FULL OUTER?
| ASOF LEFT OUTER?
| ASOF
;

joinCriteria
Expand Down Expand Up @@ -1070,6 +1072,7 @@ AND: 'AND';
ANY: 'ANY';
ARRAY: 'ARRAY';
AS: 'AS';
ASOF: 'ASOF';
ASC: 'ASC';
AT: 'AT';
AUTHORIZATION: 'AUTHORIZATION';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public void test()
"ANY",
"ARRAY",
"AS",
"ASOF",
"ASC",
"AT",
"AUTHORIZATION",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ protected Optional<PlanNodeStatsEstimate> doCalculate(JoinNode node, Context con
PlanNodeStatsEstimate crossJoinStats = crossJoinStats(node, leftStats, rightStats);

return switch (node.getType()) {
case INNER -> Optional.of(computeInnerJoinStats(node, crossJoinStats, context.session()));
case LEFT -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case INNER, ASOF -> Optional.of(computeInnerJoinStats(node, crossJoinStats, context.session()));
case LEFT, ASOF_LEFT -> Optional.of(computeLeftJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case RIGHT -> Optional.of(computeRightJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
case FULL -> Optional.of(computeFullJoinStats(node, leftStats, rightStats, crossJoinStats, context.session()));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class JoinOperatorType
public static JoinOperatorType ofJoinNodeType(JoinType joinNodeType, boolean outputSingleMatch, boolean waitForBuild)
{
return switch (joinNodeType) {
case INNER -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT -> probeOuterJoin(outputSingleMatch);
case INNER, ASOF -> innerJoin(outputSingleMatch, waitForBuild);
case LEFT, ASOF_LEFT -> probeOuterJoin(outputSingleMatch);
case RIGHT -> lookupOuterJoin(waitForBuild);
case FULL -> fullOuterJoin();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public PagesIndexOrdering createPagesIndexComparator(List<Integer> sortChannels,

public Supplier<LookupSource> createLookupSourceSupplier(Session session, List<Integer> joinChannels)
{
return createLookupSourceSupplier(session, joinChannels, Optional.empty(), Optional.empty(), ImmutableList.of());
return createLookupSourceSupplier(session, joinChannels, Optional.empty(), Optional.empty(), false, ImmutableList.of());
}

public PagesHashStrategy createPagesHashStrategy(List<Integer> joinChannels)
Expand Down Expand Up @@ -498,9 +498,10 @@ public LookupSourceSupplier createLookupSourceSupplier(
List<Integer> joinChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories)
{
return createLookupSourceSupplier(session, joinChannels, filterFunctionFactory, sortChannel, searchFunctionFactories, Optional.empty(), defaultHashArraySizeSupplier());
return createLookupSourceSupplier(session, joinChannels, filterFunctionFactory, sortChannel, sortedPositionLinksDescendingOrder, searchFunctionFactories, Optional.empty(), defaultHashArraySizeSupplier());
}

public PagesSpatialIndexSupplier createPagesSpatialIndex(
Expand All @@ -524,6 +525,7 @@ public LookupSourceSupplier createLookupSourceSupplier(
List<Integer> joinChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
Optional<List<Integer>> outputChannels,
HashArraySizeSupplier hashArraySizeSupplier)
Expand All @@ -536,6 +538,7 @@ public LookupSourceSupplier createLookupSourceSupplier(
channels,
filterFunctionFactory,
sortChannel,
sortedPositionLinksDescendingOrder,
searchFunctionFactories,
hashArraySizeSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public IndexSnapshot createIndexSnapshot(UnloadedIndexKeyRecordSet indexKeysReco
}
pages.clear();

LookupSource lookupSource = outputPagesIndex.createLookupSourceSupplier(session, keyOutputChannels, Optional.empty(), Optional.empty(), ImmutableList.of()).get();
LookupSource lookupSource = outputPagesIndex.createLookupSourceSupplier(session, keyOutputChannels, Optional.empty(), Optional.empty(), false, ImmutableList.of()).get();

// Build a page containing the keys that produced no output rows, so in future requests can skip these keys
verify(missingKeysPageBuilder.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static class HashBuilderOperatorFactory
private final List<Integer> hashChannels;
private final Optional<JoinFilterFunctionFactory> filterFunctionFactory;
private final Optional<Integer> sortChannel;
private final boolean sortedPositionLinksDescendingOrder;
private final List<JoinFilterFunctionFactory> searchFunctionFactories;
private final PagesIndex.Factory pagesIndexFactory;

Expand All @@ -97,6 +98,7 @@ public HashBuilderOperatorFactory(
List<Integer> hashChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
int expectedPositions,
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -115,6 +117,7 @@ public HashBuilderOperatorFactory(
this.hashChannels = ImmutableList.copyOf(requireNonNull(hashChannels, "hashChannels is null"));
this.filterFunctionFactory = requireNonNull(filterFunctionFactory, "filterFunctionFactory is null");
this.sortChannel = sortChannel;
this.sortedPositionLinksDescendingOrder = sortedPositionLinksDescendingOrder;
this.searchFunctionFactories = ImmutableList.copyOf(searchFunctionFactories);
this.pagesIndexFactory = requireNonNull(pagesIndexFactory, "pagesIndexFactory is null");
this.spillEnabled = spillEnabled;
Expand All @@ -141,6 +144,7 @@ public HashBuilderOperator createOperator(DriverContext driverContext)
hashChannels,
filterFunctionFactory,
sortChannel,
sortedPositionLinksDescendingOrder,
searchFunctionFactories,
expectedPositions,
pagesIndexFactory,
Expand Down Expand Up @@ -215,6 +219,7 @@ public enum State
private final List<Integer> hashChannels;
private final Optional<JoinFilterFunctionFactory> filterFunctionFactory;
private final Optional<Integer> sortChannel;
private final boolean sortedPositionLinksDescendingOrder;
private final List<JoinFilterFunctionFactory> searchFunctionFactories;

private final PagesIndex index;
Expand Down Expand Up @@ -247,6 +252,7 @@ public HashBuilderOperator(
List<Integer> hashChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
int expectedPositions,
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -261,6 +267,7 @@ public HashBuilderOperator(
this.partitionIndex = partitionIndex;
this.filterFunctionFactory = filterFunctionFactory;
this.sortChannel = sortChannel;
this.sortedPositionLinksDescendingOrder = sortedPositionLinksDescendingOrder;
this.searchFunctionFactories = searchFunctionFactories;
this.localUserMemoryContext = new CoarseGrainLocalMemoryContext(operatorContext.localUserMemoryContext(), memorySyncGranularity);
this.localRevocableMemoryContext = new CoarseGrainLocalMemoryContext(operatorContext.localRevocableMemoryContext(), memorySyncGranularity);
Expand Down Expand Up @@ -650,7 +657,7 @@ private void disposeUnspilledLookupSourceIfRequested()

private LookupSourceSupplier buildLookupSource()
{
LookupSourceSupplier partition = index.createLookupSourceSupplier(operatorContext.getSession(), hashChannels, filterFunctionFactory, sortChannel, searchFunctionFactories, Optional.of(outputChannels), hashArraySizeSupplier);
LookupSourceSupplier partition = index.createLookupSourceSupplier(operatorContext.getSession(), hashChannels, filterFunctionFactory, sortChannel, sortedPositionLinksDescendingOrder, searchFunctionFactories, Optional.of(outputChannels), hashArraySizeSupplier);
checkState(lookupSourceSupplier == null, "lookupSourceSupplier is already set");
this.lookupSourceSupplier = partition;
return partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public JoinHashSupplier(
List<ObjectArrayList<Block>> channels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
HashArraySizeSupplier hashArraySizeSupplier,
OptionalInt singleBigintJoinChannel)
Expand All @@ -79,7 +80,8 @@ public JoinHashSupplier(
positionLinksFactoryBuilder = SortedPositionLinks.builder(
addresses.size(),
pagesHashStrategy,
addresses);
addresses,
sortedPositionLinksDescendingOrder);
}
else {
positionLinksFactoryBuilder = ArrayPositionLinks.builder(addresses.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ public static class FactoryBuilder
private final PositionComparator comparator;
private final PagesHashStrategy pagesHashStrategy;
private final LongArrayList addresses;
private final int orderingSign;

public FactoryBuilder(int size, PagesHashStrategy pagesHashStrategy, LongArrayList addresses)
public FactoryBuilder(int size, PagesHashStrategy pagesHashStrategy, LongArrayList addresses, boolean descendingOrder)
{
this.size = size;
this.comparator = new PositionComparator(pagesHashStrategy, addresses);
this.pagesHashStrategy = pagesHashStrategy;
this.addresses = addresses;
this.orderingSign = descendingOrder ? -1 : 1;
positionLinks = new Int2ObjectOpenHashMap<>();
}

Expand All @@ -80,8 +82,8 @@ public int link(int from, int to)
return from;
}

// make sure that from value is the smaller one
if (comparator.compare(from, to) > 0) {
// make sure that from value is the smaller (or larger depending on ordering) one
if (comparator.compare(from, to) * orderingSign > 0) {
// _from_ is larger so, just add to current chain _to_
positionLinks.computeIfAbsent(to, key -> new IntArrayList()).add(from);
return to;
Expand Down Expand Up @@ -121,7 +123,7 @@ public Factory build()
if (positions.length > 0) {
// Use the positionsList array for the merge sort temporary work buffer to avoid an extra redundant
// copy. This works because we know that initially it has the same values as the array being sorted
IntArrays.mergeSort(positions, 0, positions.length, comparator, positionsList.elements());
IntArrays.mergeSort(positions, 0, positions.length, (left, right) -> comparator.compare(left, right) * orderingSign, positionsList.elements());
// add link from starting position to position links chain
arrayPositionLinksFactoryBuilder.link(key, positions[0]);
// add links for the sorted internal elements
Expand Down Expand Up @@ -190,9 +192,9 @@ private static long sizeOfPositionLinks(int[][] sortedPositionLinks)
return retainedSize;
}

public static FactoryBuilder builder(int size, PagesHashStrategy pagesHashStrategy, LongArrayList addresses)
public static FactoryBuilder builder(int size, PagesHashStrategy pagesHashStrategy, LongArrayList addresses, boolean descendingOrder)
{
return new FactoryBuilder(size, pagesHashStrategy, addresses);
return new FactoryBuilder(size, pagesHashStrategy, addresses, descendingOrder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public static class HashBuilderOperatorFactory
private final List<Integer> hashChannels;
private final Optional<JoinFilterFunctionFactory> filterFunctionFactory;
private final Optional<Integer> sortChannel;
private final boolean sortedPositionLinksDescendingOrder;
private final List<JoinFilterFunctionFactory> searchFunctionFactories;
private final PagesIndex.Factory pagesIndexFactory;

Expand All @@ -76,6 +77,7 @@ public HashBuilderOperatorFactory(
List<Integer> hashChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
int expectedPositions,
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -92,6 +94,7 @@ public HashBuilderOperatorFactory(
this.hashChannels = ImmutableList.copyOf(requireNonNull(hashChannels, "hashChannels is null"));
this.filterFunctionFactory = requireNonNull(filterFunctionFactory, "filterFunctionFactory is null");
this.sortChannel = sortChannel;
this.sortedPositionLinksDescendingOrder = sortedPositionLinksDescendingOrder;
this.searchFunctionFactories = ImmutableList.copyOf(searchFunctionFactories);
this.pagesIndexFactory = requireNonNull(pagesIndexFactory, "pagesIndexFactory is null");
this.hashArraySizeSupplier = requireNonNull(hashArraySizeSupplier, "hashArraySizeSupplier is null");
Expand All @@ -116,6 +119,7 @@ public HashBuilderOperator createOperator(DriverContext driverContext)
hashChannels,
filterFunctionFactory,
sortChannel,
sortedPositionLinksDescendingOrder,
searchFunctionFactories,
expectedPositions,
pagesIndexFactory,
Expand Down Expand Up @@ -164,6 +168,7 @@ public enum State
private final List<Integer> hashChannels;
private final Optional<JoinFilterFunctionFactory> filterFunctionFactory;
private final Optional<Integer> sortChannel;
private final boolean sortedPositionLinksDescendingOrder;
private final List<JoinFilterFunctionFactory> searchFunctionFactories;
private final HashArraySizeSupplier hashArraySizeSupplier;

Expand All @@ -182,12 +187,13 @@ public HashBuilderOperator(
List<Integer> hashChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
int expectedPositions,
PagesIndex.Factory pagesIndexFactory,
HashArraySizeSupplier hashArraySizeSupplier)
{
this(operatorContext, lookupSourceFactory, partitionIndex, outputChannels, hashChannels, filterFunctionFactory, sortChannel, searchFunctionFactories, expectedPositions, pagesIndexFactory, hashArraySizeSupplier, DEFAULT_GRANULARITY);
this(operatorContext, lookupSourceFactory, partitionIndex, outputChannels, hashChannels, filterFunctionFactory, sortChannel, sortedPositionLinksDescendingOrder, searchFunctionFactories, expectedPositions, pagesIndexFactory, hashArraySizeSupplier, DEFAULT_GRANULARITY);
}

@VisibleForTesting
Expand All @@ -199,6 +205,7 @@ public HashBuilderOperator(
List<Integer> hashChannels,
Optional<JoinFilterFunctionFactory> filterFunctionFactory,
Optional<Integer> sortChannel,
boolean sortedPositionLinksDescendingOrder,
List<JoinFilterFunctionFactory> searchFunctionFactories,
int expectedPositions,
PagesIndex.Factory pagesIndexFactory,
Expand All @@ -211,6 +218,7 @@ public HashBuilderOperator(
this.partitionIndex = partitionIndex;
this.filterFunctionFactory = filterFunctionFactory;
this.sortChannel = sortChannel;
this.sortedPositionLinksDescendingOrder = sortedPositionLinksDescendingOrder;
this.searchFunctionFactories = searchFunctionFactories;
this.localUserMemoryContext = new CoarseGrainLocalMemoryContext(operatorContext.localUserMemoryContext(), memorySyncThreshold);

Expand Down Expand Up @@ -345,7 +353,7 @@ private void disposeLookupSourceIfRequested()
private LookupSourceSupplier buildLookupSource()
{
checkState(index != null, "index is null");
LookupSourceSupplier partition = index.createLookupSourceSupplier(operatorContext.getSession(), hashChannels, filterFunctionFactory, sortChannel, searchFunctionFactories, Optional.of(outputChannels), hashArraySizeSupplier);
LookupSourceSupplier partition = index.createLookupSourceSupplier(operatorContext.getSession(), hashChannels, filterFunctionFactory, sortChannel, sortedPositionLinksDescendingOrder, searchFunctionFactories, Optional.of(outputChannels), hashArraySizeSupplier);
checkState(lookupSourceSupplier == null, "lookupSourceSupplier is already set");
this.lookupSourceSupplier = partition;
return partition;
Expand Down
Loading
Loading