feat: add Parquet merge policy for compaction (Phase 2)#6351
feat: add Parquet merge policy for compaction (Phase 2)#6351g-talbot merged 7 commits intomatthew.kim/metrics-partitioningfrom
Conversation
Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric. This is Phase 2 of the Parquet compaction project — the decision layer that determines which splits to merge within each compaction scope. Key components: - ParquetMergePolicy trait mirroring the MergePolicy interface - CompactionScope grouping by (index_uid, sort_fields, window_start) - ConstWriteAmplificationParquetMergePolicy with bounded write amp - finalize_operations() for cold window compaction - 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
c32b58d to
191723f
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 191723fd84
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
finalize_operations() was running single_merge_operation over all young splits without grouping by num_merge_ops level. This could merge level-0 and level-1 splits together, stamping the output with max(levels) + 1 and prematurely maturing lower-level data. Fix: group by num_merge_ops in finalize just like operations() does, then apply the lower merge factor within each level independently. Added test_finalize_respects_mc_level_invariant (unit) and proptest_finalize_respects_mc_level (property test) — both caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CompactionScope only used window_start_secs, so splits with the same start but different durations (e.g. after a window config change) would be grouped together. The merge engine requires all inputs to agree on both window_start and window_duration, so merging across durations would fail validation. Added test_different_window_duration which caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
discussed in person: we should make the target split size configurable. logs already has the ability to do this via a merge policy config section in the index config. https://quickwit.io/docs/configuration/index-config#merge-policies looks like this for logs |
|
* partitioning for metrics * fix checkpointing * lint * address comment, fix max num partition handling * move partitioning to indexer, not doc processor * lint * feat: add Parquet merge policy for compaction (Phase 2) (#6351) * feat: add Parquet merge policy for compaction (Phase 2) Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric. This is Phase 2 of the Parquet compaction project — the decision layer that determines which splits to merge within each compaction scope. Key components: - ParquetMergePolicy trait mirroring the MergePolicy interface - CompactionScope grouping by (index_uid, sort_fields, window_start) - ConstWriteAmplificationParquetMergePolicy with bounded write amp - finalize_operations() for cold window compaction - 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add partition_id to CompactionScope, rebase on #6340 Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340), include it in CompactionScope so splits with different partitions are never merged together. Adds 2 new scope tests for partition isolation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: rustfmt nightly comment wrapping and import ordering Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: finalize_operations must respect MC-LEVEL invariant finalize_operations() was running single_merge_operation over all young splits without grouping by num_merge_ops level. This could merge level-0 and level-1 splits together, stamping the output with max(levels) + 1 and prematurely maturing lower-level data. Fix: group by num_merge_ops in finalize just like operations() does, then apply the lower merge factor within each level independently. Added test_finalize_respects_mc_level_invariant (unit) and proptest_finalize_respects_mc_level (property test) — both caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: include window_duration_secs in CompactionScope CompactionScope only used window_start_secs, so splits with the same start but different durations (e.g. after a window config change) would be grouped together. The merge engine requires all inputs to agree on both window_start and window_duration, so merging across durations would fail validation. Added test_different_window_duration which caught the bug before the fix. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add MP-1/MP-2/MP-3 runtime invariant checks for merge operations Add three merge-policy invariants to the shared verification layer (quickwit-dst) with check_invariant! enforcement in ParquetMergeOperation::new(): - MP-1: all splits share the same num_merge_ops level - MP-2: merge op has at least 2 input splits - MP-3: all splits share the same compaction scope (sort_fields + window) Shared pure functions in quickwit_dst::invariants::merge_policy are the single source of truth, usable by Stateright models and production code. Debug builds panic on violation; all builds emit metrics via the invariant recorder. Tests written first (4 should_panic tests failed before adding checks, pass after). Plus 1 positive test and 3 unit tests for shared functions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: nightly rustfmt — merge imports, unwrap short line, trailing newline Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: George Talbot <george.talbot@datadoghq.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
ConstWriteAmplificationMergePolicybut using byte size instead of document count as the primary size metric (Parquet rows vary wildly in size depending on tag density)(index_uid, partition_id, sort_fields, window_start)— splits with different scopes are never merged togetherfinalize_operations()with lowered merge factor for cold window compaction (Phase 3 planner will call this when a window stops receiving new data)This is Phase 2 of the Parquet compaction project. Phase 1 (k-way sorted merge engine, #6335) provides the physical merge. This PR provides the decision layer — which splits to merge and when. Phase 3 (forthcoming) wires both into the actor pipeline.
Based on #6340 (Matt's partition_id PR) —
partition_idis included in the compaction scope so splits from different partitions are never merged.Design
Algorithm: Groups immature splits by
num_merge_opslevel, then greedily accumulates within each level until reachingmerge_factor(10) ortarget_split_size_bytes(256 MiB). Bounds write amplification atmax_merge_ops(4) — each byte is rewritten at most 4 times.Integration with compaction service: The
ParquetMergePolicytrait mirrors the shape of the existingMergePolicytrait but operates onParquetSplitMetadata. Whennadav/feature-split-mergesadds Parquet support, theCompactionPlannercan callgroup_by_compaction_scope()thenparquet_merge_policy.operations()per scope group.New files
merge/policy/mod.rsParquetMergePolicytrait,ParquetMergeOperation,ParquetSplitMaturitymerge/policy/scope.rsCompactionScopestruct andgroup_by_compaction_scope()merge/policy/const_write_amplification.rsInvariants verified
num_merge_opsTest plan
cargo clippy -p quickwit-parquet-engine --all-features --testscleancargo doc -p quickwit-parquet-engine --no-depscleancargo macheteclean🤖 Generated with Claude Code