Skip to content

feat: add Parquet merge policy for compaction (Phase 2)#6351

Merged
g-talbot merged 7 commits intomatthew.kim/metrics-partitioningfrom
gtt/parquet-merge-policy
Apr 29, 2026
Merged

feat: add Parquet merge policy for compaction (Phase 2)#6351
g-talbot merged 7 commits intomatthew.kim/metrics-partitioningfrom
gtt/parquet-merge-policy

Conversation

@g-talbot
Copy link
Copy Markdown
Contributor

@g-talbot g-talbot commented Apr 28, 2026

Summary

  • Adds a constant write amplification merge policy for Parquet splits, adapted from the existing ConstWriteAmplificationMergePolicy but using byte size instead of document count as the primary size metric (Parquet rows vary wildly in size depending on tag density)
  • Implements compaction scope grouping by (index_uid, partition_id, sort_fields, window_start) — splits with different scopes are never merged together
  • Provides finalize_operations() with lowered merge factor for cold window compaction (Phase 3 planner will call this when a window stops receiving new data)

This is Phase 2 of the Parquet compaction project. Phase 1 (k-way sorted merge engine, #6335) provides the physical merge. This PR provides the decision layer — which splits to merge and when. Phase 3 (forthcoming) wires both into the actor pipeline.

Based on #6340 (Matt's partition_id PR) — partition_id is included in the compaction scope so splits from different partitions are never merged.

Design

Algorithm: Groups immature splits by num_merge_ops level, then greedily accumulates within each level until reaching merge_factor (10) or target_split_size_bytes (256 MiB). Bounds write amplification at max_merge_ops (4) — each byte is rewritten at most 4 times.

Integration with compaction service: The ParquetMergePolicy trait mirrors the shape of the existing MergePolicy trait but operates on ParquetSplitMetadata. When nadav/feature-split-merges adds Parquet support, the CompactionPlanner can call group_by_compaction_scope() then parquet_merge_policy.operations() per scope group.

New files

File Purpose
merge/policy/mod.rs ParquetMergePolicy trait, ParquetMergeOperation, ParquetSplitMaturity
merge/policy/scope.rs CompactionScope struct and group_by_compaction_scope()
merge/policy/const_write_amplification.rs Policy implementation with config

Invariants verified

ID Invariant How tested
MC-CONSERVE Total bytes in ops + remaining = input proptest
MC-LEVEL All splits in an op share num_merge_ops proptest + unit
MC-WA Mature splits never in merge ops proptest + unit
MC-IDEMPOTENT Order-independent (shuffle invariance) proptest
MC-SCOPE Different scopes never merged scope unit tests
MC-CONVERGE Repeated application converges to target size simulation test

Test plan

  • 11 scope grouping tests (empty, single, same scope, different fields, different partition_id, no window, mixed)
  • 16 policy unit tests (empty, single, mature, merge_factor, max_merge_factor, mixed levels, size-triggered, finalize, ordering)
  • 1 proptest with 4 invariant checks across random inputs
  • 1 simulation test (200 splits through multi-round compaction)
  • cargo clippy -p quickwit-parquet-engine --all-features --tests clean
  • cargo doc -p quickwit-parquet-engine --no-deps clean
  • cargo machete clean
  • All 334 crate tests pass

🤖 Generated with Claude Code

@g-talbot g-talbot changed the base branch from main to matthew.kim/metrics-partitioning April 28, 2026 15:37
g-talbot and others added 3 commits April 28, 2026 11:47
Adds a constant write amplification merge policy for Parquet splits,
adapted from the existing ConstWriteAmplificationMergePolicy but using
byte size instead of document count as the primary size metric. This is
Phase 2 of the Parquet compaction project — the decision layer that
determines which splits to merge within each compaction scope.

Key components:
- ParquetMergePolicy trait mirroring the MergePolicy interface
- CompactionScope grouping by (index_uid, sort_fields, window_start)
- ConstWriteAmplificationParquetMergePolicy with bounded write amp
- finalize_operations() for cold window compaction
- 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340),
include it in CompactionScope so splits with different partitions are
never merged together. Adds 2 new scope tests for partition isolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@g-talbot g-talbot force-pushed the gtt/parquet-merge-policy branch from c32b58d to 191723f Compare April 28, 2026 16:08
@g-talbot
Copy link
Copy Markdown
Contributor Author

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 191723fd84

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread quickwit/quickwit-parquet-engine/src/merge/policy/const_write_amplification.rs Outdated
Comment thread quickwit/quickwit-parquet-engine/src/merge/policy/scope.rs
g-talbot and others added 4 commits April 28, 2026 12:56
finalize_operations() was running single_merge_operation over all young
splits without grouping by num_merge_ops level. This could merge level-0
and level-1 splits together, stamping the output with max(levels) + 1
and prematurely maturing lower-level data.

Fix: group by num_merge_ops in finalize just like operations() does,
then apply the lower merge factor within each level independently.

