Skip to content

Commit 0724036

Browse files
g-talbotclaude
andauthored
feat: page-bounded Arrow decoder per data page (PR-6a.2) (#6407)
* feat: page-stream → RecordBatch decoder (PR-6a) Bridges PR-4's ColumnPageStream (raw compressed pages in storage order) to arrow's standard ParquetRecordBatchReaderBuilder (decoded arrays). PR-6's streaming merge engine drains each input row-group through this to keep per-RG memory bounded — only one input RG worth of bytes is materialised at a time, rather than the whole file. Approach: reconstruct one row group's column-chunk byte layout in a buffer (column chunks placed at their original offsets, gaps zero- padded), wrap the buffer in `Bytes`, and feed it to `ParquetRecordBatchReaderBuilder::new_with_metadata` with `with_row_groups([rg_idx])`. Byte-exact reconstruction by carrying each page's original Thrift-compact `header_bytes` through PR-4's streaming reader — no re-encoding, so encoder-version drift inside the compactor cannot silently corrupt outputs. Adds `header_bytes: Bytes` to `Page` and captures the drained header bytes inside `parse_page_header_streaming`. New `StreamDecoder` borrows the stream and exposes `next_rg()` returning one `RecordBatch` per input row group, idempotent at EOF. Tests (9, all passing): single-RG and multi-RG drains, multi-page columns, dict columns, null preservation, compression codec roundtrip (uncompressed/snappy/zstd — LZ4 not enabled in our parquet feature set), idempotent EOF, byte-exact reconstruction proof, and I/O failure surfacing as PageDecodeError::PageStream rather than masked as decode. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style: nightly fmt fixup CI nightly rustfmt (newer than my local at the time of the original push) wraps `write_parquet(...)` onto multiple lines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat: page-bounded Arrow decoder per data page (PR-6a.2) Replaces PR-6a's per-RG fat-buffer approach. The previous implementation reconstructed a whole row group's column-chunk bytes into a single buffer and fed it to ParquetRecordBatchReaderBuilder — peak memory was RG-size (tens to hundreds of MB per call). This rewrite is page-bounded. API change: \`StreamDecoder::next_rg() -> Option<RecordBatch>\` is replaced by \`decode_next_page() -> Option<DecodedPage>\`. Each call returns one input data page's worth of decoded rows as an \`ArrayRef\`, plus \`(rg_idx, col_idx, page_idx_in_col, row_start)\` indexing so PR-6b's merge engine can slice take indices per page. Dictionary pages are absorbed silently (fed to the column reader for subsequent data-page decoding); INDEX_PAGE is skipped. Memory at any time: - One in-flight page (compressed + decompressed bytes) - One cached dictionary page per (rg, col) when dict-encoded - One column reader per (rg, col) with small bookkeeping (level decoders, value decoder) Does NOT buffer the row group, a column chunk, or a materialised RecordBatch. Implementation: wraps parquet-rs's public \`GenericColumnReader\` over a per-(rg, col) PageQueue we feed one page at a time. Page → ColumnPage conversion handles decompression (via \`compression::create_codec\`, which required enabling parquet's \`experimental\` feature on our Cargo.toml — the API has been stable across recent parquet-rs versions, just not yet de-experimentalised), \`format::Encoding\` (Thrift wrapper) → \`basic::Encoding\` translation, and DataPageV2's unencrypted-levels-then-compressed-values layout. Array builders cover the production schema: Boolean, Int8/16/32/64, UInt8/16/32/64, Float32/64, Utf8/LargeUtf8/Binary/LargeBinary, and \`List<non-nullable primitive>\` (DDSketch \`keys\` / \`counts\`). Dict columns decode to their value type (Utf8/Binary); the merge engine's union schema normalises strings to Utf8 anyway, and the output writer re-applies dict encoding based on observed cardinality. Tests (9, all passing): - single-RG and multi-RG round-trip (per-column comparison vs. canonical arrow reader) - per-page indexing (\`row_start\`, \`page_idx_in_col\` monotonic per-(rg, col)) - idempotent EOF - nullable column (\`service\` with nulls every 5th row) - compression codecs (uncompressed, snappy, zstd) - I/O failures surface as \`PageDecodeError::PageStream\` - \`List<UInt64>\` (DDSketch \`counts\`) with variable list lengths including empty list and \`u64::MAX\` - structural page-bounded contract: PageQueue depth ≤ 2 (one queued dictionary plus the current data page) across a long stream Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style: drop trailing blank line in page_decoder.rs CI's `cargo +nightly fmt --check` flags a single trailing blank line at end of file. No functional change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(page_decoder): preserve List<Float64/32> and LargeList<T> shapes Addresses two Codex review comments on PR-6407. ## 1. List<Float64> (and List<Float32>) flattened to flat array `build_float64_array` and `build_float32_array` ignored `field.data_type()` and unconditionally constructed a flat `Float64Array` / `Float32Array`, even when the input column was declared as `List<Float64>` / `List<Float32>` (which the streaming writer accepts and writes). The decoded page's Arrow type / row shape didn't match the schema — downstream `RecordBatch` construction or merge writing would fail with a schema mismatch, or treat list elements as rows. The int32/int64 builders already branched to `build_list_i32_array` / `build_list_i64_array` when the outer type was a list. The float builders now follow the same pattern via new `build_list_f32_array` and `build_list_f64_array` helpers. Call sites pass `reps` (was discarded as `_reps`). Coverage symmetry vs `streaming_writer.rs`'s list path: - Int8/16/32 + UInt8/16/32 (Int32-physical) ✓ — already covered - Int64 + UInt64 (Int64-physical) ✓ — already covered - Float32 (Float-physical) ✓ — added - Float64 (Double-physical) ✓ — added - Bool, Utf8, Binary — the writer rejects these as list inners ("only flat numeric primitive inners are supported"); the decoder matches. ## 2. LargeList<T> outer constructed wrong array width `build_list_i32_array` and `build_list_i64_array` accepted both `DataType::List(_)` and `DataType::LargeList(_)` outer types but always constructed `arrow::array::ListArray` (i32 offsets). For a `LargeList<UInt64>` column emitted by the streaming writer, the decoder produced `ListArray<UInt64>` — type mismatch. Factored the outer-list construction into a new `wrap_inner_in_list` helper that picks `ListArray` (i32 offsets) or `LargeListArray` (i64 offsets) based on `field.data_type()`. All four list builders (i32, i64, f32, f64) now route through it. `list_offsets_from_levels` now returns `Vec<i64>` so the same offset buffer can back either array width — the helper truncates to i32 in the `List` path. ## Round-trip caveat (documented in test) In our pipeline `init_column_state` derives fields from `parquet_to_arrow_schema(_, None)`, and parquet's native schema doesn't distinguish list offset widths — so the derived field is always `List<>`, never `LargeList<>` (the ARROW:schema KV is deliberately bypassed to avoid Dictionary type mismatches; see the existing comment in `init_column_state`). Round-trip from a `LargeList<>`-typed parquet input through the decoder produces `List<>` — that's a pre-existing limitation, not introduced here. The `LargeList` branch of `wrap_inner_in_list` is therefore only reachable when callers construct fields directly. The new `test_wrap_inner_in_list_dispatches_on_outer_flavour` exercises that branch via direct calls; documenting the dispatch contract without requiring a parquet round-trip that the rest of the pipeline can't currently produce. ## Tests - `test_list_float64_round_trip` — `List<Float64>` round-trips through the decoder as a `ListArray` with `Float64Array` inner, not a flat `Float64Array`. - `test_wrap_inner_in_list_dispatches_on_outer_flavour` — direct builder call with `LargeList<UInt64>` field produces `LargeListArray` (i64 offsets); with `List<UInt64>` produces `ListArray` (i32 offsets). 11/11 page_decoder tests pass; 443/443 crate tests pass. Clippy clean, fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(page_decoder): pre-fetch one page so peek_next_page sees real metadata Addresses Codex review comment on PR-6407: "Preserve next-page metadata for repeated pages". ## Problem `PageQueueReader::peek_next_page` reads from the per-(rg, col) queue. Before this fix, `decode_next_page` queued exactly ONE page (the current data page) before calling `read_records`. parquet-rs's column reader uses `peek_next_page` (via `at_record_boundary`) to decide whether to flush partial repetition-level state when decoding V1 data pages that contain repetition (i.e. `List<>` columns). With only the current page queued, peek returned `None`, which parquet-rs treats as "this is the last page" — it would flush partial rep-level state at every page boundary and could split a list record incorrectly when a single record's list spans multiple V1 pages. Husky writes V1 pages by default (parquet-rs `DEFAULT_WRITER_VERSION = PARQUET_1_0`), and DDSketch-style `List<UInt64>` columns can exceed page-size limits when sketches are large or page sizes are small — making this a real correctness risk. ## Fix `decode_next_page` now maintains a **one-page lookahead**: 1. After locating a state with a queued data page to decode, it pulls one more page from the underlying stream and routes it to its (rg, col) queue (which may be the same state's queue, in which case `peek_next_page` returns the real next-page metadata; or a different state's queue, in which case the current state's `peek_next_page` correctly returns `None` because the current column chunk is exhausted from this state's perspective). 2. THEN drives `read_records` on the current state. parquet-rs's column reader can now correctly check record boundaries. The decode_next_page loop is restructured: it first looks for any state with a queued unconsumed data page (which covers both the "freshly pulled" case and the "lookahead from a previous call" case), and only pulls from the stream when no state has work to do. This naturally handles the case where a previous call's lookahead landed in a different state's queue. Memory: one extra in-flight page (the lookahead) — still page-bounded; `test_page_bounded_queue_depth` continues to pass. ## Tests - `test_list_record_spanning_pages_preserved` (new): writes a `List<UInt64>` column with `data_page_size_limit = 20` bytes, forcing a 50-element list record to span multiple V1 pages. Verifies all 50 values are preserved intact after decoding. Without the lookahead, this would fail (parquet-rs would split the record at page boundaries). - `test_page_bounded_queue_depth` continues to pass (lookahead is bounded; queue depth stays small). - All 11 prior page_decoder tests pass. 444/444 crate tests pass. Clippy clean, fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(page_decoder): only pre-fetch lookahead for repeated columns Follow-up to the previous commit (9e74792) which added a one-page lookahead in `decode_next_page` to give parquet-rs's column reader accurate `peek_next_page` metadata for V1 record-spanning lists. The unconditional pre-fetch broke callers that drop a `StreamDecoder` mid-traversal and create a fresh one over the same `ColumnPageStream` — the merge engine's phase 0 (sort col drain) followed by a phase 3 fresh body-col decoder is exactly this pattern. The pre-fetch advanced the stream past the current column chunk, and the new decoder didn't have the pre-fetched page in its queue, losing the page entirely. Fix: only pre-fetch for columns where `max_rep_level > 0` (List<T> / LargeList<T>). Flat columns get one value per record, so V1 record continuation across pages doesn't apply — the lookahead's only benefit is the list-record-spanning correctness this PR was introducing. Sort cols (metric_name, timestamp_secs, sorted_series) are all flat → no lookahead → stream advances only over consumed pages → safe to drop the decoder and resume on a fresh one. `test_list_record_spanning_pages_preserved` still passes (its column has `max_rep_level = 1`, so the lookahead activates). All 12 page_decoder tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(page_decoder): make single-consumer invariant + parquet-rs lineage explicit Addresses adamtobey's review on PR-6407. - Document the single-consumer invariant on `StreamDecoder`: the `Arc<Mutex<_>>` on per-(rg, col) queues is for parquet-rs's `PageReader: Send` trait shape, not concurrency. All pushes and pops happen synchronously within a single `decode_next_page` call, so the lock/unlock/lock sequences cannot race. - Collapse `find_state_with_queued_data_page` + `decode_state_head`'s num_values capture into `next_decodable_head`, returning `((rg, col), num_values)` from one lock pass. Removes the TOCTOU-looking lock-find-unlock-relock-refind shape. - Add `build_primitive::<P>(...)` helper that mirrors parquet-rs's `PrimitiveArrayReader::consume_batch` + `coerce_i32`/`coerce_i64` coercion table. Cuts the int32/int64/float32/float64 builders from ~80 lines of repeated null-clone + cast + typed-constructor to one helper call per arm. - Module comment cites `parquet::arrow::array_reader::PrimitiveArrayReader` and explains why we re-implement: that module is `#[doc(hidden)]` and gated by parquet-rs's `experimental` feature, which we don't enable. No behaviour change. 444 lib tests pass; workspace clippy + nightly fmt + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4609bdb commit 0724036

4 files changed

Lines changed: 2156 additions & 12 deletions

File tree

quickwit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ opentelemetry-appender-tracing = "0.31"
176176
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
177177
opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic", "http-json"] }
178178
ouroboros = "0.18"
179-
parquet = { version = "58", default-features = false, features = ["arrow", "snap", "variant_experimental", "zstd"] }
179+
parquet = { version = "58", default-features = false, features = ["arrow", "experimental", "snap", "variant_experimental", "zstd"] }
180180
percent-encoding = "2.3"
181181
pin-project = "1.1"
182182
pnet = { version = "0.35", features = ["std"] }

quickwit/quickwit-parquet-engine/src/storage/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
mod config;
1818
pub mod inspect;
1919
pub(crate) mod legacy_adapter;
20+
pub(crate) mod page_decoder;
2021
pub(crate) mod split_writer;
2122
pub(crate) mod streaming_reader;
2223
pub(crate) mod streaming_writer;
@@ -28,6 +29,7 @@ pub use inspect::{
2829
verify_partition_prefix,
2930
};
3031
pub use legacy_adapter::{LegacyAdapterError, LegacyInputAdapter};
32+
pub use page_decoder::{PageDecodeError, StreamDecoder};
3133
pub use split_writer::ParquetSplitWriter;
3234
pub use streaming_reader::{
3335
ColumnPageStream, Page, ParquetReadError, RemoteByteSource, StreamingParquetReader,

0 commit comments

Comments
 (0)