diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 5ba8d44c4683e..3a5b80ae259db 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2402,6 +2402,10 @@ config_namespace! { // The input regex for Nulls when loading CSVs. pub null_regex: Option, default = None pub comment: Option, default = None + // Whether to allow truncated rows when parsing. + // By default this is set to false and will error if the CSV rows have different lengths. + // When set to true then it will allow records with less than the expected number of columns + pub truncated_rows: Option, default = None } } @@ -2494,6 +2498,15 @@ impl CsvOptions { self } + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn with_truncated_rows(mut self, allow: bool) -> Self { + self.truncated_rows = Some(allow); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.delimiter diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 23ba9e6ec8736..6c4897f711c5c 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -48,7 +48,7 @@ mod tests { use datafusion_physical_plan::{collect, ExecutionPlan}; use arrow::array::{ - BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, + Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, }; use arrow::compute::concat_batches; use arrow::csv::ReaderBuilder; @@ -1256,4 +1256,181 @@ mod tests { .build_decoder(); DecoderDeserializer::new(CsvDecoder::new(decoder)) } + + fn csv_deserializer_with_truncated( + batch_size: usize, + schema: &Arc, + ) -> impl BatchDeserializer { + // using Arrow's ReaderBuilder and enabling truncated_rows + let decoder = ReaderBuilder::new(schema.clone()) + .with_batch_size(batch_size) + .with_truncated_rows(true) // <- enable runtime truncated_rows + .build_decoder(); + DecoderDeserializer::new(CsvDecoder::new(decoder)) + } + + #[tokio::test] + async fn infer_schema_with_truncated_rows_true() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 3 columns, but first data row has only 2 columns, second row has 3 + let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // Construct CsvFormat and enable truncated_rows via CsvOptions + let csv_options = CsvOptions::default().with_truncated_rows(true); + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_options(csv_options) + .with_schema_infer_max_rec(10); + + let inferred_schema = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await?; + + // header has 3 columns; inferred schema should also have 3 + assert_eq!(inferred_schema.fields().len(), 3); + + // inferred columns should be nullable + for f in inferred_schema.fields() { + assert!(f.is_nullable()); + } + + Ok(()) + } + #[test] + fn test_decoder_truncated_rows_runtime() -> Result<()> { + // Synchronous test: Decoder API used here is synchronous + let schema = csv_schema(); // helper already defined in file + + // Construct a decoder that enables truncated_rows at runtime + let mut deserializer = csv_deserializer_with_truncated(10, &schema); + + // Provide two rows: first row complete, second row missing last column + let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n"); + deserializer.digest(input); + + // Finish and collect output + deserializer.finish(); + + let output = deserializer.next()?; + match output { + DeserializerOutput::RecordBatch(batch) => { + // ensure at least two rows present + assert!(batch.num_rows() >= 2); + // column 4 (index 3) should be a StringArray where second row is NULL + let col4 = batch + .column(3) + .as_any() + .downcast_ref::() + .expect("column 4 should be StringArray"); + + // first row present, second row should be null + assert!(!col4.is_null(0)); + assert!(col4.is_null(1)); + } + other => panic!("expected RecordBatch but got {other:?}"), + } + Ok(()) + } + + #[tokio::test] + async fn infer_schema_truncated_rows_false_error() -> Result<()> { + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + + // CSV: header has 4 cols, first data row has 3 cols -> truncated at end + let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n"); + let variable_object_store = Arc::new(VariableStream::new(csv_data, 1)); + let object_meta = ObjectMeta { + location: Path::parse("/")?, + last_modified: DateTime::default(), + size: u64::MAX, + e_tag: None, + version: None, + }; + + // CsvFormat without enabling truncated_rows (default behavior = false) + let csv_format = CsvFormat::default() + .with_has_header(true) + .with_schema_infer_max_rec(10); + + let res = csv_format + .infer_schema( + &state, + &(variable_object_store.clone() as Arc), + &[object_meta], + ) + .await; + + // Expect an error due to unequal lengths / incorrect number of fields + assert!( + res.is_err(), + "expected infer_schema to error on truncated rows when disabled" + ); + + // Optional: check message contains indicative text (two known possibilities) + if let Err(err) = res { + let msg = format!("{err}"); + assert!( + msg.contains("Encountered unequal lengths") + || msg.contains("incorrect number of fields"), + "unexpected error message: {msg}", + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> { + use std::io::Write; + + // create a SessionContext + let ctx = SessionContext::new(); + + // Create a temp file with a .csv suffix so the reader accepts it + let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv + // CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete. + write!(tmp, "a,b,c\n1,2\n3,4,5\n")?; + let path = tmp.path().to_str().unwrap().to_string(); + + // Build CsvReadOptions: header present, enable truncated_rows. + // (Use the exact builder method your crate exposes: `truncated_rows(true)` here, + // if the method name differs in your codebase use the appropriate one.) + let options = CsvReadOptions::default().truncated_rows(true); + + println!("options: {}, path: {path}", options.truncated_rows); + + // Call the API under test + let df = ctx.read_csv(&path, options).await?; + + // Collect the results and combine batches so we can inspect columns + let batches = df.collect().await?; + let combined = concat_batches(&batches[0].schema(), &batches)?; + + // Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL. + let col_c = combined.column(2); + assert!( + col_c.is_null(0), + "expected first row column 'c' to be NULL due to truncated row" + ); + + // Also ensure we read at least one row + assert!(combined.num_rows() >= 2); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 02b792823a827..8c1bb02ef0737 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> { pub file_sort_order: Vec>, /// Optional regex to match null values pub null_regex: Option, + /// Whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths. + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub truncated_rows: bool, } impl Default for CsvReadOptions<'_> { @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> { file_sort_order: vec![], comment: None, null_regex: None, + truncated_rows: false, } } @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> { self.null_regex = null_regex; self } + + /// Configure whether to allow truncated rows when parsing. + /// By default this is set to false and will error if the CSV rows have different lengths + /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls. + /// If the record’s schema is not nullable, then it will still return an error. + pub fn truncated_rows(mut self, truncated_rows: bool) -> Self { + self.truncated_rows = truncated_rows; + self + } } /// Options that control the reading of Parquet files. @@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_null_regex(self.null_regex.clone()); + .with_null_regex(self.null_regex.clone()) + .with_truncated_rows(self.truncated_rows); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 4eeb431584ba7..e09ac3af7c661 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -222,6 +222,11 @@ impl CsvFormat { self } + pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self { + self.options.truncated_rows = Some(truncated_rows); + self + } + /// Set the regex to use for null values in the CSV reader. /// - default to treat empty values as null. pub fn with_null_regex(mut self, null_regex: Option) -> Self { @@ -291,6 +296,13 @@ impl CsvFormat { self } + /// Set whether rows should be truncated to the column width + /// - defaults to false + pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self { + self.options.truncated_rows = Some(truncate_rows); + self + } + /// The delimiter character. pub fn delimiter(&self) -> u8 { self.options.delimiter @@ -426,11 +438,13 @@ impl FileFormat for CsvFormat { .with_file_compression_type(self.options.compression.into()) .with_newlines_in_values(newlines_in_values); + let truncated_rows = self.options.truncated_rows.unwrap_or(false); let source = Arc::new( CsvSource::new(has_header, self.options.delimiter, self.options.quote) .with_escape(self.options.escape) .with_terminator(self.options.terminator) - .with_comment(self.options.comment), + .with_comment(self.options.comment) + .with_truncate_rows(truncated_rows), ); let config = conf_builder.with_source(source).build(); @@ -509,7 +523,8 @@ impl CsvFormat { .unwrap_or_else(|| state.config_options().catalog.has_header), ) .with_delimiter(self.options.delimiter) - .with_quote(self.options.quote); + .with_quote(self.options.quote) + .with_truncated_rows(self.options.truncated_rows.unwrap_or(false)); if let Some(null_regex) = &self.options.null_regex { let regex = Regex::new(null_regex.as_str()) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 6c994af940d1d..9305602db7b5b 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -94,6 +94,7 @@ pub struct CsvSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, + truncate_rows: bool, } impl CsvSource { @@ -111,6 +112,11 @@ impl CsvSource { pub fn has_header(&self) -> bool { self.has_header } + + // true if rows length support truncate + pub fn truncate_rows(&self) -> bool { + self.truncate_rows + } /// A column delimiter pub fn delimiter(&self) -> u8 { self.delimiter @@ -156,6 +162,13 @@ impl CsvSource { conf.comment = comment; conf } + + /// Whether to support truncate rows when read csv file + pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self { + let mut conf = self.clone(); + conf.truncate_rows = truncate_rows; + conf + } } impl CsvSource { @@ -175,7 +188,8 @@ impl CsvSource { .expect("Batch size must be set before initializing builder"), ) .with_header(self.has_header) - .with_quote(self.quote); + .with_quote(self.quote) + .with_truncated_rows(self.truncate_rows); if let Some(terminator) = self.terminator { builder = builder.with_terminator(terminator); } @@ -340,6 +354,7 @@ impl FileOpener for CsvOpener { let config = CsvSource { has_header: csv_has_header, + truncate_rows: self.config.truncate_rows, ..(*self.config).clone() }; diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 81fc9cceb777d..c06bfda9c2846 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -430,6 +430,7 @@ message CsvOptions { bytes double_quote = 15; // Indicates if quotes are doubled bytes newlines_in_values = 16; // Indicates if newlines are supported in values bytes terminator = 17; // Optional terminator character as a byte + bytes truncated_rows = 18; // Indicates if truncated rows are allowed } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 0823e150268de..0b868fe560a00 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -900,6 +900,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { null_regex: (!proto_opts.null_regex.is_empty()) .then(|| proto_opts.null_regex.clone()), comment: proto_opts.comment.first().copied(), + truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index c3b6686df0054..e44d11e8e8272 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1566,6 +1566,9 @@ impl serde::Serialize for CsvOptions { if !self.terminator.is_empty() { len += 1; } + if !self.truncated_rows.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1638,6 +1641,11 @@ impl serde::Serialize for CsvOptions { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("terminator", pbjson::private::base64::encode(&self.terminator).as_str())?; } + if !self.truncated_rows.is_empty() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("truncatedRows", pbjson::private::base64::encode(&self.truncated_rows).as_str())?; + } struct_ser.end() } } @@ -1676,6 +1684,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "newlines_in_values", "newlinesInValues", "terminator", + "truncated_rows", + "truncatedRows", ]; #[allow(clippy::enum_variant_names)] @@ -1697,6 +1707,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { DoubleQuote, NewlinesInValues, Terminator, + TruncatedRows, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1735,6 +1746,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "doubleQuote" | "double_quote" => Ok(GeneratedField::DoubleQuote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), "terminator" => Ok(GeneratedField::Terminator), + "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1771,6 +1783,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut double_quote__ = None; let mut newlines_in_values__ = None; let mut terminator__ = None; + let mut truncated_rows__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -1893,6 +1906,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) ; } + GeneratedField::TruncatedRows => { + if truncated_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncatedRows")); + } + truncated_rows__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -1913,6 +1934,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { double_quote: double_quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), terminator: terminator__.unwrap_or_default(), + truncated_rows: truncated_rows__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 411d72af4c624..7e2fc2af30af1 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -604,6 +604,9 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index b6cbe5759cfcc..1a973982270b7 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -934,6 +934,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { null_value: opts.null_value.clone().unwrap_or_default(), null_regex: opts.null_regex.clone().unwrap_or_default(), comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), + truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h| vec![h as u8]), }) } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 64789f5de0d22..bae28b828f31e 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1024,6 +1024,7 @@ message CsvScanExecNode { string comment = 6; } bool newlines_in_values = 7; + bool truncate_rows = 8; } message JsonScanExecNode { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 411d72af4c624..7e2fc2af30af1 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -604,6 +604,9 @@ pub struct CsvOptions { /// Optional terminator character as a byte #[prost(bytes = "vec", tag = "17")] pub terminator: ::prost::alloc::vec::Vec, + /// Indicates if truncated rows are allowed + #[prost(bytes = "vec", tag = "18")] + pub truncated_rows: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 92309ea6a5cbf..78a3e3b6e6146 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3771,6 +3771,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { len += 1; } + if self.truncate_rows { + len += 1; + } if self.optional_escape.is_some() { len += 1; } @@ -3793,6 +3796,9 @@ impl serde::Serialize for CsvScanExecNode { if self.newlines_in_values { struct_ser.serialize_field("newlinesInValues", &self.newlines_in_values)?; } + if self.truncate_rows { + struct_ser.serialize_field("truncateRows", &self.truncate_rows)?; + } if let Some(v) = self.optional_escape.as_ref() { match v { csv_scan_exec_node::OptionalEscape::Escape(v) => { @@ -3825,6 +3831,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "quote", "newlines_in_values", "newlinesInValues", + "truncate_rows", + "truncateRows", "escape", "comment", ]; @@ -3836,6 +3844,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, NewlinesInValues, + TruncateRows, Escape, Comment, } @@ -3864,6 +3873,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "newlinesInValues" | "newlines_in_values" => Ok(GeneratedField::NewlinesInValues), + "truncateRows" | "truncate_rows" => Ok(GeneratedField::TruncateRows), "escape" => Ok(GeneratedField::Escape), "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), @@ -3890,6 +3900,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut newlines_in_values__ = None; + let mut truncate_rows__ = None; let mut optional_escape__ = None; let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { @@ -3924,6 +3935,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } newlines_in_values__ = Some(map_.next_value()?); } + GeneratedField::TruncateRows => { + if truncate_rows__.is_some() { + return Err(serde::de::Error::duplicate_field("truncateRows")); + } + truncate_rows__ = Some(map_.next_value()?); + } GeneratedField::Escape => { if optional_escape__.is_some() { return Err(serde::de::Error::duplicate_field("escape")); @@ -3944,6 +3961,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), newlines_in_values: newlines_in_values__.unwrap_or_default(), + truncate_rows: truncate_rows__.unwrap_or_default(), optional_escape: optional_escape__, optional_comment: optional_comment__, }) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index b0fc0ce60436d..eb26776b94827 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1545,6 +1545,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(bool, tag = "7")] pub newlines_in_values: bool, + #[prost(bool, tag = "8")] + pub truncate_rows: bool, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 620442c79e72c..ee8e322150be7 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -72,6 +72,7 @@ impl CsvOptionsProto { newlines_in_values: options .newlines_in_values .map_or(vec![], |v| vec![v as u8]), + truncated_rows: options.truncated_rows.map_or(vec![], |v| vec![v as u8]), } } else { CsvOptionsProto::default() @@ -157,6 +158,11 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { Some(proto.newlines_in_values[0] != 0) }, + truncated_rows: if proto.truncated_rows.is_empty() { + None + } else { + Some(proto.truncated_rows[0] != 0) + }, } } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 6e7546737d72c..72f772fcb28e3 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -2346,6 +2346,7 @@ impl protobuf::PhysicalPlanNode { None }, newlines_in_values: maybe_csv.newlines_in_values(), + truncate_rows: csv_config.truncate_rows(), }, )), }));