Skip to content
Merged
13 changes: 13 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,10 @@ config_namespace! {
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
// Whether to allow truncated rows when parsing.
// By default this is set to false and will error if the CSV rows have different lengths.
// When set to true then it will allow records with less than the expected number of columns
pub truncated_rows: Option<bool>, default = None
}
}

Expand Down Expand Up @@ -2494,6 +2498,15 @@ impl CsvOptions {
self
}

/// Whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths.
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub fn with_truncated_rows(mut self, allow: bool) -> Self {
self.truncated_rows = Some(allow);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand Down
179 changes: 178 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ mod tests {
use datafusion_physical_plan::{collect, ExecutionPlan};

use arrow::array::{
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
};
use arrow::compute::concat_batches;
use arrow::csv::ReaderBuilder;
Expand Down Expand Up @@ -1256,4 +1256,181 @@ mod tests {
.build_decoder();
DecoderDeserializer::new(CsvDecoder::new(decoder))
}

fn csv_deserializer_with_truncated(
batch_size: usize,
schema: &Arc<Schema>,
) -> impl BatchDeserializer<Bytes> {
// using Arrow's ReaderBuilder and enabling truncated_rows
let decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.with_truncated_rows(true) // <- enable runtime truncated_rows
.build_decoder();
DecoderDeserializer::new(CsvDecoder::new(decoder))
}

#[tokio::test]
async fn infer_schema_with_truncated_rows_true() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

// CSV: header has 3 columns, but first data row has only 2 columns, second row has 3
let csv_data = Bytes::from("a,b,c\n1,2\n3,4,5\n");
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// Construct CsvFormat and enable truncated_rows via CsvOptions
let csv_options = CsvOptions::default().with_truncated_rows(true);
let csv_format = CsvFormat::default()
.with_has_header(true)
.with_options(csv_options)
.with_schema_infer_max_rec(10);

let inferred_schema = csv_format
.infer_schema(
&state,
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
&[object_meta],
)
.await?;

// header has 3 columns; inferred schema should also have 3
assert_eq!(inferred_schema.fields().len(), 3);

// inferred columns should be nullable
for f in inferred_schema.fields() {
assert!(f.is_nullable());
}

Ok(())
}
#[test]
fn test_decoder_truncated_rows_runtime() -> Result<()> {
// Synchronous test: Decoder API used here is synchronous
let schema = csv_schema(); // helper already defined in file

// Construct a decoder that enables truncated_rows at runtime
let mut deserializer = csv_deserializer_with_truncated(10, &schema);

// Provide two rows: first row complete, second row missing last column
let input = Bytes::from("0,0.0,true,0-string\n1,1.0,true\n");
deserializer.digest(input);

// Finish and collect output
deserializer.finish();

let output = deserializer.next()?;
match output {
DeserializerOutput::RecordBatch(batch) => {
// ensure at least two rows present
assert!(batch.num_rows() >= 2);
// column 4 (index 3) should be a StringArray where second row is NULL
let col4 = batch
.column(3)
.as_any()
.downcast_ref::<StringArray>()
.expect("column 4 should be StringArray");

// first row present, second row should be null
assert!(!col4.is_null(0));
assert!(col4.is_null(1));
}
other => panic!("expected RecordBatch but got {other:?}"),
}
Ok(())
}

#[tokio::test]
async fn infer_schema_truncated_rows_false_error() -> Result<()> {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

// CSV: header has 4 cols, first data row has 3 cols -> truncated at end
let csv_data = Bytes::from("id,a,b,c\n1,foo,bar\n2,foo,bar,baz\n");
let variable_object_store = Arc::new(VariableStream::new(csv_data, 1));
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// CsvFormat without enabling truncated_rows (default behavior = false)
let csv_format = CsvFormat::default()
.with_has_header(true)
.with_schema_infer_max_rec(10);

let res = csv_format
.infer_schema(
&state,
&(variable_object_store.clone() as Arc<dyn ObjectStore>),
&[object_meta],
)
.await;

// Expect an error due to unequal lengths / incorrect number of fields
assert!(
res.is_err(),
"expected infer_schema to error on truncated rows when disabled"
);

// Optional: check message contains indicative text (two known possibilities)
if let Err(err) = res {
let msg = format!("{err}");
assert!(
msg.contains("Encountered unequal lengths")
|| msg.contains("incorrect number of fields"),
"unexpected error message: {msg}",
);
}

Ok(())
}

#[tokio::test]
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
use std::io::Write;

// create a SessionContext
let ctx = SessionContext::new();

// Create a temp file with a .csv suffix so the reader accepts it
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
let path = tmp.path().to_str().unwrap().to_string();

// Build CsvReadOptions: header present, enable truncated_rows.
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
// if the method name differs in your codebase use the appropriate one.)
let options = CsvReadOptions::default().truncated_rows(true);

println!("options: {}, path: {path}", options.truncated_rows);

// Call the API under test
let df = ctx.read_csv(&path, options).await?;

// Collect the results and combine batches so we can inspect columns
let batches = df.collect().await?;
let combined = concat_batches(&batches[0].schema(), &batches)?;

// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
let col_c = combined.column(2);
assert!(
col_c.is_null(0),
"expected first row column 'c' to be NULL due to truncated row"
);

// Also ensure we read at least one row
assert!(combined.num_rows() >= 2);

Ok(())
}
}
18 changes: 17 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
/// Whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths.
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub truncated_rows: bool,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
file_sort_order: vec![],
comment: None,
null_regex: None,
truncated_rows: false,
}
}

