Skip to content

Commit 34fbf14

Browse files
EmilyMattmartin-g
authored andcommitted
feat: Add evaluate_to_arrays function (apache#18446)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes apache#18330 . ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Reduce code duplication. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> A util function replacing many calls which are using the same code. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> No logic should change whatsoever, so each area which now uses this code should have it's own tests and benchmarks unmodified. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Yes, there is now a new pub function. No other changes to API. --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> (cherry picked from commit 76b4156)
1 parent 1cbbf62 commit 34fbf14

13 files changed

Lines changed: 153 additions & 99 deletions

File tree

datafusion/expr-common/src/columnar_value.rs

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ impl ColumnarValue {
113113
}
114114
}
115115

116-
/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
117-
/// number of rows. [`Self::Scalar`] is converted by repeating the same
118-
/// scalar multiple times which is not as efficient as handling the scalar
119-
/// directly.
116+
/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
117+
/// number of rows by repeating the same scalar multiple times,
118+
/// which is not as efficient as handling the scalar directly.
119+
/// [`Self::Array`] will just be returned as is.
120+
///
121+
/// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
120122
///
121123
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
122124
/// arrays of the same length.
@@ -135,6 +137,38 @@ impl ColumnarValue {
135137
/// number of rows. [`Self::Scalar`] is converted by repeating the same
136138
/// scalar multiple times which is not as efficient as handling the scalar
137139
/// directly.
140+
/// This validates that if this is [`Self::Array`], it has the expected length.
141+
///
142+
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
143+
/// arrays of the same length.
144+
///
145+
/// # Errors
146+
///
147+
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
148+
/// if the array length does not match the expected length
149+
pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
150+
match self {
151+
ColumnarValue::Array(array) => {
152+
if array.len() == num_rows {
153+
Ok(array)
154+
} else {
155+
internal_err!(
156+
"Array length {} does not match expected length {}",
157+
array.len(),
158+
num_rows
159+
)
160+
}
161+
}
162+
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
163+
}
164+
}
165+
166+
/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
167+
/// number of rows by repeating the same scalar multiple times,
168+
/// which is not as efficient as handling the scalar directly.
169+
/// [`Self::Array`] will just be returned as is.
170+
///
171+
/// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
138172
///
139173
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
140174
/// arrays of the same length.
@@ -149,6 +183,36 @@ impl ColumnarValue {
149183
})
150184
}
151185

