Skip to content

Commit e8990c4

Browse files
authored
perf: replace SMJ's join_filter_not_matched_map HashMap with Vec<FilterState> (#21517)
## Which issue does this PR close? Partially addresses #20910, might be the last one for now. ## Rationale for this change In full outer joins with filters, `BufferedBatch` tracks which buffered rows had all filter evaluations fail using a `HashMap<u64, bool>`. This map is read and written per-row in a hot loop during `freeze_streamed_matched`. The HashMap pays ~40-60 bytes per entry (8-byte u64 key + 1-byte bool value + hash table overhead), hashes every key twice per iteration (once for `get`, once for `insert`), and scatters entries across heap allocations with poor cache locality. ## What changes are included in this PR? Replaces `HashMap<u64, bool>` with `Vec<FilterState>` indexed by absolute row position within the batch. `FilterState` is a `#[repr(u8)]` enum with three variants (`Unvisited`, `AllFailed`, `SomePassed`), so the Vec is 1 byte per row — allocated once, direct-indexed, no hashing. At the default batch size of 8192 rows the Vec is 8 KB (fits in L1 cache). Even at large batch sizes (32K+), 32 KB is still within L1 on most machines, while the HashMap at 32K entries would consume ~1-2 MB of scattered heap memory. Three states are needed because a simple `Vec<bool>` cannot distinguish "never matched" (handled separately by `null_joined`) from "matched but all filters failed" (must be emitted as null-joined). The enum variant names are self-documenting, unlike `Option<bool>` where `None`/`Some(true)`/`Some(false)` would be opaque. ## Are these changes tested? Existing tests. ## Are there any user-facing changes? No.
1 parent 8939726 commit e8990c4

File tree

1 file changed

+45
-20
lines changed

1 file changed

+45
-20
lines changed

datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,24 @@ impl StreamedBatch {
202202
}
203203
}
204204

205+
/// Per-row filter outcome tracking for full outer joins.
206+
///
207+
/// In a full outer join with a filter, buffered rows that match on join
208+
/// keys but fail every filter evaluation must be emitted with NULLs on
209+
/// the streamed side. Three states are needed because a simple boolean
210+
/// cannot distinguish "never matched" (handled by [`BufferedBatch::null_joined`])
211+
/// from "matched but all filters failed" (must be emitted as null-joined).
212+
#[repr(u8)]
213+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214+
pub(super) enum FilterState {
215+
/// Row never appeared in a matched pair.
216+
Unvisited = 0,
217+
/// Row matched streamed rows, but all filter evaluations failed.
218+
AllFailed = 1,
219+
/// Row matched and at least one filter evaluation passed.
220+
SomePassed = 2,
221+
}
222+
205223
/// A buffered batch that contains contiguous rows with same join key
206224
///
207225
/// `BufferedBatch` can exist as either an in-memory `RecordBatch` or a `RefCountedTempFile` on disk.
@@ -217,11 +235,9 @@ pub(super) struct BufferedBatch {
217235
pub null_joined: Vec<usize>,
218236
/// Size estimation used for reserving / releasing memory
219237
pub size_estimation: usize,
220-
/// The indices of buffered batch that the join filter doesn't satisfy.
221-
/// This is a map between right row index and a boolean value indicating whether all joined row
222-
/// of the right row does not satisfy the filter .
223-
/// When dequeuing the buffered batch, we need to produce null joined rows for these indices.
224-
pub join_filter_not_matched_map: HashMap<u64, bool>,
238+
/// Tracks filter outcomes for buffered rows in full outer joins.
239+
/// Indexed by absolute row position within the batch. See [`FilterState`].
240+
pub join_filter_status: Vec<FilterState>,
225241
/// Current buffered batch number of rows. Equal to batch.num_rows()
226242
/// but if batch is spilled to disk this property is preferable
227243
/// and less expensive
@@ -258,7 +274,7 @@ impl BufferedBatch {
258274
join_arrays,
259275
null_joined: vec![],
260276
size_estimation,
261-
join_filter_not_matched_map: HashMap::new(),
277+
join_filter_status: vec![FilterState::Unvisited; num_rows],
262278
num_rows,
263279
}
264280
}
@@ -1250,12 +1266,16 @@ impl MaterializingSortMergeJoinStream {
12501266
return Ok(());
12511267
}
12521268

1253-
// For buffered row which is joined with streamed side rows but all joined rows
1254-
// don't satisfy the join filter
1269+
// Collect buffered rows that matched on join keys but had every
1270+
// filter evaluation fail — these must be emitted with NULLs on
1271+
// the streamed side to satisfy full outer join semantics.
12551272
let not_matched_buffered_indices = buffered_batch
1256-
.join_filter_not_matched_map
1273+
.join_filter_status
12571274
.iter()
1258-
.filter_map(|(idx, failed)| if *failed { Some(*idx) } else { None })
1275+
.enumerate()
1276+
.filter_map(|(i, state)| {
1277+
matches!(state, FilterState::AllFailed).then_some(i as u64)
1278+
})
12591279
.collect::<Vec<_>>();
12601280

12611281
let buffered_indices =
@@ -1270,7 +1290,9 @@ impl MaterializingSortMergeJoinStream {
12701290
self.joined_record_batches
12711291
.push_batch_with_null_metadata(record_batch, self.join_type);
12721292
}
1273-
buffered_batch.join_filter_not_matched_map.clear();
1293+
buffered_batch
1294+
.join_filter_status
1295+
.fill(FilterState::Unvisited);
12741296

12751297
Ok(())
12761298
}
@@ -1443,15 +1465,18 @@ impl MaterializingSortMergeJoinStream {
14431465
if right.is_null(i) {
14441466
continue;
14451467
}
1446-
let buffered_index = right.value(i);
1447-
buffered_batch.join_filter_not_matched_map.insert(
1448-
buffered_index,
1449-
*buffered_batch
1450-
.join_filter_not_matched_map
1451-
.get(&buffered_index)
1452-
.unwrap_or(&true)
1453-
&& !pre_mask.value(offset + i),
1454-
);
1468+
let idx = right.value(i) as usize;
1469+
match buffered_batch.join_filter_status[idx] {
1470+
FilterState::SomePassed => {}
1471+
_ if pre_mask.value(offset + i) => {
1472+
buffered_batch.join_filter_status[idx] =
1473+
FilterState::SomePassed;
1474+
}
1475+
_ => {
1476+
buffered_batch.join_filter_status[idx] =
1477+
FilterState::AllFailed;
1478+
}
1479+
}
14551480
}
14561481
offset += chunk_len;
14571482
}

0 commit comments

Comments
 (0)