Expand Down Expand Up @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
self.null_regex = null_regex;
self
}

/// Configure whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
self.truncated_rows = truncated_rows;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -558,7 +573,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());
.with_null_regex(self.null_regex.clone())
.with_truncated_rows(self.truncated_rows);

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
19 changes: 17 additions & 2 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl CsvFormat {
self
}

pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self {
self.options.truncated_rows = Some(truncated_rows);
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
Expand Down Expand Up @@ -291,6 +296,13 @@ impl CsvFormat {
self
}

/// Set whether rows should be truncated to the column width
/// - defaults to false
pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self {
self.options.truncated_rows = Some(truncate_rows);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.options.delimiter
Expand Down Expand Up @@ -426,11 +438,13 @@ impl FileFormat for CsvFormat {
.with_file_compression_type(self.options.compression.into())
.with_newlines_in_values(newlines_in_values);

let truncated_rows = self.options.truncated_rows.unwrap_or(false);
let source = Arc::new(
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
.with_escape(self.options.escape)
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
.with_comment(self.options.comment)
.with_truncate_rows(truncated_rows),
);

let config = conf_builder.with_source(source).build();
Expand Down Expand Up @@ -509,7 +523,8 @@ impl CsvFormat {
.unwrap_or_else(|| state.config_options().catalog.has_header),
)
.with_delimiter(self.options.delimiter)
.with_quote(self.options.quote);
.with_quote(self.options.quote)
.with_truncated_rows(self.options.truncated_rows.unwrap_or(false));

if let Some(null_regex) = &self.options.null_regex {
let regex = Regex::new(null_regex.as_str())
Expand Down
17 changes: 16 additions & 1 deletion datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub struct CsvSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
truncate_rows: bool,
}

impl CsvSource {
Expand All @@ -111,6 +112,11 @@ impl CsvSource {
pub fn has_header(&self) -> bool {
self.has_header
}

// true if rows length support truncate
pub fn truncate_rows(&self) -> bool {
self.truncate_rows
}
/// A column delimiter
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand Down Expand Up @@ -156,6 +162,13 @@ impl CsvSource {
conf.comment = comment;
conf
}

/// Whether to support truncate rows when read csv file
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
let mut conf = self.clone();
conf.truncate_rows = truncate_rows;
conf
}
}

impl CsvSource {
Expand All @@ -175,7 +188,8 @@ impl CsvSource {
.expect("Batch size must be set before initializing builder"),
)
.with_header(self.has_header)
.with_quote(self.quote);
.with_quote(self.quote)
.with_truncated_rows(self.truncate_rows);
if let Some(terminator) = self.terminator {
builder = builder.with_terminator(terminator);
}
Expand Down Expand Up @@ -340,6 +354,7 @@ impl FileOpener for CsvOpener {

let config = CsvSource {
has_header: csv_has_header,
truncate_rows: self.config.truncate_rows,
..(*self.config).clone()
};

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ message CsvOptions {
bytes double_quote = 15; // Indicates if quotes are doubled
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
bytes terminator = 17; // Optional terminator character as a byte
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
}

// Options controlling CSV format
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
null_regex: (!proto_opts.null_regex.is_empty())
.then(|| proto_opts.null_regex.clone()),
comment: proto_opts.comment.first().copied(),
truncated_rows: proto_opts.truncated_rows.first().map(|h| *h != 0),
})
}
}
Expand Down
Loading
Loading