Skip to content

Commit 414fc69

Browse files
Dandandanclaude
andcommitted
Optimize ArrowBytesViewMap insert_if_new with direct value access
Three optimizations for the BytesView hash map hot path: 1. Direct value bytes access: Replace values.value(i).as_ref() with direct pointer arithmetic on input_views + input_buffers, avoiding the GenericByteViewArray::value() accessor overhead on every hash table probe for >12 byte strings. 2. Skip append for inline strings: For strings <=12 bytes, the input view is self-contained. Instead of decoding to &[u8] and re-encoding via append_value -> make_view, push the input view directly. 3. Simplify make_payload_fn: Change from FnMut(Option<&[u8]>) to FnMut() since no caller uses the value bytes parameter. This eliminates unnecessary value decoding on the insert path. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7056098 commit 414fc69

2 files changed

Lines changed: 48 additions & 18 deletions

File tree

datafusion/physical-expr-common/src/binary_view_map.rs

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl ArrowBytesViewSet {
4242

4343
/// Inserts each value from `values` into the set
4444
pub fn insert(&mut self, values: &ArrayRef) {
45-
fn make_payload_fn(_value: Option<&[u8]>) {}
45+
fn make_payload_fn() {}
4646
fn observe_payload_fn(_payload: ()) {}
4747
self.0
4848
.insert_if_new(values, make_payload_fn, observe_payload_fn);
@@ -209,7 +209,7 @@ where
209209
make_payload_fn: MP,
210210
observe_payload_fn: OP,
211211
) where
212-
MP: FnMut(Option<&[u8]>) -> V,
212+
MP: FnMut() -> V,
213213
OP: FnMut(V),
214214
{
215215
// Sanity check array type
@@ -248,7 +248,7 @@ where
248248
mut make_payload_fn: MP,
249249
mut observe_payload_fn: OP,
250250
) where
251-
MP: FnMut(Option<&[u8]>) -> V,
251+
MP: FnMut() -> V,
252252
OP: FnMut(V),
253253
B: ByteViewType,
254254
{
@@ -266,6 +266,35 @@ where
266266

267267
// Get raw views buffer for direct comparison
268268
let input_views = values.views();
269+
let input_buffers = values.data_buffers();
270+
271+
// Decode input value bytes directly from view + buffers,
272+
// avoiding the overhead of values.value(i) accessor.
273+
let input_value_bytes = |idx: usize| -> &[u8] {
274+
let view = input_views[idx];
275+
let len = view as u32;
276+
if len <= 12 {
277+
// Inline: bytes are stored at offset 4 in the view.
278+
// Reference the view in input_views (not a stack copy)
279+
// so the returned slice has a valid lifetime.
280+
// SAFETY: input_views[idx] is valid for the function's lifetime,
281+
// and the inline data occupies bytes 4..4+len of the u128 view.
282+
unsafe {
283+
let ptr = (input_views.as_ptr().add(idx)) as *const u8;
284+
std::slice::from_raw_parts(ptr.add(4), len as usize)
285+
}
286+
} else {
287+
let byte_view = ByteView::from(view);
288+
let buf_idx = byte_view.buffer_index as usize;
289+
let offset = byte_view.offset as usize;
290+
// SAFETY: view comes from a valid array
291+
unsafe {
292+
input_buffers
293+
.get_unchecked(buf_idx)
294+
.get_unchecked(offset..offset + len as usize)
295+
}
296+
}
297+
};
269298

270299
// Ensure lengths are equivalent
271300
assert_eq!(values.len(), self.hashes_buffer.len());
@@ -279,7 +308,7 @@ where
279308
let payload = if let Some(&(payload, _offset)) = self.null.as_ref() {
280309
payload
281310
} else {
282-
let payload = make_payload_fn(None);
311+
let payload = make_payload_fn();
283312
let null_index = self.views.len();
284313
self.views.push(0);
285314
self.nulls.append_null();
@@ -329,8 +358,7 @@ where
329358
} else {
330359
&in_progress[offset..offset + stored_len]
331360
};
332-
let input_value: &[u8] = values.value(i).as_ref();
333-
stored_value == input_value
361+
stored_value == input_value_bytes(i)
334362
})
335363
.map(|entry| entry.payload)
336364
};
@@ -339,11 +367,18 @@ where
339367
payload
340368
} else {
341369
// no existing value, make a new one
342-
let value: &[u8] = values.value(i).as_ref();
343-
let payload = make_payload_fn(Some(value));
344-
345-
// Create view pointing to our buffers
346-
let new_view = self.append_value(value);
370+
let (new_view, payload) = if len <= 12 {
371+
// Inline string: the view is self-contained, no need
372+
// to decode bytes or copy to buffers — just reuse the
373+
// input view directly.
374+
self.views.push(view_u128);
375+
self.nulls.append_non_null();
376+
(view_u128, make_payload_fn())
377+
} else {
378+
let value = input_value_bytes(i);
379+
let new_view = self.append_value(value);
380+
(new_view, make_payload_fn())
381+
};
347382
let new_header = Entry {
348383
view: new_view,
349384
hash,
@@ -726,16 +761,12 @@ mod tests {
726761
}
727762

728763
// insert the values into the map, recording what we did
729-
let mut seen_new_strings = vec![];
730764
let mut seen_indexes = vec![];
731765
self.map.insert_if_new(
732766
&arr,
733-
|s| {
734-
let value = s
735-
.map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string"));
767+
|| {
736768
let index = next_index;
737769
next_index += 1;
738-
seen_new_strings.push(value);
739770
TestPayload { index }
740771
},
741772
|payload| {
@@ -744,7 +775,6 @@ mod tests {
744775
);
745776

746777
assert_eq!(actual_seen_indexes, seen_indexes);
747-
assert_eq!(actual_new_strings, seen_new_strings);
748778
}
749779

750780
/// Call `self.map.into_array()` validating that the strings are in the same

datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl GroupValues for GroupValuesBytesView {
5757
self.map.insert_if_new(
5858
arr,
5959
// called for each new group
60-
|_value| {
60+
|| {
6161
// assign new group index on each insert
6262
let group_idx = self.num_groups;
6363
self.num_groups += 1;

0 commit comments

Comments
 (0)