Skip to content

Commit 01ae92d

Browse files
authored
Merge branch 'main' into fix/sort-merge-reservation-starvation
2 parents 651e9c9 + a5f490e commit 01ae92d

File tree

82 files changed

+2181
-94
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+2181
-94
lines changed

datafusion-cli/src/exec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ pub async fn exec_from_repl(
196196
}
197197
Err(ReadlineError::Interrupted) => {
198198
println!("^C");
199+
rl.helper().unwrap().reset_hint();
199200
continue;
200201
}
201202
Err(ReadlineError::Eof) => {

datafusion-cli/src/helper.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! and auto-completion for file name during creating external table.
2020
2121
use std::borrow::Cow;
22+
use std::cell::Cell;
2223

2324
use crate::highlighter::{Color, NoSyntaxHighlighter, SyntaxHighlighter};
2425

@@ -40,6 +41,10 @@ pub struct CliHelper {
4041
completer: FilenameCompleter,
4142
dialect: Dialect,
4243
highlighter: Box<dyn Highlighter>,
44+
/// Tracks whether to show the default hint. Set to `false` once the user
45+
/// types anything, so the hint doesn't reappear after deleting back to
46+
/// an empty line. Reset to `true` when the line is submitted.
47+
show_hint: Cell<bool>,
4348
}
4449

4550
impl CliHelper {
@@ -53,6 +58,7 @@ impl CliHelper {
5358
completer: FilenameCompleter::new(),
5459
dialect: *dialect,
5560
highlighter,
61+
show_hint: Cell::new(true),
5662
}
5763
}
5864

@@ -62,6 +68,11 @@ impl CliHelper {
6268
}
6369
}
6470

71+
/// Re-enable the default hint for the next prompt.
72+
pub fn reset_hint(&self) {
73+
self.show_hint.set(true);
74+
}
75+
6576
fn validate_input(&self, input: &str) -> Result<ValidationResult> {
6677
if let Some(sql) = input.strip_suffix(';') {
6778
let dialect = match dialect_from_str(self.dialect) {
@@ -119,12 +130,11 @@ impl Hinter for CliHelper {
119130
type Hint = String;
120131

121132
fn hint(&self, line: &str, _pos: usize, _ctx: &Context<'_>) -> Option<String> {
122-
if line.trim().is_empty() {
123-
let suggestion = Color::gray(DEFAULT_HINT_SUGGESTION);
124-
Some(suggestion)
125-
} else {
126-
None
133+
if !line.is_empty() {
134+
self.show_hint.set(false);
127135
}
136+
(self.show_hint.get() && line.trim().is_empty())
137+
.then(|| Color::gray(DEFAULT_HINT_SUGGESTION))
128138
}
129139
}
130140

@@ -133,12 +143,9 @@ impl Hinter for CliHelper {
133143
fn is_open_quote_for_location(line: &str, pos: usize) -> bool {
134144
let mut sql = line[..pos].to_string();
135145
sql.push('\'');
136-
if let Ok(stmts) = DFParser::parse_sql(&sql)
137-
&& let Some(Statement::CreateExternalTable(_)) = stmts.back()
138-
{
139-
return true;
140-
}
141-
false
146+
DFParser::parse_sql(&sql).is_ok_and(|stmts| {
147+
matches!(stmts.back(), Some(Statement::CreateExternalTable(_)))
148+
})
142149
}
143150

144151
impl Completer for CliHelper {
@@ -161,7 +168,9 @@ impl Completer for CliHelper {
161168
impl Validator for CliHelper {
162169
fn validate(&self, ctx: &mut ValidationContext<'_>) -> Result<ValidationResult> {
163170
let input = ctx.input().trim_end();
164-
self.validate_input(input)
171+
let result = self.validate_input(input);
172+
self.reset_hint();
173+
result
165174
}
166175
}
167176

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use async_trait::async_trait;
2727
use datafusion::arrow::array::{UInt8Builder, UInt64Builder};
2828
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2929
use datafusion::arrow::record_batch::RecordBatch;
30+
use datafusion::common::tree_node::TreeNodeRecursion;
3031
use datafusion::datasource::{TableProvider, TableType, provider_as_source};
3132
use datafusion::error::Result;
3233
use datafusion::execution::context::TaskContext;
@@ -283,4 +284,20 @@ impl ExecutionPlan for CustomExec {
283284
None,
284285
)?))
285286
}
287+
288+
fn apply_expressions(
289+
&self,
290+
f: &mut dyn FnMut(
291+
&dyn datafusion::physical_plan::PhysicalExpr,
292+
) -> Result<TreeNodeRecursion>,
293+
) -> Result<TreeNodeRecursion> {
294+
// Visit expressions in the output ordering from equivalence properties
295+
let mut tnr = TreeNodeRecursion::Continue;
296+
if let Some(ordering) = self.cache.output_ordering() {
297+
for sort_expr in ordering {
298+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
299+
}
300+
}
301+
Ok(tnr)
302+
}
286303
}

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
use arrow::record_batch::RecordBatch;
3030
use arrow_schema::SchemaRef;
3131
use datafusion::common::record_batch;
32+
use datafusion::common::tree_node::TreeNodeRecursion;
3233
use datafusion::common::{exec_datafusion_err, internal_err};
3334
use datafusion::datasource::{DefaultTableSource, memory::MemTable};
3435
use datafusion::error::Result;
@@ -296,4 +297,20 @@ impl ExecutionPlan for BufferingExecutionPlan {
296297
}),
297298
)))
298299
}
300+
301+
fn apply_expressions(
302+
&self,
303+
f: &mut dyn FnMut(
304+
&dyn datafusion::physical_plan::PhysicalExpr,
305+
) -> Result<TreeNodeRecursion>,
306+
) -> Result<TreeNodeRecursion> {
307+
// Visit expressions in the output ordering from equivalence properties
308+
let mut tnr = TreeNodeRecursion::Continue;
309+
if let Some(ordering) = self.properties.output_ordering() {
310+
for sort_expr in ordering {
311+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
312+
}
313+
}
314+
Ok(tnr)
315+
}
299316
}

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use std::sync::Arc;
3838

