Skip to content

Commit 1436be7

Browse files
g-talbotclaude
andauthored
feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2) (#6409)
* feat: streaming column-major merge engine with page-bounded body cols (PR-6b.2) Rebuilds PR-6b on top of PR-6a.2's per-page Arrow decoder. The streaming merge engine now keeps body-col memory bounded by output page size (not column-chunk size) while preserving caller-specified M:N output splitting at sorted_series boundaries. Architecture (Husky multi-input → multi-output sorted merge): Phase 0 (async) — drain sort cols from each input. With Husky column ordering, sort cols + sorted_series are the prefix of each row group's body bytes, so the decoder can stop after they are fully decoded; the remaining body col pages stay un-read in the input stream, ready for phase 3. Phase 1 — compute_merge_order over the per-input sort-col RecordBatches using the existing k-way (sorted_series, timestamp_secs) heap. Phase 2 — compute_output_boundaries with the caller's num_outputs, splitting at sorted_series transitions. Phase 3 (blocking + block_on bridges) — streaming write. All M output writers are alive for the duration. For each column in Husky order, every output's col K is written in turn: - Sort col / sorted_series: applied via arrow::interleave from the already-buffered phase-0 data. - Body col: each output page is assembled via arrow::interleave from input page slices, with decoders advanced page-by-page via handle.block_on from inside the sync iterator passed to write_next_column_arrays. Pages flush to the writer's sink as SerializedColumnWriter's page-size threshold trips — memory stays bounded by the in-flight output page plus a small number of in-flight input pages. After all M outputs' col K is done, every input decoder is at the start of col K+1 in its single row group. Move to col K+1. PR-6b.2 only handles single-row-group inputs (real or PR-5- adapter-presented). Multi-RG metric-aligned inputs are rejected with a clear error message; supporting them requires either consuming + discarding body cols of RG[i-1] from the stream to reach RG[i]'s sort cols, or a second body GET — both are larger scope changes that land in a follow-up. Page-bounded contract verified by test_body_col_streams_many_pages_per_column_chunk: with data_page_row_count_limit=1000 on an 8000-row merge, the output value column spans ≥ 2 pages, demonstrating that body col writes respect data_page_size and do not materialise whole column chunks. Tests (9, all passing): two-input merge, single-RG output for single-metric_name input, total-rows-preserved across M:N, sort-schema mismatch rejection, KV metadata propagation, all-empty-inputs no-output, output drainable by StreamDecoder, multi-RG input rejection, page-bounded body col streaming. Also exposes existing helpers in merge/writer.rs as pub(super) (apply_merge_permutation, build_merge_kv_metadata, build_sorting_columns, resolve_sort_field_names, verify_sort_order) so streaming.rs can reuse the same MC-3 / KV / sorting-columns construction the non-streaming engine uses. PR-7 will fold the non-streaming engine away. PR-6c.2 will add file-size monitoring on top: close the current output at the next sorted_series transition when an in-progress file approaches the size cap, producing additional splits beyond the caller's N. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: persist decoder + page cache across body-col passes Address two Codex review findings on PR-6b.2 (#6409): * P1 — Preserve decoder/page cache across output chunks. The merge engine was constructing a fresh `StreamDecoder` for every `advance_decoder_to_row` call, which reset the per-column `rows_decoded` counter so the second decoded page reported `row_start = 0` after the stream had already advanced. The page cache also lived on the per-output assembler, so pages whose row range straddled two outputs were dropped when the first output finished even though the stream couldn't be rewound. Both scenarios produced silently wrong rows or out-of-bounds panics on any input large enough to require multi-page advances per output or multi-output coverage of a single page. The decoder now lives on `InputDecoderState` (owned via the new `StreamDecoder::from_owned` constructor), and the per-input body- col page cache + cursor are reset only at the start of each body column. * P2 — Stream body pages instead of collecting `Vec<ArrayRef>`. The per-output body-col write now feeds `write_next_column_arrays` one page at a time via `StreamingBodyColIter`, which captures assembly errors in a side cell so memory stays bounded by output- page size rather than column-chunk size. Two regression tests cover the bug shapes — multi-page body col within one output (2500 rows × 50-row pages) and multi-output input where pages span output boundaries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: guard body-col path against zero-row-group inputs Address Codex P1 (third comment) on PR-6b.2 (#6409): phase 0 explicitly accepts inputs with `num_row_groups() == 0` (returning a zero-row sort batch), but `write_body_col_for_all_outputs` unconditionally called `state.metadata.row_group(0)` for every input, panicking with "index out of bounds" before the first body column was written. Treat zero-RG inputs the same as inputs whose schema lacks the current column: push `None` into `input_col_indices` and skip them for this body col. Also drop the unused `input_target_rows` vec that was being built only for its row-group lookup side effect. Regression test `test_zero_row_input_mixed_with_non_empty` builds a 0-row + 50-row pair and merges them through the streaming engine; without this fix the merge blocking task panics inside parquet-rs's `row_group()` indexing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drop all-null sort fields from per-output streaming schema Address Codex P2 (fourth comment) on PR-6b.2 (#6409): the schema derivation condition `sort_optimised.has(name) || full_union.has(name)` was tautologically true for every iterated field — every `field` came from `full_union_schema`, so the second disjunct was always satisfied and the intended "drop all-null sort fields" branch never fired. Pass the sort union schema in explicitly so we can tell sort fields apart from body fields. Sort field present in `sort_union_schema` → keep only if `optimize_output_batch` kept it (not all-null for this output's rows). Body field → keep unconditionally; tracking per-output body-col presence would require pre-reading every body column for every output, which is the column-chunk-bounded buffering the streaming path exists to avoid. Regression test `test_derive_output_schema_drops_all_null_sort_field` calls the helper directly with a synthetic union + sort-optimised pair and asserts an all-null sort field is dropped while a body field with the same union-schema position is preserved. Verified the test fails against the pre-fix logic with the expected `['metric_name', 'env', 'timestamp_secs', 'value']` vs `['metric_name', 'timestamp_secs', 'value']` mismatch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * chore: code-quality fixes + MC-2 type round-trip test on streaming merge Bundle three pieces: - **Husky → neutral phrasing.** Replaced the seven "Husky" mentions in the streaming engine's doc-comments and error messages with neutral "sort-cols-first storage ordering" / "column ordering" phrasing. Project is Quickwit; the internal column-ordering scheme didn't need a separate brand in user-visible error strings. - **One `.unwrap()` → `.expect()` in lib code.** The hashmap lookup in `drain_sort_cols_one_input` is guarded by a `contains_key` check; promote the implicit invariant to a documented panic message per CODE_STYLE.md. - **`align_inputs_to_union_schema` nullability fix.** The first-sight branch unconditionally marked new fields nullable; the existing comment claims "columns that don't appear in every input must be nullable" but the code applied that rule to every field. Replaced with a two-pass scheme: track `any_nullable` and `appears_in` per field across all inputs, then mark nullable iff some input had it nullable OR the field is missing from some input. This unblocks `List<Float64>` columns end-to-end (the writer rejects nullable List; the previous behaviour forced every list column nullable on first sight even when every input declared it non-null). - **MC-2 round-trip integration test.** New `test_mc2_all_types_round_trip_through_streaming_merge` builds two inputs covering every parquet physical type the decoder accepts — Int8/16/32/64, UInt8/16/32/64, Float32/64, Bool, Utf8, Dictionary<Int32, Utf8>, LargeBinary, and non-nullable `List<Float64>` — merges them through the streaming engine, and asserts every `(sorted_series_key → body-col tuple)` pair survives byte-equal. Storage-encoding transitions (Dict→Utf8, LargeBinary→ Binary) are normalised in the render helper because MC-2 promises value preservation, not internal representation preservation. This test caught two real bugs while being written: 1. Body cols must be declared in lexicographic order — the streaming engine assumes the storage convention and crashes mid-merge if they aren't. Fixture re-ordered accordingly. (Worth adding upfront validation in a follow-up; not in scope here.) 2. The schema-union nullability bug above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(MS-7): page-cache bounded-memory contract is observable + asserted Add a `#[cfg(test)] static AtomicUsize` PEAK_BODY_COL_PAGE_CACHE_LEN that records the maximum length any input's `body_col_page_cache` ever reached during the current merge, bumped on every page push in `advance_decoder_to_row`. Zero production overhead — the `record_*` helper compiles to a no-op outside test builds. New test `test_ms7_body_col_page_cache_bounded_regardless_of_input_size` runs the streaming merge over three input sizes (300 / 3 000 / 30 000 rows at 50-row pages) and asserts: 1. Peak resident pages stays below a fixed ceiling (24, for the ratio of OUTPUT_PAGE_ROWS=1024 to input page_rows=50, plus a few-page slack for decoder lookahead + transients). 2. Growth from 3 000 to 30 000 rows (10× more input pages) yields at most a 2-page increase in peak. The whole MS-7 claim is that peak does not scale with input size. Verified the test catches a deliberate regression: removing the per- output-page eviction loop in `assemble_one_output_page` pushed the 3 000-row peak to 60 (60 > 24) and the test failed with the expected message. Fixture support: `write_input_parquet_with_small_pages` now also sets `write_batch_size` and `data_page_size` proportional to the requested page row count. Without those, the arrow writer's defaults (64 KiB / 1 MiB) caused `data_page_row_count_limit` to be silently ignored and produced one giant page per column. Probed the output via `get_column_page_reader` — 30 000 rows now produces 600 pages per col as expected. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drive col loop from full union schema + collect service tags from sort col Address two new Codex P2 findings on PR-6b.2 (#6409): - **Use the full union schema when driving column writes.** The old `build_parent_union_schema` picked one per-output schema by field count and used it as the column-iteration driver. If two outputs drop *different* all-null sort fields and end up with the same field count, the first wins — and any column it dropped is never iterated, leaving the other output's writer missing a column or writing subsequent columns into the wrong slot. The doc comment already said "process the FULL union schema's cols in order"; the implementation diverged. Drive `write_all_columns` from `full_union_schema` directly and delete the broken heuristic. - **Collect service names from the sort-col path too.** If the sort schema places `service` in the sort key (`metric_name|service|...`), the streaming engine writes it via the sort-col path and the body-col `track_service` branch never runs. `MergeOutputFile.low_cardinality_tags[TAG_SERVICE]` came back empty even though every row had a service value. Extract service names from `static_meta.sort_optimised` at `finalize_output_writer` time so the TAG_SERVICE metadata is accurate regardless of which write path the column took. Two regression tests: - `test_heterogeneous_dropped_fields_drive_from_full_union_schema` builds two inputs whose per-output schemas drop different all-null sort fields with the same field count. Each kept tag must survive to its output. Verified the test fails (panic on missing column) against the pre-fix logic. - `test_service_as_sort_column_still_populates_low_cardinality_tags` uses a sort schema `metric_name|service|-timestamp_secs/V2` and asserts the output's `low_cardinality_tags[TAG_SERVICE]` covers every distinct service value. Verified the test fails against pre-fix `finalize_output_writer` with the expected "must contain TAG_SERVICE" message. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(streaming): rename to fill_page_cache_to_row + cross-input docstrings Addresses adamtobey's review on PR-6409. - Rename `advance_decoder_to_row` → `fill_page_cache_to_row`. The function's effect on the world is "add pages to the per-input cache" — it never advances a cursor or skips data. The old name primed reviewers to ask "are we skipping rows?" (which is exactly what Adam asked). - Use a `rows_for_current_output` register inside `compute_input_row_destinations` and write to `rows_per_output[out_idx]` once after the inner loop; saves the per-row indexed store. - Expand `body_col_page_cache` docstring with the horizontal-vs-vertical memory bound argument and a pointer to the MS-7 invariant test (`test_ms7_body_col_page_cache_bounded_regardless_of_input_size`). - Add context comments at the cross-file invariant sites Adam flagged: - Sort-cols-first storage-ordering contract on the sort-col drain. - Single-RG-input restriction with forward pointer to PR-6c.2 (#6424) which relaxes it. - `rg_partition_prefix_len` defaulting to 0 (with reference to the legacy-promotion `mixed_prefix_ok` escape in PR-6423). No behaviour change. 461 lib tests pass; workspace clippy + nightly fmt + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(streaming): assert per-input body cols are in Husky order Adam's question on L194 asked whether body-col ordering was a hard cross-file requirement. My first answer said "no" — true for which array we read (we look up by name), but wrong for the body-col memory bound: Phase 3 iterates the union schema's body cols alphabetically and asks each input's decoder to advance to that col. Parquet emits column chunks in declared schema order, so the decoder reads pages in that input's storage order. If an input's body cols aren't in the same alphabetical-after-sort-cols order ("Husky order"), fill_page_cache_to_row has to drain every body col preceding the requested one on the wire — those pages land in body_col_page_caches[col_idx] until that col's turn in the union iteration. The cache grows to a full column-chunk's worth per misaligned col. Vertical, not horizontal. Defeats streaming. Catch this at merge entry instead of silently degrading to vertical caching: - `assert_inputs_in_husky_body_col_order` runs after `build_input_decoders_state` and before phase 0. Bails with a concrete error message naming the offending pair of column names. - New regression test `test_assert_inputs_in_husky_body_col_order_rejects_misaligned_input` builds an input with body cols `[value, metric_type]` (alphabetical would be `[metric_type, value]`) and asserts the merge errors out before phase 3. No production producer violates this today (streaming writer and legacy Husky writer both emit lexicographic body cols), so the assertion catches future producer drift, not current traffic. 462 lib tests pass (461 prior + 1 new); workspace clippy + nightly fmt + rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0724036 commit 1436be7

5 files changed

Lines changed: 3913 additions & 25 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod merge_order;
2424
pub mod metadata_aggregation;
2525
pub mod policy;
2626
mod schema;
27+
pub mod streaming;
2728
mod writer;
2829

2930
#[cfg(test)]

quickwit/quickwit-parquet-engine/src/merge/schema.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,45 +59,74 @@ pub fn align_inputs_to_union_schema(
5959
bail!("no inputs to align");
6060
}
6161

62-
// Collect all fields across all inputs, checking for type conflicts.
63-
// String-like types are normalized to Utf8 for internal alignment.
64-
let mut field_map: BTreeMap<String, Arc<Field>> = BTreeMap::new();
62+
// Track each field's normalized type, whether any input declared
63+
// it nullable, and how many of the input batches contain it. The
64+
// union field is nullable iff some input observed it as nullable
65+
// OR some input is missing the field entirely (a row from a
66+
// missing-the-field input will be null in the merged output).
67+
// The previous version always defaulted new fields to nullable on
68+
// first sight, which broke columns whose nullability must be
69+
// preserved (e.g. `List<Float64>` — the writer's non-nullable-
70+
// list contract requires the union field to stay non-nullable).
71+
struct FieldInfo {
72+
normalized_type: DataType,
73+
any_nullable: bool,
74+
appears_in: usize,
75+
}
76+
let mut field_map: BTreeMap<String, FieldInfo> = BTreeMap::new();
6577

6678
for (input_idx, batch) in inputs.iter().enumerate() {
6779
for field in batch.schema().fields() {
6880
let normalized_type = normalize_type(field.data_type());
6981

70-
match field_map.get(field.name().as_str()) {
82+
match field_map.get_mut(field.name().as_str()) {
7183
Some(existing) => {
72-
if *existing.data_type() != normalized_type {
84+
if existing.normalized_type != normalized_type {
7385
bail!(
7486
"type conflict for column '{}': input 0 has {:?}, input {} has {:?} \
7587
(normalized: {:?} vs {:?})",
7688
field.name(),
77-
existing.data_type(),
89+
existing.normalized_type,
7890
input_idx,
7991
field.data_type(),
80-
existing.data_type(),
92+
existing.normalized_type,
8193
normalized_type,
8294
);
8395
}
84-
// If either side is nullable, the union must be too.
85-
if field.is_nullable() && !existing.is_nullable() {
86-
let nullable_field =
87-
Arc::new(Field::new(field.name(), normalized_type, true));
88-
field_map.insert(field.name().clone(), nullable_field);
96+
if field.is_nullable() {
97+
existing.any_nullable = true;
8998
}
99+
existing.appears_in += 1;
90100
}
91101
None => {
92-
// Columns that don't appear in every input must be nullable.
93-
let nullable_field = Arc::new(Field::new(field.name(), normalized_type, true));
94-
field_map.insert(field.name().clone(), nullable_field);
102+
field_map.insert(
103+
field.name().clone(),
104+
FieldInfo {
105+
normalized_type,
106+
any_nullable: field.is_nullable(),
107+
appears_in: 1,
108+
},
109+
);
95110
}
96111
}
97112
}
98113
}
99114

100-
// Build the union schema in Husky column order.
115+
// Materialise `Arc<Field>` per the rule above. Keep
116+
// `BTreeMap<String, Arc<Field>>` so `build_husky_ordered_schema`
117+
// is unchanged.
118+
let total_inputs = inputs.len();
119+
let field_map: BTreeMap<String, Arc<Field>> = field_map
120+
.into_iter()
121+
.map(|(name, info)| {
122+
let nullable = info.any_nullable || info.appears_in < total_inputs;
123+
let field = Arc::new(Field::new(&name, info.normalized_type, nullable));
124+
(name, field)
125+
})
126+
.collect();
127+
128+
// Build the union schema in storage column order (sort cols first,
129+
// then body cols lexicographic).
101130
let union_schema = build_husky_ordered_schema(&field_map, sort_fields_str)?;
102131
let union_schema_ref = Arc::new(union_schema);
103132

0 commit comments

Comments
 (0)