Skip to content

Commit 4ab52a4

Browse files
g-talbotclaude
andcommitted
fix(streaming): roll over chunk-assignment before first chunk after split
Codex P1 on PR #6428: the previous "Recompute split budget after rolling over" fix (commit 56e773f, #6424) handled the split *decision* but not the split *assignment*. When the previous region fills the current output exactly and the next region enters the `needs_split` path, the chunk-assignment loop in `process_split_region_col_outer`'s setup initializes from the stale `current_output_idx` / `current_output_rows`. Its inner `needs_new_writer` check guards on `!chunk_assignments.is_empty()`, so the first iteration cannot roll over: the first sub-region is appended to the already-full output and only the second one advances. Output K ends up at 2× target while subsequent outputs are short or empty. Fix: initialize `active_output_idx` / `active_rows` from the `will_roll_over` case before the loop. The inner `needs_new_writer` check then works for both the first and subsequent iterations (on the first iteration `active_rows = 0 < target` so it correctly doesn't re-roll). The `can_reuse_current` check in the writer- materialization loop already handles "first chunk's output_idx doesn't match current_writer" by finalizing the current output (which is correct: it's full, close it) and opening a fresh writer at the next index. Regression test `test_split_chunk_assignment_rolls_over_before_first_chunk`: prefix_len=1, two metrics of 200 + 400 rows = 600 total, `num_outputs = 3` → `target_per_output = 200`. Region A fills output 0 exactly; region B needs splitting. Pre-fix the merge produced 2 outputs of 400 + 200 (output 0 overfilled, output 2 empty); post-fix it produces 3 outputs of ~200 rows each. 502 lib tests pass (+1); workspace clippy + nightly fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8ad02ce commit 4ab52a4

1 file changed

Lines changed: 80 additions & 2 deletions

File tree

quickwit/quickwit-parquet-engine/src/merge/streaming.rs

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,27 @@ pub async fn streaming_merge_sorted_parquet_files(
376376
// sequential RGs — but we coalesce them into one
377377
// RG to keep the col-outer loop simple).
378378
let mut chunk_assignments: Vec<(usize, Vec<&Region>)> = Vec::new();
379-
let mut active_output_idx = current_output_idx;
380-
let mut active_rows = current_output_rows;
379+
// Pre-rollover initialization. If `will_roll_over` is set
380+
// (the previous region already filled `current_output_idx`
381+
// and we still have outputs to fill), the first chunk must
382+
// land in the NEXT output — not append to the already-full
383+
// one. The inner `needs_new_writer` check below guards on
384+
// `!chunk_assignments.is_empty()`, so it only fires from
385+
// the second iteration on; the first chunk's destination
386+
// has to be decided here.
387+
//
388+
// Companion fix to the `effective_first_target` /
389+
// `effective_outputs_remaining` rollover handling above:
390+
// the split *decision* uses the rolled-over output's
391+
// budget, and the chunk *assignment* must too. Otherwise
392+
// the first sub-region would be glued onto the already-
393+
// full output, overfilling it by up to `target_per_output`
394+
// rows and shrinking the final output count.
395+
let (mut active_output_idx, mut active_rows) = if will_roll_over {
396+
(current_output_idx + 1, 0)
397+
} else {
398+
(current_output_idx, current_output_rows)
399+
};
381400
for sub_region in &sub_regions {
382401
let sub_rows = sub_region.total_rows();
383402
let needs_new_writer = !chunk_assignments.is_empty()
@@ -2821,6 +2840,65 @@ mod tests {
28212840
}
28222841
}
28232842

2843+
/// Regression for Codex P1 on PR #6428: companion to
2844+
/// `test_region_exactly_fills_output_does_not_split_next_aligned_region`.
2845+
/// The earlier fix handled the split *decision* (use the rolled-
2846+
/// over output's full budget) but missed the split *assignment*
2847+
/// loop in `process_split_region_col_outer`'s setup: it
2848+
/// initialized `active_output_idx` / `active_rows` from the stale
2849+
/// `current_output_idx` / `current_output_rows`. The inner
2850+
/// `needs_new_writer` check guards on `!chunk_assignments.is_empty()`,
2851+
/// so the first chunk never rolled over — it was appended to the
2852+
/// already-full output, doubling its rows and shrinking the total
2853+
/// output count.
2854+
///
2855+
/// Setup: prefix_len = 1, two metrics with very different sizes
2856+
/// (200 rows + 400 rows = 600 total). `num_outputs = 3` →
2857+
/// `target_per_output = 200`. Region A (metric_name = "alpha")
2858+
/// fills output 0 exactly. Region B (metric_name = "beta") needs
2859+
/// splitting: 400 rows > the rolled-over output's 200 budget.
2860+
/// Pre-fix the merge produced 2 outputs of 400 + 200 (output 0
2861+
/// overfilled, output 2 empty). Post-fix the merge produces 3
2862+
/// outputs of 200 + 200 + 200.
2863+
#[tokio::test]
2864+
async fn test_split_chunk_assignment_rolls_over_before_first_chunk() {
2865+
let bytes = make_prefix_len_one_input(&[("aaa.alpha", 200), ("bbb.beta", 400)]);
2866+
let inputs: Vec<Box<dyn ColumnPageStream>> = vec![open_stream(bytes).await];
2867+
2868+
let tmp = TempDir::new().expect("tmpdir");
2869+
let outputs = streaming_merge_sorted_parquet_files(inputs, tmp.path(), &merge_config(3))
2870+
.await
2871+
.expect("merge must succeed");
2872+
2873+
// Pre-fix: outputs.len() == 2 (output 0 = 400 rows, output 1 =
2874+
// 200 rows). Post-fix: 3 outputs of ~200 rows each.
2875+
assert_eq!(
2876+
outputs.len(),
2877+
3,
2878+
"expected 3 outputs (one per target), got {} — chunk assignment didn't roll over \
2879+
before the first chunk",
2880+
outputs.len(),
2881+
);
2882+
let total_rows: usize = outputs.iter().map(|o| o.num_rows).sum();
2883+
assert_eq!(total_rows, 600, "row conservation");
2884+
for (i, out) in outputs.iter().enumerate() {
2885+
// All-non-empty: pre-fix output 2 was empty (the loop
2886+
// assigned both region-B sub-chunks within outputs 0+1).
2887+
assert!(
2888+
out.num_rows > 0,
2889+
"output {i} should be non-empty post-fix; got num_rows = {}",
2890+
out.num_rows,
2891+
);
2892+
// No output exceeds the target by more than one full
2893+
// sub-region. Pre-fix output 0 was 400 rows (2× target).
2894+
assert!(
2895+
out.num_rows <= 250,
2896+
"output {i} should not exceed target by more than one sub-region; got {}",
2897+
out.num_rows,
2898+
);
2899+
}
2900+
}
2901+
28242902
/// Build a single-RG fixture with multiple metric_names sorted
28252903
/// by metric_name then timestamp. The file has
28262904
/// `rg_partition_prefix_len = 0` (legacy) so the streaming merge

0 commit comments

Comments
 (0)