3939
use datafusion::common::Result;
4040
use datafusion::common::internal_err;
41+
use datafusion::common::tree_node::TreeNodeRecursion;
4142
use datafusion::execution::TaskContext;
4243
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
4344
use datafusion::prelude::SessionContext;
@@ -128,6 +129,15 @@ impl ExecutionPlan for ParentExec {
128129
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
129130
unreachable!()
130131
}
132+
133+
fn apply_expressions(
134+
&self,
135+
_f: &mut dyn FnMut(
136+
&dyn datafusion::physical_plan::PhysicalExpr,
137+
) -> Result<TreeNodeRecursion>,
138+
) -> Result<TreeNodeRecursion> {
139+
Ok(TreeNodeRecursion::Continue)
140+
}
131141
}
132142

133143
/// A PhysicalExtensionCodec that can serialize and deserialize ParentExec
@@ -204,6 +214,15 @@ impl ExecutionPlan for ChildExec {
204214
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
205215
unreachable!()
206216
}
217+
218+
fn apply_expressions(
219+
&self,
220+
_f: &mut dyn FnMut(
221+
&dyn datafusion::physical_plan::PhysicalExpr,
222+
) -> Result<TreeNodeRecursion>,
223+
) -> Result<TreeNodeRecursion> {
224+
Ok(TreeNodeRecursion::Continue)
225+
}
207226
}
208227

