Skip to content

Commit 48a6029

Browse files
authored
Merge branch 'main' into optimize_count_distinct
2 parents bf0f95c + e8990c4 commit 48a6029

4 files changed

Lines changed: 83 additions & 69 deletions

File tree

datafusion/functions/src/string/common.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::sync::Arc;
2121

22-
use crate::strings::make_and_append_view;
22+
use crate::strings::append_view;
2323
use arrow::array::{
2424
Array, ArrayRef, GenericStringArray, GenericStringBuilder, NullBufferBuilder,
2525
OffsetSizeTrait, StringViewArray, StringViewBuilder, new_null_array,
@@ -152,13 +152,8 @@ fn string_view_trim<Tr: Trimmer>(args: &[ArrayRef]) -> Result<ArrayRef> {
152152
{
153153
if let Some(src_str) = src_str_opt {
154154
let (trimmed, offset) = Tr::trim_ascii_char(src_str, b' ');
155-
make_and_append_view(
156-
&mut views_buf,
157-
&mut null_builder,
158-
raw_view,
159-
trimmed,
160-
offset,
161-
);
155+
append_view(&mut views_buf, raw_view, trimmed, offset);
156+
null_builder.append_non_null();
162157
} else {
163158
null_builder.append_null();
164159
views_buf.push(0);
@@ -204,13 +199,8 @@ fn string_view_trim<Tr: Trimmer>(args: &[ArrayRef]) -> Result<ArrayRef> {
204199
pattern.clear();
205200
pattern.extend(characters.chars());
206201
let (trimmed, offset) = Tr::trim(src_str, &pattern);
207-
make_and_append_view(
208-
&mut views_buf,
209-
&mut null_builder,
210-
raw_view,
211-
trimmed,
212-
offset,
213-
);
202+
append_view(&mut views_buf, raw_view, trimmed, offset);
203+
null_builder.append_non_null();
214204
} else {
215205
null_builder.append_null();
216206
views_buf.push(0);
@@ -261,7 +251,8 @@ fn trim_and_append_view<Tr: Trimmer>(
261251
) {
262252
if let Some(src_str) = src_str_opt {
263253
let (trimmed, offset) = Tr::trim(src_str, pattern);
264-
make_and_append_view(views_buf, null_builder, original_view, trimmed, offset);
254+
append_view(views_buf, original_view, trimmed, offset);
255+
null_builder.append_non_null();
265256
} else {
266257
null_builder.append_null();
267258
views_buf.push(0);

datafusion/functions/src/strings.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion_common::{Result, exec_datafusion_err, internal_err};
2121

2222
use arrow::array::{
2323
Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray,
24-
NullBufferBuilder, StringArray, StringViewArray, StringViewBuilder, make_view,
24+
StringArray, StringViewArray, StringViewBuilder, make_view,
2525
};
2626
use arrow::buffer::{MutableBuffer, NullBuffer};
2727
use arrow::datatypes::DataType;
@@ -372,7 +372,9 @@ impl LargeStringArrayBuilder {
372372
}
373373
}
374374

375-
/// Append a new view to the views buffer with the given substr
375+
/// Append a new view to the views buffer with the given substr.
376+
///
377+
/// Callers are responsible for their own null tracking.
376378
///
377379
/// # Safety
378380
///
@@ -381,13 +383,15 @@ impl LargeStringArrayBuilder {
381383
///
382384
/// # Arguments
383385
/// - views_buffer: The buffer to append the new view to
384-
/// - null_builder: The buffer to append the null value to
385386
/// - original_view: The original view value
386387
/// - substr: The substring to append. Must be a valid substring of the original view
387388
/// - start_offset: The start offset of the substring in the view
388-
pub fn make_and_append_view(
389+
///
390+
/// LLVM is apparently overly eager to inline this function into some hot loops,
391+
/// which bloats them and regresses performance, so we disable inling for now.
392+
#[inline(never)]
393+
pub fn append_view(
389394
views_buffer: &mut Vec<u128>,
390-
null_builder: &mut NullBufferBuilder,
391395
original_view: &u128,
392396
substr: &str,
393397
start_offset: u32,
@@ -401,11 +405,9 @@ pub fn make_and_append_view(
401405
view.offset + start_offset,
402406
)
403407
} else {
404-
// inline value does not need block id or offset
405408
make_view(substr.as_bytes(), 0, 0)
406409
};
407410
views_buffer.push(sub_view);
408-
null_builder.append_non_null();
409411
}
410412

411413
#[derive(Debug)]

datafusion/functions/src/unicode/substr.rs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
use std::sync::Arc;
1919

20-
use crate::strings::make_and_append_view;
20+
use crate::strings::append_view;
2121
use crate::utils::make_scalar_function;
2222
use arrow::array::{
23-
Array, ArrayRef, AsArray, Int64Array, NullBufferBuilder, StringArrayType,
24-
StringViewArray, StringViewBuilder,
23+
Array, ArrayRef, AsArray, Int64Array, StringArrayType, StringViewArray,
24+
StringViewBuilder,
2525
};
26-
use arrow::buffer::ScalarBuffer;
26+
use arrow::buffer::{NullBuffer, ScalarBuffer};
2727
use arrow::datatypes::DataType;
2828
use datafusion_common::cast::as_int64_array;
2929
use datafusion_common::types::{
@@ -278,39 +278,32 @@ fn string_view_substr(
278278
let enable_ascii_fast_path =
279279
enable_ascii_fast_path(&string_view_array, start_array, count_array_opt);
280280

281+
// Combine null bitmaps from all inputs in bulk.
282+
let nulls = NullBuffer::union(
283+
NullBuffer::union(string_view_array.nulls(), start_array.nulls()).as_ref(),
284+
count_array_opt.and_then(|a| a.nulls()),
285+
);
286+
281287
let mut views_buf = Vec::with_capacity(string_view_array.len());
282-
let mut null_builder = NullBufferBuilder::new(string_view_array.len());
283-
284-
for i in 0..string_view_array.len() {
285-
if string_view_array.is_null(i)
286-
|| start_array.is_null(i)
287-
|| count_array_opt.map(|a| a.is_null(i)).unwrap_or(false)
288-
{
289-
null_builder.append_null();
288+
289+
for (i, raw_view) in string_view_array.views().iter().enumerate() {
290+
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
290291
views_buf.push(0);
291292
continue;
292293
}
293294

294295
let string = string_view_array.value(i);
295296
let start = start_array.value(i);
296297
let count = count_array_opt.map(|a| a.value(i));
297-
let raw_view = string_view_array.views()[i];
298298

299299
let (start, end) =
300300
get_true_start_end(string, start, count, enable_ascii_fast_path)?;
301301
let substr = &string[start..end];
302302

303-
make_and_append_view(
304-
&mut views_buf,
305-
&mut null_builder,
306-
&raw_view,
307-
substr,
308-
start as u32,
309-
);
303+
append_view(&mut views_buf, raw_view, substr, start as u32);
310304
}
311305

312306
let views_buf = ScalarBuffer::from(views_buf);
313-
let nulls_buf = null_builder.finish();
314307

315308
// Safety:
316309
// (1) The blocks of the given views are all provided
@@ -320,7 +313,7 @@ fn string_view_substr(
320313
let array = StringViewArray::new_unchecked(
321314
views_buf,
322315
string_view_array.data_buffers().to_vec(),
323-
nulls_buf,
316+
nulls,
324317
);
325318
Ok(Arc::new(array) as ArrayRef)
326319
}
@@ -336,13 +329,16 @@ where
336329
let enable_ascii_fast_path =
337330
enable_ascii_fast_path(&string_array, start_array, count_array_opt);
338331

332+
// Combine null bitmaps from all inputs in bulk.
333+
let nulls = NullBuffer::union(
334+
NullBuffer::union(string_array.nulls(), start_array.nulls()).as_ref(),
335+
count_array_opt.and_then(|a| a.nulls()),
336+
);
337+
339338
let mut result_builder = StringViewBuilder::new();
340339

341340
for i in 0..string_array.len() {
342-
if string_array.is_null(i)
343-
|| start_array.is_null(i)
344-
|| count_array_opt.map(|a| a.is_null(i)).unwrap_or(false)
345-
{
341+
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
346342
result_builder.append_null();
347343
continue;
348344
}

datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,24 @@ impl StreamedBatch {
202202
}
203203
}
204204

205+
/// Per-row filter outcome tracking for full outer joins.
206+
///
207+
/// In a full outer join with a filter, buffered rows that match on join
208+
/// keys but fail every filter evaluation must be emitted with NULLs on
209+
/// the streamed side. Three states are needed because a simple boolean
210+
/// cannot distinguish "never matched" (handled by [`BufferedBatch::null_joined`])
211+
/// from "matched but all filters failed" (must be emitted as null-joined).
212+
#[repr(u8)]
213+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
214+
pub(super) enum FilterState {
215+
/// Row never appeared in a matched pair.
216+
Unvisited = 0,
217+
/// Row matched streamed rows, but all filter evaluations failed.
218+
AllFailed = 1,
219+
/// Row matched and at least one filter evaluation passed.
220+
SomePassed = 2,
221+
}
222+
205223
/// A buffered batch that contains contiguous rows with same join key
206224
///
207225
/// `BufferedBatch` can exist as either an in-memory `RecordBatch` or a `RefCountedTempFile` on disk.
@@ -217,11 +235,9 @@ pub(super) struct BufferedBatch {
217235
pub null_joined: Vec<usize>,
218236
/// Size estimation used for reserving / releasing memory
219237
pub size_estimation: usize,
220-
/// The indices of buffered batch that the join filter doesn't satisfy.
221-
/// This is a map between right row index and a boolean value indicating whether all joined row
222-
/// of the right row does not satisfy the filter .
223-
/// When dequeuing the buffered batch, we need to produce null joined rows for these indices.
224-
pub join_filter_not_matched_map: HashMap<u64, bool>,
238+
/// Tracks filter outcomes for buffered rows in full outer joins.
239+
/// Indexed by absolute row position within the batch. See [`FilterState`].
240+
pub join_filter_status: Vec<FilterState>,
225241
/// Current buffered batch number of rows. Equal to batch.num_rows()
226242
/// but if batch is spilled to disk this property is preferable
227243
/// and less expensive
@@ -258,7 +274,7 @@ impl BufferedBatch {
258274
join_arrays,
259275
null_joined: vec![],
260276
size_estimation,
261-
join_filter_not_matched_map: HashMap::new(),
277+
join_filter_status: vec![FilterState::Unvisited; num_rows],
262278
num_rows,
263279
}
264280
}
@@ -1250,12 +1266,16 @@ impl MaterializingSortMergeJoinStream {
12501266
return Ok(());
12511267
}
12521268

1253-
// For buffered row which is joined with streamed side rows but all joined rows
1254-
// don't satisfy the join filter
1269+
// Collect buffered rows that matched on join keys but had every
1270+
// filter evaluation fail — these must be emitted with NULLs on
1271+
// the streamed side to satisfy full outer join semantics.
12551272
let not_matched_buffered_indices = buffered_batch
1256-
.join_filter_not_matched_map
1273+
.join_filter_status
12571274
.iter()
1258-
.filter_map(|(idx, failed)| if *failed { Some(*idx) } else { None })
1275+
.enumerate()
1276+
.filter_map(|(i, state)| {
1277+
matches!(state, FilterState::AllFailed).then_some(i as u64)
1278+
})
12591279
.collect::<Vec<_>>();
12601280

12611281
let buffered_indices =
@@ -1270,7 +1290,9 @@ impl MaterializingSortMergeJoinStream {
12701290
self.joined_record_batches
12711291
.push_batch_with_null_metadata(record_batch, self.join_type);
12721292
}
1273-
buffered_batch.join_filter_not_matched_map.clear();
1293+
buffered_batch
1294+
.join_filter_status
1295+
.fill(FilterState::Unvisited);
12741296

12751297
Ok(())
12761298
}
@@ -1443,15 +1465,18 @@ impl MaterializingSortMergeJoinStream {
14431465
if right.is_null(i) {
14441466
continue;
14451467
}
1446-
let buffered_index = right.value(i);
1447-
buffered_batch.join_filter_not_matched_map.insert(
1448-
buffered_index,
1449-
*buffered_batch
1450-
.join_filter_not_matched_map
1451-
.get(&buffered_index)
1452-
.unwrap_or(&true)
1453-
&& !pre_mask.value(offset + i),
1454-
);
1468+
let idx = right.value(i) as usize;
1469+
match buffered_batch.join_filter_status[idx] {
1470+
FilterState::SomePassed => {}
1471+
_ if pre_mask.value(offset + i) => {
1472+
buffered_batch.join_filter_status[idx] =
1473+
FilterState::SomePassed;
1474+
}
1475+
_ => {
1476+
buffered_batch.join_filter_status[idx] =
1477+
FilterState::AllFailed;
1478+
}
1479+
}
14551480
}
14561481
offset += chunk_len;
14571482
}

0 commit comments

Comments
 (0)