Skip to content

Commit e353eb0

Browse files
timsauceralamb
andauthored
Project sort expressions in StreamingTable (#19719)
## Which issue does this PR close? - Closes #19717 ## Rationale for this change If we have a `StreamingTable` that has both physical sort expressions and projection, we will get errors when trying to execute the plan if the sort expressions are not included in the projection. ## What changes are included in this PR? When we have both projection and physical sort expressions, project the schema and the sort expressions. ## Are these changes tested? Unit test added. ## Are there any user-facing changes? No --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 3d90d4b commit e353eb0

File tree

2 files changed

+67
-9
lines changed

2 files changed

+67
-9
lines changed

datafusion/catalog/src/streaming.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020
use std::any::Any;
2121
use std::sync::Arc;
2222

23-
use crate::Session;
24-
use crate::TableProvider;
25-
2623
use arrow::datatypes::SchemaRef;
24+
use async_trait::async_trait;
2725
use datafusion_common::{DFSchema, Result, plan_err};
2826
use datafusion_expr::{Expr, SortExpr, TableType};
27+
use datafusion_physical_expr::equivalence::project_ordering;
2928
use datafusion_physical_expr::{LexOrdering, create_physical_sort_exprs};
3029
use datafusion_physical_plan::ExecutionPlan;
3130
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
32-
33-
use async_trait::async_trait;
3431
use log::debug;
3532

33+
use crate::{Session, TableProvider};
34+
3635
/// A [`TableProvider`] that streams a set of [`PartitionStream`]
3736
#[derive(Debug)]
3837
pub struct StreamingTable {
@@ -105,7 +104,22 @@ impl TableProvider for StreamingTable {
105104
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
106105
let eqp = state.execution_props();
107106

108-
create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?
107+
let original_sort_exprs =
108+
create_physical_sort_exprs(&self.sort_order, &df_schema, eqp)?;
109+
110+
if let Some(p) = projection {
111+
// When performing a projection, the output columns will not match
112+
// the original physical sort expression indices. Also the sort columns
113+
// may not be in the output projection. To correct for these issues
114+
// we need to project the ordering based on the output schema.
115+
let schema = Arc::new(self.schema.project(p)?);
116+
LexOrdering::new(original_sort_exprs)
117+
.and_then(|lex_ordering| project_ordering(&lex_ordering, &schema))
118+
.map(|lex_ordering| lex_ordering.to_vec())
119+
.unwrap_or_default()
120+
} else {
121+
original_sort_exprs
122+
}
109123
} else {
110124
vec![]
111125
};

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ use crate::physical_optimizer::test_utils::{
2929
spr_repartition_exec, stream_exec_ordered, union_exec,
3030
};
3131

32-
use arrow::compute::SortOptions;
32+
use arrow::compute::{SortOptions};
3333
use arrow::datatypes::{DataType, SchemaRef};
3434
use datafusion_common::config::{ConfigOptions, CsvOptions};
3535
use datafusion_common::tree_node::{TreeNode, TransformedResult};
36-
use datafusion_common::{Result, TableReference};
36+
use datafusion_common::{create_array, Result, TableReference};
3737
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3838
use datafusion_datasource::source::DataSourceExec;
3939
use datafusion_expr_common::operator::Operator;
@@ -58,7 +58,7 @@ use datafusion_physical_optimizer::enforce_distribution::EnforceDistribution;
5858
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
5959
use datafusion_physical_optimizer::PhysicalOptimizerRule;
6060
use datafusion::prelude::*;
61-
use arrow::array::{Int32Array, RecordBatch};
61+
use arrow::array::{record_batch, ArrayRef, Int32Array, RecordBatch};
6262
use arrow::datatypes::{Field};
6363
use arrow_schema::Schema;
6464
use datafusion_execution::TaskContext;
@@ -2805,3 +2805,47 @@ async fn test_partial_sort_with_homogeneous_batches() -> Result<()> {
28052805

28062806
Ok(())
28072807
}
2808+
2809+
#[tokio::test]
2810+
async fn test_sort_with_streaming_table() -> Result<()> {
2811+
let batch = record_batch!(("a", Int32, [1, 2, 3]), ("b", Int32, [1, 2, 3]))?;
2812+
2813+
let ctx = SessionContext::new();
2814+
2815+
let sort_order = vec![
2816+
SortExpr::new(
2817+
Expr::Column(datafusion_common::Column::new(
2818+
Option::<TableReference>::None,
2819+
"a",
2820+
)),
2821+
true,
2822+
false,
2823+
),
2824+
SortExpr::new(
2825+
Expr::Column(datafusion_common::Column::new(
2826+
Option::<TableReference>::None,
2827+
"b",
2828+
)),
2829+
true,
2830+
false,
2831+
),
2832+
];
2833+
let schema = batch.schema();
2834+
let batches = Arc::new(DummyStreamPartition {
2835+
schema: schema.clone(),
2836+
batches: vec![batch],
2837+
}) as _;
2838+
let provider = StreamingTable::try_new(schema.clone(), vec![batches])?
2839+
.with_sort_order(sort_order);
2840+
ctx.register_table("test_table", Arc::new(provider))?;
2841+
2842+
let sql = "SELECT a FROM test_table GROUP BY a ORDER BY a";
2843+
let results = ctx.sql(sql).await?.collect().await?;
2844+
2845+
assert_eq!(results.len(), 1);
2846+
assert_eq!(results[0].num_columns(), 1);
2847+
let expected = create_array!(Int32, vec![1, 2, 3]) as ArrayRef;
2848+
assert_eq!(results[0].column(0), &expected);
2849+
2850+
Ok(())
2851+
}

0 commit comments

Comments
 (0)