209228
/// A PhysicalExtensionCodec that can serialize and deserialize ChildExec

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use datafusion::{
117117
};
118118
use datafusion_common::{
119119
DFSchemaRef, DataFusionError, Result, Statistics, internal_err, not_impl_err,
120-
plan_datafusion_err, plan_err,
120+
plan_datafusion_err, plan_err, tree_node::TreeNodeRecursion,
121121
};
122122
use datafusion_expr::{
123123
UserDefinedLogicalNode, UserDefinedLogicalNodeCore,
@@ -743,6 +743,22 @@ impl ExecutionPlan for SampleExec {
743743

744744
Ok(stats)
745745
}
746+
747+
fn apply_expressions(
748+
&self,
749+
f: &mut dyn FnMut(
750+
&dyn datafusion::physical_plan::PhysicalExpr,
751+
) -> Result<TreeNodeRecursion>,
752+
) -> Result<TreeNodeRecursion> {
753+
// Visit expressions in the output ordering from equivalence properties
754+
let mut tnr = TreeNodeRecursion::Continue;
755+
if let Some(ordering) = self.cache.output_ordering() {
756+
for sort_expr in ordering {
757+
tnr = tnr.visit_sibling(|| f(sort_expr.expr.as_ref()))?;
758+
}
759+
}
760+
Ok(tnr)
761+
}
746762
}
747763

748764
/// Bernoulli sampler: includes each row with probability `(upper - lower)`.

datafusion/catalog/src/memory/table.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use arrow::compute::{and, filter_record_batch};
3232
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
3333
use arrow::record_batch::RecordBatch;
3434
use datafusion_common::error::Result;
35+
use datafusion_common::tree_node::TreeNodeRecursion;
3536
use datafusion_common::{Constraints, DFSchema, SchemaExt, not_impl_err, plan_err};
3637
use datafusion_common_runtime::JoinSet;
3738
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
@@ -46,7 +47,7 @@ use datafusion_physical_plan::repartition::RepartitionExec;
4647
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
4748
use datafusion_physical_plan::{
4849
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
49-
PlanProperties, common,
50+
PhysicalExpr, PlanProperties, common,
5051
};
5152
use datafusion_session::Session;
5253

@@ -400,10 +401,7 @@ impl TableProvider for MemTable {
400401
let df_schema = DFSchema::try_from(Arc::clone(&self.schema))?;
401402

402403
// Create physical expressions for assignments upfront (outside batch loop)
403-
let physical_assignments: HashMap<
404-
String,
405-
Arc<dyn datafusion_physical_plan::PhysicalExpr>,
406-
> = assignments
404+
let physical_assignments: HashMap<String, Arc<dyn PhysicalExpr>> = assignments
407405
.iter()
408406
.map(|(name, expr)| {
409407
let physical_expr =
@@ -638,4 +636,11 @@ impl ExecutionPlan for DmlResultExec {
638636
stream,
639637
)))
640638
}
639+
640+
fn apply_expressions(
641+
&self,
642+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
643+
) -> Result<TreeNodeRecursion> {
644+
Ok(TreeNodeRecursion::Continue)
645+
}
641646
}

datafusion/common/src/join_type.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,35 @@ impl JoinType {
9797
}
9898
}
9999

100+
/// Whether each side of the join is preserved for ON-clause filter pushdown.
101+
///
102+
/// It is only correct to push ON-clause filters below a join for preserved
103+
/// inputs.
104+
///
105+
/// # "Preserved" input definition
106+
///
107+
/// A join side is preserved if the join returns all or a subset of the rows
108+
/// from that side, such that each output row directly maps to an input row.
109+
/// If a side is not preserved, the join can produce extra null rows that
110+
/// don't map to any input row.
111+
///
112+
/// # Return Value
113+
///
114+
/// A tuple of booleans - (left_preserved, right_preserved).
115+
pub fn on_lr_is_preserved(&self) -> (bool, bool) {
116+
match self {
117+
JoinType::Inner => (true, true),
118+
JoinType::Left => (false, true),
119+
JoinType::Right => (true, false),
120+
JoinType::Full => (false, false),
121+
JoinType::LeftSemi | JoinType::RightSemi => (true, true),
122+
JoinType::LeftAnti => (false, true),
123+
JoinType::RightAnti => (true, false),
124+
JoinType::LeftMark => (false, true),
125+
JoinType::RightMark => (true, false),
126+
}
127+
}
128+
100129
/// Does the join type support swapping inputs?
101130
pub fn supports_swap(&self) -> bool {
102131
matches!(

0 commit comments

Comments
 (0)