diff --git a/.github/workflows/audit.yml b/.github/workflows/audit.yml index 0610d4d37f0ae..2f12a8b5d2209 100644 --- a/.github/workflows/audit.yml +++ b/.github/workflows/audit.yml @@ -44,4 +44,4 @@ jobs: - name: Run audit check # Ignored until https://github.com/apache/datafusion/issues/15571 # ignored py03 warning until arrow 55 upgrade - run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 + run: cargo audit --ignore RUSTSEC-2024-0370 --ignore RUSTSEC-2025-0020 --ignore RUSTSEC-2025-0047 diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index cdd8e72a06cc9..109ed8e4c4464 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -539,7 +539,7 @@ config_namespace! { /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, /// and `Binary/BinaryLarge` with `BinaryView`. - pub schema_force_view_types: bool, default = true + pub schema_force_view_types: bool, default = false /// (reading) If true, parquet reader will read columns of /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. @@ -2521,6 +2521,10 @@ config_namespace! { // The input regex for Nulls when loading CSVs. pub null_regex: Option, default = None pub comment: Option, default = None + // Whether to allow truncated rows when parsing. + // By default this is set to false and will error if the CSV rows have different lengths. + // When set to true then it will allow records with less than the expected number of columns + pub truncated_rows: Option, default = None } } @@ -2613,6 +2617,15 @@ impl CsvOptions { self } + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn with_truncated_rows(mut self, allow: bool) -> Self { + self.truncated_rows = Some(allow); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.delimiter diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 23ba9e6ec8736..6c4897f711c5c 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -48,7 +48,7 @@ mod tests { use datafusion_physical_plan::{collect, ExecutionPlan}; use arrow::array::{ - BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, + Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, }; use arrow::compute::concat_batches; use arrow::csv::ReaderBuilder; @@ -1256,4 +1256,181 @@ mod tests { .build_decoder(); DecoderDeserializer::new(CsvDecoder::new(decoder)) } + + fn csv_deserializer_with_truncated( + batch_size: usize, + schema: &Arc, + ) -> impl BatchDeserializer { + // using Arrow's ReaderBuilder and enabling truncated_rows + let decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(batch_size) + .with_truncated_rows(true) // <- enable runtime truncated_rows + .build_decoder(); + DecoderDeserializer::new(CsvDecoder::new(decoder)) + } + + #[tokio::test] + async fn infer_schema_with_truncated_rows_true() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 3 columns, but first data row has only 2 columns, second row has 3 + let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // Construct CsvFormat and enable truncated_rows via CsvOptions + let csv_options = CsvOptions::default().with_truncated_rows(true); + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_options(csv_options) + .with_schema_infer_max_rec(10); + + let inferred_schema = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await?; + + // header has 3 columns; inferred schema should also have 3 + assert_eq!(inferred_schema.fields().len(), 3); + + // inferred columns should be nullable + for f in inferred_schema.fields() { + assert!(f.is_nullable()); + } + + Ok(()) + } + #[test] + fn test_decoder_truncated_rows_runtime() -> Result<()> { + // Synchronous test: Decoder API used here is synchronous + let schema = csv_schema(); // helper already defined in file + + // Construct a decoder that enables truncated_rows at runtime + let mut deserializer = csv_deserializer_with_truncated(10, &schema); + + // Provide two rows: first row complete, second row missing last column + let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n"); + deserializer.digest(input); + + // Finish and collect output + deserializer.finish(); + + let output = deserializer.next()?; + match output { + DeserializerOutput::RecordBatch(batch) => { + // ensure at least two rows present + assert!(batch.num_rows() >= 2); + // column 4 (index 3) should be a StringArray where second row is NULL + let col4 = batch + .column(3) + .as_any() + .downcast_ref::() + .expect("column 4 should be StringArray"); + + // first row present, second row should be null + assert!(!col4.is_null(0)); + assert!(col4.is_null(1)); + } + other => panic!("expected RecordBatch but got {other:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn infer_schema_truncated_rows_false_error() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 4 cols, first data row has 3 cols -> truncated at end + let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // CsvFormat without enabling truncated_rows (default behavior = false) + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_schema_infer_max_rec(10); + + let res = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await; + + // Expect an error due to unequal lengths / incorrect number of fields + assert!( + res.is_err(), + "expected infer_schema to error on truncated rows when disabled" + ); + + // Optional: check message contains indicative text (two known possibilities) + if let Err(err) = res { + let msg = format!("{err}"); + assert!( + msg.contains("Encountered unequal lengths") + || msg.contains("incorrect number of fields"), + "unexpected error message: {msg}", + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> { + use std::io::Write; + + // create a SessionContext + let ctx = SessionContext::new(); + + // Create a temp file with a .csv suffix so the reader accepts it + let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv + // CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete. + write!(tmp, "a,b,c\n1,2\n3,4,5\n")?; + let path = tmp.path().to_str().unwrap().to_string(); + + // Build CsvReadOptions: header present, enable truncated_rows. + // (Use the exact builder method your crate exposes: `truncated_rows(true)` here, + // if the method name differs in your codebase use the appropriate one.) + let options = CsvReadOptions::default().truncated_rows(true); + + println!("options: {}, path: {path}", options.truncated_rows); + + // Call the API under test + let df = ctx.read_csv(&path, options).await?; + + // Collect the results and combine batches so we can inspect columns + let batches = df.collect().await?; + let combined = concat_batches(&batches[0].schema(), &batches)?; + + // Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL. + let col_c = combined.column(2); + assert!( + col_c.is_null(0), + "expected first row column 'c' to be NULL due to truncated row" + ); + + // Also ensure we read at least one row + assert!(combined.num_rows() >= 2); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 02b792823a827..8c1bb02ef0737 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> { pub file_sort_order: Vec>, /// Optional regex to match null values pub null_regex: Option, + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub truncated_rows: bool, } impl Default for CsvReadOptions<'_> { @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> { file_sort_order: vec![], comment: None, null_regex: None, + truncated_rows: false, } } @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> { self.null_regex = null_regex; self } + + /// Configure whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { + self.truncated_rows = truncated_rows; + self + } } /// Options that control the reading of Parquet files. @@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_null_regex(self.null_regex.clone()); + .with_null_regex(self.null_regex.clone()) + .with_truncated_rows(self.truncated_rows); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 088c4408fff57..8200c0627b8c0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -581,11 +581,11 @@ mod tests { assert_eq!(string_truncation_stats.null_count, Precision::Exact(2)); assert_eq!( string_truncation_stats.max_value, - Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c"))) + Precision::Inexact(Utf8(Some("b".repeat(63) + "c"))) ); assert_eq!( string_truncation_stats.min_value, - Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64)))) + Precision::Inexact(Utf8(Some("a".repeat(64)))) ); Ok(()) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index a7b3bdeeace84..a45461967394f 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -67,6 +67,9 @@ use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_physical_plan::ExecutionPlan; use datafusion_session::Session; use datafusion_sql::parser::{DFParserBuilder, Statement}; @@ -647,9 +650,12 @@ impl SessionState { logical_plan: &LogicalPlan, ) -> datafusion_common::Result> { let logical_plan = self.optimize(logical_plan)?; - self.query_planner + let physical_plan = self + .query_planner .create_physical_plan(&logical_plan, self) - .await + .await?; + let mut id_annotator = NodeIdAnnotator::new(); + annotate_node_id_for_execution_plan(&physical_plan, &mut id_annotator) } /// Create a [`PhysicalExpr`] from an [`Expr`] after applying type diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index 27bee10234b57..6a99c1f8d61fd 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -165,7 +165,9 @@ async fn page_index_filter_one_col() { // 5.create filter date_string_col == "01/01/09"`; // Note this test doesn't apply type coercion so the literal must match the actual view type - let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8view("01/01/09"))); + // xudong: use new_utf8, because schema_force_view_types was changed to false now. + // qi: when schema_force_view_types setting to true, we should change back to utf8view + let filter = col("date_string_col").eq(lit(ScalarValue::new_utf8("01/01/09"))); let batches = get_filter_results(&state, filter.clone(), false).await; assert_eq!(batches[0].num_rows(), 14); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e0826c90dd8d2..6773e1e47ad53 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -3615,10 +3615,11 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { ); // Apply the function - let result = replace_order_preserving_variants(dist_context)?; + let result = replace_order_preserving_variants(dist_context, false)?; // Verify the plan was transformed to CoalescePartitionsExec result + .0 .plan .as_any() .downcast_ref::() @@ -3626,7 +3627,7 @@ fn test_replace_order_preserving_variants_with_fetch() -> Result<()> { // Verify fetch was preserved assert_eq!( - result.plan.fetch(), + result.0.plan.fetch(), Some(5), "Fetch value was not preserved after transformation" ); diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 4eeb431584ba7..e09ac3af7c661 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -222,6 +222,11 @@ impl CsvFormat { self } + pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self { + self.options.truncated_rows = Some(truncated_rows); + self + } + /// Set the regex to use for null values in the CSV reader. /// - default to treat empty values as null. pub fn with_null_regex(mut self, null_regex: Option) -> Self { @@ -291,6 +296,13 @@ impl CsvFormat { self } + /// Set whether rows should be truncated to the column width + /// - defaults to false + pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self { + self.options.truncated_rows = Some(truncate_rows); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.options.delimiter @@ -426,11 +438,13 @@ impl FileFormat for CsvFormat { .with_file_compression_type(self.options.compression.into()) .with_newlines_in_values(newlines_in_values); + let truncated_rows = self.options.truncated_rows.unwrap_or(false); let source = Arc::new( CsvSource::new(has_header, self.options.delimiter, self.options.quote) .with_escape(self.options.escape) .with_terminator(self.options.terminator) - .with_comment(self.options.comment), + .with_comment(self.options.comment) + .with_truncate_rows(truncated_rows), ); let config = conf_builder.with_source(source).build(); @@ -509,7 +523,8 @@ impl CsvFormat { .unwrap_or_else(|| state.config_options().catalog.has_header), ) .with_delimiter(self.options.delimiter) - .with_quote(self.options.quote); + .with_quote(self.options.quote) + .with_truncated_rows(self.options.truncated_rows.unwrap_or(false)); if let Some(null_regex) = &self.options.null_regex { let regex = Regex::new(null_regex.as_str()) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 8b95d9ba91ff2..e3c2b398c1b6e 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -94,6 +94,7 @@ pub struct CsvSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, + truncate_rows: bool, } impl CsvSource { @@ -111,6 +112,11 @@ impl CsvSource { pub fn has_header(&self) -> bool { self.has_header } + + // true if rows length support truncate + pub fn truncate_rows(&self) -> bool { + self.truncate_rows + } /// A column delimiter pub fn delimiter(&self) -> u8 { self.delimiter @@ -156,6 +162,13 @@ impl CsvSource { conf.comment = comment; conf } + + /// Whether to support truncate rows when read csv file + pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self { + let mut conf = self.clone(); + conf.truncate_rows = truncate_rows; + conf + } } impl CsvSource { @@ -175,7 +188,8 @@ impl CsvSource { .expect("Batch size must be set before initializing builder"), ) .with_header(self.has_header) - .with_quote(self.quote); + .with_quote(self.quote) + .with_truncated_rows(self.truncate_rows); if let Some(terminator) = self.terminator { builder = builder.with_terminator(terminator); } @@ -340,6 +354,7 @@ impl FileOpener for CsvOpener { let config = CsvSource { has_header: csv_has_header, + truncate_rows: self.config.truncate_rows, ..(*self.config).clone() }; diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 13404bccac610..28f844dae9321 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -249,6 +249,20 @@ impl ExecutionPlan for DataSinkExec { fn metrics(&self) -> Option { self.sink.metrics() } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = DataSinkExec::new( + Arc::clone(self.input()), + Arc::clone(&self.sink), + self.sort_order.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Create a output record batch with a count diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 20d9a1d6e53f0..702253da41d01 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -72,8 +72,8 @@ use datafusion_physical_plan::filter_pushdown::{ /// ```text /// ┌─────────────────────┐ -----► execute path /// │ │ ┄┄┄┄┄► init path -/// │ DataSourceExec │ -/// │ │ +/// │ DataSourceExec │ +/// │ │ /// └───────▲─────────────┘ /// ┊ │ /// ┊ │ @@ -328,6 +328,15 @@ impl ExecutionPlan for DataSourceExec { } } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = DataSourceExec::new(Arc::clone(&self.data_source)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/expr-common/src/accumulator.rs b/datafusion/expr-common/src/accumulator.rs index 2829a9416f033..99c121e3e81e8 100644 --- a/datafusion/expr-common/src/accumulator.rs +++ b/datafusion/expr-common/src/accumulator.rs @@ -311,4 +311,15 @@ pub trait Accumulator: Send + Sync + Debug { fn supports_retract_batch(&self) -> bool { false } + + fn as_serializable(&self) -> Option<&dyn SerializableAccumulator> { + None + } +} + +pub trait SerializableAccumulator: Accumulator { + fn serialize(&self) -> Result>; + fn deserialize(bytes: &[u8]) -> Result> + where + Self: Sized; } diff --git a/datafusion/expr/src/expr_rewriter/mod.rs b/datafusion/expr/src/expr_rewriter/mod.rs index 05a9425452a14..7d1539acf5940 100644 --- a/datafusion/expr/src/expr_rewriter/mod.rs +++ b/datafusion/expr/src/expr_rewriter/mod.rs @@ -148,6 +148,25 @@ pub fn replace_col(expr: Expr, replace_map: &HashMap<&Column, &Column>) -> Resul .data() } +pub fn replace_col_with_expr( + expr: Expr, + replace_map: &HashMap, +) -> Result { + expr.transform(|expr| { + Ok({ + if let Expr::Column(c) = &expr { + match replace_map.get(c) { + Some(new_expr) => Transformed::yes((**new_expr).to_owned()), + None => Transformed::no(expr), + } + } else { + Transformed::no(expr) + } + }) + }) + .data() +} + /// Recursively 'unnormalize' (remove all qualifiers) from an /// expression tree. /// diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index b4ad8387215ec..6b6859987487d 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -79,6 +79,7 @@ pub mod window_state; pub use datafusion_doc::{DocSection, Documentation, DocumentationBuilder}; pub use datafusion_expr_common::accumulator::Accumulator; +pub use datafusion_expr_common::accumulator::SerializableAccumulator; pub use datafusion_expr_common::columnar_value::ColumnarValue; pub use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; pub use datafusion_expr_common::operator::Operator; diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 27c2499c8a26a..352f0c8ce89ee 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -31,7 +31,7 @@ use datafusion_common::{ internal_err, plan_err, qualified_name, Column, DFSchema, Result, }; use datafusion_expr::expr::WindowFunction; -use datafusion_expr::expr_rewriter::replace_col; +use datafusion_expr::expr_rewriter::{replace_col, replace_col_with_expr}; use datafusion_expr::logical_plan::{Join, JoinType, LogicalPlan, TableScan, Union}; use datafusion_expr::utils::{ conjunction, expr_to_columns, split_conjunction, split_conjunction_owned, @@ -41,9 +41,8 @@ use datafusion_expr::{ }; use crate::optimizer::ApplyOrder; -use crate::simplify_expressions::simplify_predicates; use crate::utils::{has_all_column_refs, is_restrict_null_predicate}; -use crate::{OptimizerConfig, OptimizerRule}; +use crate::{simplify_expressions::simplify_predicates, OptimizerConfig, OptimizerRule}; /// Optimizer rule for pushing (moving) filter expressions down in a plan so /// they are applied as early as possible. @@ -799,7 +798,7 @@ impl OptimizerRule for PushDownFilter { // remove duplicated filters let child_predicates = split_conjunction_owned(child_filter.predicate); - let new_predicates = parents_predicates + let mut new_predicates = parents_predicates .into_iter() .chain(child_predicates) // use IndexSet to remove dupes while preserving predicate order @@ -807,6 +806,8 @@ impl OptimizerRule for PushDownFilter { .into_iter() .collect::>(); + new_predicates = infer_predicates_from_equalities(new_predicates)?; + let Some(new_predicate) = conjunction(new_predicates) else { return plan_err!("at least one expression exists"); }; @@ -1420,6 +1421,73 @@ fn contain(e: &Expr, check_map: &HashMap) -> bool { is_contain } +/// Infers new predicates by substituting equalities. +/// For example, with predicates `t2.b = 3` and `t1.b > t2.b`, +/// we can infer `t1.b > 3`. +fn infer_predicates_from_equalities(predicates: Vec) -> Result> { + // Map from column names to their literal values (from equality predicates) + let mut equality_map: HashMap = + HashMap::with_capacity(predicates.len()); + let mut final_predicates = Vec::with_capacity(predicates.len()); + // First pass: collect column=literal equalities + for predicate in predicates.iter() { + if let Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::Eq, + right, + }) = predicate + { + if let Expr::Column(col) = left.as_ref() { + // Only add to map if right side is a literal + if matches!(right.as_ref(), Expr::Literal(_, _)) { + equality_map.insert(col.clone(), *right.clone()); + final_predicates.push(predicate.clone()); + } + } else if let Expr::Column(col) = right.as_ref() { + // Only add to map if left side is a literal + if matches!(left.as_ref(), Expr::Literal(_, _)) { + equality_map.insert(col.clone(), *right.clone()); + final_predicates.push(predicate.clone()); + } + } + } + } + + // If no equality mappings found, nothing to infer + if equality_map.is_empty() { + return Ok(predicates); + } + + // Second pass: apply substitutions to create new predicates + for predicate in predicates { + // Skip equality predicates we already used for mapping + if final_predicates.contains(&predicate) { + continue; + } + + // Try to replace columns with their literal values + let mut columns_in_expr = HashSet::new(); + expr_to_columns(&predicate, &mut columns_in_expr)?; + + // Create a combined replacement map for all columns in this predicate + let replace_map: HashMap<_, _> = columns_in_expr + .into_iter() + .filter_map(|col| equality_map.get(&col).map(|lit| (col, lit))) + .collect(); + + if replace_map.is_empty() { + final_predicates.push(predicate); + continue; + } + // Apply all substitutions at once to get the fully substituted predicate + let new_pred = replace_col_with_expr(predicate, &replace_map)?; + + final_predicates.push(new_pred); + } + + Ok(final_predicates) +} + #[cfg(test)] mod tests { use std::any::Any; diff --git a/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs b/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs index 9d9e840636b8a..32b2315e15d58 100644 --- a/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs +++ b/datafusion/optimizer/src/simplify_expressions/simplify_predicates.rs @@ -204,16 +204,17 @@ fn find_most_restrictive_predicate( if let Some(scalar) = scalar_value { if let Some(current_best) = best_value { - let comparison = scalar.try_cmp(current_best)?; - let is_better = if find_greater { - comparison == std::cmp::Ordering::Greater - } else { - comparison == std::cmp::Ordering::Less - }; - - if is_better { - best_value = Some(scalar); - most_restrictive_idx = idx; + if let Some(comparison) = scalar.partial_cmp(current_best) { + let is_better = if find_greater { + comparison == std::cmp::Ordering::Greater + } else { + comparison == std::cmp::Ordering::Less + }; + + if is_better { + best_value = Some(scalar); + most_restrictive_idx = idx; + } } } else { best_value = Some(scalar); diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index c783862d579da..15bb32c25d26b 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -332,6 +332,7 @@ pub trait AggregateWindowExpr: WindowExpr { return value.to_array_of_size(record_batch.num_rows()); } let order_bys = get_orderby_values(self.order_by_columns(record_batch)?); + let most_recent_row_order_bys = most_recent_row .map(|batch| self.order_by_columns(batch)) .transpose()? diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 898386e2f9880..34c4f52824e8b 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -945,9 +945,12 @@ fn add_hash_on_top( /// /// # Returns /// -/// Updated node with an execution plan, where the desired single distribution -/// requirement is satisfied. -fn add_merge_on_top(input: DistributionContext) -> DistributionContext { +/// Updated node with an execution plan, where desired single +/// distribution is satisfied by adding [`SortPreservingMergeExec`]. +fn add_merge_on_top( + input: DistributionContext, + fetch: &mut Option, +) -> DistributionContext { // Apply only when the partition count is larger than one. if input.plan.output_partitioning().partition_count() > 1 { // When there is an existing ordering, we preserve ordering @@ -957,10 +960,10 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext { // - Usage of order preserving variants is not desirable // (determined by flag `config.optimizer.prefer_existing_sort`) let new_plan = if let Some(req) = input.plan.output_ordering() { - Arc::new(SortPreservingMergeExec::new( - req.clone(), - Arc::clone(&input.plan), - )) as _ + Arc::new( + SortPreservingMergeExec::new(req.clone(), Arc::clone(&input.plan)) + .with_fetch(fetch.take()), + ) as _ } else { // If there is no input order, we can simply coalesce partitions: Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _ @@ -989,20 +992,37 @@ fn add_merge_on_top(input: DistributionContext) -> DistributionContext { /// ```text /// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] fn remove_dist_changing_operators( mut distribution_context: DistributionContext, -) -> Result { +) -> Result<( + DistributionContext, + Option, + Option>, +)> { + let mut fetch = None; + let mut spm: Option> = None; while is_repartition(&distribution_context.plan) || is_coalesce_partitions(&distribution_context.plan) || is_sort_preserving_merge(&distribution_context.plan) { + if is_sort_preserving_merge(&distribution_context.plan) { + if let Some(child_fetch) = distribution_context.plan.fetch() { + if fetch.is_none() { + fetch = Some(child_fetch); + spm = Some(distribution_context.plan); + } else { + fetch = Some(fetch.unwrap().min(child_fetch)); + } + } + } // All of above operators have a single child. First child is only child. // Remove any distribution changing operators at the beginning: distribution_context = distribution_context.children.swap_remove(0); // Note that they will be re-inserted later on if necessary or helpful. } - Ok(distribution_context) + Ok((distribution_context, fetch, spm)) } /// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable. @@ -1023,27 +1043,36 @@ fn remove_dist_changing_operators( /// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] pub fn replace_order_preserving_variants( mut context: DistributionContext, -) -> Result { - context.children = context - .children - .into_iter() - .map(|child| { - if child.data { - replace_order_preserving_variants(child) - } else { - Ok(child) - } - }) - .collect::>>()?; - + ordering_satisfied: bool, +) -> Result<(DistributionContext, Option)> { + let mut children = vec![]; + let mut fetch = None; + for child in context.children.into_iter() { + if child.data { + let (child, inner_fetch) = + replace_order_preserving_variants(child, ordering_satisfied)?; + children.push(child); + fetch = inner_fetch; + } else { + children.push(child); + } + } + context.children = children; if is_sort_preserving_merge(&context.plan) { + // Keep the fetch value of the SortPreservingMerge operator, maybe it will be used later. + let fetch = context.plan.fetch(); let child_plan = Arc::clone(&context.children[0].plan); - context.plan = Arc::new( - CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()), - ); - return Ok(context); + if !ordering_satisfied { + // It's safe to unwrap because `CoalescePartitionsExec` supports `fetch`. + context.plan = + Arc::new(CoalescePartitionsExec::new(child_plan).with_fetch(fetch)); + return Ok((context, None)); + } + context.plan = Arc::new(CoalescePartitionsExec::new(child_plan)); + return Ok((context, fetch)); } else if let Some(repartition) = context.plan.as_any().downcast_ref::() { @@ -1052,11 +1081,11 @@ pub fn replace_order_preserving_variants( Arc::clone(&context.children[0].plan), repartition.partitioning().clone(), )?); - return Ok(context); + return Ok((context, None)); } } - context.update_plan_from_children() + Ok((context.update_plan_from_children()?, fetch)) } /// A struct to keep track of repartition requirements for each child node. @@ -1198,11 +1227,15 @@ pub fn ensure_distribution( unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort; // Remove unnecessary repartition from the physical plan if any - let DistributionContext { - mut plan, - data, - children, - } = remove_dist_changing_operators(dist_context)?; + let ( + DistributionContext { + mut plan, + data, + children, + }, + mut fetch, + spm, + ) = remove_dist_changing_operators(dist_context)?; if let Some(exec) = plan.as_any().downcast_ref::() { if let Some(updated_window) = get_best_fitting_window( @@ -1235,43 +1268,39 @@ pub fn ensure_distribution( plan.maintains_input_order(), repartition_status_flags.into_iter() ) - .map( - |( - mut child, - required_input_ordering, - maintains, - RepartitionRequirementStatus { - requirement, - roundrobin_beneficial, - roundrobin_beneficial_stats, - hash_necessary, - }, - )| { - let add_roundrobin = enable_round_robin - // Operator benefits from partitioning (e.g. filter): - && roundrobin_beneficial - && roundrobin_beneficial_stats - // Unless partitioning increases the partition count, it is not beneficial: - && child.plan.output_partitioning().partition_count() < target_partitions; - - // When `repartition_file_scans` is set, attempt to increase - // parallelism at the source. - // - // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`) - // then no repartitioning will have occurred. As the default implementation returns None, it is only - // specific physical plan nodes, such as certain datasources, which are repartitioned. - if repartition_file_scans && roundrobin_beneficial_stats { - if let Some(new_child) = - child.plan.repartitioned(target_partitions, config)? - { - child.plan = new_child; + .map( + |( + mut child, + required_input_ordering, + maintains, + RepartitionRequirementStatus { + requirement, + roundrobin_beneficial, + roundrobin_beneficial_stats, + hash_necessary, + }, + )| { + let add_roundrobin = enable_round_robin + // Operator benefits from partitioning (e.g. filter): + && roundrobin_beneficial + && roundrobin_beneficial_stats + // Unless partitioning increases the partition count, it is not beneficial: + && child.plan.output_partitioning().partition_count() < target_partitions; + + // When `repartition_file_scans` is set, attempt to increase + // parallelism at the source. + if repartition_file_scans && roundrobin_beneficial_stats { + if let Some(new_child) = + child.plan.repartitioned(target_partitions, config)? + { + child.plan = new_child; + } } - } // Satisfy the distribution requirement if it is unmet. match &requirement { Distribution::SinglePartition => { - child = add_merge_on_top(child); + child = add_merge_on_top(child, &mut fetch); } Distribution::HashPartitioned(exprs) => { if add_roundrobin { @@ -1308,7 +1337,8 @@ pub fn ensure_distribution( if (!ordering_satisfied || !order_preserving_variants_desirable) && child.data { - child = replace_order_preserving_variants(child)?; + let (replaced_child, fetch) = replace_order_preserving_variants(child, ordering_satisfied)?; + child = replaced_child; // If ordering requirements were satisfied before repartitioning, // make sure ordering requirements are still satisfied after. if ordering_satisfied { @@ -1316,10 +1346,7 @@ pub fn ensure_distribution( child = add_sort_above_with_check( child, sort_req, - plan.as_any() - .downcast_ref::() - .map(|output| output.fetch()) - .unwrap_or(None), + fetch, )?; } } @@ -1331,20 +1358,22 @@ pub fn ensure_distribution( // Operator requires specific distribution. Distribution::SinglePartition | Distribution::HashPartitioned(_) => { // Since there is no ordering requirement, preserving ordering is pointless - child = replace_order_preserving_variants(child)?; + child = replace_order_preserving_variants(child, false)?.0; } Distribution::UnspecifiedDistribution => { // Since ordering is lost, trying to preserve ordering is pointless if !maintains || plan.as_any().is::() { - child = replace_order_preserving_variants(child)?; + child = replace_order_preserving_variants(child,false)?.0; } + } - } + }; } + Ok(child) - }, - ) - .collect::>>()?; + }, + ) + .collect::>>()?; let children_plans = children .iter() @@ -1383,9 +1412,20 @@ pub fn ensure_distribution( plan.with_new_children(children_plans)? }; - Ok(Transformed::yes(DistributionContext::new( - plan, data, children, - ))) + let mut optimized_distribution_ctx = + DistributionContext::new(Arc::clone(&plan), data, children); + + // If `fetch` was not consumed, it means that there was `SortPreservingMergeExec` with fetch before + // It was removed by `remove_dist_changing_operators` + // and we need to add it back. + if fetch.is_some() { + // It's safe to unwrap because `spm` is set only if `fetch` is set. + let plan = spm.unwrap().with_fetch(fetch.take()).unwrap(); + optimized_distribution_ctx = + DistributionContext::new(plan, data, vec![optimized_distribution_ctx]); + } + + Ok(Transformed::yes(optimized_distribution_ctx)) } /// Keeps track of distribution changing operators (like `RepartitionExec`, diff --git a/datafusion/physical-plan/src/aggregates/group_values/mod.rs b/datafusion/physical-plan/src/aggregates/group_values/mod.rs index f2f489b7223c3..c64be0de1e83f 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/mod.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/mod.rs @@ -119,7 +119,7 @@ pub trait GroupValues: Send { /// - If group by single column, and type of this column has /// the specific [`GroupValues`] implementation, such implementation /// will be chosen. -/// +/// /// - If group by multiple columns, and all column types have the specific /// `GroupColumn` implementations, `GroupValuesColumn` will be chosen. /// diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 878bccc1d1778..c2fe5b42dfb23 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1025,6 +1025,29 @@ impl ExecutionPlan for AggregateExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::LowerEqual } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = AggregateExec { + mode: self.mode, + group_by: self.group_by.clone(), + aggr_expr: self.aggr_expr.clone(), + filter_expr: self.filter_expr.clone(), + input_order_mode: self.input_order_mode.clone(), + input: Arc::clone(&self.input), + input_schema: Arc::clone(&self.input_schema), + schema: Arc::clone(&self.schema), + cache: self.cache.clone(), + limit: self.limit, + required_input_ordering: self.required_input_ordering.clone(), + metrics: self.metrics.clone(), + }; + + let new_props: PlanProperties = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } fn create_schema( diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index c095afe5e716e..9bb4c573b399a 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -209,6 +209,21 @@ impl ExecutionPlan for AnalyzeExec { futures::stream::once(output), ))) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = AnalyzeExec::new( + self.verbose, + self.show_statistics, + Arc::clone(self.input()), + Arc::clone(&self.schema), + ); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Creates the output of AnalyzeExec as a RecordBatch diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index d98530d28e918..2f598a8ecea88 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -227,7 +227,17 @@ impl ExecutionPlan for CoalesceBatchesExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } - + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + CoalesceBatchesExec::new(Arc::clone(self.input()), self.target_batch_size) + .with_fetch(self.fetch()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 976ff70502b75..685e751832eb4 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -224,6 +224,16 @@ impl ExecutionPlan for CoalescePartitionsExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = CoalescePartitionsExec::new(Arc::clone(self.input())); + new_plan.fetch = self.fetch; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap `projection` with its input, which is known to be a /// [`CoalescePartitionsExec`]. If possible, performs the swap and returns diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 1cad0ee85c0da..816da7cf8eb09 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -389,6 +389,12 @@ impl ExecutionPlanVisitor for IndentVisitor<'_, '_> { fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { write!(self.f, "{:indent$}", "", indent = self.indent * 2)?; plan.fmt_as(self.t, self.f)?; + + // MAX: disable this for now since we don't need it displayed + it fails many DF tests + //if let Some(node_id) = plan.properties().node_id() { + // write!(self.f, ", node_id={}", node_id)?; + //} + match self.show_metrics { ShowMetrics::None => {} ShowMetrics::Aggregated => { diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index 40b4ec61dc102..acd00ac3fa255 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -175,6 +175,16 @@ impl ExecutionPlan for EmptyExec { None, )) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = EmptyExec::new(Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index e708bbd3ec865..6bf613f9aaf88 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -511,6 +511,14 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown } + /// If supported, returns a copy of this `ExecutionPlan` node with the specified + /// node_id. Returns `None` otherwise. + fn with_node_id( + self: Arc, + _node_id: usize, + ) -> Result>> { + Ok(None) + } /// Attempts to push down the given projection into the input of this `ExecutionPlan`. /// @@ -733,6 +741,11 @@ pub trait ExecutionPlanProperties { /// /// [`FilterExec`]: crate::filter::FilterExec fn equivalence_properties(&self) -> &EquivalenceProperties; + + // Node Id of this ExecutionPlan node. See also [`ExecutionPlan::with_node_id`] + fn node_id(&self) -> Option { + None + } } impl ExecutionPlanProperties for Arc { @@ -755,6 +768,10 @@ impl ExecutionPlanProperties for Arc { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } impl ExecutionPlanProperties for &dyn ExecutionPlan { @@ -777,6 +794,10 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan { fn equivalence_properties(&self) -> &EquivalenceProperties { self.properties().equivalence_properties() } + + fn node_id(&self) -> Option { + self.properties().node_id() + } } /// Represents whether a stream of data **generated** by an operator is bounded (finite) @@ -980,6 +1001,8 @@ pub struct PlanProperties { pub scheduling_type: SchedulingType, /// See [ExecutionPlanProperties::output_ordering] output_ordering: Option, + /// See [ExecutionPlanProperties::node_id] + node_id: Option, } impl PlanProperties { @@ -1000,6 +1023,7 @@ impl PlanProperties { evaluation_type: EvaluationType::Lazy, scheduling_type: SchedulingType::NonCooperative, output_ordering, + node_id: None, } } @@ -1018,6 +1042,12 @@ impl PlanProperties { self } + /// Overwrite node id with its new value. + pub fn with_node_id(mut self, node_id: usize) -> Self { + self.node_id = Some(node_id); + self + } + /// Overwrite boundedness with its new value. pub fn with_boundedness(mut self, boundedness: Boundedness) -> Self { self.boundedness = boundedness; @@ -1064,6 +1094,10 @@ impl PlanProperties { self.output_ordering.as_ref() } + pub fn node_id(&self) -> Option { + self.node_id + } + /// Get schema of the node. pub(crate) fn schema(&self) -> &SchemaRef { self.eq_properties.schema() diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0a811a8826818..fb930e6a32d67 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -419,6 +419,18 @@ impl ExecutionPlan for FilterExec { CardinalityEffect::LowerEqual } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + FilterExec::try_new(Arc::clone(&self.predicate), Arc::clone(self.input()))? + .with_projection(self.projection.clone())?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to swap `projection` with its input (`filter`). If possible, performs /// the swap and returns [`FilterExec`] as the top plan. Otherwise, returns `None`. fn try_swapping_with_projection( diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 864def5e916fa..71d2fcbaa39b4 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -366,6 +366,16 @@ impl ExecutionPlan for CrossJoinExec { Ok(stats_cartesian_product(left_stats, right_stats)) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + CrossJoinExec::new(Arc::clone(&self.left), Arc::clone(&self.right)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, /// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index b184bd341306e..8b6767e8583d7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -1048,6 +1048,25 @@ impl ExecutionPlan for HashJoinExec { Ok(stats.project(self.projection.as_ref())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = HashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.projection.clone(), + *self.partition_mode(), + self.null_equality, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to push `projection` down through `hash_join`. If possible, performs the /// pushdown and returns a new [`HashJoinExec`] as the top plan which has projections /// as its children. Otherwise, returns `None`. diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index 7f5cf01fd42f6..287689db247ba 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -524,6 +524,24 @@ impl ExecutionPlan for SortMergeJoinExec { ) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = SortMergeJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.sort_options.clone(), + self.null_equality, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + /// Tries to swap the projection with its input [`SortMergeJoinExec`]. If it can be done, /// it returns the new swapped version having the [`SortMergeJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index aedeb971866f9..1713677c97962 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -473,6 +473,26 @@ impl ExecutionPlan for SymmetricHashJoinExec { Ok(Statistics::new_unknown(&self.schema())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = SymmetricHashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + self.join_type(), + self.null_equality, + self.left_sort_exprs.clone(), + self.right_sort_exprs.clone(), + self.mode, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index afe61541fc456..fd74ea4d60ea8 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -76,6 +76,7 @@ pub mod joins; pub mod limit; pub mod memory; pub mod metrics; +pub mod node_id; pub mod placeholder_row; pub mod projection; pub mod recursive_query; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 2224f85cc1226..c1410677330a5 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -209,6 +209,17 @@ impl ExecutionPlan for GlobalLimitExec { fn supports_limit_pushdown(&self) -> bool { true } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + GlobalLimitExec::new(Arc::clone(self.input()), self.skip, self.fetch); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// LocalLimitExec applies a limit to a single partition diff --git a/datafusion/physical-plan/src/node_id.rs b/datafusion/physical-plan/src/node_id.rs new file mode 100644 index 0000000000000..7b8d0281eb73b --- /dev/null +++ b/datafusion/physical-plan/src/node_id.rs @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::ExecutionPlan; + +use datafusion_common::DataFusionError; + +// Util for traversing ExecutionPlan tree and annotating node_id +pub struct NodeIdAnnotator { + next_id: usize, +} + +impl NodeIdAnnotator { + pub fn new() -> Self { + NodeIdAnnotator { next_id: 0 } + } + + fn annotate_execution_plan_with_node_id( + &mut self, + plan: Arc, + ) -> Result, DataFusionError> { + let plan_with_id = Arc::clone(&plan) + .with_node_id(self.next_id)? + .unwrap_or(plan); + self.next_id += 1; + Ok(plan_with_id) + } +} + +impl Default for NodeIdAnnotator { + fn default() -> Self { + Self::new() + } +} + +pub fn annotate_node_id_for_execution_plan( + plan: &Arc, + annotator: &mut NodeIdAnnotator, +) -> Result, DataFusionError> { + let mut new_children: Vec> = vec![]; + for child in plan.children() { + let new_child: Arc = + annotate_node_id_for_execution_plan(child, annotator)?; + new_children.push(new_child); + } + let new_plan = Arc::clone(plan).with_new_children(new_children)?; + let new_plan_with_id = annotator.annotate_execution_plan_with_node_id(new_plan)?; + Ok(new_plan_with_id) +} diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index e7df79f867d70..922f3e3d2d667 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -187,6 +187,16 @@ impl ExecutionPlan for PlaceholderRowExec { None, )) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = PlaceholderRowExec::new(Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 6eea70e1176d3..ded952457f3e9 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -352,6 +352,16 @@ impl ExecutionPlan for ProjectionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + ProjectionExec::try_new(self.expr.clone(), Arc::clone(self.input()))?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn try_swapping_with_projection( &self, diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 700a9076fecf0..bbbdb9e89ccdb 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -206,6 +206,21 @@ impl ExecutionPlan for RecursiveQueryExec { fn statistics(&self) -> Result { Ok(Statistics::new_unknown(&self.schema())) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = RecursiveQueryExec::try_new( + self.name.clone(), + Arc::clone(&self.static_term), + Arc::clone(&self.recursive_term), + self.is_distinct, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } impl DisplayAs for RecursiveQueryExec { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 3cd6ee6c1af30..d8aab7c3b2321 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -800,6 +800,21 @@ impl ExecutionPlan for RepartitionExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = RepartitionExec { + input: Arc::clone(&self.input), + state: Arc::clone(&self.state), + metrics: self.metrics.clone(), + preserve_order: self.preserve_order, + cache: self.cache.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } fn try_swapping_with_projection( &self, diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 513081e627e1a..ac79e5de091a7 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -323,6 +323,24 @@ impl ExecutionPlan for PartialSortExec { fn partition_statistics(&self, partition: Option) -> Result { self.input.partition_statistics(partition) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = PartialSortExec { + expr: self.expr.clone(), + input: Arc::clone(&self.input), + common_prefix_length: self.common_prefix_length, + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + fetch: self.fetch, + cache: self.cache.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } struct PartialSortStream { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index c026e430971ae..e1a78e9369d92 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1321,6 +1321,22 @@ impl ExecutionPlan for SortExec { CardinalityEffect::LowerEqual } } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let new_plan = SortExec { + input: Arc::clone(self.input()), + expr: self.expr.clone(), + fetch: self.fetch, + metrics_set: self.metrics_set.clone(), + preserve_partitioning: self.preserve_partitioning, + cache: self.cache.clone().with_node_id(node_id), + common_sort_prefix: self.common_sort_prefix.clone(), + filter: self.filter.clone(), + }; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`SortExec`]. If it can be done, /// it returns the new swapped version having the [`SortExec`] as the top plan. diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 09ad71974e6c9..6a6fafb9ff078 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -362,6 +362,17 @@ impl ExecutionPlan for SortPreservingMergeExec { true } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(self.input())) + .with_fetch(self.fetch()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to swap the projection with its input [`SortPreservingMergeExec`]. /// If this is possible, it returns the new [`SortPreservingMergeExec`] whose /// child is a projection. Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index f9a7feb9e726e..5da512f86f106 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -339,6 +339,25 @@ impl ExecutionPlan for StreamingTableExec { metrics: self.metrics.clone(), })) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = StreamingTableExec { + partitions: self.partitions.clone(), + projection: self.projection.clone(), + projected_schema: Arc::clone(&self.projected_schema), + projected_output_ordering: self.projected_output_ordering.clone(), + infinite: self.infinite, + limit: self.limit, + cache: self.cache.clone(), + metrics: self.metrics.clone(), + }; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index aca03c57b1b48..96b64d98615ec 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -301,6 +301,15 @@ impl ExecutionPlan for UnionExec { true } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = UnionExec::new(self.inputs.clone()); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } /// Tries to push `projection` down through `union`. If possible, performs the /// pushdown and returns a new [`UnionExec`] as the top plan which has projections /// as its children. Otherwise, returns `None`. diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index e36cd2b6c2429..3e183e9417fbb 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -201,6 +201,22 @@ impl ExecutionPlan for UnnestExec { fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = UnnestExec::new( + Arc::clone(self.input()), + self.list_column_indices.clone(), + self.struct_column_indices.clone(), + Arc::clone(&self.schema), + self.options.clone(), + ); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } #[derive(Clone, Debug)] diff --git a/datafusion/physical-plan/src/values.rs b/datafusion/physical-plan/src/values.rs new file mode 100644 index 0000000000000..e5dc3b1162338 --- /dev/null +++ b/datafusion/physical-plan/src/values.rs @@ -0,0 +1,343 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Values execution plan + +use std::any::Any; +use std::sync::Arc; + +use crate::execution_plan::{Boundedness, EmissionType}; +use crate::memory::MemoryStream; +use crate::{common, DisplayAs, PlanProperties, SendableRecordBatchStream, Statistics}; +use crate::{ + ColumnarValue, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, +}; +use arrow::datatypes::{Schema, SchemaRef}; +use arrow::record_batch::{RecordBatch, RecordBatchOptions}; +use datafusion_common::{internal_err, plan_err, Result, ScalarValue}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::EquivalenceProperties; + +/// Execution plan for values list based relation (produces constant rows) +#[deprecated( + since = "45.0.0", + note = "Use `MemorySourceConfig::try_new_as_values` instead" +)] +#[derive(Debug, Clone)] +pub struct ValuesExec { + /// The schema + schema: SchemaRef, + /// The data + data: Vec, + /// Cache holding plan properties like equivalences, output partitioning etc. + cache: PlanProperties, +} + +#[allow(deprecated)] +impl ValuesExec { + /// Create a new values exec from data as expr + #[deprecated(since = "45.0.0", note = "Use `MemoryExec::try_new` instead")] + pub fn try_new( + schema: SchemaRef, + data: Vec>>, + ) -> Result { + if data.is_empty() { + return plan_err!("Values list cannot be empty"); + } + let n_row = data.len(); + let n_col = schema.fields().len(); + // We have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row + let batch = RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + )?; + + let arr = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let r = data[i][j].evaluate(&batch); + + match r { + Ok(ColumnarValue::Scalar(scalar)) => Ok(scalar), + Ok(ColumnarValue::Array(a)) if a.len() == 1 => { + ScalarValue::try_from_array(&a, 0) + } + Ok(ColumnarValue::Array(a)) => { + plan_err!( + "Cannot have array values {a:?} in a values list" + ) + } + Err(err) => Err(err), + } + }) + .collect::>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::>>()?; + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arr, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + let data: Vec = vec![batch]; + Self::try_new_from_batches(schema, data) + } + + /// Create a new plan using the provided schema and batches. + /// + /// Errors if any of the batches don't match the provided schema, or if no + /// batches are provided. + #[deprecated( + since = "45.0.0", + note = "Use `MemoryExec::try_new_from_batches` instead" + )] + pub fn try_new_from_batches( + schema: SchemaRef, + batches: Vec, + ) -> Result { + if batches.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + for batch in &batches { + let batch_schema = batch.schema(); + if batch_schema != schema { + return plan_err!( + "Batch has invalid schema. Expected: {schema}, got: {batch_schema}" + ); + } + } + + let cache = Self::compute_properties(Arc::clone(&schema)); + #[allow(deprecated)] + Ok(ValuesExec { + schema, + data: batches, + cache, + }) + } + + /// Provides the data + pub fn data(&self) -> Vec { + #[allow(deprecated)] + self.data.clone() + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +#[allow(deprecated)] +impl DisplayAs for ValuesExec { + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "ValuesExec") + } + DisplayFormatType::TreeRender => { + // TODO: collect info + write!(f, "") + } + } + } +} + +#[allow(deprecated)] +impl ExecutionPlan for ValuesExec { + fn name(&self) -> &'static str { + "ValuesExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + #[allow(deprecated)] + &self.cache + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + #[allow(deprecated)] + ValuesExec::try_new_from_batches(Arc::clone(&self.schema), self.data.clone()) + .map(|e| Arc::new(e) as _) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + // ValuesExec has a single output partition + if 0 != partition { + return internal_err!( + "ValuesExec invalid partition {partition} (expected 0)" + ); + } + + Ok(Box::pin(MemoryStream::try_new( + self.data(), + #[allow(deprecated)] + Arc::clone(&self.schema), + None, + )?)) + } + + fn statistics(&self) -> Result { + let batch = self.data(); + Ok(common::compute_record_batch_statistics( + &[batch], + #[allow(deprecated)] + &self.schema, + None, + )) + } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = ValuesExec::try_new_from_batches( + Arc::clone(&self.schema), + self.data.clone(), + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::lit; + use crate::test::{self, make_partition}; + + use arrow::datatypes::{DataType, Field}; + use datafusion_common::stats::{ColumnStatistics, Precision}; + + #[tokio::test] + async fn values_empty_case() -> Result<()> { + let schema = test::aggr_test_schema(); + #[allow(deprecated)] + let empty = ValuesExec::try_new(schema, vec![]); + assert!(empty.is_err()); + Ok(()) + } + + #[test] + fn new_exec_with_batches() { + let batch = make_partition(7); + let schema = batch.schema(); + let batches = vec![batch.clone(), batch]; + #[allow(deprecated)] + let _exec = ValuesExec::try_new_from_batches(schema, batches).unwrap(); + } + + #[test] + fn new_exec_with_batches_empty() { + let batch = make_partition(7); + let schema = batch.schema(); + #[allow(deprecated)] + let _ = ValuesExec::try_new_from_batches(schema, Vec::new()).unwrap_err(); + } + + #[test] + fn new_exec_with_batches_invalid_schema() { + let batch = make_partition(7); + let batches = vec![batch.clone(), batch]; + + let invalid_schema = Arc::new(Schema::new(vec![ + Field::new("col0", DataType::UInt32, false), + Field::new("col1", DataType::Utf8, false), + ])); + #[allow(deprecated)] + let _ = ValuesExec::try_new_from_batches(invalid_schema, batches).unwrap_err(); + } + + // Test issue: https://github.com/apache/datafusion/issues/8763 + #[test] + fn new_exec_with_non_nullable_schema() { + let schema = Arc::new(Schema::new(vec![Field::new( + "col0", + DataType::UInt32, + false, + )])); + #[allow(deprecated)] + let _ = ValuesExec::try_new(Arc::clone(&schema), vec![vec![lit(1u32)]]).unwrap(); + // Test that a null value is rejected + #[allow(deprecated)] + let _ = ValuesExec::try_new(schema, vec![vec![lit(ScalarValue::UInt32(None))]]) + .unwrap_err(); + } + + #[test] + fn values_stats_with_nulls_only() -> Result<()> { + let data = vec![ + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + vec![lit(ScalarValue::Null)], + ]; + let rows = data.len(); + #[allow(deprecated)] + let values = ValuesExec::try_new( + Arc::new(Schema::new(vec![Field::new("col0", DataType::Null, true)])), + data, + )?; + + #[allow(deprecated)] + let stats = values.statistics()?; + assert_eq!( + stats, + Statistics { + num_rows: Precision::Exact(rows), + total_byte_size: Precision::Exact(8), // not important + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(rows), // there are only nulls + distinct_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + },], + } + ); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 1b7cb9bb76e1b..511d8e69fa167 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -301,6 +301,20 @@ impl ExecutionPlan for WindowAggExec { Ok(Statistics::new_unknown(&self.schema())) } } + + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = WindowAggExec::try_new( + self.window_expr.clone(), + Arc::clone(self.input()), + self.can_repartition, + )?; + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } } /// Compute the window aggregate columns diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index 40a22f94b81f6..dfdbc37daca27 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -213,6 +213,17 @@ impl ExecutionPlan for WorkTableExec { Ok(Statistics::new_unknown(&self.schema())) } + fn with_node_id( + self: Arc, + node_id: usize, + ) -> Result>> { + let mut new_plan = + WorkTableExec::new(self.name.clone(), Arc::clone(&self.schema)); + let new_props = new_plan.cache.clone().with_node_id(node_id); + new_plan.cache = new_props; + Ok(Some(Arc::new(new_plan))) + } + fn partition_statistics(&self, _partition: Option) -> Result { Ok(Statistics::new_unknown(&self.schema())) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index f5c79cf3d9a43..d89f73269c3d7 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -460,6 +460,7 @@ message CsvOptions { bytes double_quote = 15; // Indicates if quotes are doubled bytes newlines_in_values = 16; // Indicates if newlines are supported in values bytes terminator = 17; // Optional terminator character as a byte + bytes truncated_rows = 18; // Indicates if truncated rows are allowed } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c5242d0176e62..bbfd0dfd2ad2e 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -917,6 +917,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { null_regex: (!proto_opts.null_regex.is_empty()) .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), + truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 48782ff1d93af..0c6e115e3ec8f 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1663,6 +1663,9 @@ impl serde::Serialize for CsvOptions { if !self.terminator.is_empty() { len += 1; } + if !self.truncated_rows.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1735,6 +1738,11 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("terminator", pbjson::private::base64::encode(&self.terminator).as_str())?; } + if !self.truncated_rows.is_empty() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("truncatedRows", pbjson::private::base64::encode(&self.truncated_rows).as_str())?; + } struct_ser.end() } } @@ -1773,6 +1781,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "newlines_in_values", "newlinesInValues", "terminator", + "truncated_rows", + "truncatedRows", ]; #[allow(clippy::enum_variant_names)] @@ -1794,6 +1804,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { DoubleQuote, NewlinesInValues, Terminator, + TruncatedRows, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1832,6 +1843,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "terminator" => Ok(GeneratedField::Terminator), + "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1868,6 +1880,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut double_quote__ = None; let mut newlines_in_values__ = None; let mut terminator__ = None; + let mut truncated_rows__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -1990,6 +2003,15 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::TruncatedRows => { + if truncated_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncatedRows")); + } + + truncated_rows__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -2010,6 +2032,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), terminator: terminator__.unwrap_or_default(), + truncated_rows: truncated_rows__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index aa23cea57470c..f09eef67867bb 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -646,6 +646,9 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index c064270657332..2902a9ce54df3 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -934,6 +934,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { null_value: opts.null_value.clone().unwrap_or_default(), null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), + truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]), }) } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index bb985e6ea0265..4f411a4a93323 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1032,6 +1032,7 @@ message CsvScanExecNode { string comment = 6; } bool newlines_in_values = 7; + bool truncate_rows = 8; } message JsonScanExecNode { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index aa23cea57470c..f09eef67867bb 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -646,6 +646,9 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 2ddf063ee8782..ff7519aa5df29 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3771,6 +3771,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { len += 1; } + if self.truncate_rows { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3793,6 +3796,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; } + if self.truncate_rows { + struct_ser.serialize_field("truncateRows", &self.truncate_rows)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3825,6 +3831,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "quote", "newlines_in_values", "newlinesInValues", + "truncate_rows", + "truncateRows", "escape", "comment", ]; @@ -3836,6 +3844,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, NewlinesInValues, + TruncateRows, Escape, Comment, } @@ -3864,6 +3873,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), + "truncateRows" | "truncate_rows" => Ok(GeneratedField::TruncateRows), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3890,6 +3900,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut newlines_in_values__ = None; + let mut truncate_rows__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3924,6 +3935,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } newlines_in_values__ = Some(map_.next_value()?); } + GeneratedField::TruncateRows => { + if truncate_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncateRows")); + } + truncate_rows__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3944,6 +3961,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), + truncate_rows: truncate_rows__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 69f7542e48c93..ffb73086650f3 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1564,6 +1564,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(bool, tag = "7")] pub newlines_in_values: bool, + #[prost(bool, tag = "8")] + pub truncate_rows: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index b4d72aa1b6cb3..db790d27d13cb 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -97,29 +97,6 @@ //! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); //! # Ok(()) //! # } -//! ``` -//! # Example: Serializing [`ExecutionPlan`]s -//! -//! ``` -//! # use datafusion::prelude::*; -//! # use datafusion_common::Result; -//! # use datafusion_proto::bytes::{physical_plan_from_bytes,physical_plan_to_bytes}; -//! # #[tokio::main] -//! # async fn main() -> Result<()>{ -//! // Create a plan that scans table 't' -//! let ctx = SessionContext::new(); -//! ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()).await?; -//! let physical_plan = ctx.table("t1").await?.create_physical_plan().await?; -//! -//! // Convert the plan into bytes (for sending over the network, etc.) -//! let bytes = physical_plan_to_bytes(physical_plan.clone())?; -//! -//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan -//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?; -//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip)); -//! # Ok(()) -//! # } -//! ``` pub mod bytes; pub mod common; pub mod generated; diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 654607bd733da..492795855cf6e 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -72,6 +72,7 @@ impl CsvOptionsProto { newlines_in_values: options .newlines_in_values .map_or(vec![], |v| vec![v as u8]), + truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]), } } else { CsvOptionsProto::default() @@ -157,6 +158,11 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { Some(proto.newlines_in_values[0] != 0) }, + truncated_rows: if proto.truncated_rows.is_empty() { + None + } else { + Some(proto.truncated_rows[0] != 0) + }, } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 8e38b0d1bf5b4..e577de5b1d0e0 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2619,6 +2619,7 @@ impl protobuf::PhysicalPlanNode { None }, newlines_in_values: maybe_csv.newlines_in_values(), + truncate_rows: csv_config.truncate_rows(), }, )), })); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index a5357a132eef2..b7620dc88ff69 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -32,6 +32,9 @@ use arrow::csv::WriterBuilder; use arrow::datatypes::{Fields, TimeUnit}; use datafusion::physical_expr::aggregate::AggregateExprBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::node_id::{ + annotate_node_id_for_execution_plan, NodeIdAnnotator, +}; use datafusion_expr::dml::InsertOp; use datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf; use datafusion_functions_aggregate::array_agg::array_agg_udaf; @@ -135,14 +138,22 @@ fn roundtrip_test_and_return( ctx: &SessionContext, codec: &dyn PhysicalExtensionCodec, ) -> Result> { + let mut annotator = NodeIdAnnotator::new(); + let exec_plan = annotate_node_id_for_execution_plan(&exec_plan, &mut annotator)?; let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), codec) .expect("to proto"); let runtime = ctx.runtime_env(); - let result_exec_plan: Arc = proto + let mut result_exec_plan: Arc = proto .try_into_physical_plan(ctx, runtime.deref(), codec) .expect("from proto"); + // Qi: workaround for NodeId not being serialized/deserialized, + // otherwise the assert_eq! below will fail + let mut annotator2 = NodeIdAnnotator::new(); + result_exec_plan = + annotate_node_id_for_execution_plan(&result_exec_plan, &mut annotator2)?; + pretty_assertions::assert_eq!( format!("{exec_plan:?}"), format!("{result_exec_plan:?}") diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index 96ab84ab90954..c3524a0d16539 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5852,7 +5852,7 @@ NULL # array_has([], 1) -> 'false' (empty array should return false) # array_has(null, 1) -> 'null' (null array should return null) query ?T -SELECT column1, COALESCE(CAST(array_has(column1, column2) AS VARCHAR), 'null') +SELECT column1, COALESCE(CAST(array_has(column1, column2) AS VARCHAR), 'null') from array_has_table_empty; ---- [1, 3, 5] true @@ -6113,7 +6113,7 @@ true false false true false true false false NULL NULL false false false false NULL false -false false false NULL +false false false NULL query BBBB select array_has_all(make_array(1,2,3), []), @@ -7978,7 +7978,7 @@ select array_reverse(arrow_cast(make_array(1, 2, 3), 'LargeList(Int64)')), array [3, 2, 1] [1] query ???? -select array_reverse(arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)')), +select array_reverse(arrow_cast(make_array(1, 2, 3), 'FixedSizeList(3, Int64)')), array_reverse(arrow_cast(make_array(1), 'FixedSizeList(1, Int64)')), array_reverse(arrow_cast(make_array(1, NULL, 3), 'FixedSizeList(3, Int64)')), array_reverse(arrow_cast(make_array(NULL, NULL, NULL), 'FixedSizeList(3, Int64)')); diff --git a/datafusion/sqllogictest/test_files/describe.slt b/datafusion/sqllogictest/test_files/describe.slt index de5208b5483aa..70163b663d0be 100644 --- a/datafusion/sqllogictest/test_files/describe.slt +++ b/datafusion/sqllogictest/test_files/describe.slt @@ -81,8 +81,8 @@ int_col Int32 YES bigint_col Int64 YES float_col Float32 YES double_col Float64 YES -date_string_col Utf8View YES -string_col Utf8View YES +date_string_col Utf8 YES +string_col Utf8 YES timestamp_col Timestamp(Nanosecond, None) YES year Int32 YES month Int32 YES diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 06965ebef0f7d..4a1a2a39529be 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -301,8 +301,8 @@ initial_physical_plan 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] @@ -329,7 +329,7 @@ physical_plan after EnsureCooperative SAME TEXT AS ABOVE physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -346,8 +346,8 @@ initial_physical_plan_with_stats 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] initial_physical_plan_with_schema -01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +01)GlobalLimitExec: skip=0, fetch=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] physical_plan after OutputRequirements 01)OutputRequirementExec: order_by=[], dist_by=Unspecified 02)--GlobalLimitExec: skip=0, fetch=10 @@ -375,7 +375,7 @@ physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] -physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] +physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:Binary;N, string_col:Binary;N, timestamp_col:Timestamp(Nanosecond, None);N] statement ok @@ -429,7 +429,7 @@ query TT explain select a from t1 where exists (select count(*) from t2); ---- logical_plan -01)LeftSemi Join: +01)LeftSemi Join: 02)--TableScan: t1 projection=[a] 03)--SubqueryAlias: __correlated_sq_1 04)----EmptyRelation: rows=1 diff --git a/datafusion/sqllogictest/test_files/explain_tree.slt b/datafusion/sqllogictest/test_files/explain_tree.slt index 0df361a75bae2..bce2d0d198e0b 100644 --- a/datafusion/sqllogictest/test_files/explain_tree.slt +++ b/datafusion/sqllogictest/test_files/explain_tree.slt @@ -818,7 +818,7 @@ physical_plan 40)└───────────────────────────┘ query TT -explain select +explain select count(*) over(), row_number() over () from table1 @@ -931,7 +931,7 @@ physical_plan 27)└───────────────────────────┘ query TT -explain select +explain select rank() over(ORDER BY int_col DESC), row_number() over (ORDER BY int_col ASC) from table1 @@ -1470,8 +1470,8 @@ drop table t2; # prepare table statement ok CREATE UNBOUNDED EXTERNAL TABLE data ( - "date" DATE, - "ticker" VARCHAR, + "date" DATE, + "ticker" VARCHAR, "time" TIMESTAMP, ) STORED AS CSV WITH ORDER ("date", "ticker", "time") @@ -1480,8 +1480,8 @@ LOCATION './a.parquet'; # query query TT -explain SELECT * FROM data -WHERE ticker = 'A' +explain SELECT * FROM data +WHERE ticker = 'A' ORDER BY "date", "time"; ---- physical_plan @@ -1643,7 +1643,7 @@ physical_plan # same thing but order by time, date query TT -explain SELECT * FROM data +explain SELECT * FROM data WHERE ticker = 'A' AND CAST(time AS DATE) = date ORDER BY "time", "date"; ---- @@ -1688,8 +1688,8 @@ physical_plan # query query TT -explain SELECT * FROM data -WHERE date = '2006-01-02' +explain SELECT * FROM data +WHERE date = '2006-01-02' ORDER BY "ticker", "time"; ---- physical_plan diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index fb2c89020112d..23b8e16a31159 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -248,7 +248,7 @@ datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false -datafusion.execution.parquet.schema_force_view_types true +datafusion.execution.parquet.schema_force_view_types false datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page @@ -362,7 +362,7 @@ datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query -datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.schema_force_view_types false (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index d6b75591111ea..b37db2fd24bfd 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -733,7 +733,7 @@ explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLi ---- logical_plan 01)Limit: skip=0, fetch=10 -02)--Cross Join: +02)--Cross Join: 03)----SubqueryAlias: t1 04)------Limit: skip=0, fetch=10 05)--------TableScan: testsubquerylimit projection=[a, b], fetch=10 @@ -758,7 +758,7 @@ explain select * from testSubQueryLimit as t1 join (select * from testSubQueryLi ---- logical_plan 01)Limit: skip=0, fetch=2 -02)--Cross Join: +02)--Cross Join: 03)----SubqueryAlias: t1 04)------Limit: skip=0, fetch=2 05)--------TableScan: testsubquerylimit projection=[a, b], fetch=2 diff --git a/datafusion/sqllogictest/test_files/listing_table_statistics.slt b/datafusion/sqllogictest/test_files/listing_table_statistics.slt index 37daf551c2c39..0b3b6106bbdc9 100644 --- a/datafusion/sqllogictest/test_files/listing_table_statistics.slt +++ b/datafusion/sqllogictest/test_files/listing_table_statistics.slt @@ -35,7 +35,7 @@ query TT explain format indent select * from t; ---- logical_plan TableScan: t projection=[int_col, str_col] -physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(212), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8View("a")) Max=Exact(Utf8View("d")) Null=Exact(0))]] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/listing_table_statistics/2.parquet]]}, projection=[int_col, str_col], file_type=parquet, statistics=[Rows=Exact(4), Bytes=Exact(212), [(Col[0]: Min=Exact(Int64(-1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8("a")) Max=Exact(Utf8("d")) Null=Exact(0))]] statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/map.slt b/datafusion/sqllogictest/test_files/map.slt index 4f1e5ef39a00d..75e23999c5b08 100644 --- a/datafusion/sqllogictest/test_files/map.slt +++ b/datafusion/sqllogictest/test_files/map.slt @@ -45,7 +45,7 @@ describe data; ---- ints Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO strings Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false) NO -timestamp Utf8View NO +timestamp Utf8 NO query ??T SELECT * FROM data ORDER by ints['bytes'] DESC LIMIT 10; diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 1050b59613612..1f9e4203a5fbd 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1024,10 +1024,10 @@ ORDER BY SUM(column1) # ORDER BY with a GROUP BY clause query I -SELECT SUM(column1) - FROM foo -GROUP BY column2 -ORDER BY SUM(column1) +SELECT SUM(column1) + FROM foo +GROUP BY column2 +ORDER BY SUM(column1) ---- 0 2 @@ -1039,12 +1039,12 @@ ORDER BY SUM(column1) # ORDER BY with a GROUP BY clause and a HAVING clause query I -SELECT - SUM(column1) -FROM foo -GROUP BY column2 -HAVING SUM(column1) < 3 -ORDER BY SUM(column1) +SELECT + SUM(column1) +FROM foo +GROUP BY column2 +HAVING SUM(column1) < 3 +ORDER BY SUM(column1) ---- 0 2 @@ -1179,7 +1179,7 @@ physical_plan 04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok -drop table ordered_table; +drop table ordered_table; # ABS(x) breaks the ordering if x's range contains both negative and positive values. @@ -1215,7 +1215,7 @@ physical_plan 05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok -drop table ordered_table; +drop table ordered_table; # ABS(x) preserves the ordering if x's range falls into positive values. # Since x is defined as INT UNSIGNED, its range is assumed to be from 0 to INF. diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index e722005bf0f0d..211d08650be8a 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -430,15 +430,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_default; ---- -BinaryView 616161 BinaryView 616161 BinaryView 616161 -BinaryView 626262 BinaryView 626262 BinaryView 626262 -BinaryView 636363 BinaryView 636363 BinaryView 636363 -BinaryView 646464 BinaryView 646464 BinaryView 646464 -BinaryView 656565 BinaryView 656565 BinaryView 656565 -BinaryView 666666 BinaryView 666666 BinaryView 666666 -BinaryView 676767 BinaryView 676767 BinaryView 676767 -BinaryView 686868 BinaryView 686868 BinaryView 686868 -BinaryView 696969 BinaryView 696969 BinaryView 696969 +Binary 616161 LargeBinary 616161 BinaryView 616161 +Binary 626262 LargeBinary 626262 BinaryView 626262 +Binary 636363 LargeBinary 636363 BinaryView 636363 +Binary 646464 LargeBinary 646464 BinaryView 646464 +Binary 656565 LargeBinary 656565 BinaryView 656565 +Binary 666666 LargeBinary 666666 BinaryView 666666 +Binary 676767 LargeBinary 676767 BinaryView 676767 +Binary 686868 LargeBinary 686868 BinaryView 686868 +Binary 696969 LargeBinary 696969 BinaryView 696969 # Run an explain plan to show the cast happens in the plan (a CAST is needed for the predicates) query TT @@ -451,13 +451,13 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") -02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] +01)Filter: CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%") AND CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8) LIKE Utf8("%a%"), CAST(binary_as_string_default.largebinary_col AS LargeUtf8) LIKE LargeUtf8("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 -02)--FilterExec: CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +02)--FilterExec: CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% 03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% +04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8) LIKE %a% AND CAST(largebinary_col@1 AS LargeUtf8) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a% statement ok @@ -478,15 +478,15 @@ select arrow_typeof(binaryview_col), binaryview_col FROM binary_as_string_option; ---- -Utf8View aaa Utf8View aaa Utf8View aaa -Utf8View bbb Utf8View bbb Utf8View bbb -Utf8View ccc Utf8View ccc Utf8View ccc -Utf8View ddd Utf8View ddd Utf8View ddd -Utf8View eee Utf8View eee Utf8View eee -Utf8View fff Utf8View fff Utf8View fff -Utf8View ggg Utf8View ggg Utf8View ggg -Utf8View hhh Utf8View hhh Utf8View hhh -Utf8View iii Utf8View iii Utf8View iii +Utf8 aaa LargeUtf8 aaa Utf8View aaa +Utf8 bbb LargeUtf8 bbb Utf8View bbb +Utf8 ccc LargeUtf8 ccc Utf8View ccc +Utf8 ddd LargeUtf8 ddd Utf8View ddd +Utf8 eee LargeUtf8 eee Utf8View eee +Utf8 fff LargeUtf8 fff Utf8View fff +Utf8 ggg LargeUtf8 ggg Utf8View ggg +Utf8 hhh LargeUtf8 hhh Utf8View hhh +Utf8 iii LargeUtf8 iii Utf8View iii # Run an explain plan to show the cast happens in the plan (there should be no casts) query TT @@ -499,8 +499,8 @@ EXPLAIN binaryview_col LIKE '%a%'; ---- logical_plan -01)Filter: binary_as_string_option.binary_col LIKE Utf8View("%a%") AND binary_as_string_option.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") -02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8View("%a%"), binary_as_string_option.largebinary_col LIKE Utf8View("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] +01)Filter: binary_as_string_option.binary_col LIKE Utf8("%a%") AND binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%") +02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8("%a%"), binary_as_string_option.largebinary_col LIKE LargeUtf8("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a% @@ -665,8 +665,8 @@ query TT explain select * from foo where starts_with(column1, 'f'); ---- logical_plan -01)Filter: foo.column1 LIKE Utf8View("f%") -02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8View("f%")] +01)Filter: foo.column1 LIKE Utf8("f%") +02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8("f%")] physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 LIKE f% diff --git a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part index 4cfbdc18ca508..8a1c4592641ca 100644 --- a/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/plans/q19.slt.part @@ -59,7 +59,7 @@ logical_plan 03)----Projection: lineitem.l_extendedprice, lineitem.l_discount 04)------Inner Join: lineitem.l_partkey = part.p_partkey Filter: part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2) AND part.p_size <= Int32(15) 05)--------Projection: lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount -06)----------Filter: (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) AND lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") +06)----------Filter: lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON") AND (lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)) AND (lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG")) 07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], partial_filters=[lineitem.l_shipmode = Utf8View("AIR") OR lineitem.l_shipmode = Utf8View("AIR REG"), lineitem.l_shipinstruct = Utf8View("DELIVER IN PERSON"), lineitem.l_quantity >= Decimal128(Some(100),15,2) AND lineitem.l_quantity <= Decimal128(Some(1100),15,2) OR lineitem.l_quantity >= Decimal128(Some(1000),15,2) AND lineitem.l_quantity <= Decimal128(Some(2000),15,2) OR lineitem.l_quantity >= Decimal128(Some(2000),15,2) AND lineitem.l_quantity <= Decimal128(Some(3000),15,2)] 08)--------Filter: (part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)) AND part.p_size >= Int32(1) 09)----------TableScan: part projection=[p_partkey, p_brand, p_size, p_container], partial_filters=[part.p_size >= Int32(1), part.p_brand = Utf8View("Brand#12") AND part.p_container IN ([Utf8View("SM CASE"), Utf8View("SM BOX"), Utf8View("SM PACK"), Utf8View("SM PKG")]) AND part.p_size <= Int32(5) OR part.p_brand = Utf8View("Brand#23") AND part.p_container IN ([Utf8View("MED BAG"), Utf8View("MED BOX"), Utf8View("MED PKG"), Utf8View("MED PACK")]) AND part.p_size <= Int32(10) OR part.p_brand = Utf8View("Brand#34") AND part.p_container IN ([Utf8View("LG CASE"), Utf8View("LG BOX"), Utf8View("LG PACK"), Utf8View("LG PKG")]) AND part.p_size <= Int32(15)] @@ -73,7 +73,7 @@ physical_plan 07)------------CoalesceBatchesExec: target_batch_size=8192 08)--------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 09)----------------CoalesceBatchesExec: target_batch_size=8192 -10)------------------FilterExec: (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG) AND l_shipinstruct@4 = DELIVER IN PERSON, projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] +10)------------------FilterExec: l_shipinstruct@4 = DELIVER IN PERSON AND (l_quantity@1 >= Some(100),15,2 AND l_quantity@1 <= Some(1100),15,2 OR l_quantity@1 >= Some(1000),15,2 AND l_quantity@1 <= Some(2000),15,2 OR l_quantity@1 >= Some(2000),15,2 AND l_quantity@1 <= Some(3000),15,2) AND (l_shipmode@5 = AIR OR l_shipmode@5 = AIR REG), projection=[l_partkey@0, l_quantity@1, l_extendedprice@2, l_discount@3] 11)--------------------DataSourceExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity, l_extendedprice, l_discount, l_shipinstruct, l_shipmode], file_type=csv, has_header=false 12)------------CoalesceBatchesExec: target_batch_size=8192 13)--------------RepartitionExec: partitioning=Hash([p_partkey@0], 4), input_partitions=4 diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 996ba0d70a63e..e7c7375197ce3 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -21,11 +21,11 @@ statement ok CREATE TABLE t1( - id INT, + id INT, name TEXT ) as VALUES - (1, 'Alex'), - (2, 'Bob'), + (1, 'Alex'), + (2, 'Bob'), (3, 'Alice') ; @@ -34,20 +34,20 @@ CREATE TABLE t2( id TINYINT, name TEXT ) as VALUES - (1, 'Alex'), - (2, 'Bob'), + (1, 'Alex'), + (2, 'Bob'), (3, 'John') ; # union with EXCEPT(JOIN) query T rowsort -( +( SELECT name FROM t1 EXCEPT SELECT name FROM t2 -) +) UNION ALL -( +( SELECT name FROM t2 EXCEPT SELECT name FROM t1 @@ -58,13 +58,13 @@ John # union with type coercion query IT rowsort -( +( SELECT * FROM t1 EXCEPT SELECT * FROM t2 -) +) UNION ALL -( +( SELECT * FROM t2 EXCEPT SELECT * FROM t1 @@ -584,11 +584,11 @@ OPTIONS ('format.has_header' 'true'); query TT explain SELECT c1 FROM( -( +( SELECT c1 FROM t1 -) +) UNION ALL -( +( SELECT c1a FROM t2 )) ORDER BY c1 @@ -764,8 +764,8 @@ DROP TABLE t4; # Test issue: https://github.com/apache/datafusion/issues/11742 query R rowsort -WITH - tt(v1) AS (VALUES (1::INT),(NULL::INT)) +WITH + tt(v1) AS (VALUES (1::INT),(NULL::INT)) SELECT NVL(v1, 0.5) FROM tt UNION ALL SELECT NULL WHERE FALSE; diff --git a/dev/changelog/44.0.0.md b/dev/changelog/44.0.0.md index 233e302e50e69..b3f10f6794b53 100644 --- a/dev/changelog/44.0.0.md +++ b/dev/changelog/44.0.0.md @@ -19,7 +19,7 @@ under the License. # Apache DataFusion 44.0.0 Changelog -This release consists of 332 commits from 94 contributors. See credits at the end of this changelog for more information. +This release consists of 329 commits from 94 contributors. See credits at the end of this changelog for more information. **Breaking changes:** @@ -110,6 +110,7 @@ This release consists of 332 commits from 94 contributors. See credits at the en - Support unicode character for `initcap` function [#13752](https://github.com/apache/datafusion/pull/13752) (tlm365) - [minor] make recursive package dependency optional [#13778](https://github.com/apache/datafusion/pull/13778) (buraksenn) - Fix `recursive-protection` feature flag [#13887](https://github.com/apache/datafusion/pull/13887) (alamb) +- Prepare for 44.0.0 release: version and changelog [#13882](https://github.com/apache/datafusion/pull/13882) (alamb) **Other:** @@ -362,13 +363,15 @@ This release consists of 332 commits from 94 contributors. See credits at the en - Minor: change the sort merge join emission as incremental [#13894](https://github.com/apache/datafusion/pull/13894) (berkaysynnada) - Minor: change visibility of hash join utils [#13893](https://github.com/apache/datafusion/pull/13893) (berkaysynnada) - Fix visibility of `swap_hash_join` to be `pub` [#13899](https://github.com/apache/datafusion/pull/13899) (alamb) +- Minor: Avoid emitting empty batches in partial sort [#13895](https://github.com/apache/datafusion/pull/13895) (berkaysynnada) +- BACKPORT: Correct return type for initcap scalar function with utf8view (#13909) [#13934](https://github.com/apache/datafusion/pull/13934) (alamb) ## Credits Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. ``` - 59 Andrew Lamb + 55 Andrew Lamb 35 Piotr Findeisen 16 Jonathan Chen 14 Jonah Gao @@ -383,13 +386,13 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co 5 Dmitrii Blaginin 5 Qianqian 4 Adrian Garcia Badaracco + 4 Berkay Şahin 4 Marco Neumann 4 Tai Le Manh 4 Tim Saucer 4 zhuliquan 3 Andy Grove 3 Arttu - 3 Berkay Şahin 3 Burak Şen 3 Onur Satici 3 Qi Zhu diff --git a/dev/changelog/48.0.0.md b/dev/changelog/48.0.0.md index 9cf6c03b7acf0..42f128bcb7b51 100644 --- a/dev/changelog/48.0.0.md +++ b/dev/changelog/48.0.0.md @@ -19,7 +19,7 @@ under the License. # Apache DataFusion 48.0.0 Changelog -This release consists of 267 commits from 89 contributors. See credits at the end of this changelog for more information. +This release consists of 269 commits from 89 contributors. See credits at the end of this changelog for more information. **Breaking changes:** @@ -94,6 +94,7 @@ This release consists of 267 commits from 89 contributors. See credits at the en - fix: metadata of join schema [#16221](https://github.com/apache/datafusion/pull/16221) (chenkovsky) - fix: add missing row count limits to TPC-H queries [#16230](https://github.com/apache/datafusion/pull/16230) (0ax1) - fix: NaN semantics in GROUP BY [#16256](https://github.com/apache/datafusion/pull/16256) (chenkovsky) +- fix: [branch-48] Revert "Improve performance of constant aggregate window expression" [#16307](https://github.com/apache/datafusion/pull/16307) (andygrove) **Documentation updates:** @@ -305,6 +306,7 @@ This release consists of 267 commits from 89 contributors. See credits at the en - Handle dicts for distinct count [#15871](https://github.com/apache/datafusion/pull/15871) (blaginin) - Add `--substrait-round-trip` option in sqllogictests [#16183](https://github.com/apache/datafusion/pull/16183) (gabotechs) - Minor: fix upgrade papercut `pub use PruningStatistics` [#16264](https://github.com/apache/datafusion/pull/16264) (alamb) +- chore: update DF48 changelog [#16269](https://github.com/apache/datafusion/pull/16269) (xudong963) ## Credits @@ -313,7 +315,7 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co ``` 30 dependabot[bot] 29 Andrew Lamb - 16 xudong.w + 17 xudong.w 14 Adrian Garcia Badaracco 10 Chen Chongchen 8 Gabriel @@ -328,13 +330,13 @@ Thank you to everyone who contributed to this release. Here is a breakdown of co 4 Nuno Faria 4 Yongting You 4 logan-keede + 3 Andy Grove 3 Christian 3 Daniël Heres 3 Liam Bao 3 Phillip LeBlanc 3 Piotr Findeisen 3 ding-young - 2 Andy Grove 2 Atahan Yorgancı 2 Brayan Jules 2 Georgi Krastev diff --git a/dev/changelog/48.0.1.md b/dev/changelog/48.0.1.md new file mode 100644 index 0000000000000..dcd4cc9c15479 --- /dev/null +++ b/dev/changelog/48.0.1.md @@ -0,0 +1,41 @@ + + +# Apache DataFusion 48.0.1 Changelog + +This release consists of 3 commits from 2 contributors. See credits at the end of this changelog for more information. + +See the [upgrade guide](https://datafusion.apache.org/library-user-guide/upgrading.html) for information on how to upgrade from previous versions. + +**Bug Fixes:** + +- [branch-48] Set the default value of datafusion.execution.collect_statistics to true #16447 [#16659](https://github.com/apache/datafusion/pull/16659) (blaginin) +- [branch-48] Fix parquet filter_pushdown: respect parquet filter pushdown config i… [#16656](https://github.com/apache/datafusion/pull/16656) (alamb) +- [branch-48] fix: column indices in FFI partition evaluator (#16480) [#16657](https://github.com/apache/datafusion/pull/16657) (alamb) + +## Credits + +Thank you to everyone who contributed to this release. Here is a breakdown of commits (PRs merged) per contributor. + +``` + 2 Andrew Lamb + 1 Dmitrii Blaginin +``` + +Thank you also to everyone who contributed in other ways such as filing issues, reviewing PRs, and providing feedback on this release. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 3d4730958fb30..d9fb2b850fb5f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ The following configuration settings are available: | datafusion.execution.parquet.metadata_size_hint | NULL | (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | | datafusion.execution.parquet.pushdown_filters | false | (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". | | datafusion.execution.parquet.reorder_filters | false | (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.parquet.schema_force_view_types | true | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | +| datafusion.execution.parquet.schema_force_view_types | false | (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. | | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files |