Added test_finalize_respects_mc_level_invariant (unit) and
proptest_finalize_respects_mc_level (property test) — both caught the
bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CompactionScope only used window_start_secs, so splits with the same
start but different durations (e.g. after a window config change) would
be grouped together. The merge engine requires all inputs to agree on
both window_start and window_duration, so merging across durations would
fail validation.

Added test_different_window_duration which caught the bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add three merge-policy invariants to the shared verification layer
(quickwit-dst) with check_invariant! enforcement in
ParquetMergeOperation::new():

- MP-1: all splits share the same num_merge_ops level
- MP-2: merge op has at least 2 input splits
- MP-3: all splits share the same compaction scope (sort_fields + window)

Shared pure functions in quickwit_dst::invariants::merge_policy are the
single source of truth, usable by Stateright models and production code.
Debug builds panic on violation; all builds emit metrics via the
invariant recorder.

Tests written first (4 should_panic tests failed before adding checks,
pass after). Plus 1 positive test and 3 unit tests for shared functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@mattmkim
Copy link
Copy Markdown
Contributor

discussed in person:

we should make the target split size configurable. logs already has the ability to do this via a merge policy config section in the index config. https://quickwit.io/docs/configuration/index-config#merge-policies

looks like this for logs

version: 0.7
index_id: "hdfs"
# ...
indexing_settings:
  merge_policy:
    type: "stable_log"
    min_level_num_docs: 100000
    merge_factor: 10
    max_merge_factor: 12
    maturation_period: 48h

@g-talbot
Copy link
Copy Markdown
Contributor Author

discussed in person:

we should make the target split size configurable. logs already has the ability to do this via a merge policy config section in the index config. https://quickwit.io/docs/configuration/index-config#merge-policies

looks like this for logs

version: 0.7
index_id: "hdfs"
# ...
indexing_settings:
  merge_policy:
    type: "stable_log"
    min_level_num_docs: 100000
    merge_factor: 10
    max_merge_factor: 12
    maturation_period: 48h

@mattmkim Done in a subsequent PR: #6362

@g-talbot g-talbot merged commit 7190d9b into matthew.kim/metrics-partitioning Apr 29, 2026
5 checks passed
@g-talbot g-talbot deleted the gtt/parquet-merge-policy branch April 29, 2026 21:04
mattmkim added a commit that referenced this pull request Apr 30, 2026
* partitioning for metrics

* fix checkpointing

* lint

* address comment, fix max num partition handling

* move partitioning to indexer, not doc processor

* lint

* feat: add Parquet merge policy for compaction (Phase 2) (#6351)

* feat: add Parquet merge policy for compaction (Phase 2)

Adds a constant write amplification merge policy for Parquet splits,
adapted from the existing ConstWriteAmplificationMergePolicy but using
byte size instead of document count as the primary size metric. This is
Phase 2 of the Parquet compaction project — the decision layer that
determines which splits to merge within each compaction scope.

Key components:
- ParquetMergePolicy trait mirroring the MergePolicy interface
- CompactionScope grouping by (index_uid, sort_fields, window_start)
- ConstWriteAmplificationParquetMergePolicy with bounded write amp
- finalize_operations() for cold window compaction
- 33 tests: unit, proptest (MC-CONSERVE/LEVEL/WA/IDEMPOTENT), simulation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: add partition_id to CompactionScope, rebase on #6340

Now that ParquetSplitMetadata has partition_id (from Matt's PR #6340),
include it in CompactionScope so splits with different partitions are
never merged together. Adds 2 new scope tests for partition isolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: rustfmt nightly comment wrapping and import ordering

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: finalize_operations must respect MC-LEVEL invariant

finalize_operations() was running single_merge_operation over all young
splits without grouping by num_merge_ops level. This could merge level-0
and level-1 splits together, stamping the output with max(levels) + 1
and prematurely maturing lower-level data.

Fix: group by num_merge_ops in finalize just like operations() does,
then apply the lower merge factor within each level independently.

Added test_finalize_respects_mc_level_invariant (unit) and
proptest_finalize_respects_mc_level (property test) — both caught the
bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: include window_duration_secs in CompactionScope

CompactionScope only used window_start_secs, so splits with the same
start but different durations (e.g. after a window config change) would
be grouped together. The merge engine requires all inputs to agree on
both window_start and window_duration, so merging across durations would
fail validation.

Added test_different_window_duration which caught the bug before the fix.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add MP-1/MP-2/MP-3 runtime invariant checks for merge operations

Add three merge-policy invariants to the shared verification layer
(quickwit-dst) with check_invariant! enforcement in
ParquetMergeOperation::new():

- MP-1: all splits share the same num_merge_ops level
- MP-2: merge op has at least 2 input splits
- MP-3: all splits share the same compaction scope (sort_fields + window)

Shared pure functions in quickwit_dst::invariants::merge_policy are the
single source of truth, usable by Stateright models and production code.
Debug builds panic on violation; all builds emit metrics via the
invariant recorder.

Tests written first (4 should_panic tests failed before adding checks,
pass after). Plus 1 positive test and 3 unit tests for shared functions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: nightly rustfmt — merge imports, unwrap short line, trailing newline

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: George Talbot <george.talbot@datadoghq.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants