Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions arrow-avro/src/reader/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,38 @@ mod tests {
assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
}

#[tokio::test]
async fn test_alltypes_with_empty_schema_large_batch() {
// With an empty reader schema -- should count rows but produce no columns
let file = arrow_test_data("avro/alltypes_plain.avro");
let schema = Arc::new(Schema::new(Vec::<Field>::new()));
let batches = read_async_file(&file, 1024, None, Some(schema), None)
.await
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];

assert_eq!(batch.num_rows(), 8);
assert_eq!(batch.num_columns(), 0);
}

#[tokio::test]
async fn test_alltypes_with_empty_schema_small_batch() {
// With an empty reader schema -- should count rows but produce no columns
let file = arrow_test_data("avro/alltypes_plain.avro");
let schema = Arc::new(Schema::new(Vec::<Field>::new()));
let batches = read_async_file(&file, 5, None, Some(schema), None)
.await
.unwrap();

assert_eq!(batches.len(), 2);

assert_eq!(batches[0].num_rows(), 5);
assert_eq!(batches[0].num_columns(), 0);
assert_eq!(batches[1].num_rows(), 3);
assert_eq!(batches[1].num_columns(), 0);
}

#[tokio::test]
async fn test_nested_no_schema_no_projection() {
// No reader schema, no projection
Expand Down Expand Up @@ -1597,6 +1629,31 @@ mod tests {
assert_eq!(batch.schema().field(2).name(), "f1");
}

#[tokio::test]
async fn test_nested_with_empty_schema() {
// With an empty reader schema -- should count rows but produce no columns
let file = arrow_test_data("avro/nested_records.avro");
let schema = Arc::new(
Schema::new(Vec::<Field>::new()).with_metadata(HashMap::from([(
SCHEMA_METADATA_KEY.into(),
r#"{
"type": "record",
"namespace": "ns1",
"name": "record1",
"fields": []
}"#
.to_owned(),
)])),
);
let batches = read_async_file(&file, 1024, None, Some(schema), None)
.await
.unwrap();
let batch = &batches[0];

assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 0);
}

#[tokio::test]
async fn test_projection_error_out_of_bounds() {
let file = arrow_test_data("avro/alltypes_plain.avro");
Expand Down
8 changes: 7 additions & 1 deletion arrow-avro/src/reader/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ pub(crate) struct RecordDecoder {
schema: SchemaRef,
fields: Vec<Decoder>,
projector: Option<Projector>,
row_count: usize,
}

impl RecordDecoder {
Expand Down Expand Up @@ -136,6 +137,7 @@ impl RecordDecoder {
schema: Arc::new(ArrowSchema::new(arrow_fields)),
fields: encodings,
projector,
row_count: 0,
})
}
other => Err(AvroError::ParseError(format!(
Expand Down Expand Up @@ -166,6 +168,7 @@ impl RecordDecoder {
}
}
}
self.row_count += count;
Ok(cursor.position())
}

Expand All @@ -176,7 +179,10 @@ impl RecordDecoder {
.iter_mut()
.map(|x| x.flush(None))
.collect::<Result<Vec<_>, _>>()?;
RecordBatch::try_new(self.schema.clone(), arrays).map_err(Into::into)
let batch_options = RecordBatchOptions::new().with_row_count(Some(self.row_count));
self.row_count = 0;
RecordBatch::try_new_with_options(self.schema.clone(), arrays, &batch_options)
.map_err(Into::into)
}
}

Expand Down
Loading