From 95aadb9b9935b7aff147efa4ea56ae2d34550236 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Wed, 3 Sep 2025 11:38:41 +0800 Subject: [PATCH 01/10] fix clippy --- datafusion/physical-optimizer/src/enforce_distribution.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 8f8c7274cf78e..40998783cce4e 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -989,6 +989,7 @@ fn add_spm_on_top( /// ```text /// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] fn remove_dist_changing_operators( mut distribution_context: DistributionContext, ) -> Result<(DistributionContext, Option)> { @@ -1033,6 +1034,7 @@ fn remove_dist_changing_operators( /// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet", /// ``` +#[allow(clippy::type_complexity)] pub fn replace_order_preserving_variants( mut context: DistributionContext, ordering_satisfied: bool, From b571c3b5c5dcef7885785b636b0dd9aba4a4a898 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 11:50:54 +0800 Subject: [PATCH 02/10] Support csv truncte for datafusion --- datafusion/common/src/config.rs | 13 ++ .../core/src/datasource/file_format/csv.rs | 141 +++++++++++++++++- datafusion/datasource-csv/src/file_format.rs | 3 +- datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/prost.rs | 2 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 2 + .../proto/src/logical_plan/file_formats.rs | 6 + 8 files changed, 167 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0d34815a248f7..2ae3aa27e3201 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1946,6 +1946,10 @@ config_namespace! { // The input regex for Nulls when loading CSVs. pub null_regex: Option, default = None pub comment: Option, default = None + // Whether to allow truncated rows when parsing. + // By default this is set to false and will error if the CSV rows have different lengths. + // When set to true then it will allow records with less than the expected number of columns + pub truncated_rows: Option, default = None } } @@ -2038,6 +2042,15 @@ impl CsvOptions { self } + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn with_truncated_rows(mut self, allow: bool) -> Self { + self.truncated_rows = Some(allow); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.delimiter diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index efec07abbca05..7eec55d70ffa0 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -47,7 +47,7 @@ mod tests { use datafusion_physical_plan::{collect, ExecutionPlan}; use arrow::array::{ - BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, + Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, }; use arrow::compute::concat_batches; use arrow::csv::ReaderBuilder; @@ -55,6 +55,7 @@ mod tests { use async_trait::async_trait; use bytes::Bytes; use chrono::DateTime; + use datafusion_common::config::CsvOptions; use futures::stream::BoxStream; use futures::StreamExt; use insta::assert_snapshot; @@ -1174,4 +1175,142 @@ mod tests { .build_decoder(); DecoderDeserializer::new(CsvDecoder::new(decoder)) } + + fn csv_deserializer_with_truncated( + batch_size: usize, + schema: &Arc, + ) -> impl BatchDeserializer { + // using Arrow's ReaderBuilder and enabling truncated_rows + let decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(batch_size) + .with_truncated_rows(true) // <- enable runtime truncated_rows + .build_decoder(); + DecoderDeserializer::new(CsvDecoder::new(decoder)) + } + + #[tokio::test] + async fn infer_schema_with_truncated_rows_true() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 3 columns, but first data row has only 2 columns, second row has 3 + let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // Construct CsvFormat and enable truncated_rows via CsvOptions + let csv_options = CsvOptions::default().with_truncated_rows(true); + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_options(csv_options) + .with_schema_infer_max_rec(10); + + let inferred_schema = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await?; + + // header has 3 columns; inferred schema should also have 3 + assert_eq!(inferred_schema.fields().len(), 3); + + // inferred columns should be nullable + for f in inferred_schema.fields() { + assert!(f.is_nullable()); + } + + Ok(()) + } + #[test] + fn test_decoder_truncated_rows_runtime() -> Result<()> { + // Synchronous test: Decoder API used here is synchronous + let schema = csv_schema(); // helper already defined in file + + // Construct a decoder that enables truncated_rows at runtime + let mut deserializer = csv_deserializer_with_truncated(10, &schema); + + // Provide two rows: first row complete, second row missing last column + let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n"); + deserializer.digest(input); + + // Finish and collect output + deserializer.finish(); + + let output = deserializer.next()?; + match output { + DeserializerOutput::RecordBatch(batch) => { + // ensure at least two rows present + assert!(batch.num_rows() >= 2); + // column 4 (index 3) should be a StringArray where second row is NULL + let col4 = batch + .column(3) + .as_any() + .downcast_ref::() + .expect("column 4 should be StringArray"); + + // first row present, second row should be null + assert!(!col4.is_null(0)); + assert!(col4.is_null(1)); + } + other => panic!("expected RecordBatch but got {:?}", other), + } + Ok(()) + } + + #[tokio::test] + async fn infer_schema_truncated_rows_false_error() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 4 cols, first data row has 3 cols -> truncated at end + let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // CsvFormat without enabling truncated_rows (default behavior = false) + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_schema_infer_max_rec(10); + + let res = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await; + + // Expect an error due to unequal lengths / incorrect number of fields + assert!( + res.is_err(), + "expected infer_schema to error on truncated rows when disabled" + ); + + // Optional: check message contains indicative text (two known possibilities) + if let Err(err) = res { + let msg = format!("{err}"); + assert!( + msg.contains("Encountered unequal lengths") + || msg.contains("incorrect number of fields"), + "unexpected error message: {}", + msg + ); + } + + Ok(()) + } } diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index c9cd09bf676b7..fe86b48dc13bc 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -505,7 +505,8 @@ impl CsvFormat { .unwrap_or_else(|| state.config_options().catalog.has_header), ) .with_delimiter(self.options.delimiter) - .with_quote(self.options.quote); + .with_quote(self.options.quote) + .with_truncated_rows(self.options.truncated_rows.unwrap_or(false)); if let Some(null_regex) = &self.options.null_regex { let regex = Regex::new(null_regex.as_str()) diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index bd969db316872..39cf65070554e 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -900,6 +900,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { null_regex: (!proto_opts.null_regex.is_empty()) .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), + truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a55714f190c57..16c045d9a8cfe 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -604,6 +604,8 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows : ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index b6cbe5759cfcc..1a973982270b7 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -934,6 +934,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { null_value: opts.null_value.clone().unwrap_or_default(), null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), + truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]), }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a55714f190c57..16c045d9a8cfe 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -604,6 +604,8 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows : ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d3f6511ec98fa..55ef8bd79f382 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -72,6 +72,7 @@ impl CsvOptionsProto { newlines_in_values: options .newlines_in_values .map_or(vec![], |v| vec![v as u8]), + truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]), } } else { CsvOptionsProto::default() @@ -157,6 +158,11 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { Some(proto.newlines_in_values[0] != 0) }, + truncated_rows: if proto.truncated_rows.is_empty() { + None + } else { + Some(proto.truncated_rows[0] != 0) + }, } } } From 63c54eaf17018951bca836d6c5a25db3bddcfe53 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 20:35:23 +0800 Subject: [PATCH 03/10] add generated field to proto --- datafusion/proto-common/src/generated/prost.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16c045d9a8cfe..a55714f190c57 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -604,8 +604,6 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, - #[prost(bytes = "vec", tag = "18")] - pub truncated_rows : ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] From b7f98284fedd0ff5d0fba86ad54b61b81ff04dd1 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 20:42:47 +0800 Subject: [PATCH 04/10] generate proto --- datafusion/proto-common/src/generated/prost.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a55714f190c57..323f73923272b 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -604,6 +604,8 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] From d0b757beffd1355df98ee1a532ff6377ad71b167 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 20:51:37 +0800 Subject: [PATCH 05/10] add proto message and generated. --- .../proto/datafusion_common.proto | 1 + .../proto-common/src/generated/pbjson.rs | 22 +++++++++++++++++++ .../proto-common/src/generated/prost.rs | 1 + 3 files changed, 24 insertions(+) diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 35f41155fa050..c8d49ce882212 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -424,6 +424,7 @@ message CsvOptions { bytes double_quote = 15; // Indicates if quotes are doubled bytes newlines_in_values = 16; // Indicates if newlines are supported in values bytes terminator = 17; // Optional terminator character as a byte + bytes truncated_rows = 18; // Indicates if truncated rows are allowed } // Options controlling CSV format diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 1ac35742c73a4..14233b634fd2f 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1566,6 +1566,9 @@ impl serde::Serialize for CsvOptions { if !self.terminator.is_empty() { len += 1; } + if !self.truncated_rows.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1638,6 +1641,11 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("terminator", pbjson::private::base64::encode(&self.terminator).as_str())?; } + if !self.truncated_rows.is_empty() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("truncatedRows", pbjson::private::base64::encode(&self.truncated_rows).as_str())?; + } struct_ser.end() } } @@ -1676,6 +1684,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "newlines_in_values", "newlinesInValues", "terminator", + "truncated_rows", + "truncatedRows", ]; #[allow(clippy::enum_variant_names)] @@ -1697,6 +1707,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { DoubleQuote, NewlinesInValues, Terminator, + TruncatedRows, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1735,6 +1746,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "terminator" => Ok(GeneratedField::Terminator), + "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1771,6 +1783,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut double_quote__ = None; let mut newlines_in_values__ = None; let mut terminator__ = None; + let mut truncated_rows__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -1893,6 +1906,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::TruncatedRows => { + if truncated_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncatedRows")); + } + truncated_rows__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -1913,6 +1934,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), terminator: terminator__.unwrap_or_default(), + truncated_rows: truncated_rows__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 323f73923272b..96dadba835fa2 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -604,6 +604,7 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed #[prost(bytes = "vec", tag = "18")] pub truncated_rows: ::prost::alloc::vec::Vec, } From 5b9219d77537f2ae45724557899d53741aa6cd14 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 20:54:59 +0800 Subject: [PATCH 06/10] fix --- datafusion/proto/src/generated/datafusion_proto_common.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16c045d9a8cfe..96dadba835fa2 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -604,8 +604,9 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed #[prost(bytes = "vec", tag = "18")] - pub truncated_rows : ::prost::alloc::vec::Vec, + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] From 5aa43e51c77374f2cbedc20f2892981a130dde82 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Thu, 4 Sep 2025 21:11:35 +0800 Subject: [PATCH 07/10] fix clippy --- datafusion/core/src/datasource/file_format/csv.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 7eec55d70ffa0..444d4e6e0cd82 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1260,7 +1260,7 @@ mod tests { assert!(!col4.is_null(0)); assert!(col4.is_null(1)); } - other => panic!("expected RecordBatch but got {:?}", other), + other => panic!("expected RecordBatch but got {other:?}"), } Ok(()) } @@ -1306,8 +1306,7 @@ mod tests { assert!( msg.contains("Encountered unequal lengths") || msg.contains("incorrect number of fields"), - "unexpected error message: {}", - msg + "unexpected error message: {msg}", ); } From cae4095e02f7c0acf2e2ba1f326767ca135123e8 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Fri, 5 Sep 2025 11:13:44 +0800 Subject: [PATCH 08/10] X-1035 Part-2: support csv scan to read truncted rows --- .../core/src/datasource/file_format/csv.rs | 40 +++++++++++++++++++ .../src/datasource/file_format/options.rs | 19 ++++++++- datafusion/datasource-csv/src/file_format.rs | 9 ++++- datafusion/datasource-csv/src/source.rs | 17 +++++++- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 18 +++++++++ datafusion/proto/src/generated/prost.rs | 2 + datafusion/proto/src/physical_plan/mod.rs | 1 + 8 files changed, 104 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 444d4e6e0cd82..e119cea5f4f6b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1312,4 +1312,44 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> { + use std::io::Write; + + // create a SessionContext + let ctx = SessionContext::new(); + + // Create a temp file with a .csv suffix so the reader accepts it + let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv + // CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete. + write!(tmp, "a,b,c\n1,2\n3,4,5\n")?; + let path = tmp.path().to_str().unwrap().to_string(); + + // Build CsvReadOptions: header present, enable truncated_rows. + // (Use the exact builder method your crate exposes: `truncated_rows(true)` here, + // if the method name differs in your codebase use the appropriate one.) + let options = CsvReadOptions::default().truncated_rows(true); + + println!("options: {}, path: {path}", options.truncated_rows); + + // Call the API under test + let df = ctx.read_csv(&path, options).await?; + + // Collect the results and combine batches so we can inspect columns + let batches = df.collect().await?; + let combined = concat_batches(&batches[0].schema(), &batches)?; + + // Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL. + let col_c = combined.column(2); + assert!( + col_c.is_null(0), + "expected first row column 'c' to be NULL due to truncated row" + ); + + // Also ensure we read at least one row + assert!(combined.num_rows() >= 2); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 9aaf1cf598113..a197366130db3 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> { pub file_sort_order: Vec>, /// Optional regex to match null values pub null_regex: Option, + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub truncated_rows: bool, } impl Default for CsvReadOptions<'_> { @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> { file_sort_order: vec![], comment: None, null_regex: None, + truncated_rows: false, } } @@ -223,6 +229,16 @@ impl<'a> CsvReadOptions<'a> { self.null_regex = null_regex; self } + + /// Configure whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + /// See https://docs.rs/arrow/latest/arrow/csv/struct.ReaderBuilder.html#method.with_allow_truncated_rows + pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { + self.truncated_rows = truncated_rows; + self + } } /// Options that control the reading of Parquet files. @@ -546,7 +562,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_null_regex(self.null_regex.clone()); + .with_null_regex(self.null_regex.clone()) + .with_truncated_rows(self.truncated_rows); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index fe86b48dc13bc..f4ae99bb964bf 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -222,6 +222,11 @@ impl CsvFormat { self } + pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self { + self.options.truncated_rows = Some(truncated_rows); + self + } + /// Set the regex to use for null values in the CSV reader. /// - default to treat empty values as null. pub fn with_null_regex(mut self, null_regex: Option) -> Self { @@ -422,11 +427,13 @@ impl FileFormat for CsvFormat { .with_file_compression_type(self.options.compression.into()) .with_newlines_in_values(newlines_in_values); + let truncated_rows = self.options.truncated_rows.unwrap_or(false); let source = Arc::new( CsvSource::new(has_header, self.options.delimiter, self.options.quote) .with_escape(self.options.escape) .with_terminator(self.options.terminator) - .with_comment(self.options.comment), + .with_comment(self.options.comment) + .with_truncate_rows(truncated_rows), ); let config = conf_builder.with_source(source).build(); diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 3af1f2b345ba8..8b494689fb7e1 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -93,6 +93,7 @@ pub struct CsvSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, + truncate_rows: bool, } impl CsvSource { @@ -110,6 +111,11 @@ impl CsvSource { pub fn has_header(&self) -> bool { self.has_header } + + // true if rows length support truncate + pub fn truncate_rows(&self) -> bool { + self.truncate_rows + } /// A column delimiter pub fn delimiter(&self) -> u8 { self.delimiter @@ -155,6 +161,13 @@ impl CsvSource { conf.comment = comment; conf } + + /// Whether to support truncate rows when read csv file + pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self { + let mut conf = self.clone(); + conf.truncate_rows = truncate_rows; + conf + } } impl CsvSource { @@ -174,7 +187,8 @@ impl CsvSource { .expect("Batch size must be set before initializing builder"), ) .with_header(self.has_header) - .with_quote(self.quote); + .with_quote(self.quote) + .with_truncated_rows(self.truncate_rows); if let Some(terminator) = self.terminator { builder = builder.with_terminator(terminator); } @@ -335,6 +349,7 @@ impl FileOpener for CsvOpener { let config = CsvSource { has_header: csv_has_header, + truncate_rows: self.config.truncate_rows, ..(*self.config).clone() }; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 4c8b6c588d949..591e54ab49fd3 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1023,6 +1023,7 @@ message CsvScanExecNode { string comment = 6; } bool newlines_in_values = 7; + bool truncate_rows = 8; } message JsonScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 932422944508d..5cf096244ef60 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3680,6 +3680,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { len += 1; } + if self.truncate_rows { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3702,6 +3705,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; } + if self.truncate_rows { + struct_ser.serialize_field("truncateRows", &self.truncate_rows)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3734,6 +3740,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "quote", "newlines_in_values", "newlinesInValues", + "truncate_rows", + "truncateRows", "escape", "comment", ]; @@ -3745,6 +3753,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, NewlinesInValues, + TruncateRows, Escape, Comment, } @@ -3773,6 +3782,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), + "truncateRows" | "truncate_rows" => Ok(GeneratedField::TruncateRows), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3799,6 +3809,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut newlines_in_values__ = None; + let mut truncate_rows__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3833,6 +3844,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } newlines_in_values__ = Some(map_.next_value()?); } + GeneratedField::TruncateRows => { + if truncate_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncateRows")); + } + truncate_rows__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3853,6 +3870,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), + truncate_rows: truncate_rows__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c2f4e93cef6ae..9c3ac34c437da 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1543,6 +1543,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(bool, tag = "7")] pub newlines_in_values: bool, + #[prost(bool, tag = "8")] + pub truncate_rows: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7a85a2a8efbd0..7a2c296bae7b2 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2286,6 +2286,7 @@ impl protobuf::PhysicalPlanNode { None }, newlines_in_values: maybe_csv.newlines_in_values(), + truncate_rows: csv_config.truncate_rows(), }, )), })); From 86c8754b23fc0bdd982863961a0342c8335f3867 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Fri, 5 Sep 2025 11:29:47 +0800 Subject: [PATCH 09/10] fix CI --- datafusion/core/src/datasource/file_format/options.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index a197366130db3..6b30b5bcc954a 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -234,7 +234,6 @@ impl<'a> CsvReadOptions<'a> { /// By default this is set to false and will error if the CSV rows have different lengths /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. /// If the record’s schema is not nullable, then it will still return an error. - /// See https://docs.rs/arrow/latest/arrow/csv/struct.ReaderBuilder.html#method.with_allow_truncated_rows pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { self.truncated_rows = truncated_rows; self From 253e49ce8ae192f2abbe8012bb5828834ecf2815 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Fri, 5 Sep 2025 11:35:55 +0800 Subject: [PATCH 10/10] add csvfmt with --- datafusion/datasource-csv/src/file_format.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index f4ae99bb964bf..e6ce981f7ada4 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -296,6 +296,13 @@ impl CsvFormat { self } + /// Set whether rows should be truncated to the column width + /// - defaults to false + pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self { + self.options.truncated_rows = Some(truncate_rows); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.options.delimiter