Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0edf65f
Add partitioned probe support for hash joins
PointKernel Apr 10, 2026
2a3ef4c
Clean up rmm policy mr usage
PointKernel Apr 10, 2026
d89b384
Use custom kernels + file split
PointKernel Apr 14, 2026
903f5ec
Remove redundant thrust::reduce in partitioned join retrieve
PointKernel Apr 14, 2026
c03dd98
Use DEFAULT_JOIN_BLOCK_SIZE and DEFAULT_JOIN_CG_SIZE directly
PointKernel Apr 14, 2026
e586b56
Minor cleanup with modern CCCL
PointKernel Apr 15, 2026
a5879bc
Cleanups
PointKernel Apr 15, 2026
ef0d6f4
Improve retrieve kenrel perf with shared memory buffer
PointKernel Apr 17, 2026
d91c171
Replace full_join_complement with full_join_finalize
PointKernel Apr 17, 2026
e0ecc1b
Use a common full join finalize logic
PointKernel Apr 17, 2026
fd73529
Speed up full_join finalize with consume-in-place and fused compact
PointKernel Apr 17, 2026
9ada51a
Rename full_join_finalize -> finalize_partitioned_full_join; sort CMa…
PointKernel Apr 28, 2026
d11bf76
Address lamarrr review: explicit mode check in benchmark, delete copy…
PointKernel Apr 28, 2026
36f212e
Use cooperative_groups::invoke_one instead of thread_rank() == 0
PointKernel Apr 28, 2026
2d670d2
Rename count_each -> partitioned_count kernel and associated files
PointKernel Apr 28, 2026
444d121
Rename retrieve -> partitioned_retrieve kernel and associated files; …
PointKernel Apr 28, 2026
04bcf07
Rename full_join_finalize.cpp -> partitioned_full_join_finalize.cpp
PointKernel Apr 28, 2026
f656c0b
Rename to finalize_partitioned_full_join.cpp to match API name
PointKernel Apr 28, 2026
3e6c606
Clean up stale comments: remove ported language, fix exclusive scan -…
PointKernel Apr 28, 2026
ed8ad93
Add doc comment to partitioned_count_kernel
PointKernel Apr 28, 2026
bb86c18
Formatting
PointKernel Apr 28, 2026
43d0e82
Address shrshi review nits and rename cnt -> match_count
PointKernel May 4, 2026
7be83f3
Replace raw int64_t with thread_index_type
PointKernel May 8, 2026
e567072
Use left/right naming
PointKernel May 9, 2026
55a3633
Address CodeRabbit review feedback
PointKernel May 11, 2026
f2c9fff
Fix probe_table_num_rows reference in match_context.cu
PointKernel May 19, 2026
3a47653
Use right/left naming for table nouns in finalize_full_join and tests
PointKernel May 19, 2026
87765a3
Simplify partitioned_count_kernel: drop tile.all short-circuit, fuse …
PointKernel May 20, 2026
594a532
Add tile size check + use cuda::std::distance
PointKernel May 20, 2026
c455b21
Merge remote-tracking branch 'upstream/release/26.06' into chunked-ha…
PointKernel May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ add_library(
src/join/filter_join_indices_kernel_null_primitive.cu
src/join/filter_join_indices_kernel_primitive.cu
src/join/filtered_join.cu
src/join/hash_join/finalize_partitioned_full_join.cpp
src/join/hash_join/full_join_match_context.cpp
src/join/hash_join/full_join_retrieve.cu
src/join/hash_join/full_join_size.cu
Expand All @@ -682,6 +683,14 @@ add_library(
src/join/hash_join/left_join_retrieve.cu
src/join/hash_join/left_join_size.cu
src/join/hash_join/match_context.cu
src/join/hash_join/partitioned_count.cu
src/join/hash_join/partitioned_count_outer.cu
src/join/hash_join/partitioned_full_join.cu
src/join/hash_join/partitioned_inner_join.cu
src/join/hash_join/partitioned_join_retrieve.cu
src/join/hash_join/partitioned_left_join.cu
src/join/hash_join/partitioned_retrieve.cu
src/join/hash_join/partitioned_retrieve_outer.cu
src/join/mark_join.cu
src/join/filter_join_indices_jit.cu
src/join/join.cu
Expand Down
34 changes: 27 additions & 7 deletions cpp/benchmarks/join/join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,33 @@ void nvbench_inner_join(nvbench::state& state,
nvbench::enum_type<DataType>>)
{
auto const num_keys = state.get_int64("num_keys");
auto const mode = state.get_string("mode");
auto dtypes = cycle_dtypes(get_type_or_group(static_cast<int32_t>(DataType)), num_keys);

auto join = [](cudf::table_view const& left_input,
cudf::table_view const& right_input,
cudf::null_equality compare_nulls) {
return cudf::inner_join(left_input, right_input, compare_nulls);
};
BM_join<Nullable, join_t::HASH, NullEquality>(state, dtypes, join);
if (mode == "normal") {
auto join = [](cudf::table_view const& left_input,
cudf::table_view const& right_input,
cudf::null_equality compare_nulls) {
return cudf::inner_join(left_input, right_input, compare_nulls);
};
BM_join<Nullable, join_t::HASH, NullEquality>(state, dtypes, join);
} else if (mode == "partitioned") {
// Partitioned code path: build hash join, compute match context, then retrieve the
// entire probe table as a single partition. This exercises the two-phase
// count-then-retrieve flow used for chunked probing.
auto join = [](cudf::table_view const& left_input,
cudf::table_view const& right_input,
cudf::null_equality compare_nulls) {
auto hash_joiner = cudf::hash_join(right_input, compare_nulls);
auto match_ctx = hash_joiner.inner_join_match_context(left_input);
auto part_ctx = cudf::join_partition_context{
std::make_unique<cudf::join_match_context>(std::move(match_ctx)), 0, left_input.num_rows()};
return hash_joiner.partitioned_inner_join(part_ctx);
};
BM_join<Nullable, join_t::HASH, NullEquality>(state, dtypes, join);
} else {
CUDF_FAIL("unrecognized mode: " + mode);
}
}

template <bool Nullable, cudf::null_equality NullEquality, data_type DataType>
Expand Down Expand Up @@ -92,7 +111,8 @@ NVBENCH_BENCH_TYPES(nvbench_inner_join,
.add_int64_axis("num_keys", nvbench::range(1, 5, 1))
.add_int64_axis("left_size", JOIN_SIZE_RANGE)
.add_int64_axis("right_size", JOIN_SIZE_RANGE)
.add_int64_axis("skip_large_sizes", {1});
.add_int64_axis("skip_large_sizes", {1})
.add_string_axis("mode", {"normal", "partitioned"});

NVBENCH_BENCH_TYPES(nvbench_left_join,
NVBENCH_TYPE_AXES(JOIN_NULLABLE_RANGE,
Expand Down
36 changes: 35 additions & 1 deletion cpp/include/cudf/detail/join/hash_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,33 @@ class hash_join {
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

/**
* @copydoc cudf::hash_join::partitioned_inner_join
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_inner_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

/**
* @copydoc cudf::hash_join::partitioned_left_join
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_left_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

/**
* @copydoc cudf::hash_join::partitioned_full_join
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_full_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

private:
bool const _is_empty; ///< true if `_hash_table` is empty
bool const _has_nulls; ///< true if nulls are present in either right table or any left table
Expand All @@ -164,10 +191,17 @@ class hash_join {
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_join_retrieve(join_kind join,
cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;

template <join_kind Join>
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
join_retrieve(cudf::table_view const& left,
join_retrieve(cudf::table_view const& probe,
std::optional<std::size_t> output_size,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const;
Expand Down
113 changes: 113 additions & 0 deletions cpp/include/cudf/join/hash_join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/export.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
Expand Down Expand Up @@ -308,6 +309,118 @@ class hash_join {
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
* @brief Performs an inner join on a partition of the probe table.
*
* This method executes an inner join between a specific partition of the probe table
* (defined by the join_partition_context) and the build table. The context must have been
* previously created by calling inner_join_match_context().
*
* The returned left_indices are relative to the original complete probe table, not just the
* partition, so they can be used directly with the original probe table.
*
* @throw std::invalid_argument If `context.left_table_context` is null, if its
* `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds
* of the left table.
*
* @param context The partition context containing match information and partition bounds
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the join indices' device memory
*
* @return A pair of device vectors [`left_indices`, `right_indices`] for this partition
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
Comment thread
wence- marked this conversation as resolved.
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_inner_join(
cudf::join_partition_context const& context,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
* @brief Performs a left join on a partition of the probe table.
Comment thread
PointKernel marked this conversation as resolved.
*
* This method executes a left join between a specific partition of the probe table
* (defined by the join_partition_context) and the build table. The context must have been
* previously created by calling left_join_match_context().
*
* The returned left_indices are relative to the original complete probe table.
*
* @throw std::invalid_argument If `context.left_table_context` is null, if its
* `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds
* of the left table.
*
* @param context The partition context containing match information and partition bounds
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the join indices' device memory
*
* @return A pair of device vectors [`left_indices`, `right_indices`] for this partition
Comment thread
PointKernel marked this conversation as resolved.
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_left_join(
cudf::join_partition_context const& context,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
* @brief Performs a full join probe on a partition of the probe table.
*
* This method executes the probe-side of a full join between a specific partition of the probe
* table (defined by the join_partition_context) and the build table. The context must have been
* previously created by calling full_join_match_context().
*
* @note This method does NOT include unmatched build rows (the complement). After all
* partitions have been processed, pass the collected results to
* `finalize_partitioned_full_join()` to obtain the complete full join output.
*
* The returned left_indices are relative to the original complete probe table.
*
* @throw std::invalid_argument If `context.left_table_context` is null, if its
* `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds
* of the left table.
*
* @param context The partition context containing match information and partition bounds
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the join indices' device memory
*
* @return A pair of device vectors [`left_indices`, `right_indices`] for this partition
*/
[[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
partitioned_full_join(
cudf::join_partition_context const& context,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const;

/**
* @brief Finalizes a partitioned full join by concatenating all per-partition results
* and appending the unmatched right rows (the complement).
*
* Call this method after calling `partitioned_full_join()` for every partition. It combines
* the per-partition indices with the unmatched right row indices (a global property
* across all partitions) and returns a single `(left_indices, right_indices)` pair equivalent
* to the output of `full_join()`.
Comment thread
wence- marked this conversation as resolved.
*
* @param left_partials Per-partition `left_indices` views produced by `partitioned_full_join()`
* @param right_partials Per-partition `right_indices` views produced by `partitioned_full_join()`
* @param left_table_num_rows Total number of rows in the original left table
* @param right_table_num_rows Total number of rows in the right table
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the result device memory
*
* @return A pair of device vectors [`left_indices`, `right_indices`] representing the full
* join output.
*/
[[nodiscard]] static std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
finalize_partitioned_full_join(
cudf::host_span<cudf::device_span<size_type const> const> left_partials,
cudf::host_span<cudf::device_span<size_type const> const> right_partials,
size_type left_table_num_rows,
size_type right_table_num_rows,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());
Comment thread
PointKernel marked this conversation as resolved.

private:
std::unique_ptr<impl_type const> _impl;
};
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/join/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ struct join_match_context {
: _left_table{left_table}, _match_counts{std::move(match_counts)}
{
}
join_match_context(join_match_context const&) = delete;
join_match_context& operator=(join_match_context const&) = delete;
join_match_context(join_match_context&&) = default; ///< Move constructor
/**
* @brief Move assignment operator
* @return Reference to this object
*/
join_match_context& operator=(join_match_context&&) = default;
virtual ~join_match_context() = default; ///< Virtual destructor for proper polymorphic deletion
};

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rmm/cuda_stream_view.hpp>

#include <optional>
#include <vector>

namespace cudf {
namespace detail {
Expand Down Expand Up @@ -267,9 +268,8 @@ conditional_join(table_view const& left,
// For full joins, get the indices in the right table that were not joined to
// by any row in the left table.
if (join_type == join_kind::FULL_JOIN) {
auto complement_indices = detail::get_left_join_indices_complement(
join_indices.second, left.num_rows(), right.num_rows(), stream, mr);
join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream);
join_indices = detail::finalize_full_join(
std::move(join_indices), left.num_rows(), right.num_rows(), stream, mr);
}
return join_indices;
}
Expand Down
35 changes: 35 additions & 0 deletions cpp/src/join/hash_join/finalize_partitioned_full_join.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

#include "join/join_common_utils.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/join/hash_join.hpp>
#include <cudf/join/join.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/memory_resource.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>

namespace cudf {

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
hash_join::finalize_partitioned_full_join(
cudf::host_span<cudf::device_span<size_type const> const> left_partials,
cudf::host_span<cudf::device_span<size_type const> const> right_partials,
size_type left_table_num_rows,
size_type right_table_num_rows,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return cudf::detail::finalize_full_join(
left_partials, right_partials, left_table_num_rows, right_table_num_rows, stream, mr);
}

} // namespace cudf
30 changes: 30 additions & 0 deletions cpp/src/join/hash_join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,34 @@ cudf::join_match_context hash_join::full_join_match_context(cudf::table_view con
return _impl->full_join_match_context(left, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
hash_join::partitioned_inner_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
CUDF_FUNC_RANGE();
return _impl->partitioned_inner_join(context, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
hash_join::partitioned_left_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
CUDF_FUNC_RANGE();
return _impl->partitioned_left_join(context, stream, mr);
}

std::pair<std::unique_ptr<rmm::device_uvector<size_type>>,
std::unique_ptr<rmm::device_uvector<size_type>>>
hash_join::partitioned_full_join(cudf::join_partition_context const& context,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr) const
{
CUDF_FUNC_RANGE();
return _impl->partitioned_full_join(context, stream, mr);
}

} // namespace cudf
24 changes: 24 additions & 0 deletions cpp/src/join/hash_join/kernels_common.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/

// Custom hash-join probe kernels that give cudf direct control over kernel launches.
// Uses the cuco ref type for hash-table access (storage, probing scheme, predicate).

#pragma once

#include "join/join_common_utils.hpp"

#include <cudf/detail/join/join.hpp>
#include <cudf/hashing.hpp>
#include <cudf/types.hpp>

#include <cuco/pair.cuh>

namespace cudf::detail {

/// The probe key type stored in the hash table: {hash_value, row_index}.
using probe_key_type = cuco::pair<hash_value_type, size_type>;

} // namespace cudf::detail
Loading
Loading