Add partitioned probe support for hash joins#22108
Conversation
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/ok to test f10754d |
|
/ok to test c14cbba |
|
/ok to test f9a260f |
|
/ok to test 5b5e0af |
shrshi
left a comment
There was a problem hiding this comment.
Two non-blocking nits, but looks great otherwise! :)
|
/ok to test 6cd86fb |
wence-
left a comment
There was a problem hiding this comment.
Requesting a few clarifications in the docstrings because every time I read the join docstrings I get confused.
| * | ||
| * @return A pair of device vectors [`left_indices`, `right_indices`] for this partition | ||
| */ | ||
| [[nodiscard]] std::pair<std::unique_ptr<rmm::device_uvector<size_type>>, |
There was a problem hiding this comment.
I am trying to understand how to use this function, and I must admit I am struggling from this docstring.
Suppose I am doing an inner join left \cap right. I think what I do is:
- Create hash join object
hasherwith the build tableright - call
hasher.inner_join_match_contextwith the complete probe tableleft - Having observed that the output size is "too big", call
hasher.partitioned_inner_join(partition_ctx)to obtain gather indices for theleftandrighttables (probe and build). Thepartition_ctxis thejoin_match_contextplus slice bounds of the probe (left) table. - Gather from
leftandright(unsliced)
What does the join_partition_context buy me over just cudf::sliceing the left table once I have determined that the join output will be too big?
Is it that partitioned_left_join and partitioned_full_join need to manage more internal state so you want a symmetric interface?
There was a problem hiding this comment.
join_partition_context holds per-row match counts computed once by inner_join_match_context() over the full probe table. Each partitioned_inner_join uses a slice of it to size outputs and compute offsets, avoiding re-probing for counts. Calling inner_join per partition would redo that work. The full-join variant also uses the same context to carry build-side complement info, hence the symmetric design.
| * Call this method after calling `partitioned_full_join()` for every partition. It combines | ||
| * the per-partition probe indices with the unmatched build row indices (a global property | ||
| * across all partitions) and returns a single `(left_indices, right_indices)` pair equivalent | ||
| * to the output of `full_join()`. |
There was a problem hiding this comment.
question: Could we use this same facility to perform a left-join (say) where the left table is the "build" table and we want to keep track of which rows in the left table have no match in any right table so we can emit them with nulls appropriately?
There was a problem hiding this comment.
We don't have the utility to support the left table as the build table for full join, so it's not supported in the current setup.
| auto count_matches = [&](auto equality, auto d_hasher) { | ||
| auto const iter = cudf::detail::make_counting_transform_iterator(0, pair_fn{d_hasher}); | ||
| // Precompute probe keys: {hash(row_idx), row_idx} for each probe row. | ||
| auto const n = static_cast<cuda::std::int64_t>(probe_table_num_rows); |
There was a problem hiding this comment.
| auto const n = static_cast<cuda::std::int64_t>(probe_table_num_rows); | |
| auto const n = static_cast<cuda::std::size_t>(probe_table_num_rows); |
?
There was a problem hiding this comment.
This is intentional as we use the singed int64_t for all kernel thread indices to avoid overflow with large size_type.
cudf/cpp/include/cudf/types.hpp
Line 87 in aa3cdee
Let me update it to thread_index_type for clarity.
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (3)
cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh (1)
15-16: ⚡ Quick winUse
cudf::detail::device_scalarfor the temporary counter.This temporary counter should follow the repo convention instead of using
rmm::device_scalardirectly.As per coding guidelines, "Use cudf::detail::device_scalar instead of rmm::device_scalar."
Suggested fix
-#include <rmm/device_scalar.hpp> +#include <cudf/detail/utilities/device_scalar.hpp> @@ - rmm::device_scalar<size_type> output_counter(size_type{0}, stream); + cudf::detail::device_scalar<size_type> output_counter(size_type{0}, stream);Also applies to: 243-243
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh` around lines 15 - 16, Replace usage of rmm::device_scalar with cudf::detail::device_scalar for the temporary counter: update the includes and any declarations where rmm::device_scalar<T> is used (e.g., the include/usage around device_scalar in partitioned_retrieve_kernels.cuh and the instance referenced near line 243) to cudf::detail::device_scalar<T>, and adjust or add the appropriate cudf header if needed so the symbol resolves; ensure any constructor and .value() calls remain compatible with cudf::detail::device_scalar.cpp/src/join/hash_join/hash_join.cu (1)
262-287: ⚡ Quick winAdd
CUDF_FUNC_RANGE()to the new public hash-join entry points.These methods are public API shims that delegate directly to
_impl, so they should open an NVTX range like the other public entry points undercpp/src/.Suggested fix
std::pair<std::unique_ptr<rmm::device_uvector<size_type>>, std::unique_ptr<rmm::device_uvector<size_type>>> hash_join::partitioned_inner_join(cudf::join_partition_context const& context, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const { + CUDF_FUNC_RANGE(); return _impl->partitioned_inner_join(context, stream, mr); } std::pair<std::unique_ptr<rmm::device_uvector<size_type>>, std::unique_ptr<rmm::device_uvector<size_type>>> hash_join::partitioned_left_join(cudf::join_partition_context const& context, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const { + CUDF_FUNC_RANGE(); return _impl->partitioned_left_join(context, stream, mr); } std::pair<std::unique_ptr<rmm::device_uvector<size_type>>, std::unique_ptr<rmm::device_uvector<size_type>>> hash_join::partitioned_full_join(cudf::join_partition_context const& context, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) const { + CUDF_FUNC_RANGE(); return _impl->partitioned_full_join(context, stream, mr); }As per coding guidelines,
cpp/src/**/*.{cu,cpp}:Use CUDF_FUNC_RANGE() in public functions before delegating to detail:: functions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/src/join/hash_join/hash_join.cu` around lines 262 - 287, The three public shim functions partitioned_inner_join, partitioned_left_join, and partitioned_full_join should open an NVTX range before delegating to _impl; add a CUDF_FUNC_RANGE() call as the first statement in each function (i.e., inside hash_join::partitioned_inner_join, hash_join::partitioned_left_join, and hash_join::partitioned_full_join) and then return _impl->... as now so the public entry points mirror other cpp/src/* public functions.cpp/src/join/hash_join/finalize_partitioned_full_join.cpp (1)
19-30: ⚡ Quick winInstrument this public finalizer with
CUDF_FUNC_RANGE().This is another public API wrapper that immediately forwards into
detail::finalize_full_join, so it should open an NVTX range as well.Suggested fix
`#include` "join/join_common_utils.hpp" +#include <cudf/detail/nvtx/ranges.hpp> `#include` <cudf/join/hash_join.hpp> `#include` <cudf/join/join.hpp> `#include` <cudf/types.hpp> `#include` <cudf/utilities/memory_resource.hpp> `#include` <cudf/utilities/span.hpp> @@ hash_join::finalize_partitioned_full_join( cudf::host_span<cudf::device_span<size_type const> const> left_partials, cudf::host_span<cudf::device_span<size_type const> const> right_partials, size_type probe_table_num_rows, size_type build_table_num_rows, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); return cudf::detail::finalize_full_join( left_partials, right_partials, probe_table_num_rows, build_table_num_rows, stream, mr); }As per coding guidelines,
cpp/src/**/*.{cu,cpp}:Use CUDF_FUNC_RANGE() in public functions before delegating to detail:: functions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/src/join/hash_join/finalize_partitioned_full_join.cpp` around lines 19 - 30, The public wrapper function hash_join::finalize_partitioned_full_join should open an NVTX range: add a CUDF_FUNC_RANGE() invocation as the first statement inside finalize_partitioned_full_join (before calling cudf::detail::finalize_full_join) so the public API is instrumented; ensure the macro is placed in the body of finalize_partitioned_full_join around the delegation to finalize_full_join.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@cpp/include/cudf/join/hash_join.hpp`:
- Around line 312-410: Update the Doxygen for the four new public
APIs—partitioned_inner_join, partitioned_left_join, partitioned_full_join, and
finalize_partitioned_full_join—to include `@throw` tags that enumerate the
precondition/invalid-context failures and the exception types users should
expect; specifically document cases such as: context not created by the
corresponding *_match_context, context partition bounds outside the probe table,
mismatched sizes or inconsistent left_partials/right_partials spans, invalid
probe_table_num_rows or build_table_num_rows, and any std::invalid_argument or
std::logic_error (or specific cudf exception types used by these functions)
thrown on those violations so callers know what to catch. Ensure each function’s
comment block contains a brief `@throw` line describing the condition and the
exception type and reference the function names (partitioned_inner_join,
partitioned_left_join, partitioned_full_join, finalize_partitioned_full_join) so
the reader can map failures to the correct API.
In `@cpp/src/join/hash_join/partitioned_count_kernels.cuh`:
- Around line 89-90: After launching partitioned_count_kernel<IsOuter> add an
immediate kernel launch error check by calling
CUDF_CUDA_TRY(cudaGetLastError()); right after the <<<...>>> invocation, and add
the required include <cudf/utilities/error.hpp> at the top of the file so
CUDF_CUDA_TRY is available; this ensures any launch failures are caught and
reported immediately for the partitioned_count_kernel<IsOuter> invocation.
In `@cpp/src/join/hash_join/partitioned_join_retrieve.cu`:
- Around line 69-105: Validate the join_partition_context (ensure
context.left_table_context is non-null and that left_start_idx and left_end_idx
form a valid range within the left table and within match_counts size) before
dereferencing match_ctx or slicing; if the context is malformed (null
left_table_context, left_start_idx > left_end_idx, or either index outside the
left table row count or match_counts length) return the appropriate empty-result
(same as the current empty-partition/empty-build branches) or throw a clear
error, and only then proceed to use match_ctx,
cudf::slice(probe_partition_view), _match_counts (partition_counts),
validate_hash_join_probe, and
cudf::detail::row::equality::preprocessed_table::create.
In `@cpp/src/join/hash_join/ref_types.cuh`:
- Around line 12-16: Add a direct include of <utility> to ref_types.cuh to bring
in std::declval (used in the decltype expressions around the ref type
detection), removing the transitive dependency on dispatch.cuh; update the top
of ref_types.cuh (alongside the existing includes of "dispatch.cuh",
"hash_join_impl.cuh", and <cuco/static_multiset.cuh>) to include <utility> so
declval is available even if dispatch.cuh no longer exposes it.
In `@cpp/tests/join/join_tests.cpp`:
- Around line 3206-3490: Add two new partitioned-probe tests: one that uses a
sliced probe table/view (non-zero row offset) to exercise offset handling and
one that uses a larger partition size that spans multiple device blocks to
exercise kernel-boundary logic. Implement the sliced-probe test alongside
HashJoinPartitionedInnerJoin/LeftJoin/FullJoin by creating a table with an
offset slice for the probe side and calling hash_joiner.partitioned_*_join with
part_ctx.left_start_idx/left_end_idx that reference non-zero offsets; validate
by comparing against the corresponding full join
(inner_join/left_join/full_join) sorted/equivalenced results. Implement the
multi-block-partition test similar to HashJoinPartitionedWholeTable but
construct probe and build tables large enough (or set
left_start_idx/left_end_idx large) so the partitioned join produces indices
spanning multiple GPU blocks, then gather and compare to the full join result.
Use the existing helpers: cudf::hash_join,
partitioned_inner_join/left_join/full_join, finalize_partitioned_full_join, and
CUDF_TEST_EXPECT_TABLES_EQUIVALENT to validate behavior.
---
Nitpick comments:
In `@cpp/src/join/hash_join/finalize_partitioned_full_join.cpp`:
- Around line 19-30: The public wrapper function
hash_join::finalize_partitioned_full_join should open an NVTX range: add a
CUDF_FUNC_RANGE() invocation as the first statement inside
finalize_partitioned_full_join (before calling cudf::detail::finalize_full_join)
so the public API is instrumented; ensure the macro is placed in the body of
finalize_partitioned_full_join around the delegation to finalize_full_join.
In `@cpp/src/join/hash_join/hash_join.cu`:
- Around line 262-287: The three public shim functions partitioned_inner_join,
partitioned_left_join, and partitioned_full_join should open an NVTX range
before delegating to _impl; add a CUDF_FUNC_RANGE() call as the first statement
in each function (i.e., inside hash_join::partitioned_inner_join,
hash_join::partitioned_left_join, and hash_join::partitioned_full_join) and then
return _impl->... as now so the public entry points mirror other cpp/src/*
public functions.
In `@cpp/src/join/hash_join/partitioned_retrieve_kernels.cuh`:
- Around line 15-16: Replace usage of rmm::device_scalar with
cudf::detail::device_scalar for the temporary counter: update the includes and
any declarations where rmm::device_scalar<T> is used (e.g., the include/usage
around device_scalar in partitioned_retrieve_kernels.cuh and the instance
referenced near line 243) to cudf::detail::device_scalar<T>, and adjust or add
the appropriate cudf header if needed so the symbol resolves; ensure any
constructor and .value() calls remain compatible with
cudf::detail::device_scalar.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 266e6c51-490c-4ecc-b78f-88e91fe12a8c
📒 Files selected for processing (28)
cpp/CMakeLists.txtcpp/benchmarks/join/join.cucpp/include/cudf/detail/join/hash_join.hppcpp/include/cudf/join/hash_join.hppcpp/include/cudf/join/join.hppcpp/src/join/conditional_join.cucpp/src/join/hash_join/finalize_partitioned_full_join.cppcpp/src/join/hash_join/hash_join.cucpp/src/join/hash_join/kernels_common.cuhcpp/src/join/hash_join/match_context.cucpp/src/join/hash_join/partitioned_count.cucpp/src/join/hash_join/partitioned_count_kernels.cuhcpp/src/join/hash_join/partitioned_count_kernels.hppcpp/src/join/hash_join/partitioned_count_outer.cucpp/src/join/hash_join/partitioned_full_join.cucpp/src/join/hash_join/partitioned_inner_join.cucpp/src/join/hash_join/partitioned_join_retrieve.cucpp/src/join/hash_join/partitioned_left_join.cucpp/src/join/hash_join/partitioned_retrieve.cucpp/src/join/hash_join/partitioned_retrieve_kernels.cuhcpp/src/join/hash_join/partitioned_retrieve_kernels.hppcpp/src/join/hash_join/partitioned_retrieve_outer.cucpp/src/join/hash_join/ref_types.cuhcpp/src/join/hash_join/retrieve_impl.cuhcpp/src/join/join_common_utils.hppcpp/src/join/join_utils.cucpp/src/join/mixed_join.cucpp/tests/join/join_tests.cpp
- Validate join_partition_context bounds in partitioned_join_retrieve - Document throw conditions on new partitioned_*_join APIs - Add CUDF_FUNC_RANGE to new partitioned shims and finalize_*_full_join - Use cudf::detail::device_scalar and direct <utility> include - Add sliced + multi-block partitioned probe test
There was a problem hiding this comment.
🧹 Nitpick comments (1)
cpp/src/join/hash_join/partitioned_join_retrieve.cu (1)
124-131: 💤 Low valueSpecify memory resource for temporary
left_keysallocation.Per coding guidelines, temporary memory should explicitly use
cudf::get_current_device_resource_ref()rather than relying on the implicit default.Suggested fix
- rmm::device_uvector<probe_key_type> left_keys(n, stream); + rmm::device_uvector<probe_key_type> left_keys(n, stream, cudf::get_current_device_resource_ref());As per coding guidelines: "Allocate temporary memory using cudf::get_current_device_resource_ref(), not the passed-in MR parameter"
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@cpp/src/join/hash_join/partitioned_join_retrieve.cu` around lines 124 - 131, The temporary device_uvector left_keys in the retrieve_partition lambda is being allocated without an explicit memory resource; change the allocation of left_keys (rmm::device_uvector<probe_key_type> left_keys(...)) to pass cudf::get_current_device_resource_ref() as the MR argument so the temporary uses the current device resource instead of the implicit default; update the rmm::device_uvector constructor call inside retrieve_partition accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@cpp/src/join/hash_join/partitioned_join_retrieve.cu`:
- Around line 124-131: The temporary device_uvector left_keys in the
retrieve_partition lambda is being allocated without an explicit memory
resource; change the allocation of left_keys
(rmm::device_uvector<probe_key_type> left_keys(...)) to pass
cudf::get_current_device_resource_ref() as the MR argument so the temporary uses
the current device resource instead of the implicit default; update the
rmm::device_uvector constructor call inside retrieve_partition accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 1f736ad3-e0fa-412b-85a5-042a49fda753
📒 Files selected for processing (7)
cpp/include/cudf/join/hash_join.hppcpp/src/join/hash_join/finalize_partitioned_full_join.cppcpp/src/join/hash_join/hash_join.cucpp/src/join/hash_join/partitioned_join_retrieve.cucpp/src/join/hash_join/partitioned_retrieve_kernels.cuhcpp/src/join/hash_join/ref_types.cuhcpp/tests/join/join_tests.cpp
✅ Files skipped from review due to trivial changes (2)
- cpp/src/join/hash_join/finalize_partitioned_full_join.cpp
- cpp/src/join/hash_join/hash_join.cu
🚧 Files skipped from review as they are similar to previous changes (3)
- cpp/include/cudf/join/hash_join.hpp
- cpp/tests/join/join_tests.cpp
- cpp/src/join/hash_join/ref_types.cuh
|
/ok to test 0a266a8 |
Description
Closes #18677
This introduces partitioned probe support for hash joins and refactors the join internals to reduce duplication.
Checklist