Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1312,4 +1312,44 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_read_csv_truncated_rows_via_tempfile() -> Result<()> {
use std::io::Write;

// create a SessionContext
let ctx = SessionContext::new();

// Create a temp file with a .csv suffix so the reader accepts it
let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; // ensures path ends with .csv
// CSV has header "a,b,c". First data row is truncated (only "1,2"), second row is complete.
write!(tmp, "a,b,c\n1,2\n3,4,5\n")?;
let path = tmp.path().to_str().unwrap().to_string();

// Build CsvReadOptions: header present, enable truncated_rows.
// (Use the exact builder method your crate exposes: `truncated_rows(true)` here,
// if the method name differs in your codebase use the appropriate one.)
let options = CsvReadOptions::default().truncated_rows(true);

println!("options: {}, path: {path}", options.truncated_rows);

// Call the API under test
let df = ctx.read_csv(&path, options).await?;

// Collect the results and combine batches so we can inspect columns
let batches = df.collect().await?;
let combined = concat_batches(&batches[0].schema(), &batches)?;

// Column 'c' is the 3rd column (index 2). The first data row was truncated -> should be NULL.
let col_c = combined.column(2);
assert!(
col_c.is_null(0),
"expected first row column 'c' to be NULL due to truncated row"
);

// Also ensure we read at least one row
assert!(combined.num_rows() >= 2);

Ok(())
}
}
18 changes: 17 additions & 1 deletion datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub struct CsvReadOptions<'a> {
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Optional regex to match null values
pub null_regex: Option<String>,
/// Whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths.
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub truncated_rows: bool,
}

impl Default for CsvReadOptions<'_> {
Expand All @@ -117,6 +122,7 @@ impl<'a> CsvReadOptions<'a> {
file_sort_order: vec![],
comment: None,
null_regex: None,
truncated_rows: false,
}
}

Expand Down Expand Up @@ -223,6 +229,15 @@ impl<'a> CsvReadOptions<'a> {
self.null_regex = null_regex;
self
}

/// Configure whether to allow truncated rows when parsing.
/// By default this is set to false and will error if the CSV rows have different lengths
/// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
/// If the record’s schema is not nullable, then it will still return an error.
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
self.truncated_rows = truncated_rows;
self
}
}

/// Options that control the reading of Parquet files.
Expand Down Expand Up @@ -546,7 +561,8 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone());
.with_null_regex(self.null_regex.clone())
.with_truncated_rows(self.truncated_rows);

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
16 changes: 15 additions & 1 deletion datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl CsvFormat {
self
}

pub fn with_truncated_rows(mut self, truncated_rows: bool) -> Self {
self.options.truncated_rows = Some(truncated_rows);
self
}

/// Set the regex to use for null values in the CSV reader.
/// - default to treat empty values as null.
pub fn with_null_regex(mut self, null_regex: Option<String>) -> Self {
Expand Down Expand Up @@ -291,6 +296,13 @@ impl CsvFormat {
self
}

/// Set whether rows should be truncated to the column width
/// - defaults to false
pub fn with_truncate_rows(mut self, truncate_rows: bool) -> Self {
self.options.truncated_rows = Some(truncate_rows);
self
}

/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.options.delimiter
Expand Down Expand Up @@ -422,11 +434,13 @@ impl FileFormat for CsvFormat {
.with_file_compression_type(self.options.compression.into())
.with_newlines_in_values(newlines_in_values);

let truncated_rows = self.options.truncated_rows.unwrap_or(false);
let source = Arc::new(
CsvSource::new(has_header, self.options.delimiter, self.options.quote)
.with_escape(self.options.escape)
.with_terminator(self.options.terminator)
.with_comment(self.options.comment),
.with_comment(self.options.comment)
.with_truncate_rows(truncated_rows),
);

let config = conf_builder.with_source(source).build();
Expand Down
17 changes: 16 additions & 1 deletion datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ pub struct CsvSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
truncate_rows: bool,
}

impl CsvSource {
Expand All @@ -110,6 +111,11 @@ impl CsvSource {
pub fn has_header(&self) -> bool {
self.has_header
}

// true if rows length support truncate
pub fn truncate_rows(&self) -> bool {
self.truncate_rows
}
/// A column delimiter
pub fn delimiter(&self) -> u8 {
self.delimiter
Expand Down Expand Up @@ -155,6 +161,13 @@ impl CsvSource {
conf.comment = comment;
conf
}

/// Whether to support truncate rows when read csv file
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
let mut conf = self.clone();
conf.truncate_rows = truncate_rows;
conf
}
}

impl CsvSource {
Expand All @@ -174,7 +187,8 @@ impl CsvSource {
.expect("Batch size must be set before initializing builder"),
)
.with_header(self.has_header)
.with_quote(self.quote);
.with_quote(self.quote)
.with_truncated_rows(self.truncate_rows);
if let Some(terminator) = self.terminator {
builder = builder.with_terminator(terminator);
}
Expand Down Expand Up @@ -335,6 +349,7 @@ impl FileOpener for CsvOpener {

let config = CsvSource {
has_header: csv_has_header,
truncate_rows: self.config.truncate_rows,
..(*self.config).clone()
};

Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ message CsvScanExecNode {
string comment = 6;
}
bool newlines_in_values = 7;
bool truncate_rows = 8;
}

message JsonScanExecNode {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2286,6 +2286,7 @@ impl protobuf::PhysicalPlanNode {
None
},
newlines_in_values: maybe_csv.newlines_in_values(),
truncate_rows: csv_config.truncate_rows(),
},
)),
}));
Expand Down