diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 3a3224545ce4e..5854d2957c7f7 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -87,5 +87,9 @@ name = "binary_op" harness = false name = "simplify" +[[bench]] +harness = false +name = "string_concat" + [package.metadata.cargo-machete] ignored = ["half"] diff --git a/datafusion/physical-expr/benches/string_concat.rs b/datafusion/physical-expr/benches/string_concat.rs new file mode 100644 index 0000000000000..23f54c7637bdd --- /dev/null +++ b/datafusion/physical-expr/benches/string_concat.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::StringViewArray; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow::record_batch::RecordBatch; +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use datafusion_expr::Operator; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +const NUM_ROWS: usize = 8192; +const SEED: u64 = 42; + +fn create_string_view_array( + num_rows: usize, + str_len: usize, + null_density: f64, + seed: u64, +) -> StringViewArray { + let mut rng = StdRng::seed_from_u64(seed); + let values: Vec> = (0..num_rows) + .map(|_| { + if rng.random::() < null_density { + None + } else { + let s: String = (0..str_len) + .map(|_| rng.random_range(b'a'..=b'z') as char) + .collect(); + Some(s) + } + }) + .collect(); + StringViewArray::from_iter(values) +} + +fn bench_concat_utf8view(c: &mut Criterion) { + let mut group = c.benchmark_group("concat_utf8view"); + + let schema = Arc::new(Schema::new(vec![ + Field::new("left", DataType::Utf8View, true), + Field::new("right", DataType::Utf8View, true), + ])); + + // left || right + let expr = BinaryExpr::new( + Arc::new(Column::new("left", 0)), + Operator::StringConcat, + Arc::new(Column::new("right", 1)), + ); + + for null_density in [0.0, 0.1, 0.5] { + let left = create_string_view_array(NUM_ROWS, 16, null_density, SEED); + let right = create_string_view_array(NUM_ROWS, 16, null_density, SEED + 1); + + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(left), Arc::new(right)]) + .unwrap(); + + let label = format!("nulls_{}", (null_density * 100.0) as u32); + group.bench_with_input( + BenchmarkId::new("concat", &label), + &null_density, + |b, _| { + b.iter(|| { + black_box(expr.evaluate(black_box(&batch)).unwrap()); + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_concat_utf8view); +criterion_main!(benches); diff --git a/datafusion/physical-expr/src/expressions/binary/kernels.rs b/datafusion/physical-expr/src/expressions/binary/kernels.rs index 39e3a6f16b5cf..9818861c2fdcd 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels.rs @@ -18,6 +18,7 @@ //! This module contains computation kernels that are specific to //! datafusion and not (yet) targeted to port upstream to arrow use arrow::array::*; +use arrow::buffer::NullBuffer; use arrow::compute::kernels::bitwise::{ bitwise_and, bitwise_and_scalar, bitwise_or, bitwise_or_scalar, bitwise_shift_left, bitwise_shift_left_scalar, bitwise_shift_right, bitwise_shift_right_scalar, @@ -177,24 +178,27 @@ pub fn concat_elements_utf8view( right.len() ))); } - let capacity = left.len(); - let mut result = StringViewBuilder::with_capacity(capacity); + let mut result = StringViewBuilder::with_capacity(left.len()); - // Avoid reallocations by writing to a reused buffer (note we - // could be even more efficient r by creating the view directly - // here and avoid the buffer but that would be more complex) + // Avoid reallocations by writing to a reused buffer (note we could be even + // more efficient by creating the view directly here and avoid the buffer + // but that would be more complex) let mut buffer = String::new(); - for (left, right) in left.iter().zip(right.iter()) { - if let (Some(left), Some(right)) = (left, right) { - use std::fmt::Write; + // Pre-compute combined null bitmap, so the per-row NULL check is more + // efficient + let nulls = NullBuffer::union(left.nulls(), right.nulls()); + + for i in 0..left.len() { + if nulls.as_ref().is_some_and(|n| n.is_null(i)) { + result.append_null(); + } else { + let l = left.value(i); + let r = right.value(i); buffer.clear(); - write!(&mut buffer, "{left}{right}") - .expect("writing into string buffer failed"); + buffer.push_str(l); + buffer.push_str(r); result.try_append_value(&buffer)?; - } else { - // at least one of the values is null, so the output is also null - result.append_null() } } Ok(result.finish())