Skip to content

Commit d2e2cda

Browse files
jonasdeddenetseidlalamb
authored
Fix skip_records over-counting when partial record precedes num_rows page skip (#9374)
# Which issue does this PR close? - Closes #9370 . # Rationale for this change The bug occurs when using RowSelection with nested types (like List<String>) when: 1. A column has multiple pages in a row group 2. The selected rows span across page boundaries 3. The first page is entirely consumed during skip operations The issue was in `arrow-rs/parquet/src/column/reader.rs:287-382` (`skip_records` function). **Root cause:** When `skip_records` completed successfully after crossing page boundaries, the `has_partial` state in the `RepetitionLevelDecoder` could incorrectly remain true. This happened when: - The skip operation exhausted a page where has_record_delimiter was false - The skip found the remaining records on the next page by counting a delimiter at index 0 - When a subsequent read_records(1) was called, the stale has_partial=true state caused count_records to incorrectly interpret the first repetition level (0) at index 0 as ending a "phantom" partial record, returning (1 record, 0 levels, 0 values) instead of properly reading the actual record data. For a more descriptive explanation, look here: #9370 (comment) # What changes are included in this PR? Added code at the end of skip_records to reset the partial record state when all requested records have been successfully skipped. This ensures that after skip_records completes, we're at a clean record boundary with no lingering partial record state, fixing the array length mismatch in StructArrayReader. # Are these changes tested? Commit 365bd9a introduces a test showcasing this issue with v2 data pages only on a unit-test level. PR #9399 could be used to showcase the issue in an end-to-end way. Previously wrong assumption that thought it had to do with mixing v1 and v2 data pages: ``` In b52e043 I added a test that I validated to fail whenever I remove my fix. Bug Mechanism The bug requires three ingredients: 1. Page 1 (DataPage v1): Contains a nested column (with rep levels). During skip_records, all levels on this page are consumed. count_records sees no following rep=0 delimiter, so it sets has_partial=true. Since has_record_delimiter is false (the default InMemoryPageReader returns false when more pages exist), flush_partial is not called. 2. Page 2 (DataPage v2): Has num_rows available in its metadata. When num_rows <= remaining_records, the entire page is skipped via skip_next_page() — this does not touch the rep level decoder at all, so has_partial remains stale true from page 1. 3. Page 3 (DataPage v1): When read_records loads this page, the stale has_partial=true causes the rep=0 at position 0 to be misinterpreted as completing a "phantom" partial record. This produces (1 record, 0 levels, 0 values) instead of reading the actual record data. Test Verification - With fix (flush_partial at end of skip_records): read_records(1) correctly returns (1, 2, 2) with values [70, 80] - Without fix: read_records(1) returns (1, 0, 0) — a phantom record with no data, which is what causes the "Not all children array length are the same!" error when different sibling columns in a struct produce different record counts ``` --------- Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 0b04483 commit d2e2cda

4 files changed

Lines changed: 145 additions & 4 deletions

File tree

parquet/src/column/page.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,14 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {
406406
/// [(#4327)]: https://github.com/apache/arrow-rs/pull/4327
407407
/// [(#4943)]: https://github.com/apache/arrow-rs/pull/4943
408408
fn at_record_boundary(&mut self) -> Result<bool> {
409-
Ok(self.peek_next_page()?.is_none())
409+
match self.peek_next_page()? {
410+
// Last page in the column chunk - always a record boundary
411+
None => Ok(true),
412+
// A V2 data page is required by the parquet spec to start at a
413+
// record boundary, so the current page ends at one. V2 pages
414+
// are identified by having `num_rows` set in their header.
415+
Some(metadata) => Ok(metadata.num_rows.is_some()),
416+
}
410417
}
411418
}
412419

parquet/src/column/reader.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,4 +1361,135 @@ mod tests {
13611361
);
13621362
}
13631363
}
1364+
1365+
/// Regression test for <https://github.com/apache/arrow-rs/issues/9370>
1366+
///
1367+
/// Reproduces the production scenario: all DataPage v2 pages for a
1368+
/// list column (rep_level=1) read without an offset index (i.e.
1369+
/// `at_record_boundary` returns false for non-last pages).
1370+
///
1371+
/// When a prior operation (here `skip_records(1)`) loads a v2 page,
1372+
/// and a subsequent `skip_records` exhausts the remaining levels on
1373+
/// that page, the rep level decoder is left with `has_partial=true`.
1374+
/// Because `has_record_delimiter` is false, the partial is not
1375+
/// flushed during level-based processing. When the next v2 page is
1376+
/// then peeked with `num_rows` available, the whole-page-skip
1377+
/// shortcut must flush the pending partial first. Otherwise:
1378+
///
1379+
/// 1. The skip over-counts (skips N+1 records instead of N), and
1380+
/// 2. The stale `has_partial` causes a subsequent `read_records` to
1381+
/// produce a "phantom" record with 0 values.
1382+
#[test]
1383+
fn test_skip_records_v2_page_skip_accounts_for_partial() {
1384+
use crate::encodings::levels::LevelEncoder;
1385+
1386+
let max_rep_level: i16 = 1;
1387+
let max_def_level: i16 = 1;
1388+
1389+
// Column descriptor for a list element column (rep=1, def=1)
1390+
let primitive_type = SchemaType::primitive_type_builder("element", PhysicalType::INT32)
1391+
.with_repetition(Repetition::REQUIRED)
1392+
.build()
1393+
.unwrap();
1394+
let desc = Arc::new(ColumnDescriptor::new(
1395+
Arc::new(primitive_type),
1396+
max_def_level,
1397+
max_rep_level,
1398+
ColumnPath::new(vec!["list".to_string(), "element".to_string()]),
1399+
));
1400+
1401+
// Helper: build a DataPage v2 for this list column.
1402+
let make_v2_page =
1403+
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: u32| -> Page {
1404+
let mut rep_enc = LevelEncoder::v2(max_rep_level, rep_levels.len());
1405+
rep_enc.put(rep_levels);
1406+
let rep_bytes = rep_enc.consume();
1407+
1408+
let mut def_enc = LevelEncoder::v2(max_def_level, def_levels.len());
1409+
def_enc.put(def_levels);
1410+
let def_bytes = def_enc.consume();
1411+
1412+
let val_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
1413+
1414+
let mut buf = Vec::new();
1415+
buf.extend_from_slice(&rep_bytes);
1416+
buf.extend_from_slice(&def_bytes);
1417+
buf.extend_from_slice(&val_bytes);
1418+
1419+
Page::DataPageV2 {
1420+
buf: Bytes::from(buf),
1421+
num_values: rep_levels.len() as u32,
1422+
encoding: Encoding::PLAIN,
1423+
num_nulls: 0,
1424+
num_rows,
1425+
def_levels_byte_len: def_bytes.len() as u32,
1426+
rep_levels_byte_len: rep_bytes.len() as u32,
1427+
is_compressed: false,
1428+
statistics: None,
1429+
}
1430+
};
1431+
1432+
// All pages are DataPage v2 (matching the production scenario where
1433+
// parquet-rs writes only v2 data pages and no offset index is loaded,
1434+
// so at_record_boundary() returns false for non-last pages).
1435+
1436+
// Page 1 (v2): 2 records × 2 elements = [10,20], [30,40]
1437+
let page1 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[10, 20, 30, 40], 2);
1438+
1439+
// Page 2 (v2): 2 records × 2 elements = [50,60], [70,80]
1440+
let page2 = make_v2_page(&[0, 1, 0, 1], &[1, 1, 1, 1], &[50, 60, 70, 80], 2);
1441+
1442+
// Page 3 (v2): 1 record × 2 elements = [90,100]
1443+
let page3 = make_v2_page(&[0, 1], &[1, 1], &[90, 100], 1);
1444+
1445+
// 5 records total: [10,20], [30,40], [50,60], [70,80], [90,100]
1446+
let pages = VecDeque::from(vec![page1, page2, page3]);
1447+
let page_reader = InMemoryPageReader::new(pages);
1448+
let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader));
1449+
let mut typed_reader = get_typed_column_reader::<Int32Type>(column_reader);
1450+
1451+
// Step 1 — skip 1 record:
1452+
// Peek page 1: num_rows=2, remaining=1 → rows(2) > remaining(1),
1453+
// so the page is LOADED (not whole-page-skipped).
1454+
// Level-based skip consumes rep levels [0,1] for record [10,20],
1455+
// stopping at the 0 that starts record [30,40].
1456+
let skipped = typed_reader.skip_records(1).unwrap();
1457+
assert_eq!(skipped, 1);
1458+
1459+
// Step 2 — skip 2 more records ([30,40] and [50,60]):
1460+
// Mid-page in page 1 with 2 remaining levels [0,1] for [30,40].
1461+
// skip_rep_levels(2, 2): the leading 0 does NOT act as a record
1462+
// delimiter (has_partial=false, idx==0), so count_records returns
1463+
// (true, 0, 2) — all levels consumed, has_partial=true, 0 records.
1464+
//
1465+
// has_record_delimiter is false → no flush at page boundary.
1466+
// Page 1 exhausted → peek page 2 (v2, num_rows=2).
1467+
//
1468+
// With fix: flush_partial → remaining 2→1, page 2 NOT skipped
1469+
// (rows=2 > remaining=1). Load page 2, skip 1 record [50,60].
1470+
//
1471+
// Without fix: rows(2) <= remaining(2) → page 2 whole-page-skipped,
1472+
// over-counting by 1. has_partial stays true (stale from page 1).
1473+
let skipped = typed_reader.skip_records(2).unwrap();
1474+
assert_eq!(skipped, 2);
1475+
1476+
// Step 3 — read 1 record:
1477+
let mut values = Vec::new();
1478+
let mut def_levels = Vec::new();
1479+
let mut rep_levels = Vec::new();
1480+
1481+
let (records, values_read, levels_read) = typed_reader
1482+
.read_records(1, Some(&mut def_levels), Some(&mut rep_levels), &mut values)
1483+
.unwrap();
1484+
1485+
// Without the fix: (1, 0, 0) — phantom record from stale has_partial;
1486+
// the rep=0 on page 3 "completes" the phantom, yielding 0 values.
1487+
// With the fix: (1, 2, 2) — correctly reads record [70, 80].
1488+
assert_eq!(records, 1, "should read exactly 1 record");
1489+
assert_eq!(levels_read, 2, "should read 2 levels for the record");
1490+
assert_eq!(values_read, 2, "should read 2 non-null values");
1491+
assert_eq!(values, vec![70, 80], "should contain 4th record's values");
1492+
assert_eq!(rep_levels, vec![0, 1], "rep levels for a 2-element list");
1493+
assert_eq!(def_levels, vec![1, 1], "def levels (all non-null)");
1494+
}
13641495
}