186+
/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
187+
/// number of rows. [`Self::Scalar`] is converted by repeating the same
188+
/// scalar multiple times which is not as efficient as handling the scalar
189+
/// directly.
190+
/// This validates that if this is [`Self::Array`], it has the expected length.
191+
///
192+
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
193+
/// arrays of the same length.
194+
///
195+
/// # Errors
196+
///
197+
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
198+
/// if the array length does not match the expected length
199+
pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
200+
match self {
201+
ColumnarValue::Array(array) => {
202+
if array.len() == num_rows {
203+
Ok(Arc::clone(array))
204+
} else {
205+
internal_err!(
206+
"Array length {} does not match expected length {}",
207+
array.len(),
208+
num_rows
209+
)
210+
}
211+
}
212+
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
213+
}
214+
}
215+
152216
/// Null columnar values are implemented as a null array in order to pass batch
153217
/// num_rows
154218
pub fn create_null_array(num_rows: usize) -> Self {
@@ -249,6 +313,34 @@ mod tests {
249313
use super::*;
250314
use arrow::array::Int32Array;
251315

316+
#[test]
317+
fn into_array_of_size() {
318+
// Array case
319+
let arr = make_array(1, 3);
320+
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
321+
assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
322+
323+
// Scalar case
324+
let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
325+
let expected_array = make_array(42, 100);
326+
assert_eq!(
327+
&scalar_columnar_value.into_array_of_size(100).unwrap(),
328+
&expected_array
329+
);
330+
331+
// Array case with wrong size
332+
let arr = make_array(1, 3);
333+
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
334+
let result = arr_columnar_value.into_array_of_size(5);
335+
let err = result.unwrap_err();
336+
assert!(
337+
err.to_string().starts_with(
338+
"Internal error: Array length 3 does not match expected length 5"
339+
),
340+
"Found: {err}"
341+
);
342+
}
343+
252344
#[test]
253345
fn values_to_arrays() {
254346
// (input, expected)

datafusion/physical-expr-common/src/utils.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::tree_node::ExprContext;
2222

2323
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
2424
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
25+
use arrow::record_batch::RecordBatch;
2526
use datafusion_common::Result;
2627
use datafusion_expr_common::sort_properties::ExprProperties;
2728

@@ -91,6 +92,26 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
9192
Ok(make_array(data))
9293
}
9394

95+
/// Evaluates expressions against a record batch.
96+
/// This will convert the resulting ColumnarValues to ArrayRefs,
97+
/// duplicating any ScalarValues that may have been returned,
98+
/// and validating that the returned arrays all have the same
99+
/// number of rows as the input batch.
100+
#[inline]
101+
pub fn evaluate_expressions_to_arrays<'a>(
102+
exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
103+
batch: &RecordBatch,
104+
) -> Result<Vec<ArrayRef>> {
105+
let num_rows = batch.num_rows();
106+
exprs
107+
.into_iter()
108+
.map(|e| {
109+
e.evaluate(batch)
110+
.and_then(|col| col.into_array_of_size(num_rows))
111+
})
112+
.collect::<Result<Vec<ArrayRef>>>()
113+
}
114+
94115
#[cfg(test)]
95116
mod tests {
96117
use std::sync::Arc;

datafusion/physical-expr/src/window/standard_window_function_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch;
2323
use datafusion_common::Result;
2424
use datafusion_expr::{LimitEffect, PartitionEvaluator};
2525

26+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
2627
use std::any::Any;
2728
use std::sync::Arc;
2829

@@ -57,13 +58,7 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug {
5758
///
5859
/// Typically, the resulting vector is a single element vector.
5960
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
60-
self.expressions()
61-
.iter()
62-
.map(|e| {
63-
e.evaluate(batch)
64-
.and_then(|v| v.into_array(batch.num_rows()))
65-
})
66-
.collect()
61+
evaluate_expressions_to_arrays(&self.expressions(), batch)
6762
}
6863

6964
/// Create a [`PartitionEvaluator`] for evaluating the function on

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use datafusion_expr::window_state::{
4141
use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound};
4242
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
4343

44+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
4445
use indexmap::IndexMap;
4546

4647
/// Common trait for [window function] implementations
@@ -90,13 +91,7 @@ pub trait WindowExpr: Send + Sync + Debug {
9091
/// Evaluate the window function arguments against the batch and return
9192
/// array ref, normally the resulting `Vec` is a single element one.
9293
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
93-
self.expressions()
94-
.iter()
95-
.map(|e| {
96-
e.evaluate(batch)
97-
.and_then(|v| v.into_array(batch.num_rows()))
98-
})
99-
.collect()
94+
evaluate_expressions_to_arrays(&self.expressions(), batch)
10095
}
10196

10297
/// Evaluate the window function values against the batch

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::{
5959
};
6060

6161
use datafusion_expr::utils::AggregateOrderSensitivity;
62+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
6263
use itertools::Itertools;
6364

6465
pub mod group_values;
@@ -1434,25 +1435,14 @@ pub fn finalize_aggregation(
14341435
}
14351436
}
14361437

1437-
/// Evaluates expressions against a record batch.
1438-
fn evaluate(
1439-
expr: &[Arc<dyn PhysicalExpr>],
1440-
batch: &RecordBatch,
1441-
) -> Result<Vec<ArrayRef>> {
1442-
expr.iter()
1443-
.map(|expr| {
1444-
expr.evaluate(batch)
1445-
.and_then(|v| v.into_array(batch.num_rows()))
1446-
})
1447-
.collect()
1448-
}
1449-
1450-
/// Evaluates expressions against a record batch.
1438+
/// Evaluates groups of expressions against a record batch.
14511439
pub fn evaluate_many(
14521440
expr: &[Vec<Arc<dyn PhysicalExpr>>],
14531441
batch: &RecordBatch,
14541442
) -> Result<Vec<Vec<ArrayRef>>> {
1455-
expr.iter().map(|expr| evaluate(expr, batch)).collect()
1443+
expr.iter()
1444+
.map(|expr| evaluate_expressions_to_arrays(expr, batch))
1445+
.collect()
14561446
}
14571447

14581448
fn evaluate_optional(
@@ -1506,23 +1496,14 @@ pub fn evaluate_group_by(
15061496
group_by: &PhysicalGroupBy,
15071497
batch: &RecordBatch,
15081498
) -> Result<Vec<Vec<ArrayRef>>> {
1509-
let exprs: Vec<ArrayRef> = group_by
1510-
.expr
1511-
.iter()
1512-
.map(|(expr, _)| {
1513-
let value = expr.evaluate(batch)?;
1514-
value.into_array(batch.num_rows())
1515-
})
1516-
.collect::<Result<Vec<_>>>()?;
1517-
1518-
let null_exprs: Vec<ArrayRef> = group_by
1519-
.null_expr
1520-
.iter()
1521-
.map(|(expr, _)| {
1522-
let value = expr.evaluate(batch)?;
1523-
value.into_array(batch.num_rows())
1524-
})
1525-
.collect::<Result<Vec<_>>>()?;
1499+
let exprs = evaluate_expressions_to_arrays(
1500+
group_by.expr.iter().map(|(expr, _)| expr),
1501+
batch,
1502+
)?;
1503+
let null_exprs = evaluate_expressions_to_arrays(
1504+
group_by.null_expr.iter().map(|(expr, _)| expr),
1505+
batch,
1506+
)?;
15261507

15271508
group_by
15281509
.groups

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ use std::borrow::Cow;
3333
use std::sync::Arc;
3434
use std::task::{Context, Poll};
3535

36+
use super::AggregateExec;
3637
use crate::filter::batch_filter;
3738
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
39+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
3840
use futures::stream::{Stream, StreamExt};
3941

40-
use super::AggregateExec;
41-
4242
/// stream struct for aggregation without grouping columns
4343
pub(crate) struct AggregateStream {
4444
stream: BoxStream<'static, Result<RecordBatch>>,
@@ -219,13 +219,8 @@ fn aggregate_batch(
219219
None => Cow::Borrowed(&batch),
220220
};
221221

222-
let n_rows = batch.num_rows();
223-
224222
// 1.3
225-
let values = expr
226-
.iter()
227-
.map(|e| e.evaluate(&batch).and_then(|v| v.into_array(n_rows)))
228-
.collect::<Result<Vec<_>>>()?;
223+
let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?;
229224

230225
// 1.4
231226
let size_pre = accum.size();

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
7777

7878
use ahash::RandomState;
7979
use datafusion_physical_expr_common::physical_expr::fmt_sql;
80+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8081
use futures::TryStreamExt;
8182
use parking_lot::Mutex;
8283

@@ -1465,13 +1466,7 @@ async fn collect_left_input(
14651466
BooleanBufferBuilder::new(0)
14661467
};
14671468

1468-
let left_values = on_left
1469-
.iter()
1470-
.map(|c| {
1471-
c.evaluate(&single_batch)?
1472-
.into_array(single_batch.num_rows())
1473-
})
1474-
.collect::<Result<Vec<_>>>()?;
1469+
let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
14751470

14761471
// Compute bounds for dynamic filter if enabled
14771472
let bounds = match bounds_accumulators {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use datafusion_common::{
5151
use datafusion_physical_expr::PhysicalExprRef;
5252

5353
use ahash::RandomState;
54+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
5455
use futures::{ready, Stream, StreamExt};
5556

5657
/// Represents build-side of hash join.
@@ -447,11 +448,7 @@ impl HashJoinStream {
447448
}
448449
Some(Ok(batch)) => {
449450
// Precalculate hash values for fetched batch
450-
let keys_values = self
451-
.on_right
452-
.iter()
453-
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
454-
.collect::<Result<Vec<_>>>()?;
451+
let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;
455452

456453
self.hashes_buffer.clear();
457454
self.hashes_buffer.resize(batch.num_rows(), 0);

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
7878
use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
7979

8080
use ahash::RandomState;
81+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8182
use futures::{ready, Stream, StreamExt};
8283
use parking_lot::Mutex;
8384

@@ -1065,14 +1066,8 @@ fn lookup_join_hashmap(
10651066
hashes_buffer: &mut Vec<u64>,
10661067
deleted_offset: Option<usize>,
10671068
) -> Result<(UInt64Array, UInt32Array)> {
1068-
let keys_values = probe_on
1069-
.iter()
1070-
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1071-
.collect::<Result<Vec<_>>>()?;
1072-
let build_join_values = build_on
1073-
.iter()
1074-
.map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1075-
.collect::<Result<Vec<_>>>()?;
1069+
let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
1070+
let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;
10761071

10771072
hashes_buffer.clear();
10781073
hashes_buffer.resize(probe_batch.num_rows(), 0);

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ use datafusion_physical_expr::{
7575
};
7676

7777
use datafusion_physical_expr_common::datum::compare_op_for_nested;
78+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
7879
use futures::future::{BoxFuture, Shared};
7980
use futures::{ready, FutureExt};
8081
use parking_lot::Mutex;
@@ -1674,10 +1675,7 @@ pub fn update_hash(
16741675
fifo_hashmap: bool,
16751676
) -> Result<()> {
16761677
// evaluate the keys
1677-
let keys_values = on
1678-
.iter()
1679-
.map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
1680-
.collect::<Result<Vec<_>>>()?;
1678+
let keys_values = evaluate_expressions_to_arrays(on, batch)?;
16811679

16821680
// calculate the hash values
16831681
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;

0 commit comments

Comments
 (0)