Skip to content

Commit f717a99

Browse files
adriangbclaude
andcommitted
docs: rewrite comments to describe the code, not the change
Drop comments that read like PR-review notes ("X no longer Y", "the legacy CASE", "this drops the routing") in favour of comments that describe the current behaviour for someone reading the file cold. Trim some now-redundant field-level docs and tighten doc strings on `MultiMapLookupExpr`, `PushdownStrategy`, `build_partitioned_filter`, and `try_build_merged_inlist`. No functional change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ba75202 commit f717a99

4 files changed

Lines changed: 62 additions & 84 deletions

File tree

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,14 +1072,11 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10721072
.await
10731073
.unwrap();
10741074

1075-
// Now check what our filter looks like. When the cross-partition InList
1076-
// is small enough (≤ MERGED_INLIST_MAX_TOTAL_LEN) we collapse it into a
1077-
// single global `IN (SET)` regardless of how the data was repartitioned —
1078-
// this lets the merged set participate in parquet stats / bloom-filter
1079-
// pruning at the scan, which a per-partition `CASE` could not. Both the
1080-
// normal repartition and the `force_hash_collisions` path produce the
1081-
// same logical shape; they only differ in the partition-iteration order
1082-
// that controls the InList element order.
1075+
// The dynamic filter for a `Partitioned` hash join with a small enough
1076+
// cross-partition InList collapses to a single global `struct(...) IN
1077+
// (SET) ([...])`. The two `#[cfg]` arms differ only in the order of the
1078+
// InList elements, which is determined by partition-iteration order
1079+
// (normal repartition vs. the `force_hash_collisions` collapse).
10831080
#[cfg(not(feature = "force_hash_collisions"))]
10841081
insta::assert_snapshot!(
10851082
format!("{}", format_plan_for_test(&plan)),
@@ -2575,10 +2572,9 @@ async fn test_hashjoin_hash_table_pushdown_partitioned() {
25752572
.await
25762573
.unwrap();
25772574

2578-
// Verify the all-Map fast path collapses per-partition routing into a
2579-
// single shared `multi_hash_lookup` rather than a
2580-
// `CASE hash_repartition % N WHEN p THEN hash_lookup ELSE false END`
2581-
// expression.
2575+
// The dynamic filter for an all-Map Partitioned hash join uses a single
2576+
// shared `multi_hash_lookup` over every partition's hash table. There is
2577+
// no per-row `hash_repartition` routing.
25822578
let plan_str = format_plan_for_test(&plan).to_string();
25832579
assert!(
25842580
plan_str.contains("multi_hash_lookup"),

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,8 +1316,7 @@ impl ExecutionPlan for HashJoinExec {
13161316
.counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition);
13171317

13181318
// Initialize build_accumulator lazily with runtime partition counts
1319-
// (only if enabled). The dynamic filter no longer routes by repartition
1320-
// hash, so REPARTITION_RANDOM_STATE is not needed here.
1319+
// (only when dynamic filter pushdown is enabled).
13211320
let build_accumulator = enable_dynamic_filter_pushdown
13221321
.then(|| {
13231322
self.dynamic_filter.as_ref().map(|df| {
@@ -2042,11 +2041,11 @@ async fn collect_left_input(
20422041

20432042
let map = Arc::new(join_hash_map);
20442043

2045-
// The `Map` is always built (the join itself uses it). The optional
2046-
// `inlist` array is set when the build side fit under the per-partition
2047-
// InList caps — that's our signal that this partition's keys are small
2048-
// enough to participate in parquet stats / bloom-filter pruning when
2049-
// collapsed across partitions on the probe-side scan.
2044+
// The hash map is needed by the join itself, so it always travels in
2045+
// `PushdownStrategy::map`. The optional `inlist` array is attached when
2046+
// the build side fits under the per-partition InList caps; the
2047+
// `SharedBuildAccumulator` may then merge those arrays across partitions
2048+
// into a single `IN (SET)` for scan-side pruning.
20502049
let membership = if num_rows == 0 {
20512050
PushdownStrategy::empty()
20522051
} else {

datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -346,29 +346,24 @@ impl PhysicalExpr for HashTableLookupExpr {
346346
/// Physical expression that probes the same join keys against multiple [`Map`]s
347347
/// and returns `true` for any row whose join keys match at least one map.
348348
///
349-
/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s but evaluates
350-
/// `create_hashes` exactly once for the whole batch — every [`Map::HashMap`]
351-
/// probe shares that hash buffer. Used for the global-first dynamic filter
352-
/// when every reported partition uses a hash-table pushdown strategy.
349+
/// Equivalent to `OR`-ing several [`HashTableLookupExpr`]s, but
350+
/// `create_hashes` runs exactly once for the whole batch and every
351+
/// [`Map::HashMap`] probe shares the same hash buffer. All `HashMap`
352+
/// entries must therefore have been built with the same `RandomState`;
353+
/// [`Map::ArrayMap`] entries are queried via `contain_keys` and do not
354+
/// consume hashes.
353355
pub struct MultiMapLookupExpr {
354-
/// Columns in the ON clause used to compute the join key for lookups
356+
/// Join-key expressions evaluated against each input batch.
355357
on_columns: Vec<PhysicalExprRef>,
356-
/// Random state for hashing — every map must have been built with the
357-
/// same `RandomState`, otherwise the shared hash buffer is meaningless.
358+
/// Hashing seed shared by every entry in `maps`.
358359
random_state: SeededRandomState,
359-
/// Maps to OR over (each is one partition's build-side data)
360+
/// Build-side maps to OR over, one per partition.
360361
maps: Vec<Arc<Map>>,
361-
/// Description for display
362+
/// Display name used in `EXPLAIN` output (e.g. `"multi_hash_lookup"`).
362363
description: String,
363364
}
364365

365366
impl MultiMapLookupExpr {
366-
/// Create a new MultiMapLookupExpr.
367-
///
368-
/// `maps` is the (per-partition) sequence of build-side maps to probe.
369-
/// All `Map::HashMap` entries are expected to use the same `random_state`;
370-
/// `Map::ArrayMap` entries do not consume hashes and are queried via
371-
/// `contain_keys`.
372367
pub fn new(
373368
on_columns: Vec<PhysicalExprRef>,
374369
random_state: SeededRandomState,
@@ -466,8 +461,8 @@ impl PhysicalExpr for MultiMapLookupExpr {
466461
let join_keys = evaluate_columns(&self.on_columns, batch)?;
467462

468463
if self.maps.is_empty() || num_rows == 0 {
469-
// No maps to probe — this should not happen in practice but
470-
// returning all-false matches the semantics of an empty `OR`.
464+
// Empty `maps` would not be constructed by the dynamic-filter
465+
// builder — guard anyway: an empty OR is `false` for every row.
471466
let buffer = BooleanBufferBuilder::new(num_rows);
472467
let mut buffer = buffer;
473468
buffer.append_n(num_rows, false);
@@ -477,14 +472,13 @@ impl PhysicalExpr for MultiMapLookupExpr {
477472
))));
478473
}
479474

480-
// Whether any map needs hashes. We only compute hashes if at least
481-
// one map is a HashMap.
475+
// Hashes are only needed for `HashMap` probes; `ArrayMap` queries
476+
// its keys directly via `contain_keys`.
482477
let needs_hashes = self
483478
.maps
484479
.iter()
485480
.any(|m| matches!(m.as_ref(), Map::HashMap(_)));
486481

487-
// Result buffer accumulates the OR of every map's `contain_*` result.
488482
let mut result = vec![false; num_rows];
489483

490484
let process_one_map =
@@ -494,8 +488,8 @@ impl PhysicalExpr for MultiMapLookupExpr {
494488
let hashes = hashes
495489
.expect("hashes computed when at least one map is a HashMap");
496490
let arr = hm.contain_hashes(hashes);
497-
// OR into the running result. `arr` has no nulls
498-
// (`contain_hashes` returns a non-nullable BooleanArray).
491+
// `contain_hashes` always returns a non-null
492+
// `BooleanArray`; OR its bits into `result`.
499493
for (slot, hit) in result.iter_mut().zip(arr.values().iter()) {
500494
*slot |= hit;
501495
}
@@ -511,8 +505,8 @@ impl PhysicalExpr for MultiMapLookupExpr {
511505
};
512506

513507
if needs_hashes {
514-
// Compute the join-key hashes ONCE, then probe every HashMap
515-
// against the same buffer. ArrayMap probes ignore the hashes.
508+
// Hash the join keys once and reuse the buffer for every
509+
// `HashMap` probe; `ArrayMap` probes pass `None` for hashes.
516510
with_hashes(&join_keys, self.random_state.random_state(), |hashes| {
517511
for map in &self.maps {
518512
process_one_map(&mut result, map, Some(hashes))?;

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,11 @@ pub(crate) struct SharedBuildAccumulator {
302302
on_right: Vec<PhysicalExprRef>,
303303
/// Schema of the probe (right) side for evaluating filter expressions
304304
probe_schema: Arc<Schema>,
305-
/// Cap on the cross-partition merged-InList distinct count. Reuses
306-
/// `optimizer.hash_join_inlist_pushdown_max_distinct_values` (the same
307-
/// option that gates per-partition InList pushdown). When the union of
308-
/// every reported partition's deduplicated InList values stays at or
309-
/// below this many distinct entries we collapse them into a single
310-
/// `IN (SET)` predicate that can participate in parquet stats /
311-
/// bloom-filter pruning at the scan; otherwise we use
312-
/// `multi_hash_lookup` over every partition's hash table instead.
305+
/// Maximum distinct entries the cross-partition merged InList may
306+
/// contain before we fall back to `multi_hash_lookup`. Sourced from
307+
/// `optimizer.hash_join_inlist_pushdown_max_distinct_values` so the
308+
/// same threshold caps both per-partition and cross-partition InList
309+
/// pushdown.
313310
inlist_max_distinct_values: usize,
314311
}
315312

@@ -733,31 +730,26 @@ impl SharedBuildAccumulator {
733730
}))
734731
}
735732

736-
/// Build the dynamic filter for `PartitionMode::Partitioned`. The filter
737-
/// is decoupled from the repartition strategy: regardless of whether
738-
/// individual partitions chose Map or InList for their pushdown, we
739-
/// always emit `(global_minmax AND ([merged_in_list AND] multi_hash_lookup))`.
740-
/// This drops the legacy `CASE hash_repartition % N WHEN p THEN … END`
741-
/// routing expression entirely.
733+
/// Build the dynamic filter for `PartitionMode::Partitioned`. Emits
734+
/// `global_minmax AND ([merged_in_list AND] multi_hash_lookup)` —
735+
/// independent of how the build side was repartitioned.
742736
///
743737
/// * `global_minmax` — envelope of every partition's per-column min/max.
744738
/// Cheap short-circuit and the only piece visible to scan-level
745739
/// `pruning_predicate` extraction.
746-
/// * `merged_in_list` — concatenated build keys when every reported
747-
/// partition produced an `InList` array and the cross-partition
748-
/// *deduplicated* set has at most `inlist_max_distinct_values` distinct
749-
/// entries (the same option that gates the per-partition InList path,
750-
/// `optimizer.hash_join_inlist_pushdown_max_distinct_values`). Worth
751-
/// carrying because a small `IN (SET)` participates in parquet stats /
752-
/// bloom-filter pruning, which `multi_hash_lookup` cannot.
753-
/// * `multi_hash_lookup` — runtime hash-table probe across every
754-
/// partition's `Map`, hashing the join keys once.
740+
/// * `merged_in_list` — concatenated, deduplicated build keys when every
741+
/// reported partition contributed an `InList` array and the
742+
/// cross-partition union fits under
743+
/// `optimizer.hash_join_inlist_pushdown_max_distinct_values`. A small
744+
/// `IN (SET)` participates in parquet stats / bloom-filter pruning,
745+
/// which `multi_hash_lookup` does not. When present it fully replaces
746+
/// the lookup.
747+
/// * `multi_hash_lookup` — hashes the join keys once and ORs
748+
/// `contain_hashes()` across every partition's hash table.
755749
///
756-
/// The `has_canceled_unknown` case is the only one that can't safely use
757-
/// this shape (we'd be missing maps for the canceled partitions). We
758-
/// could keep a CASE just for that case, but the query is in the middle
759-
/// of being torn down — emit `lit(true)` and let the join do whatever
760-
/// filtering it can on its own.
750+
/// `has_canceled_unknown` partitions short-circuit to `lit(true)`: we
751+
/// don't have their maps, so we cannot include them in the lookup, and
752+
/// the query is being torn down anyway.
761753
fn build_partitioned_filter(
762754
&self,
763755
real_partitions: &[&PartitionData],
@@ -776,12 +768,10 @@ impl SharedBuildAccumulator {
776768
.as_ref()
777769
.and_then(|b| create_bounds_predicate(&self.on_right, b));
778770

779-
// Try to build a merged InList. Only fires when *every* reported
780-
// partition contributed an InList array AND the cross-partition
781-
// deduplicated union has at most `inlist_max_distinct_values`
782-
// entries. The merged InList already covers the union of every
783-
// partition's build-side keys, so when present it fully replaces
784-
// `multi_hash_lookup` — no need to AND a redundant probe on top.
771+
// The merged InList covers the union of every partition's
772+
// build-side keys, so when it fires it stands alone — there is no
773+
// need to also AND a `multi_hash_lookup` (which would just probe
774+
// the same data via a different structure).
785775
let membership_expr =
786776
if let Some(merged) = self.try_build_merged_inlist(real_partitions)? {
787777
Some(merged)
@@ -814,14 +804,13 @@ impl SharedBuildAccumulator {
814804
/// If every reported partition contributed an InList array, concatenate
815805
/// them, deduplicate by scalar value, and gate on the
816806
/// `inlist_max_distinct_values` cap. Returns the merged
817-
/// `(struct(...))? IN (SET) ([…])` predicate built over the *deduplicated*
818-
/// keys when the cap is satisfied; `None` otherwise.
807+
/// `(struct(...))? IN (SET) ([…])` predicate built over the
808+
/// deduplicated keys when the cap is satisfied; `None` otherwise.
819809
///
820-
/// Per-partition arrays carry duplicates (the build side never dedups
821-
/// before shipping), so each partition's `arr.len()` is an upper bound on
822-
/// its distinct count. We start with a cheap pre-check (sum of lengths ≤
823-
/// some fast-reject limit) before the dedup walk to keep the cost
824-
/// proportional to actual partition sizes.
810+
/// Per-partition arrays carry duplicates — each partition ships its raw
811+
/// build-side join keys, dedup happens here. The dedup walk early-aborts
812+
/// the moment we cross the cap, so the cost stays bounded by
813+
/// `O(rows-until-cap+1-distinct-found)` rather than total input size.
825814
fn try_build_merged_inlist(
826815
&self,
827816
real_partitions: &[&PartitionData],

0 commit comments

Comments
 (0)