parquet/src/file/serialized_reader.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1158,7 +1158,12 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
11581158

11591159
fn at_record_boundary(&mut self) -> Result<bool> {
11601160
match &mut self.state {
1161-
SerializedPageReaderState::Values { .. } => Ok(self.peek_next_page()?.is_none()),
1161+
SerializedPageReaderState::Values { .. } => match self.peek_next_page()? {
1162+
None => Ok(true),
1163+
// V2 data pages must start at record boundaries per the parquet
1164+
// spec, so the current page ends at one.
1165+
Some(metadata) => Ok(metadata.num_rows.is_some()),
1166+
},
11621167
SerializedPageReaderState::Pages { .. } => Ok(true),
11631168
}
11641169
}

parquet/tests/arrow_reader/row_filter/sync.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,6 @@ fn test_row_filter_full_page_skip_is_handled() {
206206
/// Without the fix, the list column over-skips by one record, causing
207207
/// struct children to disagree on record counts.
208208
#[test]
209-
#[should_panic(expected = "StructArrayReader out of sync in read_records, expected 1 read, got 0")]
210209
fn test_row_selection_list_column_v2_page_boundary_skip() {
211210
use arrow_array::builder::{Int32Builder, ListBuilder};
212211

@@ -327,7 +326,6 @@ fn test_row_selection_list_column_v2_page_boundary_skip() {
327326
/// bug causes one leaf to over-skip by one record while the other stays
328327
/// correct.
329328
#[test]
330-
#[should_panic(expected = "Not all children array length are the same!")]
331329
fn test_list_struct_page_boundary_desync_produces_length_mismatch() {
332330
use arrow_array::Array;
333331
use arrow_array::builder::{Int32Builder, ListBuilder, StringBuilder, StructBuilder};

0 commit comments

Comments
 (0)