diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c09b411f3b6..33ed665c0c9 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -681,6 +681,7 @@ add_library( src/join/filter_join_indices_kernel_null_primitive.cu src/join/filter_join_indices_kernel_primitive.cu src/join/filtered_join.cu + src/join/hash_join/finalize_partitioned_full_join.cpp src/join/hash_join/full_join_match_context.cpp src/join/hash_join/full_join_retrieve.cu src/join/hash_join/full_join_size.cu @@ -693,6 +694,14 @@ add_library( src/join/hash_join/left_join_retrieve.cu src/join/hash_join/left_join_size.cu src/join/hash_join/match_context.cu + src/join/hash_join/partitioned_count.cu + src/join/hash_join/partitioned_count_outer.cu + src/join/hash_join/partitioned_full_join.cu + src/join/hash_join/partitioned_inner_join.cu + src/join/hash_join/partitioned_join_retrieve.cu + src/join/hash_join/partitioned_left_join.cu + src/join/hash_join/partitioned_retrieve.cu + src/join/hash_join/partitioned_retrieve_outer.cu src/join/mark_join.cu src/join/filter_join_indices_jit.cu src/join/join.cu diff --git a/cpp/benchmarks/join/join.cu b/cpp/benchmarks/join/join.cu index 88d2cf22dcd..13ee5533130 100644 --- a/cpp/benchmarks/join/join.cu +++ b/cpp/benchmarks/join/join.cu @@ -16,14 +16,33 @@ void nvbench_inner_join(nvbench::state& state, nvbench::enum_type>) { auto const num_keys = state.get_int64("num_keys"); + auto const mode = state.get_string("mode"); auto dtypes = cycle_dtypes(get_type_or_group(static_cast(DataType)), num_keys); - auto join = [](cudf::table_view const& left_input, - cudf::table_view const& right_input, - cudf::null_equality compare_nulls) { - return cudf::inner_join(left_input, right_input, compare_nulls); - }; - BM_join(state, dtypes, join); + if (mode == "normal") { + auto join = [](cudf::table_view const& left_input, + cudf::table_view const& right_input, + cudf::null_equality compare_nulls) { + return cudf::inner_join(left_input, right_input, compare_nulls); + }; + BM_join(state, dtypes, join); + } else if (mode == "partitioned") { + // Partitioned code path: build hash join, compute match context, then retrieve the + // entire probe table as a single partition. This exercises the two-phase + // count-then-retrieve flow used for chunked probing. + auto join = [](cudf::table_view const& left_input, + cudf::table_view const& right_input, + cudf::null_equality compare_nulls) { + auto hash_joiner = cudf::hash_join(right_input, compare_nulls); + auto match_ctx = hash_joiner.inner_join_match_context(left_input); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, left_input.num_rows()}; + return hash_joiner.partitioned_inner_join(part_ctx); + }; + BM_join(state, dtypes, join); + } else { + CUDF_FAIL("unrecognized mode: " + mode); + } } template @@ -92,7 +111,8 @@ NVBENCH_BENCH_TYPES(nvbench_inner_join, .add_int64_axis("num_keys", nvbench::range(1, 5, 1)) .add_int64_axis("left_size", JOIN_SIZE_RANGE) .add_int64_axis("right_size", JOIN_SIZE_RANGE) - .add_int64_axis("skip_large_sizes", {1}); + .add_int64_axis("skip_large_sizes", {1}) + .add_string_axis("mode", {"normal", "partitioned"}); NVBENCH_BENCH_TYPES(nvbench_left_join, NVBENCH_TYPE_AXES(JOIN_NULLABLE_RANGE, diff --git a/cpp/include/cudf/detail/join/hash_join.hpp b/cpp/include/cudf/detail/join/hash_join.hpp index b1b96ca7218..81ab53fd5f3 100644 --- a/cpp/include/cudf/detail/join/hash_join.hpp +++ b/cpp/include/cudf/detail/join/hash_join.hpp @@ -149,6 +149,33 @@ class hash_join { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const; + /** + * @copydoc cudf::hash_join::partitioned_inner_join + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_inner_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + + /** + * @copydoc cudf::hash_join::partitioned_left_join + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_left_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + + /** + * @copydoc cudf::hash_join::partitioned_full_join + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_full_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + private: bool const _is_empty; ///< true if `_hash_table` is empty bool const _has_nulls; ///< true if nulls are present in either right table or any left table @@ -164,10 +191,17 @@ class hash_join { rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const; + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_join_retrieve(join_kind join, + cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + template [[nodiscard]] std::pair>, std::unique_ptr>> - join_retrieve(cudf::table_view const& left, + join_retrieve(cudf::table_view const& probe, std::optional output_size, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const; diff --git a/cpp/include/cudf/join/hash_join.hpp b/cpp/include/cudf/join/hash_join.hpp index 0865fb784cb..d765d5d807d 100644 --- a/cpp/include/cudf/join/hash_join.hpp +++ b/cpp/include/cudf/join/hash_join.hpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -308,6 +309,118 @@ class hash_join { rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const; + /** + * @brief Performs an inner join on a partition of the probe table. + * + * This method executes an inner join between a specific partition of the probe table + * (defined by the join_partition_context) and the build table. The context must have been + * previously created by calling inner_join_match_context(). + * + * The returned left_indices are relative to the original complete probe table, not just the + * partition, so they can be used directly with the original probe table. + * + * @throw std::invalid_argument If `context.left_table_context` is null, if its + * `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds + * of the left table. + * + * @param context The partition context containing match information and partition bounds + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the join indices' device memory + * + * @return A pair of device vectors [`left_indices`, `right_indices`] for this partition + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_inner_join( + cudf::join_partition_context const& context, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const; + + /** + * @brief Performs a left join on a partition of the probe table. + * + * This method executes a left join between a specific partition of the probe table + * (defined by the join_partition_context) and the build table. The context must have been + * previously created by calling left_join_match_context(). + * + * The returned left_indices are relative to the original complete probe table. + * + * @throw std::invalid_argument If `context.left_table_context` is null, if its + * `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds + * of the left table. + * + * @param context The partition context containing match information and partition bounds + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the join indices' device memory + * + * @return A pair of device vectors [`left_indices`, `right_indices`] for this partition + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_left_join( + cudf::join_partition_context const& context, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const; + + /** + * @brief Performs a full join probe on a partition of the probe table. + * + * This method executes the probe-side of a full join between a specific partition of the probe + * table (defined by the join_partition_context) and the build table. The context must have been + * previously created by calling full_join_match_context(). + * + * @note This method does NOT include unmatched build rows (the complement). After all + * partitions have been processed, pass the collected results to + * `finalize_partitioned_full_join()` to obtain the complete full join output. + * + * The returned left_indices are relative to the original complete probe table. + * + * @throw std::invalid_argument If `context.left_table_context` is null, if its + * `_match_counts` is null, or if `[left_start_idx, left_end_idx)` is outside the bounds + * of the left table. + * + * @param context The partition context containing match information and partition bounds + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the join indices' device memory + * + * @return A pair of device vectors [`left_indices`, `right_indices`] for this partition + */ + [[nodiscard]] std::pair>, + std::unique_ptr>> + partitioned_full_join( + cudf::join_partition_context const& context, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()) const; + + /** + * @brief Finalizes a partitioned full join by concatenating all per-partition results + * and appending the unmatched right rows (the complement). + * + * Call this method after calling `partitioned_full_join()` for every partition. It combines + * the per-partition indices with the unmatched right row indices (a global property + * across all partitions) and returns a single `(left_indices, right_indices)` pair equivalent + * to the output of `full_join()`. + * + * @param left_partials Per-partition `left_indices` views produced by `partitioned_full_join()` + * @param right_partials Per-partition `right_indices` views produced by `partitioned_full_join()` + * @param left_table_num_rows Total number of rows in the original left table + * @param right_table_num_rows Total number of rows in the right table + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the result device memory + * + * @return A pair of device vectors [`left_indices`, `right_indices`] representing the full + * join output. + */ + [[nodiscard]] static std::pair>, + std::unique_ptr>> + finalize_partitioned_full_join( + cudf::host_span const> left_partials, + cudf::host_span const> right_partials, + size_type left_table_num_rows, + size_type right_table_num_rows, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + private: std::unique_ptr _impl; }; diff --git a/cpp/include/cudf/join/join.hpp b/cpp/include/cudf/join/join.hpp index 86bc2a5a8f5..16ed0ad0e2e 100644 --- a/cpp/include/cudf/join/join.hpp +++ b/cpp/include/cudf/join/join.hpp @@ -87,6 +87,14 @@ struct join_match_context { : _left_table{left_table}, _match_counts{std::move(match_counts)} { } + join_match_context(join_match_context const&) = delete; + join_match_context& operator=(join_match_context const&) = delete; + join_match_context(join_match_context&&) = default; ///< Move constructor + /** + * @brief Move assignment operator + * @return Reference to this object + */ + join_match_context& operator=(join_match_context&&) = default; virtual ~join_match_context() = default; ///< Virtual destructor for proper polymorphic deletion }; diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu index 96116028ba9..d45912139c4 100644 --- a/cpp/src/join/conditional_join.cu +++ b/cpp/src/join/conditional_join.cu @@ -23,6 +23,7 @@ #include #include +#include namespace cudf { namespace detail { @@ -267,9 +268,8 @@ conditional_join(table_view const& left, // For full joins, get the indices in the right table that were not joined to // by any row in the left table. if (join_type == join_kind::FULL_JOIN) { - auto complement_indices = detail::get_left_join_indices_complement( - join_indices.second, left.num_rows(), right.num_rows(), stream, mr); - join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); + join_indices = detail::finalize_full_join( + std::move(join_indices), left.num_rows(), right.num_rows(), stream, mr); } return join_indices; } diff --git a/cpp/src/join/hash_join/finalize_partitioned_full_join.cpp b/cpp/src/join/hash_join/finalize_partitioned_full_join.cpp new file mode 100644 index 00000000000..c5e36c5be74 --- /dev/null +++ b/cpp/src/join/hash_join/finalize_partitioned_full_join.cpp @@ -0,0 +1,35 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "join/join_common_utils.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace cudf { + +std::pair>, + std::unique_ptr>> +hash_join::finalize_partitioned_full_join( + cudf::host_span const> left_partials, + cudf::host_span const> right_partials, + size_type left_table_num_rows, + size_type right_table_num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return cudf::detail::finalize_full_join( + left_partials, right_partials, left_table_num_rows, right_table_num_rows, stream, mr); +} + +} // namespace cudf diff --git a/cpp/src/join/hash_join/hash_join.cu b/cpp/src/join/hash_join/hash_join.cu index 5c7679ae588..163b558eb18 100644 --- a/cpp/src/join/hash_join/hash_join.cu +++ b/cpp/src/join/hash_join/hash_join.cu @@ -259,4 +259,34 @@ cudf::join_match_context hash_join::full_join_match_context(cudf::table_view con return _impl->full_join_match_context(left, stream, mr); } +std::pair>, + std::unique_ptr>> +hash_join::partitioned_inner_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + CUDF_FUNC_RANGE(); + return _impl->partitioned_inner_join(context, stream, mr); +} + +std::pair>, + std::unique_ptr>> +hash_join::partitioned_left_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + CUDF_FUNC_RANGE(); + return _impl->partitioned_left_join(context, stream, mr); +} + +std::pair>, + std::unique_ptr>> +hash_join::partitioned_full_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + CUDF_FUNC_RANGE(); + return _impl->partitioned_full_join(context, stream, mr); +} + } // namespace cudf diff --git a/cpp/src/join/hash_join/kernels_common.cuh b/cpp/src/join/hash_join/kernels_common.cuh new file mode 100644 index 00000000000..cd74a5bde4e --- /dev/null +++ b/cpp/src/join/hash_join/kernels_common.cuh @@ -0,0 +1,24 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +// Custom hash-join probe kernels that give cudf direct control over kernel launches. +// Uses the cuco ref type for hash-table access (storage, probing scheme, predicate). + +#pragma once + +#include "join/join_common_utils.hpp" + +#include +#include +#include + +#include + +namespace cudf::detail { + +/// The probe key type stored in the hash table: {hash_value, row_index}. +using probe_key_type = cuco::pair; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/match_context.cu b/cpp/src/join/hash_join/match_context.cu index b6653be623c..5fc2dd5ba9c 100644 --- a/cpp/src/join/hash_join/match_context.cu +++ b/cpp/src/join/hash_join/match_context.cu @@ -6,24 +6,19 @@ #include "common.cuh" #include "dispatch.cuh" #include "join/join_common_utils.cuh" +#include "partitioned_count_kernels.hpp" #include #include #include +#include #include -#include +#include namespace cudf::detail { -namespace { -/// Functor that ensures a minimum count of 1 for LEFT/FULL join match counts. -struct clamp_zero_to_one { - __device__ size_type operator()(size_type count) const { return count == 0 ? 1 : count; } -}; -} // namespace - std::unique_ptr> make_join_match_counts( table_view const& right, std::shared_ptr const& preprocessed_right, @@ -55,24 +50,23 @@ std::unique_ptr> make_join_match_counts( auto const left_table_num_rows = left.num_rows(); auto count_matches = [&](auto equality, auto d_hasher) { - auto const iter = cudf::detail::make_counting_transform_iterator(0, pair_fn{d_hasher}); + // Precompute left keys: {hash(row_idx), row_idx} for each left row. + auto const n = static_cast(left_table_num_rows); + rmm::device_uvector left_keys(n, stream); + thrust::transform(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), + cuda::counting_iterator(0), + cuda::counting_iterator(left_table_num_rows), + left_keys.begin(), + pair_fn{d_hasher}); + + auto const ref = hash_table.ref(cuco::op::count) + .rebind_key_eq(equality) + .rebind_hash_function(hash_table.hash_function()); if (join == join_kind::INNER_JOIN) { - hash_table.count_each(iter, - iter + left_table_num_rows, - equality, - hash_table.hash_function(), - match_counts->begin(), - stream.value()); + launch_partitioned_count(left_keys.data(), n, match_counts->begin(), ref, stream); } else { - // For LEFT/FULL joins, fuse the clamp into the output to avoid a separate kernel launch. - auto const output = - thrust::make_transform_output_iterator(match_counts->begin(), clamp_zero_to_one{}); - hash_table.count_each(iter, - iter + left_table_num_rows, - equality, - hash_table.hash_function(), - output, - stream.value()); + // IsOuter=true handles the clamp (zero → 1) for LEFT/FULL joins internally. + launch_partitioned_count(left_keys.data(), n, match_counts->begin(), ref, stream); } }; diff --git a/cpp/src/join/hash_join/partitioned_count.cu b/cpp/src/join/hash_join/partitioned_count.cu new file mode 100644 index 00000000000..67af2bf0d05 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_count.cu @@ -0,0 +1,23 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "partitioned_count_kernels.cuh" +#include "ref_types.cuh" + +namespace cudf::detail { + +template void launch_partitioned_count(probe_key_type const*, + thread_index_type, + size_type*, + primitive_count_ref_t, + rmm::cuda_stream_view); + +template void launch_partitioned_count( + probe_key_type const*, thread_index_type, size_type*, nested_count_ref_t, rmm::cuda_stream_view); + +template void launch_partitioned_count( + probe_key_type const*, thread_index_type, size_type*, flat_count_ref_t, rmm::cuda_stream_view); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_count_kernels.cuh b/cpp/src/join/hash_join/partitioned_count_kernels.cuh new file mode 100644 index 00000000000..f9d3809a0a4 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_count_kernels.cuh @@ -0,0 +1,94 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "kernels_common.cuh" + +#include + +#include + +#include +#include + +namespace cudf::detail { + +/** + * @brief Count matching build-side rows for each probe key. + * + * Each probing tile (@p cg_size threads) calls `ref.count()` for one probe key + * and reduces the per-lane counts across the tile with a warp reduce. The result + * is written to @p output by a single elected thread via `invoke_one`. If + * @p IsOuter is true, keys with zero matches are recorded as 1 so every probe + * row contributes at least one output row in the subsequent retrieve pass. + * + * This is the first phase of the two-phase partitioned join: count then retrieve. + * The output array is consumed by `launch_partitioned_retrieve` to pre-allocate + * the output index buffers. + * + * @tparam IsOuter If true, zero-match keys produce a count of 1 + * @tparam Ref cuco open-addressing reference type (carries hash, equality, storage) + * @param keys Packed probe keys: `.first` = hash, `.second` = probe row index + * @param n Number of probe keys + * @param output Per-key match count output (one entry per probe key) + * @param ref cuco hash-table reference for counting + */ +template +CUDF_KERNEL void __launch_bounds__(DEFAULT_JOIN_BLOCK_SIZE) + partitioned_count_kernel(probe_key_type const* __restrict__ keys, + thread_index_type n, + size_type* __restrict__ output, + Ref ref) +{ + auto constexpr cg_size = DEFAULT_JOIN_CG_SIZE; + + auto idx = grid_1d::global_thread_id() / cg_size; + auto const stride = grid_1d::grid_stride() / cg_size; + + while (idx < n) { + auto const key = keys[idx]; + if constexpr (cg_size == 1) { + auto const match_count = ref.count(key); + if constexpr (IsOuter) { + output[idx] = (match_count == 0) ? size_type{1} : match_count; + } else { + output[idx] = match_count; + } + } else { + auto const tile = + cooperative_groups::tiled_partition(cooperative_groups::this_thread_block()); + auto const temp_count = static_cast(ref.count(tile, key)); + auto const match_count = + cooperative_groups::reduce(tile, temp_count, cooperative_groups::plus()); + cooperative_groups::invoke_one(tile, [&]() { + if constexpr (IsOuter) { + output[idx] = (match_count == 0) ? size_type{1} : match_count; + } else { + output[idx] = match_count; + } + }); + } + idx += stride; + } +} + +template +void launch_partitioned_count(probe_key_type const* keys, + thread_index_type n, + size_type* output, + Ref ref, + rmm::cuda_stream_view stream) +{ + if (n == 0) { return; } + + auto const config = + grid_1d{static_cast(n * DEFAULT_JOIN_CG_SIZE), DEFAULT_JOIN_BLOCK_SIZE}; + + partitioned_count_kernel + <<>>(keys, n, output, ref); +} + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_count_kernels.hpp b/cpp/src/join/hash_join/partitioned_count_kernels.hpp new file mode 100644 index 00000000000..d6bb30b7f00 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_count_kernels.hpp @@ -0,0 +1,22 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "kernels_common.cuh" + +#include + +namespace cudf::detail { + +/// Launch the partitioned_count kernel. +template +void launch_partitioned_count(probe_key_type const* keys, + thread_index_type n, + size_type* output, + Ref ref, + rmm::cuda_stream_view stream); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_count_outer.cu b/cpp/src/join/hash_join/partitioned_count_outer.cu new file mode 100644 index 00000000000..3f4204cb1fa --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_count_outer.cu @@ -0,0 +1,23 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "partitioned_count_kernels.cuh" +#include "ref_types.cuh" + +namespace cudf::detail { + +template void launch_partitioned_count(probe_key_type const*, + thread_index_type, + size_type*, + primitive_count_ref_t, + rmm::cuda_stream_view); + +template void launch_partitioned_count( + probe_key_type const*, thread_index_type, size_type*, nested_count_ref_t, rmm::cuda_stream_view); + +template void launch_partitioned_count( + probe_key_type const*, thread_index_type, size_type*, flat_count_ref_t, rmm::cuda_stream_view); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_full_join.cu b/cpp/src/join/hash_join/partitioned_full_join.cu new file mode 100644 index 00000000000..06a77a46121 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_full_join.cu @@ -0,0 +1,26 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common.cuh" + +namespace cudf::detail { + +template +std::pair>, + std::unique_ptr>> +hash_join::partitioned_full_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + return this->partitioned_join_retrieve(join_kind::FULL_JOIN, context, stream, mr); +} + +template std::pair>, + std::unique_ptr>> +hash_join::partitioned_full_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_inner_join.cu b/cpp/src/join/hash_join/partitioned_inner_join.cu new file mode 100644 index 00000000000..26a0f873de9 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_inner_join.cu @@ -0,0 +1,26 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common.cuh" + +namespace cudf::detail { + +template +std::pair>, + std::unique_ptr>> +hash_join::partitioned_inner_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + return this->partitioned_join_retrieve(join_kind::INNER_JOIN, context, stream, mr); +} + +template std::pair>, + std::unique_ptr>> +hash_join::partitioned_inner_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_join_retrieve.cu b/cpp/src/join/hash_join/partitioned_join_retrieve.cu new file mode 100644 index 00000000000..77dcae1e932 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_join_retrieve.cu @@ -0,0 +1,164 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common.cuh" +#include "dispatch.cuh" +#include "join/join_common_utils.cuh" +#include "join/join_common_utils.hpp" +#include "partitioned_retrieve_kernels.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace cudf::detail { +namespace { + +/** + * @brief Returns trivial left/right index pairs for an outer join when the build side is empty. + */ +std::pair>, + std::unique_ptr>> +make_trivial_outer_indices(size_type left_start_idx, + size_type partition_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto left_indices = std::make_unique>(partition_size, stream, mr); + auto right_indices = std::make_unique>(partition_size, stream, mr); + auto out = cuda::zip_iterator(left_indices->begin(), right_indices->begin()); + thrust::tabulate(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), + out, + out + partition_size, + cuda::proclaim_return_type>( + [left_start_idx] __device__(auto i) { + return cuda::std::tuple{static_cast(left_start_idx + i), + JoinNoMatch}; + })); + return std::pair(std::move(left_indices), std::move(right_indices)); +} + +} // namespace + +template +std::pair>, + std::unique_ptr>> +hash_join::partitioned_join_retrieve(join_kind join, + cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + CUDF_FUNC_RANGE(); + + CUDF_EXPECTS( + join == join_kind::INNER_JOIN || join == join_kind::LEFT_JOIN || join == join_kind::FULL_JOIN, + "Unsupported join kind for partitioned retrieve"); + + CUDF_EXPECTS(context.left_table_context != nullptr, + "join_partition_context is missing left_table_context", + std::invalid_argument); + + auto const& match_ctx = *context.left_table_context; + auto const left_start_idx = context.left_start_idx; + auto const left_end_idx = context.left_end_idx; + + CUDF_EXPECTS(match_ctx._match_counts != nullptr, + "join_match_context is missing match counts", + std::invalid_argument); + CUDF_EXPECTS(left_start_idx >= 0 && left_end_idx >= left_start_idx && + left_end_idx <= match_ctx._left_table.num_rows(), + "Invalid partition bounds", + std::invalid_argument); + + // Empty partition + if (left_start_idx >= left_end_idx) { + return std::pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); + } + + auto const partition_size = left_end_idx - left_start_idx; + + // Trivial case: build table is empty + if (_is_empty) { + if (join == join_kind::INNER_JOIN) { + return std::pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); + } else { + return make_trivial_outer_indices(left_start_idx, partition_size, stream, mr); + } + } + + // Slice the left table to the partition range + auto const left_partition_view = + cudf::slice(match_ctx._left_table, {left_start_idx, left_end_idx})[0]; + + validate_hash_join_probe(_right, left_partition_view, _has_nulls); + + auto const preprocessed_left = + cudf::detail::row::equality::preprocessed_table::create(left_partition_view, stream); + + // For FULL_JOIN, probe with LEFT_JOIN semantics (no complement here) + bool const is_outer = (join != join_kind::INNER_JOIN); + + // launch_partitioned_retrieve reduces match counts to compute output size + // (total = last_offset + last_count), allocates output buffers, and launches the kernel. + auto const* partition_counts = match_ctx._match_counts->data() + left_start_idx; + auto const n = static_cast(partition_size); + + std::pair>, + std::unique_ptr>> + join_indices; + + auto retrieve_partition = [&](auto equality, auto d_hasher) { + // Precompute left keys for this partition slice. + rmm::device_uvector left_keys(n, stream); + thrust::transform(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), + cuda::counting_iterator(0), + cuda::counting_iterator(partition_size), + left_keys.begin(), + pair_fn{d_hasher}); + + auto const ref = _impl->_hash_table.ref(cuco::op::count) + .rebind_key_eq(equality) + .rebind_hash_function(_impl->_hash_table.hash_function()); + + if (is_outer) { + join_indices = launch_partitioned_retrieve( + left_keys.data(), n, partition_counts, ref, left_start_idx, stream, mr); + } else { + join_indices = launch_partitioned_retrieve( + left_keys.data(), n, partition_counts, ref, left_start_idx, stream, mr); + } + }; + + dispatch_join_comparator(_right, + left_partition_view, + _preprocessed_right, + preprocessed_left, + _has_nulls, + _nulls_equal, + retrieve_partition); + + return join_indices; +} + +template std::pair>, + std::unique_ptr>> +hash_join::partitioned_join_retrieve(join_kind, + cudf::join_partition_context const&, + rmm::cuda_stream_view, + rmm::device_async_resource_ref) const; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_left_join.cu b/cpp/src/join/hash_join/partitioned_left_join.cu new file mode 100644 index 00000000000..1200a9f6a0e --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_left_join.cu @@ -0,0 +1,26 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "common.cuh" + +namespace cudf::detail { + +template +std::pair>, + std::unique_ptr>> +hash_join::partitioned_left_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const +{ + return this->partitioned_join_retrieve(join_kind::LEFT_JOIN, context, stream, mr); +} + +template std::pair>, + std::unique_ptr>> +hash_join::partitioned_left_join(cudf::join_partition_context const& context, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) const; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_retrieve.cu b/cpp/src/join/hash_join/partitioned_retrieve.cu new file mode 100644 index 00000000000..efb200f4c89 --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_retrieve.cu @@ -0,0 +1,41 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "partitioned_retrieve_kernels.cuh" +#include "ref_types.cuh" + +namespace cudf::detail { + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + primitive_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + nested_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + flat_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh b/cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh new file mode 100644 index 00000000000..7e822fed12b --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh @@ -0,0 +1,257 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "kernels_common.cuh" + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace cudf::detail { + +/** + * @brief Count the number of set bits below a given position in a bitmask. + */ +__device__ __forceinline__ int count_lower_set_bits(unsigned int mask, int pos) +{ + return cuda::std::popcount(mask & ((1u << pos) - 1)); +} + +/** + * @brief Retrieve matching build-side rows for each probe key. + * + * Each probing tile (@p cg_size threads) walks the hash table for one probe key, + * collecting matches via warp ballot. Matches are staged in a per-flushing-tile (warp) + * shared-memory buffer instead of being written directly to global memory. When the buffer + * nears capacity, the flushing tile claims a contiguous range in the global output arrays + * via a single atomic and flushes with coalesced writes, amortising atomic overhead across + * many matches. If @p IsOuter is true, probe rows with no matches emit a + * `(left_index, JoinNoMatch)` pair. + * + * @tparam IsOuter If true, unmatched probe rows emit a null-padded output row + * @tparam Ref cuco open-addressing reference type (carries hash, equality, storage) + * @param keys Packed left keys: `.first` = hash, `.second` = left row index + * @param n Number of probe keys + * @param left_offset Added to each probe row index to produce an absolute left index + * @param left_output Output buffer for left (probe-side) row indices + * @param right_output Output buffer for right (build-side) row indices + * @param output_counter Global atomic counter tracking total pairs written so far + * @param ref cuco hash-table reference for probing + */ +template +CUDF_KERNEL void __launch_bounds__(DEFAULT_JOIN_BLOCK_SIZE) + partitioned_retrieve_kernel(probe_key_type const* __restrict__ keys, + thread_index_type n, + size_type left_offset, + size_type* __restrict__ left_output, + size_type* __restrict__ right_output, + size_type* __restrict__ output_counter, + Ref ref) +{ + namespace cg = cooperative_groups; + + auto constexpr cg_size = Ref::cg_size; + auto constexpr bucket_size = Ref::bucket_size; + auto constexpr flushing_tile_size = 32; // full warp for coalesced flushes + static_assert(flushing_tile_size >= cg_size); + static_assert(flushing_tile_size % cg_size == 0, + "Every probing tile must sit inside a single flushing tile"); + static_assert(DEFAULT_JOIN_BLOCK_SIZE % flushing_tile_size == 0); + + auto constexpr num_flushing_tiles = DEFAULT_JOIN_BLOCK_SIZE / flushing_tile_size; + auto constexpr tiles_in_block = DEFAULT_JOIN_BLOCK_SIZE / cg_size; + auto constexpr max_matches_per_step = flushing_tile_size * bucket_size; + // buffer_size leaves headroom so one full probing step can't overflow. + auto constexpr buffer_size = max_matches_per_step + flushing_tile_size; + + using index_pair = cuco::pair; + __shared__ index_pair buffers[num_flushing_tiles][buffer_size]; + __shared__ cuda::std::int32_t counters[num_flushing_tiles]; + + auto const block = cg::this_thread_block(); + auto const flushing_tile = cg::tiled_partition(block); + auto const probing_tile = cg::tiled_partition(block); + auto const flushing_tile_id = flushing_tile.meta_group_rank(); + auto const empty_sentinel = ref.empty_key_sentinel(); + auto const key_equal = ref.key_eq(); + + if (flushing_tile.thread_rank() == 0) { counters[flushing_tile_id] = 0; } + flushing_tile.sync(); + + auto atomic_counter = cuda::atomic_ref{*output_counter}; + + auto flush_buffers = [&](auto const& tile) { + auto const count = counters[flushing_tile_id]; + auto const offset = cg::invoke_one_broadcast(tile, [&]() { + return atomic_counter.fetch_add(static_cast(count), cuda::memory_order_relaxed); + }); + auto const rank = tile.thread_rank(); + for (int i = rank; i < count; i += tile.size()) { + left_output[offset + i] = buffers[flushing_tile_id][i].first; + right_output[offset + i] = buffers[flushing_tile_id][i].second; + } + }; + + auto const grid_stride_tiles = static_cast(gridDim.x) * tiles_in_block; + auto idx = + static_cast(blockIdx.x) * tiles_in_block + probing_tile.meta_group_rank(); + + while (flushing_tile.any(idx < n)) { + bool const active = idx < n; + auto const active_flushing_tile = + cg::binary_partition(flushing_tile, active); + + if (active) { + auto const probe_key = keys[idx]; + auto const left_index = probe_key.second + left_offset; + + auto probing_iter = ref.probing_scheme().template make_iterator( + probing_tile, probe_key, ref.storage_ref().extent()); + auto const init_probing_idx = *probing_iter; + + bool running = true; + [[maybe_unused]] bool found_match = false; + + while (active_flushing_tile.any(running)) { + if (running) { + auto const bucket_slots = ref.storage_ref()[*probing_iter]; + + bool equals[bucket_size]; + for (int i = 0; i < bucket_size; ++i) { + equals[i] = false; + if (running) { + if (bucket_slots[i] == empty_sentinel) { + running = false; + } else if (key_equal(probe_key, bucket_slots[i])) { + equals[i] = true; + } + } + } + + probing_tile.sync(); + running = probing_tile.all(running); + + cuda::std::int32_t exists[bucket_size]; + cuda::std::int32_t num_matches[bucket_size]; + cuda::std::int32_t total_matches = 0; + for (int i = 0; i < bucket_size; ++i) { + exists[i] = probing_tile.ballot(equals[i]); + num_matches[i] = cuda::std::popcount(static_cast(exists[i])); + total_matches += num_matches[i]; + } + + auto const lane_id = probing_tile.thread_rank(); + + if (total_matches > 0) { + if constexpr (IsOuter) { found_match = true; } + + cuda::std::int32_t output_idx = 0; + if (lane_id == 0) { + auto shared_ref = cuda::atomic_ref{ + counters[flushing_tile_id]}; + output_idx = shared_ref.fetch_add(total_matches, cuda::memory_order_relaxed); + } + output_idx = probing_tile.shfl(output_idx, 0); + + cuda::std::int32_t matches_offset = 0; + for (int i = 0; i < bucket_size; ++i) { + if (equals[i]) { + auto const lane_offset = count_lower_set_bits(exists[i], lane_id); + buffers[flushing_tile_id][output_idx + matches_offset + lane_offset] = { + left_index, bucket_slots[i].second}; + } + matches_offset += num_matches[i]; + } + } + + if constexpr (IsOuter) { + if (!running && !found_match && lane_id == 0) { + auto shared_ref = cuda::atomic_ref{ + counters[flushing_tile_id]}; + auto const output_idx = shared_ref.fetch_add(1, cuda::memory_order_relaxed); + buffers[flushing_tile_id][output_idx] = {left_index, cudf::JoinNoMatch}; + } + } + } // if running + + active_flushing_tile.sync(); + if (counters[flushing_tile_id] > (buffer_size - max_matches_per_step)) { + flush_buffers(active_flushing_tile); + active_flushing_tile.sync(); + if (active_flushing_tile.thread_rank() == 0) { counters[flushing_tile_id] = 0; } + active_flushing_tile.sync(); + } + + ++probing_iter; + if (*probing_iter == init_probing_idx) { running = false; } + } // while running + } // if active + + idx += grid_stride_tiles; + } // while idx < n + + flushing_tile.sync(); + if (counters[flushing_tile_id] > 0) { flush_buffers(flushing_tile); } +} + +template +std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const* keys, + thread_index_type n, + size_type const* match_counts, + Ref ref, + size_type left_offset, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (n == 0) { + return std::pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); + } + + // Shared-memory buffered retrieve only needs the total output size, not + // per-row offsets. A reduce is cheaper than an exclusive_scan. + auto const total_output = + thrust::reduce(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), + match_counts, + match_counts + n, + size_type{0}); + + if (total_output == 0) { + return std::pair(std::make_unique>(0, stream, mr), + std::make_unique>(0, stream, mr)); + } + + auto left_indices = std::make_unique>(total_output, stream, mr); + auto right_indices = std::make_unique>(total_output, stream, mr); + + // Global atomic counter claimed in bulk by each flushing-tile buffer flush. + cudf::detail::device_scalar output_counter(size_type{0}, stream); + + auto constexpr tiles_in_block = DEFAULT_JOIN_BLOCK_SIZE / Ref::cg_size; + auto const num_blocks = static_cast((n + tiles_in_block - 1) / tiles_in_block); + + partitioned_retrieve_kernel<<>>( + keys, n, left_offset, left_indices->data(), right_indices->data(), output_counter.data(), ref); + + return std::pair(std::move(left_indices), std::move(right_indices)); +} + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_retrieve_kernels.hpp b/cpp/src/join/hash_join/partitioned_retrieve_kernels.hpp new file mode 100644 index 00000000000..edd8816924a --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_retrieve_kernels.hpp @@ -0,0 +1,41 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "kernels_common.cuh" + +#include + +#include +#include + +#include +#include + +namespace cudf::detail { + +/** + * @brief Probes the hash table for each key and writes matching index pairs. + * + * Reduces match_counts to derive the total output size, allocates output buffers, + * and launches the retrieve kernel. `left_offset` is added to each stored probe-row index when + * writing to `left_indices`, so callers can produce indices in the full probe + * table's coordinate space directly from a slice-local `keys` array. + * + * @return A pair of device vectors [left_indices, right_indices]. + */ +template +std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const* keys, + thread_index_type n, + size_type const* match_counts, + Ref ref, + size_type left_offset, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/partitioned_retrieve_outer.cu b/cpp/src/join/hash_join/partitioned_retrieve_outer.cu new file mode 100644 index 00000000000..68b8f591ada --- /dev/null +++ b/cpp/src/join/hash_join/partitioned_retrieve_outer.cu @@ -0,0 +1,41 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "partitioned_retrieve_kernels.cuh" +#include "ref_types.cuh" + +namespace cudf::detail { + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + primitive_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + nested_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +template std::pair>, + std::unique_ptr>> +launch_partitioned_retrieve(probe_key_type const*, + thread_index_type, + size_type const*, + flat_count_ref_t, + size_type, + rmm::cuda_stream_view, + rmm::device_async_resource_ref); + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/ref_types.cuh b/cpp/src/join/hash_join/ref_types.cuh new file mode 100644 index 00000000000..a1511c56dcb --- /dev/null +++ b/cpp/src/join/hash_join/ref_types.cuh @@ -0,0 +1,44 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION. + * SPDX-License-Identifier: Apache-2.0 + */ + +// Type aliases for the cuco hash table ref types and equality comparators +// used across hash join probe kernels. There are 3 dispatch paths: +// primitive, nested, non-nested. + +#pragma once + +#include "dispatch.cuh" +#include "hash_join_impl.cuh" + +#include + +#include + +namespace cudf::detail { + +// --- Equality types from the 3 dispatch paths --- + +using primitive_equality_t = primitive_pair_equal; + +using nested_equality_t = pair_equal>>; + +using flat_equality_t = pair_equal>>; + +// --- Count ref types (used by partitioned_count kernel) --- + +template +using count_ref_t = + decltype(std::declval() + .ref(cuco::op::count) + .rebind_key_eq(std::declval()) + .rebind_hash_function(std::declval().hash_function())); + +using primitive_count_ref_t = count_ref_t; +using nested_count_ref_t = count_ref_t; +using flat_count_ref_t = count_ref_t; + +} // namespace cudf::detail diff --git a/cpp/src/join/hash_join/retrieve_impl.cuh b/cpp/src/join/hash_join/retrieve_impl.cuh index 58b29562a2c..2ad60a33303 100644 --- a/cpp/src/join/hash_join/retrieve_impl.cuh +++ b/cpp/src/join/hash_join/retrieve_impl.cuh @@ -10,9 +10,16 @@ #include "join/join_common_utils.hpp" #include "size_impl.cuh" +#include #include +#include +#include +#include #include +#include +#include + #include #include #include @@ -182,9 +189,8 @@ hash_join::join_retrieve(cudf::table_view const& left, mr); if constexpr (Join == join_kind::FULL_JOIN) { - auto complement_indices = detail::get_left_join_indices_complement( - join_indices.second, left.num_rows(), _right.num_rows(), stream, mr); - return detail::concatenate_vector_pairs(join_indices, complement_indices, stream); + return detail::finalize_full_join( + std::move(join_indices), left.num_rows(), _right.num_rows(), stream, mr); } else { return join_indices; } diff --git a/cpp/src/join/join_common_utils.hpp b/cpp/src/join/join_common_utils.hpp index 6a40692b59f..94ba17d5245 100644 --- a/cpp/src/join/join_common_utils.hpp +++ b/cpp/src/join/join_common_utils.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -42,48 +43,54 @@ VectorPair get_trivial_left_join_indices(table_view const& left, rmm::device_async_resource_ref mr); /** - * @brief Takes two pairs of vectors and returns a single pair where the first - * element is a vector made from concatenating the first elements of both input - * pairs and the second element is a vector made from concatenating the second - * elements of both input pairs. + * @brief Finalize a full-join result from a single `(left, right)` index pair. * - * This function's primary use is for computing the indices of a full join by - * first performing a left join, then separately getting the complementary - * right join indices, then finally calling this function to concatenate the - * results. In this case, each input VectorPair contains the left and right - * indices from a join. + * Takes ownership of `indices`, resizes both vectors to `indices.first->size() + + * right_table_num_rows`, and appends the complement (unmatched right rows paired with + * `JoinNoMatch`) into the tail. The vectors are then resized down to the true output length. * - * Note that this is a destructive operation, in that at least one of a or b - * will be invalidated (by a move) by this operation. Calling code should - * assume that neither input VectorPair is valid after this function executes. + * Used by the non-partitioned full-join paths (hash/mixed/conditional); consuming the caller's + * buffers in-place avoids a redundant concat memcpy over the left-side data. * - * @param a The first pair of vectors. - * @param b The second pair of vectors. - * @param stream CUDA stream used for device memory operations and kernel launches + * @param indices `(left, right)` index vectors (consumed). + * @param left_table_num_rows Number of rows in the left table (0 → every right row is + * unmatched, fast path). + * @param right_table_num_rows Number of rows in the right table. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate working storage. * - * @return A pair of vectors containing the concatenated output. + * @return `[left_indices, right_indices]` of the complete full-join output. */ -VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream); +VectorPair finalize_full_join(VectorPair&& indices, + size_type left_table_num_rows, + size_type right_table_num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); /** - * @brief Creates a table containing the complement of left join indices. + * @brief Finalize a full-join result from per-partition index spans. + * + * Concatenates every `(left_partials[i], right_partials[i])` pair into the head of the output + * and appends the complement (unmatched right rows paired with `JoinNoMatch`) into the tail. + * Internally delegates to the `VectorPair&&` overload, so the mark/compact path is shared. * - * This table has two columns. The first one is filled with `JoinNoMatch` - * and the second one contains values from 0 to right_table_row_count - 1 - * excluding those found in the right_indices column. + * Used by `cudf::hash_join::finalize_partitioned_full_join` for partitioned full joins where the + * partials live in separate buffers and must be gathered. * - * @param right_indices Vector of indices - * @param left_table_row_count Number of rows of left table - * @param right_table_row_count Number of rows of right table + * @param left_partials Per-partition left index spans. + * @param right_partials Per-partition right index spans. + * @param left_table_num_rows Number of rows in the left table. + * @param right_table_num_rows Number of rows in the right table. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned vectors. * - * @return Pair of vectors containing the left join indices complement + * @return `[left_indices, right_indices]` sized `sum(left_partials[i].size()) + num_unmatched`. */ -VectorPair get_left_join_indices_complement( - std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, +VectorPair finalize_full_join( + cudf::host_span const> left_partials, + cudf::host_span const> right_partials, + size_type left_table_num_rows, + size_type right_table_num_rows, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr); diff --git a/cpp/src/join/join_utils.cu b/cpp/src/join/join_utils.cu index 0ebbd61464b..c345eb6a915 100644 --- a/cpp/src/join/join_utils.cu +++ b/cpp/src/join/join_utils.cu @@ -4,10 +4,15 @@ */ #include "join_common_utils.cuh" +#include "join_common_utils.hpp" +#include +#include +#include #include #include #include +#include #include #include @@ -16,12 +21,17 @@ #include #include -#include +#include +#include +#include +#include #include #include #include +#include #include +#include namespace cudf { namespace detail { @@ -45,100 +55,161 @@ VectorPair get_trivial_left_join_indices(table_view const& left, return std::pair(std::move(left_indices), std::move(right_indices)); } -VectorPair concatenate_vector_pairs(VectorPair& a, VectorPair& b, rmm::cuda_stream_view stream) -{ - CUDF_EXPECTS((a.first->size() == a.second->size()), - "Mismatch between sizes of vectors in vector pair"); - CUDF_EXPECTS((b.first->size() == b.second->size()), - "Mismatch between sizes of vectors in vector pair"); - if (a.first->is_empty()) { - return std::move(b); - } else if (b.first->is_empty()) { - return std::move(a); +namespace { + +// Predicate: right row `idx` is unmatched iff its flag slot is zero. +// We use an int32 flag (one per right row) rather than a packed bit or a byte: byte stores +// from a dense 32-wide scatter don't coalesce into full-word transactions, which costs 2–3× +// in the mark kernel for skewed left/right ratios. +struct unmatched_flag { + size_type const* flags; + __device__ bool operator()(size_type idx) const noexcept { return flags[idx] == 0; } +}; + +// Transform a selected (unmatched) right index into a (JoinNoMatch, idx) pair that is stored +// through a zip iterator over (left_out_tail, right_out_tail). +struct to_no_match_pair { + __device__ cuda::std::tuple operator()(size_type idx) const noexcept + { + return cuda::std::make_tuple(cudf::JoinNoMatch, idx); } - auto original_size = a.first->size(); - a.first->resize(a.first->size() + b.first->size(), stream); - a.second->resize(a.second->size() + b.second->size(), stream); - thrust::copy(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - b.first->begin(), - b.first->end(), - a.first->begin() + original_size); - thrust::copy(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - b.second->begin(), - b.second->end(), - a.second->begin() + original_size); - return std::move(a); -} +}; -VectorPair get_left_join_indices_complement( - std::unique_ptr>& right_indices, - size_type left_table_row_count, - size_type right_table_row_count, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) +} // namespace + +VectorPair finalize_full_join(VectorPair&& indices, + size_type left_table_num_rows, + size_type right_table_num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { - // Get array of indices that do not appear in right_indices - - // Vector allocated for unmatched result - auto right_indices_complement = - std::make_unique>(right_table_row_count, stream); - - // If left table is empty in a full join call then all rows of the right table - // should be represented in the joined indices. This is an optimization since - // if left table is empty and full join is called all the elements in - // right_indices will be cudf::JoinNoMatch, i.e. `cuda::std::numeric_limits::min()`. - // This if path should produce exactly the same result as the else path but will be faster. - if (left_table_row_count == 0) { + auto [left_out, right_out] = std::move(indices); + CUDF_EXPECTS(left_out->size() == right_out->size(), + "left/right index vectors must have equal size", + std::invalid_argument); + auto const match_total = left_out->size(); + + // Empty-left fast path: every right row is unmatched. + if (left_table_num_rows == 0) { + auto const tail = static_cast(right_table_num_rows); + left_out->resize(match_total + tail, stream); + right_out->resize(match_total + tail, stream); thrust::sequence(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - right_indices_complement->begin(), - right_indices_complement->end(), + right_out->begin() + match_total, + right_out->end(), 0); - } else { - // Assume all the indices in invalid_index_map are invalid - auto invalid_index_map = - std::make_unique>(right_table_row_count, stream); thrust::uninitialized_fill( rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - invalid_index_map->begin(), - invalid_index_map->end(), - int32_t{1}); - - // Functor to check for index validity since left joins can create invalid indices - valid_range valid(0, right_table_row_count); - - // invalid_index_map[index_ptr[i]] = 0 for i = 0 to right_table_row_count - // Thus specifying that those locations are valid - thrust::scatter_if(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - cuda::make_constant_iterator(0), - cuda::make_constant_iterator(0) + right_indices->size(), - right_indices->begin(), // Index locations - right_indices->begin(), // Stencil - Check if index location is valid - invalid_index_map->begin(), // Output indices - valid); // Stencil Predicate - size_type begin_counter = static_cast(0); - size_type end_counter = static_cast(right_table_row_count); - - // Create list of indices that have been marked as invalid - size_type indices_count = - thrust::copy_if(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - cuda::counting_iterator{begin_counter}, - cuda::counting_iterator{end_counter}, - invalid_index_map->begin(), - right_indices_complement->begin(), - cuda::std::identity{}) - - right_indices_complement->begin(); - right_indices_complement->resize(indices_count, stream); + left_out->begin() + match_total, + left_out->end(), + cudf::JoinNoMatch); + return std::pair(std::move(left_out), std::move(right_out)); } - auto left_invalid_indices = - std::make_unique>(right_indices_complement->size(), stream); - thrust::uninitialized_fill( - rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), - left_invalid_indices->begin(), - left_invalid_indices->end(), - cudf::JoinNoMatch); + if (right_table_num_rows == 0) { return std::pair(std::move(left_out), std::move(right_out)); } + + // Grow to the upper bound (match_total + right_table_num_rows); the complement is appended + // into the tail. If the caller pre-reserved this capacity (see the span overload below), + // these resizes don't reallocate. + auto const upper = match_total + static_cast(right_table_num_rows); + left_out->resize(upper, stream); + right_out->resize(upper, stream); + + // Mark matched right rows in an int32 flag array (one word per right row). Redundant stores + // of the same value are idempotent, so no atomics are needed. Word-sized stores coalesce into + // full 128-byte transactions per warp; byte-sized flags cost ~2–3× here because partial-word + // stores from dense scatters serialize within each 32-bit sector. + auto flags = cudf::detail::make_zeroed_device_uvector_async( + right_table_num_rows, stream, cudf::get_current_device_resource_ref()); + + thrust::scatter_if(rmm::exec_policy_nosync(stream, cudf::get_current_device_resource_ref()), + cuda::make_constant_iterator(size_type{1}), + cuda::make_constant_iterator(size_type{1}) + match_total, + right_out->begin(), + right_out->begin(), + flags.begin(), + valid_range{0, right_table_num_rows}); + + // Fused compaction: for each unmatched right row, emit (JoinNoMatch, right_idx) into + // (left_out_tail, right_out_tail) in a single CUB DeviceSelect pass. + auto zip_tail = + thrust::make_zip_iterator(left_out->data() + match_total, right_out->data() + match_total); + auto out_iter = thrust::make_transform_output_iterator(zip_tail, to_no_match_pair{}); + + auto const new_end = + cudf::detail::copy_if(cuda::counting_iterator{0}, + cuda::counting_iterator{right_table_num_rows}, + out_iter, + unmatched_flag{flags.data()}, + stream); + + auto const comp_size = cuda::std::distance(out_iter, new_end); + left_out->resize(match_total + comp_size, stream); + right_out->resize(match_total + comp_size, stream); + + return std::pair(std::move(left_out), std::move(right_out)); +} + +VectorPair finalize_full_join( + cudf::host_span const> left_partials, + cudf::host_span const> right_partials, + size_type left_table_num_rows, + size_type right_table_num_rows, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_EXPECTS(left_partials.size() == right_partials.size(), + "left_partials and right_partials must have the same length", + std::invalid_argument); + + std::size_t match_total = 0; + for (std::size_t i = 0; i < left_partials.size(); ++i) { + CUDF_EXPECTS(left_partials[i].size() == right_partials[i].size(), + "matching partials must have equal left/right sizes", + std::invalid_argument); + match_total += left_partials[i].size(); + } + + // Pre-allocate at the upper bound so the VectorPair overload's resize-up becomes a no-op + // (capacity is already there). + auto const upper = match_total + static_cast(right_table_num_rows); + auto left_out = std::make_unique>(upper, stream, mr); + auto right_out = std::make_unique>(upper, stream, mr); + + // Concatenate every partial into the head of the output via one batched memcpy. + if (match_total > 0) { + auto const n = left_partials.size(); + std::vector dsts; + std::vector srcs; + std::vector sizes; + dsts.reserve(2 * n); + srcs.reserve(2 * n); + sizes.reserve(2 * n); + std::size_t offset = 0; + for (std::size_t i = 0; i < n; ++i) { + auto const sz = left_partials[i].size() * sizeof(size_type); + dsts.push_back(left_out->data() + offset); + srcs.push_back(left_partials[i].data()); + sizes.push_back(sz); + dsts.push_back(right_out->data() + offset); + srcs.push_back(right_partials[i].data()); + sizes.push_back(sz); + offset += left_partials[i].size(); + } + CUDF_CUDA_TRY(cudf::detail::memcpy_batch_async( + dsts.data(), srcs.data(), sizes.data(), dsts.size(), stream)); + } + + // Shrink the uvectors' logical size to match_total (capacity stays at upper bound), then + // delegate to the VectorPair overload which resizes back up and appends the complement. + left_out->resize(match_total, stream); + right_out->resize(match_total, stream); - return std::pair(std::move(left_invalid_indices), std::move(right_indices_complement)); + return finalize_full_join(std::pair(std::move(left_out), std::move(right_out)), + left_table_num_rows, + right_table_num_rows, + stream, + mr); } } // namespace detail diff --git a/cpp/src/join/mixed_join.cu b/cpp/src/join/mixed_join.cu index e2aef657baf..79d359c7b01 100644 --- a/cpp/src/join/mixed_join.cu +++ b/cpp/src/join/mixed_join.cu @@ -34,6 +34,7 @@ #include #include +#include namespace cudf { namespace detail { @@ -522,9 +523,8 @@ mixed_join( // For full joins, get the indices in the right table that were not joined to // by any row in the left table. if (join_type == join_kind::FULL_JOIN) { - auto complement_indices = detail::get_left_join_indices_complement( - join_indices.second, left_num_rows, right_num_rows, stream, mr); - join_indices = detail::concatenate_vector_pairs(join_indices, complement_indices, stream); + join_indices = detail::finalize_full_join( + std::move(join_indices), left_num_rows, right_num_rows, stream, mr); } return join_indices; } diff --git a/cpp/tests/join/join_tests.cpp b/cpp/tests/join/join_tests.cpp index d08c7fcf8a6..231a52085e7 100644 --- a/cpp/tests/join/join_tests.cpp +++ b/cpp/tests/join/join_tests.cpp @@ -43,13 +43,12 @@ namespace { template -using column_wrapper = cudf::test::fixed_width_column_wrapper; -using strcol_wrapper = cudf::test::strings_column_wrapper; -using CVector = std::vector>; -using Table = cudf::table; -constexpr cudf::size_type NoneValue = - std::numeric_limits::min(); // TODO: how to test if this isn't public? -enum class algorithm { HASH, SORT_MERGE, MERGE }; +using column_wrapper = cudf::test::fixed_width_column_wrapper; +using strcol_wrapper = cudf::test::strings_column_wrapper; +using CVector = std::vector>; +using Table = cudf::table; +constexpr cudf::size_type NoneValue = cudf::JoinNoMatch; +enum class algorithm { HASH, HASH_PARTITIONED, SORT_MERGE, MERGE }; void expect_match_counts_equal(rmm::device_uvector const& actual_counts, std::vector const& expected_counts, @@ -139,6 +138,24 @@ std::unique_ptr inner_join( left_on, right_on, compare_nulls); + } else if (algo == algorithm::HASH_PARTITIONED) { + return join_and_gather( + [](cudf::table_view const& left, + cudf::table_view const& right, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + cudf::hash_join hash_joiner(right, compare_nulls, stream); + auto match_ctx = hash_joiner.inner_join_match_context(left, stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, left.num_rows()}; + return hash_joiner.partitioned_inner_join(part_ctx, stream, mr); + }, + left_input, + right_input, + left_on, + right_on, + compare_nulls); } return join_and_gather( [](cudf::table_view const& left, @@ -212,6 +229,24 @@ std::unique_ptr left_join( left_on, right_on, compare_nulls); + } else if (algo == algorithm::HASH_PARTITIONED) { + return join_and_gather( + [](cudf::table_view const& left, + cudf::table_view const& right, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + cudf::hash_join hash_joiner(right, compare_nulls, stream); + auto match_ctx = hash_joiner.left_join_match_context(left, stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, left.num_rows()}; + return hash_joiner.partitioned_left_join(part_ctx, stream, mr); + }, + left_input, + right_input, + left_on, + right_on, + compare_nulls); } return join_and_gather( [](cudf::table_view const& left, @@ -233,8 +268,35 @@ std::unique_ptr full_join( cudf::table_view const& right_input, std::vector const& full_on, std::vector const& right_on, - cudf::null_equality compare_nulls = cudf::null_equality::EQUAL) + cudf::null_equality compare_nulls = cudf::null_equality::EQUAL, + algorithm algo = algorithm::HASH) { + if (algo == algorithm::HASH_PARTITIONED) { + return join_and_gather( + [](cudf::table_view const& left, + cudf::table_view const& right, + cudf::null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + cudf::hash_join hash_joiner(right, compare_nulls, stream); + auto match_ctx = hash_joiner.full_join_match_context(left, stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, left.num_rows()}; + auto [left_idx, right_idx] = hash_joiner.partitioned_full_join(part_ctx, stream, mr); + + std::vector> left_partials{ + cudf::device_span{left_idx->data(), left_idx->size()}}; + std::vector> right_partials{ + cudf::device_span{right_idx->data(), right_idx->size()}}; + return cudf::hash_join::finalize_partitioned_full_join( + left_partials, right_partials, left.num_rows(), right.num_rows(), stream, mr); + }, + full_input, + right_input, + full_on, + right_on, + compare_nulls); + } return join_and_gather( [](cudf::table_view const& left, cudf::table_view const& right, @@ -310,11 +372,15 @@ struct JoinParameterizedTestSortedInput : public JoinTest, // Parametrize qualifying join tests for supported algorithms INSTANTIATE_TEST_CASE_P(InnerJoinParameterizedTest, JoinParameterizedTest, - ::testing::Values(algorithm::HASH, algorithm::SORT_MERGE)); + ::testing::Values(algorithm::HASH, + algorithm::HASH_PARTITIONED, + algorithm::SORT_MERGE)); INSTANTIATE_TEST_CASE_P(InnerJoinParameterizedTestSortedInput, JoinParameterizedTestSortedInput, - ::testing::Values(algorithm::HASH, algorithm::MERGE)); + ::testing::Values(algorithm::HASH, + algorithm::HASH_PARTITIONED, + algorithm::MERGE)); TEST_P(JoinParameterizedTestSortedInput, SortedKeys) { @@ -3137,4 +3203,365 @@ TEST_F(SortMergeJoinThreadSafetyTest, ConcurrentPartitionedJoins) } } +TEST_F(JoinTest, HashJoinPartitionedInnerJoin) +{ + column_wrapper col0_0{{3, 1, 2, 0, 2}}; + strcol_wrapper col0_1({"s1", "s1", "s0", "s4", "s0"}, {true, true, false, true, true}); + column_wrapper col0_2{{0, 1, 2, 4, 1}}; + + column_wrapper col1_0{{2, 2, 0, 4, 3}}; + strcol_wrapper col1_1({"s1", "s0", "s1", "s2", "s1"}, {true, false, true, true, true}); + column_wrapper col1_2{{1, 0, 1, 2, 1}, {true, false, true, true, true}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + + Table t0(std::move(cols0)); + Table t1(std::move(cols1)); + + auto const left_on = std::vector({0, 1}); + auto const right_on = std::vector({0, 1}); + auto const compare_nulls = cudf::null_equality::EQUAL; + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Reference result from full inner join + auto expected_result = inner_join(t0, t1, left_on, right_on, compare_nulls); + auto expected_sort_order = cudf::sorted_order(expected_result->view()); + auto expected_sorted = cudf::gather(expected_result->view(), *expected_sort_order); + + // Partitioned inner join + cudf::hash_join hash_joiner(t1.select(right_on), compare_nulls, stream); + auto match_ctx = hash_joiner.inner_join_match_context(t0.select(left_on), stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, 0}; + + auto join_and_gather = [&](cudf::join_partition_context const& ctx) { + auto const [left_idx, right_idx] = hash_joiner.partitioned_inner_join(ctx, stream, mr); + auto left_col = cudf::column_view{cudf::device_span{*left_idx}}; + auto right_col = cudf::column_view{cudf::device_span{*right_idx}}; + auto left_res = cudf::gather(t0, left_col, cudf::out_of_bounds_policy::DONT_CHECK); + auto right_res = cudf::gather(t1, right_col, cudf::out_of_bounds_policy::DONT_CHECK); + auto joined = left_res->release(); + auto right_c = right_res->release(); + joined.insert(joined.end(), + std::make_move_iterator(right_c.begin()), + std::make_move_iterator(right_c.end())); + return std::make_unique(std::move(joined)); + }; + + // Process row by row + std::vector> partials; + std::vector partial_views; + for (cudf::size_type i = 0; i < t0.num_rows(); i++) { + part_ctx.left_start_idx = i; + part_ctx.left_end_idx = i + 1; + partials.push_back(join_and_gather(part_ctx)); + partial_views.push_back(partials.back()->view()); + } + + auto concat_result = cudf::concatenate(partial_views, stream, mr); + auto concat_sort_order = cudf::sorted_order(concat_result->view()); + auto concat_sorted = cudf::gather(concat_result->view(), *concat_sort_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*expected_sorted, *concat_sorted); +} + +TEST_F(JoinTest, HashJoinPartitionedLeftJoin) +{ + column_wrapper col0_0{{3, 1, 2, 0, 2}}; + strcol_wrapper col0_1({"s1", "s1", "s0", "s4", "s0"}, {true, true, false, true, true}); + column_wrapper col0_2{{0, 1, 2, 4, 1}}; + + column_wrapper col1_0{{2, 2, 0, 4, 3}}; + strcol_wrapper col1_1({"s1", "s0", "s1", "s2", "s1"}, {true, false, true, true, true}); + column_wrapper col1_2{{1, 0, 1, 2, 1}, {true, false, true, true, true}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + + Table t0(std::move(cols0)); + Table t1(std::move(cols1)); + + auto const left_on = std::vector({0, 1}); + auto const right_on = std::vector({0, 1}); + auto const compare_nulls = cudf::null_equality::EQUAL; + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Reference result from full left join + auto expected_result = left_join(t0, t1, left_on, right_on, compare_nulls); + auto expected_sort_order = cudf::sorted_order(expected_result->view()); + auto expected_sorted = cudf::gather(expected_result->view(), *expected_sort_order); + + // Partitioned left join + cudf::hash_join hash_joiner(t1.select(right_on), compare_nulls, stream); + auto match_ctx = hash_joiner.left_join_match_context(t0.select(left_on), stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, 0}; + + auto join_and_gather = [&](cudf::join_partition_context const& ctx) { + auto const [left_idx, right_idx] = hash_joiner.partitioned_left_join(ctx, stream, mr); + auto left_col = cudf::column_view{cudf::device_span{*left_idx}}; + auto right_col = cudf::column_view{cudf::device_span{*right_idx}}; + auto left_res = cudf::gather(t0, left_col, cudf::out_of_bounds_policy::NULLIFY); + auto right_res = cudf::gather(t1, right_col, cudf::out_of_bounds_policy::NULLIFY); + auto joined = left_res->release(); + auto right_c = right_res->release(); + joined.insert(joined.end(), + std::make_move_iterator(right_c.begin()), + std::make_move_iterator(right_c.end())); + return std::make_unique(std::move(joined)); + }; + + std::vector> partials; + std::vector partial_views; + for (cudf::size_type i = 0; i < t0.num_rows(); i++) { + part_ctx.left_start_idx = i; + part_ctx.left_end_idx = i + 1; + partials.push_back(join_and_gather(part_ctx)); + partial_views.push_back(partials.back()->view()); + } + + auto concat_result = cudf::concatenate(partial_views, stream, mr); + auto concat_sort_order = cudf::sorted_order(concat_result->view()); + auto concat_sorted = cudf::gather(concat_result->view(), *concat_sort_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*expected_sorted, *concat_sorted); +} + +TEST_F(JoinTest, HashJoinPartitionedFullJoin) +{ + column_wrapper col0_0{{3, 1, 2, 0, 2}}; + strcol_wrapper col0_1({"s1", "s1", "s0", "s4", "s0"}, {true, true, false, true, true}); + column_wrapper col0_2{{0, 1, 2, 4, 1}}; + + column_wrapper col1_0{{2, 2, 0, 4, 3}}; + strcol_wrapper col1_1({"s1", "s0", "s1", "s2", "s1"}, {true, false, true, true, true}); + column_wrapper col1_2{{1, 0, 1, 2, 1}, {true, false, true, true, true}}; + + CVector cols0, cols1; + cols0.push_back(col0_0.release()); + cols0.push_back(col0_1.release()); + cols0.push_back(col0_2.release()); + cols1.push_back(col1_0.release()); + cols1.push_back(col1_1.release()); + cols1.push_back(col1_2.release()); + + Table t0(std::move(cols0)); + Table t1(std::move(cols1)); + + auto const left_on = std::vector({0, 1}); + auto const right_on = std::vector({0, 1}); + auto const compare_nulls = cudf::null_equality::EQUAL; + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Reference result from full join + auto expected_result = full_join(t0, t1, left_on, right_on, compare_nulls); + auto expected_sort_order = cudf::sorted_order(expected_result->view()); + auto expected_sorted = cudf::gather(expected_result->view(), *expected_sort_order); + + // Partitioned full join (probe side) + cudf::hash_join hash_joiner(t1.select(right_on), compare_nulls, stream); + auto match_ctx = hash_joiner.full_join_match_context(t0.select(left_on), stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, 0}; + + // Collect per-partition (left, right) indices for finalization. + std::vector>> left_idx_parts; + std::vector>> right_idx_parts; + for (cudf::size_type i = 0; i < t0.num_rows(); i++) { + part_ctx.left_start_idx = i; + part_ctx.left_end_idx = i + 1; + auto [left_idx, right_idx] = hash_joiner.partitioned_full_join(part_ctx, stream, mr); + left_idx_parts.push_back(std::move(left_idx)); + right_idx_parts.push_back(std::move(right_idx)); + } + + std::vector> left_partials; + std::vector> right_partials; + left_partials.reserve(left_idx_parts.size()); + right_partials.reserve(right_idx_parts.size()); + for (std::size_t i = 0; i < left_idx_parts.size(); ++i) { + left_partials.emplace_back(left_idx_parts[i]->data(), left_idx_parts[i]->size()); + right_partials.emplace_back(right_idx_parts[i]->data(), right_idx_parts[i]->size()); + } + + auto [final_left, final_right] = + cudf::hash_join::finalize_partitioned_full_join(left_partials, + right_partials, + t0.select(left_on).num_rows(), + t1.select(right_on).num_rows(), + stream, + mr); + + auto left_col = cudf::column_view{cudf::device_span{*final_left}}; + auto right_col = cudf::column_view{cudf::device_span{*final_right}}; + auto left_res = cudf::gather(t0, left_col, cudf::out_of_bounds_policy::NULLIFY); + auto right_res = cudf::gather(t1, right_col, cudf::out_of_bounds_policy::NULLIFY); + auto joined = left_res->release(); + auto right_c = right_res->release(); + joined.insert( + joined.end(), std::make_move_iterator(right_c.begin()), std::make_move_iterator(right_c.end())); + auto concat_result = std::make_unique(std::move(joined)); + auto concat_sort_order = cudf::sorted_order(concat_result->view()); + auto concat_sorted = cudf::gather(concat_result->view(), *concat_sort_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*expected_sorted, *concat_sorted); +} + +TEST_F(JoinTest, HashJoinPartitionedEmptyPartition) +{ + column_wrapper col0{{3, 1, 2, 0, 2}}; + column_wrapper col1{{2, 2, 0, 4, 3}}; + + CVector cols0, cols1; + cols0.push_back(col0.release()); + cols1.push_back(col1.release()); + Table t0(std::move(cols0)); + Table t1(std::move(cols1)); + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + cudf::hash_join hash_joiner(t1.select({0}), cudf::null_equality::EQUAL, stream); + auto match_ctx = hash_joiner.inner_join_match_context(t0.select({0}), stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, 0}; + + // Empty partition (start == end) + part_ctx.left_start_idx = 2; + part_ctx.left_end_idx = 2; + auto [left_idx, right_idx] = hash_joiner.partitioned_inner_join(part_ctx, stream, mr); + EXPECT_EQ(left_idx->size(), 0); + EXPECT_EQ(right_idx->size(), 0); +} + +TEST_F(JoinTest, HashJoinPartitionedWholeTable) +{ + column_wrapper col0{{3, 1, 2, 0, 2}}; + column_wrapper col1{{2, 2, 0, 4, 3}}; + + CVector cols0, cols1; + cols0.push_back(col0.release()); + cols1.push_back(col1.release()); + Table t0(std::move(cols0)); + Table t1(std::move(cols1)); + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Reference: full inner join + auto expected = inner_join(t0, t1, {0}, {0}); + auto expected_order = cudf::sorted_order(expected->view()); + auto expected_sort = cudf::gather(expected->view(), *expected_order); + + // Partitioned: entire table as one partition + cudf::hash_join hash_joiner(t1.select({0}), cudf::null_equality::EQUAL, stream); + auto match_ctx = hash_joiner.inner_join_match_context(t0.select({0}), stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, t0.num_rows()}; + + auto [left_idx, right_idx] = hash_joiner.partitioned_inner_join(part_ctx, stream, mr); + auto left_col = cudf::column_view{cudf::device_span{*left_idx}}; + auto right_col = cudf::column_view{cudf::device_span{*right_idx}}; + auto left_res = cudf::gather(t0, left_col, cudf::out_of_bounds_policy::DONT_CHECK); + auto right_res = cudf::gather(t1, right_col, cudf::out_of_bounds_policy::DONT_CHECK); + auto joined = left_res->release(); + auto right_c = right_res->release(); + joined.insert( + joined.end(), std::make_move_iterator(right_c.begin()), std::make_move_iterator(right_c.end())); + auto result = std::make_unique(std::move(joined)); + auto result_order = cudf::sorted_order(result->view()); + auto result_sort = cudf::gather(result->view(), *result_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*expected_sort, *result_sort); +} + +// Exercises both a sliced (non-zero offset) left view and a partition size large enough +// to span multiple kernel blocks. +TEST_F(JoinTest, HashJoinPartitionedSlicedMultiBlock) +{ + auto constexpr left_full_rows = 4000; + auto constexpr left_offset = 1234; + auto constexpr left_rows = 2500; + auto constexpr right_rows = 300; + + std::vector left_vals(left_full_rows); + for (cudf::size_type i = 0; i < left_full_rows; ++i) { + left_vals[i] = i % 200; + } + std::vector right_vals(right_rows); + for (cudf::size_type i = 0; i < right_rows; ++i) { + right_vals[i] = i; + } + + column_wrapper left_col(left_vals.begin(), left_vals.end()); + column_wrapper right_col(right_vals.begin(), right_vals.end()); + + CVector cols_left, cols_right; + cols_left.push_back(left_col.release()); + cols_right.push_back(right_col.release()); + Table left_full(std::move(cols_left)); + Table right(std::move(cols_right)); + + auto const stream = cudf::get_default_stream(); + auto const mr = cudf::get_current_device_resource_ref(); + + // Sliced left view with non-zero offset + auto const left_view = cudf::slice(left_full.view(), {left_offset, left_offset + left_rows})[0]; + + // Reference: full inner join on the sliced left + auto expected = inner_join(cudf::table_view{left_view}, right, {0}, {0}); + auto expected_order = cudf::sorted_order(expected->view()); + auto expected_sort = cudf::gather(expected->view(), *expected_order); + + cudf::hash_join hash_joiner(right.select({0}), cudf::null_equality::EQUAL, stream); + auto match_ctx = hash_joiner.inner_join_match_context(left_view, stream, mr); + auto part_ctx = cudf::join_partition_context{ + std::make_unique(std::move(match_ctx)), 0, 0}; + + // Two partitions covering the sliced left; each is large enough to span multiple GPU blocks. + auto const mid = left_rows / 2; + std::vector> const partitions = {{0, mid}, + {mid, left_rows}}; + + std::vector> partials; + std::vector partial_views; + for (auto [s, e] : partitions) { + part_ctx.left_start_idx = s; + part_ctx.left_end_idx = e; + auto const [left_idx, right_idx] = hash_joiner.partitioned_inner_join(part_ctx, stream, mr); + auto left_col_view = cudf::column_view{cudf::device_span{*left_idx}}; + auto right_col_view = cudf::column_view{cudf::device_span{*right_idx}}; + auto left_res = cudf::gather( + cudf::table_view{left_view}, left_col_view, cudf::out_of_bounds_policy::DONT_CHECK); + auto right_res = cudf::gather(right, right_col_view, cudf::out_of_bounds_policy::DONT_CHECK); + auto joined = left_res->release(); + auto right_c = right_res->release(); + joined.insert(joined.end(), + std::make_move_iterator(right_c.begin()), + std::make_move_iterator(right_c.end())); + partials.push_back(std::make_unique(std::move(joined))); + partial_views.push_back(partials.back()->view()); + } + + auto concat = cudf::concatenate(partial_views, stream, mr); + auto concat_order = cudf::sorted_order(concat->view()); + auto concat_sort = cudf::gather(concat->view(), *concat_order); + + CUDF_TEST_EXPECT_TABLES_EQUIVALENT(*expected_sort, *concat_sort); +} + CUDF_TEST_PROGRAM_MAIN()