diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 444d4e6e0cd82..e119cea5f4f6b 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1312,4 +1312,44 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> { + use std::io::Write; + + // create a SessionContext + let ctx = SessionContext::new(); + + // Create a temp file with a .csv suffix so the reader accepts it + let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv + // CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete. + write!(tmp, "a,b,c\n1,2\n3,4,5\n")?; + let path = tmp.path().to_str().unwrap().to_string(); + + // Build CsvReadOptions: header present, enable truncated_rows. + // (Use the exact builder method your crate exposes: `truncated_rows(true)` here, + // if the method name differs in your codebase use the appropriate one.) + let options = CsvReadOptions::default().truncated_rows(true); + + println!("options: {}, path: {path}", options.truncated_rows); + + // Call the API under test + let df = ctx.read_csv(&path, options).await?; + + // Collect the results and combine batches so we can inspect columns + let batches = df.collect().await?; + let combined = concat_batches(&batches[0].schema(), &batches)?; + + // Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL. + let col_c = combined.column(2); + assert!( + col_c.is_null(0), + "expected first row column 'c' to be NULL due to truncated row" + ); + + // Also ensure we read at least one row + assert!(combined.num_rows() >= 2); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 9aaf1cf598113..6b30b5bcc954a 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> { pub file_sort_order: Vec>, /// Optional regex to match null values pub null_regex: Option, + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub truncated_rows: bool, } impl Default for CsvReadOptions<'_> { @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> { file_sort_order: vec![], comment: None, null_regex: None, + truncated_rows: false, } } @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> { self.null_regex = null_regex; self } + + /// Configure whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { + self.truncated_rows = truncated_rows; + self + } } /// Options that control the reading of Parquet files. @@ -546,7 +561,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_null_regex(self.null_regex.clone()); + .with_null_regex(self.null_regex.clone()) + .with_truncated_rows(self.truncated_rows); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index fe86b48dc13bc..e6ce981f7ada4 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -222,6 +222,11 @@ impl CsvFormat { self } + pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self { + self.options.truncated_rows = Some(truncated_rows); + self + } + /// Set the regex to use for null values in the CSV reader. /// - default to treat empty values as null. pub fn with_null_regex(mut self, null_regex: Option) -> Self { @@ -291,6 +296,13 @@ impl CsvFormat { self } + /// Set whether rows should be truncated to the column width + /// - defaults to false + pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self { + self.options.truncated_rows = Some(truncate_rows); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.options.delimiter @@ -422,11 +434,13 @@ impl FileFormat for CsvFormat { .with_file_compression_type(self.options.compression.into()) .with_newlines_in_values(newlines_in_values); + let truncated_rows = self.options.truncated_rows.unwrap_or(false); let source = Arc::new( CsvSource::new(has_header, self.options.delimiter, self.options.quote) .with_escape(self.options.escape) .with_terminator(self.options.terminator) - .with_comment(self.options.comment), + .with_comment(self.options.comment) + .with_truncate_rows(truncated_rows), ); let config = conf_builder.with_source(source).build(); diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 3af1f2b345ba8..8b494689fb7e1 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -93,6 +93,7 @@ pub struct CsvSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, + truncate_rows: bool, } impl CsvSource { @@ -110,6 +111,11 @@ impl CsvSource { pub fn has_header(&self) -> bool { self.has_header } + + // true if rows length support truncate + pub fn truncate_rows(&self) -> bool { + self.truncate_rows + } /// A column delimiter pub fn delimiter(&self) -> u8 { self.delimiter @@ -155,6 +161,13 @@ impl CsvSource { conf.comment = comment; conf } + + /// Whether to support truncate rows when read csv file + pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self { + let mut conf = self.clone(); + conf.truncate_rows = truncate_rows; + conf + } } impl CsvSource { @@ -174,7 +187,8 @@ impl CsvSource { .expect("Batch size must be set before initializing builder"), ) .with_header(self.has_header) - .with_quote(self.quote); + .with_quote(self.quote) + .with_truncated_rows(self.truncate_rows); if let Some(terminator) = self.terminator { builder = builder.with_terminator(terminator); } @@ -335,6 +349,7 @@ impl FileOpener for CsvOpener { let config = CsvSource { has_header: csv_has_header, + truncate_rows: self.config.truncate_rows, ..(*self.config).clone() }; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 4c8b6c588d949..591e54ab49fd3 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1023,6 +1023,7 @@ message CsvScanExecNode { string comment = 6; } bool newlines_in_values = 7; + bool truncate_rows = 8; } message JsonScanExecNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 932422944508d..5cf096244ef60 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3680,6 +3680,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { len += 1; } + if self.truncate_rows { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3702,6 +3705,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; } + if self.truncate_rows { + struct_ser.serialize_field("truncateRows", &self.truncate_rows)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3734,6 +3740,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "quote", "newlines_in_values", "newlinesInValues", + "truncate_rows", + "truncateRows", "escape", "comment", ]; @@ -3745,6 +3753,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, NewlinesInValues, + TruncateRows, Escape, Comment, } @@ -3773,6 +3782,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), + "truncateRows" | "truncate_rows" => Ok(GeneratedField::TruncateRows), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3799,6 +3809,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut newlines_in_values__ = None; + let mut truncate_rows__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3833,6 +3844,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } newlines_in_values__ = Some(map_.next_value()?); } + GeneratedField::TruncateRows => { + if truncate_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncateRows")); + } + truncate_rows__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3853,6 +3870,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), + truncate_rows: truncate_rows__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index c2f4e93cef6ae..9c3ac34c437da 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1543,6 +1543,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(bool, tag = "7")] pub newlines_in_values: bool, + #[prost(bool, tag = "8")] + pub truncate_rows: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 7a85a2a8efbd0..7a2c296bae7b2 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2286,6 +2286,7 @@ impl protobuf::PhysicalPlanNode { None }, newlines_in_values: maybe_csv.newlines_in_values(), + truncate_rows: csv_config.truncate_rows(), }, )), }));