Skip to content

Commit d399f52

Browse files
LLDayaskalt
authored andcommitted
feat: add ValuesSource datasource
This commit introduces `ValuesSource`, a new data source specifically designed to handle constant values that may contain placeholders. It replaces the previous `MemorySourceConfig::try_new_as_values` implementation, providing the necessary infrastructure to resolve physical placeholders during execution while maintaining performance for constant-only data by falling back to the default memory source.
1 parent 7c34911 commit d399f52

10 files changed

Lines changed: 896 additions & 172 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use datafusion_common::{
7777
};
7878
use datafusion_datasource::file_groups::FileGroup;
7979
use datafusion_datasource::memory::MemorySourceConfig;
80+
use datafusion_datasource::values::ValuesSource;
8081
use datafusion_expr::dml::{CopyTo, InsertOp};
8182
use datafusion_expr::expr::{
8283
AggregateFunction, AggregateFunctionParams, Alias, GroupingSet, NullTreatment,
@@ -607,8 +608,7 @@ impl DefaultPhysicalPlanner {
607608
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()
608609
})
609610
.collect::<Result<Vec<_>>>()?;
610-
MemorySourceConfig::try_new_as_values(Arc::clone(schema.inner()), exprs)?
611-
as _
611+
ValuesSource::try_new_exec(Arc::clone(schema.inner()), exprs)?
612612
}
613613
LogicalPlan::EmptyRelation(EmptyRelation {
614614
produce_one_row: false,

datafusion/datasource/src/memory.rs

Lines changed: 13 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,22 @@ use std::sync::Arc;
2727
use crate::sink::DataSink;
2828
use crate::source::{DataSource, DataSourceExec};
2929

30-
use arrow::array::{RecordBatch, RecordBatchOptions};
31-
use arrow::datatypes::{Schema, SchemaRef};
30+
use arrow::array::RecordBatch;
31+
use arrow::datatypes::SchemaRef;
3232
use datafusion_common::tree_node::TreeNodeRecursion;
33-
use datafusion_common::{
34-
Result, ScalarValue, assert_or_internal_err, plan_err, project_schema,
35-
};
33+
use datafusion_common::{Result, assert_or_internal_err, plan_err, project_schema};
3634
use datafusion_execution::TaskContext;
3735
use datafusion_physical_expr::equivalence::project_orderings;
3836
use datafusion_physical_expr::projection::ProjectionExprs;
3937
use datafusion_physical_expr::utils::collect_columns;
40-
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
38+
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
4139
use datafusion_physical_plan::memory::MemoryStream;
4240
use datafusion_physical_plan::projection::{
4341
all_alias_free_columns, new_projections_for_columns,
4442
};
4543
use datafusion_physical_plan::{
46-
ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
47-
SendableRecordBatchStream, Statistics, common,
44+
DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream, Statistics,
45+
common,
4846
};
4947

5048
use async_trait::async_trait;
@@ -300,61 +298,6 @@ impl MemorySourceConfig {
300298
Ok(DataSourceExec::from_data_source(source))
301299
}
302300

303-
/// Create a new execution plan from a list of constant values (`ValuesExec`)
304-
#[expect(clippy::needless_pass_by_value)]
305-
pub fn try_new_as_values(
306-
schema: SchemaRef,
307-
data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
308-
) -> Result<Arc<DataSourceExec>> {
309-
if data.is_empty() {
310-
return plan_err!("Values list cannot be empty");
311-
}
312-
313-
let n_row = data.len();
314-
let n_col = schema.fields().len();
315-
316-
// We have this single row batch as a placeholder to satisfy evaluation argument
317-
// and generate a single output row
318-
let placeholder_schema = Arc::new(Schema::empty());
319-
let placeholder_batch = RecordBatch::try_new_with_options(
320-
Arc::clone(&placeholder_schema),
321-
vec![],
322-
&RecordBatchOptions::new().with_row_count(Some(1)),
323-
)?;
324-
325-
// Evaluate each column
326-
let arrays = (0..n_col)
327-
.map(|j| {
328-
(0..n_row)
329-
.map(|i| {
330-
let expr = &data[i][j];
331-
let result = expr.evaluate(&placeholder_batch)?;
332-
333-
match result {
334-
ColumnarValue::Scalar(scalar) => Ok(scalar),
335-
ColumnarValue::Array(array) if array.len() == 1 => {
336-
ScalarValue::try_from_array(&array, 0)
337-
}
338-
ColumnarValue::Array(_) => {
339-
plan_err!("Cannot have array values in a values list")
340-
}
341-
}
342-
})
343-
.collect::<Result<Vec<_>>>()
344-
.and_then(ScalarValue::iter_to_array)
345-
})
346-
.collect::<Result<Vec<_>>>()?;
347-
348-
let batch = RecordBatch::try_new_with_options(
349-
Arc::clone(&schema),
350-
arrays,
351-
&RecordBatchOptions::new().with_row_count(Some(n_row)),
352-
)?;
353-
354-
let partitions = vec![batch];
355-
Self::try_new_from_batches(Arc::clone(&schema), partitions)
356-
}
357-
358301
/// Create a new plan using the provided schema and batches.
359302
///
360303
/// Errors if any of the batches don't match the provided schema, or if no
@@ -860,12 +803,13 @@ mod memory_source_tests {
860803
mod tests {
861804
use super::*;
862805
use crate::test_util::col;
863-
use crate::tests::{aggr_test_schema, make_partition};
806+
use crate::tests::make_partition;
807+
use crate::values::ValuesSource;
864808

865809
use arrow::array::{ArrayRef, Int32Array, Int64Array, StringArray};
866-
use arrow::datatypes::{DataType, Field};
867-
use datafusion_common::assert_batches_eq;
810+
use arrow::datatypes::{DataType, Field, Schema};
868811
use datafusion_common::stats::{ColumnStatistics, Precision};
812+
use datafusion_common::{ScalarValue, assert_batches_eq};
869813
use datafusion_physical_expr::PhysicalSortExpr;
870814
use datafusion_physical_plan::expressions::lit;
871815

@@ -898,14 +842,6 @@ mod tests {
898842
Ok(())
899843
}
900844

901-
#[tokio::test]
902-
async fn values_empty_case() -> Result<()> {
903-
let schema = aggr_test_schema();
904-
let empty = MemorySourceConfig::try_new_as_values(schema, vec![]);
905-
assert!(empty.is_err());
906-
Ok(())
907-
}
908-
909845
#[test]
910846
fn new_exec_with_batches() {
911847
let batch = make_partition(7);
@@ -934,27 +870,6 @@ mod tests {
934870
.unwrap_err();
935871
}
936872

937-
// Test issue: https://github.com/apache/datafusion/issues/8763
938-
#[test]
939-
fn new_exec_with_non_nullable_schema() {
940-
let schema = Arc::new(Schema::new(vec![Field::new(
941-
"col0",
942-
DataType::UInt32,
943-
false,
944-
)]));
945-
let _ = MemorySourceConfig::try_new_as_values(
946-
Arc::clone(&schema),
947-
vec![vec![lit(1u32)]],
948-
)
949-
.unwrap();
950-
// Test that a null value is rejected
951-
let _ = MemorySourceConfig::try_new_as_values(
952-
schema,
953-
vec![vec![lit(ScalarValue::UInt32(None))]],
954-
)
955-
.unwrap_err();
956-
}
957-
958873
#[test]
959874
fn values_stats_with_nulls_only() -> Result<()> {
960875
let data = vec![
@@ -965,7 +880,9 @@ mod tests {
965880
let rows = data.len();
966881
let schema =
967882
Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)]));
968-
let values = MemorySourceConfig::try_new_as_values(schema, data)?;
883+
884+
let values = ValuesSource::try_new_exec(schema, data)?;
885+
assert!(values.data_source().as_any().is::<MemorySourceConfig>());
969886

970887
assert_eq!(
971888
*values.partition_statistics(None)?,

datafusion/datasource/src/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod sink;
4444
pub mod source;
4545
mod statistics;
4646
pub mod table_schema;
47+
pub mod values;
4748

4849
#[cfg(test)]
4950
pub mod test_util;

0 commit comments

Comments
 (0)