Skip to content

Commit 5cd0700

Browse files
g-talbotclaude
andauthored
feat(merge-executor): YAML flag to route regular merges through streaming engine (default off, no-op rollout) (#6441)
* test(merge): engine-parity tests + share MS-7 serial lock Adds `merge::tests::parity` with two tests that run the same realistic input fixture through both `merge_sorted_parquet_files` (in-memory engine) and `execute_merge_operation` (streaming engine over the same `LocalFileByteSource` the executor uses in production), then asserts row-by-row equivalence on every visible column. These gate the upcoming YAML flag that flips regular merges to the streaming engine: parity must hold before the default is flipped in production. The streaming engine writes a process-global atomic (`PEAK_BODY_COL_PAGE_CACHE_LEN`) that the MS-7 tests reset-then-read. Any test that runs a streaming merge must serialise against MS-7 or inflate its readings. Move `ms7_serial_lock` from the streaming-tests submodule to module scope (still `#[cfg(test)] pub(crate)`) so the new parity tests acquire the same lock. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge-executor): YAML flag to route regular merges through the streaming engine Wires the streaming Parquet merge engine into the regular (non-promotion) merge path behind a node-level YAML flag, `indexer.parquet_merge_use_streaming_engine`, defaulted to false. When true, `ParquetMergeExecutor::handle` runs every merge through `execute_merge_operation` (the column-major, page-bounded streaming engine) instead of the in-memory `merge_sorted_parquet_files`. Promotion merges (`target_prefix_len_override.is_some()`) continue to take the streaming path unconditionally — the in-memory engine can't handle mixed `rg_partition_prefix_len` inputs. The in-memory engine stays in place as the runtime fallback. If the streaming engine hits a bug in production, an operator can flip the flag back to `false` via YAML without redeploying. Once the streaming path has soaked, the fallback branch and `merge_sorted_parquet_files` itself can be removed. The flag is plumbed `IndexerConfig` → `IndexingService` → `ParquetMergePipelineParams` → `ParquetMergeExecutor::new`, and exercised end-to-end by the engine parity tests in the previous commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(config): exercise parquet_merge_use_streaming_engine in YAML/JSON/TOML fixtures Adds the new flag to the YAML, JSON, and TOML node-config test fixtures and bumps the expected `IndexerConfig` in `node_config_parse_*` to `parquet_merge_use_streaming_engine: true`. Catches parse / serde regressions on the field — e.g., a rename or a default-fn typo would fail the test instead of silently parsing as `false`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(merge-pipeline): end-to-end streaming-engine flag verification Adds `test_merge_pipeline_end_to_end_with_streaming_engine_flag`, an integration test that runs the full actor chain (planner → downloader → executor → uploader → publisher) with `ParquetMergePipelineParams::use_streaming_engine = true`. Asserts: 1. Publish fired with the right replaced_split_ids (merge ran end-to-end through the executor). 2. `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` after the merge. The streaming engine increments this on every body-col page assembly; the in-memory engine never touches it. Non-zero is direct evidence the streaming path executed — not a silent fallback to in-memory. 3. The merge output row count and metric names are correct. To make assertion (2) work cross-crate, exposes `PEAK_BODY_COL_PAGE_CACHE_LEN` as `pub` under `#[cfg(any(test, feature = "testsuite"))]`. The visibility widening is test-only — production builds never see the symbol. This is the closest analog to the sesh-mode "production-path" rule that is feasible today: the metrics pipeline's OTLP gRPC ingest path is not yet wired into `quickwit-serve`, so the closest end-to-end test is the actor-chain integration test that this PR adds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(adr): record dual Parquet merge engines as deviation #1 Captures the intentional, time-bounded divergence from ADR-003 §4 introduced by the streaming-engine wire-in: two engines coexist in production behind `IndexerConfig::parquet_merge_use_streaming_engine`, with the in-memory engine retained as the runtime fallback. Documents: - The ADR-003 §4 quote the deviation diverges from (page-granular streaming, bounded memory). - The current dual-engine implementation and routing logic. - Why this exists (production safety, staged rollout, parity is strong-but-not-total). - Explicit exit criteria: default flipped to `true`, ≥ 2-week production soak with no merge-correctness incidents, no rollback. When met, a follow-up PR deletes the in-memory branch and engine, the flag, and the parity tests. This is the first deviation recorded under the EVOLUTION.md framework. Indexes the doc in `deviations/README.md`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(merge-pipeline): share full correctness contract between both engine tests Extracts the steps-5-through-8 assertions (replaced_split_ids, staged metadata, Parquet file content, Parquet KV headers) into `assert_cpu_mem_merge_outputs_correct` and calls it from both `test_merge_pipeline_end_to_end` (in-memory engine) and `test_merge_pipeline_end_to_end_with_streaming_engine_flag` (streaming engine). The streaming-engine test had been doing only a small subset of the checks — row count and metric names. It now runs the full contract: time_range, num_merge_ops, sort_fields, row_keys_proto, zonemap_regexes, low_cardinality_tags, all 100 timestamps, sorted_series monotonicity, cpu/mem sort-order semantics, and every `qh.*` Parquet KV header. By construction both engines must produce a file that satisfies the same contract — the helper is the executable parity between engines at the pipeline-integration level, complementing the column-level parity tests in `quickwit-parquet-engine::merge::tests::parity`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(merge-pipeline,merge-engine): multi-metric + multi-RG + m:n disjointness Expands test coverage along three axes the existing helpers didn't hit: 1. **Multi-input, multi-metric pipeline tests** (new file `parquet_merge_pipeline_multi_metric_test.rs`). Three inputs, each carrying three metrics with overlapping per-metric timeseries IDs and overlapping-but-distinct timestamps — the merge must row-by-row interleave across all three inputs. Output writer uses `row_group_size = 50` so the 180-row merge output breaks into four row groups, exercising the writer's multi-RG path in both engines. Both engine variants (in-memory + streaming) covered. Streaming-engine test asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` to confirm the flag routed through the streaming path. 2. **Engine-level multi-output contract** in `merge::tests::parity::assert_engine_parity`. Beyond the existing engine-vs-engine column equivalence, every parity test now also verifies on the in-memory engine's outputs (equivalent to the streaming engine's): sum of per-output row counts equals total input rows, each output internally monotonic on `sorted_series`, and across outputs the partition is disjoint (no two outputs share any `sorted_series` value). This is the m:n non-overlap contract. 3. **Multi-metric overlapping-input m:n** test `parity_multi_metric_overlapping_inputs_multi_output` exercises the strengthened contract with three inputs × three metrics where per-metric keyspaces overlap across inputs. n = 3 outputs target. Honest scope note in the new pipeline test module's doc: the actor pipeline today hardcodes `num_outputs = 1` in `ParquetMergeExecutor`, so n > 1 is not reachable end-to-end through the actor system. The new engine-level test covers the n > 1 correctness contract for now; when the executor is taught to accept `num_outputs > 1` from the merge policy, the pipeline tests can grow an n > 1 variant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(merge-executor): compute num_outputs from target_split_size_bytes Replaces the hardcoded `MergeConfig { num_outputs: 1, ... }` in `ParquetMergeExecutor::handle` with a per-merge computation: num_outputs = max(1, ceil(total_input_bytes / target_split_size_bytes)) So a merge that ingests more than one target's worth of data spreads across multiple output files; merges that fit in one target keep producing a single output (preserving today's behavior for the common case). The engine clamps the request to the number of `sorted_series` boundaries actually available, so the value is an upper bound, not an exact count. Plumbing: `IndexerConfig` already carries `target_split_size_bytes` in `ParquetMergePolicyConfig`. Pass that through `ParquetMergePipelineParams.target_split_size_bytes` → `ParquetMergeExecutor::new`. Default for tests: `256 * 1024 * 1024` (matches the production default). Latent multi-output bug fixed at the same time: with n>1, the executor used to assign the planner-supplied `merge_split_id` to **every** output split, which would have collided on the rename to `{split_id}.parquet`. First output keeps the planner ID for observability continuity; subsequent outputs use the fresh IDs generated by `merge_parquet_split_metadata`. Also exposes `quickwit_parquet_engine::merge::streaming::ms7_serial_lock` as `pub` under the `testsuite` feature so cross-crate streaming tests (in `quickwit-indexing`) can serialise against the same global lock the in-crate MS-7 tests use. The streaming engine writes to a process-global atomic on every merge — without shared locking, the existing pipeline streaming-engine test races `store(0)` against other tests' merges. Adds the appropriate `#[allow(clippy::await_holding_lock)]` to the in-crate `test_merge_pipeline_end_to_end_with_streaming_engine_flag` to match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(merge-pipeline): bonus — prefix_len=1 multi-RG inputs + m:n outputs Adds the bonus scenario: three multi-metric inputs each written with `rg_partition_prefix_len = 1` and one row group per distinct metric_name (via `row_group_size = ROWS_PER_METRIC_PER_INPUT` so the writer flushes at every metric boundary after sorting). Merged with a small `ParquetMergePipelineParams::target_split_size_bytes = 500` that forces the executor's `num_outputs` calculation to ask the engine for multiple outputs — exercising the m:n merge path now reachable through the actor pipeline (PR's earlier commit removed the `num_outputs = 1` hardcode). Both engines covered: - `test_prefix_aligned_multi_metric_three_input_multi_output_in_memory_engine` - `test_prefix_aligned_multi_metric_three_input_multi_output_streaming_engine` The streaming-engine variant also asserts `PEAK_BODY_COL_PAGE_CACHE_LEN > 0` (under `ms7_serial_lock`) so a silent fallback to the in-memory path would fail. The shared assertion helper `assert_three_input_three_metric_multi_output_correct` checks the m:n contract end-to-end at the pipeline level: - All three input splits replaced. - ≥ 2 output splits staged (proves splitting happened). - Sum of per-output row counts = total input rows. - Each output internally monotonic on `sorted_series`. - Across outputs, the `sorted_series` partition is disjoint — no two outputs share any key, which is the "non-overlapping output" contract the engine promises. - Union of metric_names / services across outputs = full input set. - Every output has `num_merge_ops = 1`, `row_keys_proto`, and a `metric_name` zonemap regex. To pin the test to exactly one merge (not a cascade of merges over the now-multiple staged outputs), `make_pipeline_params` now takes `max_merge_ops` and the bonus tests set it to `1`: outputs land at `num_merge_ops = 1`, equal to the policy ceiling, and the planner refuses to merge them again. The existing n=1 tests stay at 5 (headroom — they produce a single output that can't trigger another merge anyway, since `merge_factor = 3`). Updates the module doc to drop the now-stale scope note about m:n not being reachable through the pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * style: nightly rustfmt + drop useless borrows in assert! Reformats doc comments / format strings under nightly rustfmt (`wrap_comments`, `format_strings`), and removes two redundant `&` in `assert!` arguments flagged by clippy. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4597b7a commit 5cd0700

18 files changed

Lines changed: 2134 additions & 70 deletions
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# Deviation 001: Dual Parquet merge engines during streaming-engine rollout
2+
3+
## Summary
4+
5+
Two Parquet merge engines coexist in production behind a runtime YAML flag.
6+
The streaming engine (`execute_merge_operation`) matches the intent of
7+
[ADR-003 §4](../003-time-windowed-sorted-compaction.md) (page-granular
8+
streaming, bounded memory). The in-memory engine
9+
(`merge_sorted_parquet_files`) is retained as the runtime fallback so an
10+
operator can flip back via configuration if the streaming engine hits a
11+
production bug. The dual-engine state is intentional and time-bounded —
12+
it ends when the streaming engine has soaked at the new default in
13+
production.
14+
15+
## Related ADR
16+
17+
- **ADR**: [ADR-003 Time-Windowed Sorted Compaction](../003-time-windowed-sorted-compaction.md)
18+
- **Section**: §4 Sorted Merge, Phase 2 (column streaming)
19+
20+
## ADR States
21+
22+
> Phase 2: Stream columns through the merge.
23+
>
24+
> Once the global sort order is determined, each column is read from the
25+
> input splits and written to the output in sorted order. Columns are
26+
> processed one at a time (or in small groups) for memory efficiency.
27+
>
28+
> For large columns, it may be advantageous to operate at **page
29+
> granularity** rather than loading an entire column from each input:
30+
> read individual Parquet pages from inputs as needed and write
31+
> individual pages to the output. This bounds memory usage for columns
32+
> with large values (e.g., high-cardinality string tags, large attribute
33+
> maps) and avoids materializing an entire column across all inputs
34+
> simultaneously.
35+
36+
ADR-003 §4 describes the merge as a streaming operation that bounds
37+
memory by reading and writing pages incrementally. The in-memory
38+
`merge_sorted_parquet_files` engine pre-materializes whole columns from
39+
all inputs simultaneously — directly contrary to the ADR's stated
40+
memory model.
41+
42+
## Current Implementation
43+
44+
Both engines live in `quickwit-parquet-engine/src/merge/`:
45+
46+
- **Streaming engine** (`execute_merge_operation`, in `merge/mod.rs`,
47+
backed by `merge/streaming.rs`). Column-major, page-bounded body cache,
48+
reads inputs through `RemoteByteSource`. This is the
49+
ADR-003-compliant implementation. It is the unconditional path for
50+
promotion merges (the in-memory path can't handle mixed
51+
`rg_partition_prefix_len`) and the opt-in path for regular merges.
52+
- **In-memory engine** (`merge_sorted_parquet_files`, in `merge/mod.rs`).
53+
Buffers all inputs through arrow-rs into memory, runs the merge under
54+
`run_cpu_intensive`. This is the original bootstrap implementation
55+
retained as the runtime fallback.
56+
57+
`ParquetMergeExecutor::handle` routes between them:
58+
59+
```rust
60+
let is_promotion = scratch.merge_operation.target_prefix_len_override.is_some();
61+
if is_promotion || self.use_streaming_engine {
62+
execute_merge_operation(&op, sources, &output_dir, &config).await
63+
} else {
64+
run_cpu_intensive(move || {
65+
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
66+
}).await
67+
}
68+
```
69+
70+
The `use_streaming_engine` boolean is sourced from the node-level
71+
`IndexerConfig::parquet_merge_use_streaming_engine` YAML field, default
72+
`false`.
73+
74+
Row-content equivalence between the two engines is enforced by parity
75+
tests in `quickwit-parquet-engine/src/merge/tests.rs::parity`. These
76+
must keep passing as long as both engines coexist.
77+
78+
## Signal Impact
79+
80+
Applies to **metrics** (the only signal currently using the Parquet
81+
pipeline). Will apply to **traces** and **logs** when those signals
82+
adopt Parquet splits. The deviation does not affect Tantivy-backed
83+
pipelines.
84+
85+
## Impact
86+
87+
| Aspect | ADR Target | Current Reality |
88+
|--------|------------|-----------------|
89+
| Engines in production | One streaming engine | Two (streaming + in-memory) |
90+
| Memory model | Page-bounded; ~constant per column | In-memory engine: O(total input column size) per merge |
91+
| Configuration surface | None — engine choice is internal | One YAML flag (`parquet_merge_use_streaming_engine`) |
92+
| Code surface to maintain | One engine | Two engines + parity tests + routing branch |
93+
| Operator rollback | Not applicable — only one path | Flip flag to `false`, no redeploy needed |
94+
95+
## Why This Exists
96+
97+
The streaming engine is new code. ADR-003 describes the target memory
98+
model but does not guarantee bug-free first-deployment behavior. Three
99+
forces produced the dual-engine state:
100+
101+
1. **Production safety.** The in-memory engine has been the live merge
102+
path during the metrics pipeline's bring-up. Replacing it wholesale
103+
on a single PR, without an in-place fallback, would mean any bug in
104+
the streaming engine requires a redeploy to recover. With the flag,
105+
recovery is `config edit + restart`.
106+
2. **Staged rollout.** Production confidence is built by enabling the
107+
streaming engine on a soak fleet, observing for some time, then
108+
flipping the default. The dual-engine state is the necessary
109+
infrastructure for that rollout.
110+
3. **Parity verifiable, not certain.** The parity tests in
111+
`merge::tests::parity` cover representative synthetic fixtures.
112+
Production data has shapes those fixtures don't cover. The fallback
113+
exists because parity is a strong-but-not-total guarantee.
114+
115+
## Priority Assessment
116+
117+
- **PoC / MVP**: acceptable — dual-engine is in fact the deliberate
118+
state during MVP rollout.
119+
- **Production (current)**: acceptable — flag defaults to `false`,
120+
rollout has not begun. The streaming engine is exercised only by
121+
promotion merges (whose execution will start once GAP-011 is closed).
122+
- **Production (post-soak)**: not acceptable. Once the streaming engine
123+
has soaked at default-`true` in production, the in-memory engine
124+
becomes dead code that complicates the merge-executor and obscures
125+
the ADR-003 memory contract. Resolve before merging additional
126+
significant work into `parquet_merge_executor.rs`.
127+
128+
## Exit Criteria
129+
130+
The deviation resolves when **all** of the following hold:
131+
132+
1. `IndexerConfig::default_parquet_merge_use_streaming_engine` defaults
133+
to `true` in `quickwit-config`.
134+
2. At least one production fleet has run with the flag set to `true` for
135+
a soak window of ≥ 2 weeks with no merge-correctness incidents (no
136+
data loss, no schema mismatch, no merge-output-rows-≠-input-rows
137+
alerts).
138+
3. No deviation-resolving rollback has been issued during the soak.
139+
140+
When those are met, the follow-up PR:
141+
142+
- Deletes `merge_sorted_parquet_files` from `quickwit-parquet-engine`.
143+
- Deletes the in-memory branch in `ParquetMergeExecutor::handle`.
144+
- Deletes the `use_streaming_engine` field on `ParquetMergeExecutor` and
145+
`ParquetMergePipelineParams`.
146+
- Deletes `IndexerConfig::parquet_merge_use_streaming_engine`.
147+
- Deletes `merge::tests::parity` (both engines no longer exist to
148+
compare).
149+
- Closes this deviation.
150+
151+
## Work Required to Match ADR
152+
153+
| Change | Difficulty | Description |
154+
|--------|------------|-------------|
155+
| Flip default to `true` | Trivial | One-line change in `IndexerConfig::default_parquet_merge_use_streaming_engine`. Lands after soak. |
156+
| Production soak | Operational | Run with `true` on at least one fleet for ≥ 2 weeks, monitor merge-correctness signals. |
157+
| Delete in-memory engine | Moderate | Remove `merge_sorted_parquet_files`, the fallback branch, the flag, and the parity tests. Mechanically straightforward but touches several call sites. |
158+
159+
## Recommendation
160+
161+
**Accept for now.** The dual-engine state is the deliberate output of a
162+
flag-with-fallback rollout pattern (see commit history of #6441 and
163+
related PRs). Resolution is a known follow-up, not technical debt that
164+
needs to be paid down ahead of schedule.
165+
166+
Track the exit criteria in this doc. When all three conditions hold,
167+
open the deletion PR and close this deviation.
168+
169+
## References
170+
171+
- [ADR-003 Time-Windowed Sorted Compaction](../003-time-windowed-sorted-compaction.md) §4
172+
- [GAP-011 No legacy promotion planner](../gaps/011-no-legacy-promotion-planner.md)
173+
- [GAP-012 Merge downloads instead of streaming](../gaps/012-merge-downloads-instead-of-streaming.md)
174+
- PR #6441 (wire-in of the YAML flag)
175+
176+
## Date
177+
178+
2026-05-18

docs/internals/adr/deviations/README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,7 @@ Deviation files use sequential numbering: `001-short-description.md`
9494

9595
| Deviation | Title | Related ADR | Priority |
9696
|-----------|-------|-------------|----------|
97-
98-
*No deviations recorded yet.*
97+
| [001](./001-dual-parquet-merge-engines.md) | Dual Parquet merge engines during streaming-engine rollout | [ADR-003](../003-time-windowed-sorted-compaction.md) | Accept until post-soak |
9998

10099
## Lifecycle
101100

quickwit/quickwit-config/resources/tests/node_config/quickwit.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
"split_store_max_num_splits": 10000,
5555
"max_concurrent_split_uploads": 8,
5656
"max_merge_write_throughput": "100mb",
57-
"merge_concurrency": 2
57+
"merge_concurrency": 2,
58+
"parquet_merge_use_streaming_engine": true
5859
},
5960
"ingest_api": {
6061
"replication_factor": 2

quickwit/quickwit-config/resources/tests/node_config/quickwit.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ split_store_max_num_splits = 10_000
4545
max_concurrent_split_uploads = 8
4646
max_merge_write_throughput = "100mb"
4747
merge_concurrency = 2
48+
parquet_merge_use_streaming_engine = true
4849

4950
[ingest_api]
5051
replication_factor = 2

quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ indexer:
4949
max_concurrent_split_uploads: 8
5050
max_merge_write_throughput: 100mb
5151
merge_concurrency: 2
52+
parquet_merge_use_streaming_engine: true
5253

5354
ingest_api:
5455
replication_factor: 2

quickwit/quickwit-config/src/node_config/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ pub struct IndexerConfig {
163163
pub enable_cooperative_indexing: bool,
164164
#[serde(default = "IndexerConfig::default_cpu_capacity")]
165165
pub cpu_capacity: CpuCapacity,
166+
/// If true, run Parquet merges through the streaming column-major engine
167+
/// (`execute_merge_operation`). If false (default), use the in-memory
168+
/// `merge_sorted_parquet_files` engine. The legacy in-memory engine is
169+
/// kept as the runtime fallback so production can flip back to it
170+
/// without redeploying if the streaming engine hits a bug. Promotion
171+
/// merges (those with `target_prefix_len_override`) always go through
172+
/// the streaming engine regardless of this flag — the in-memory path
173+
/// can't handle mixed prefix lengths.
174+
#[serde(default = "IndexerConfig::default_parquet_merge_use_streaming_engine")]
175+
pub parquet_merge_use_streaming_engine: bool,
166176
}
167177

168178
impl IndexerConfig {
@@ -201,6 +211,10 @@ impl IndexerConfig {
201211
CpuCapacity::one_cpu_thread() * (quickwit_common::num_cpus() as u32)
202212
}
203213

214+
fn default_parquet_merge_use_streaming_engine() -> bool {
215+
false
216+
}
217+
204218
#[cfg(any(test, feature = "testsuite"))]
205219
pub fn for_test() -> anyhow::Result<Self> {
206220
use quickwit_proto::indexing::PIPELINE_FULL_CAPACITY;
@@ -213,6 +227,7 @@ impl IndexerConfig {
213227
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
214228
max_merge_write_throughput: None,
215229
merge_concurrency: NonZeroUsize::new(3).unwrap(),
230+
parquet_merge_use_streaming_engine: Self::default_parquet_merge_use_streaming_engine(),
216231
};
217232
Ok(indexer_config)
218233
}
@@ -229,6 +244,7 @@ impl Default for IndexerConfig {
229244
cpu_capacity: Self::default_cpu_capacity(),
230245
merge_concurrency: Self::default_merge_concurrency(),
231246
max_merge_write_throughput: None,
247+
parquet_merge_use_streaming_engine: Self::default_parquet_merge_use_streaming_engine(),
232248
}
233249
}
234250
}

quickwit/quickwit-config/src/node_config/serialize.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ mod tests {
657657
cpu_capacity: IndexerConfig::default_cpu_capacity(),
658658
enable_cooperative_indexing: false,
659659
max_merge_write_throughput: Some(ByteSize::mb(100)),
660+
parquet_merge_use_streaming_engine: true,
660661
}
661662
);
662663
assert_eq!(

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ pub struct IndexingService {
113113
counters: IndexingServiceCounters,
114114
local_split_store: Arc<IndexingSplitCache>,
115115
pub(crate) max_concurrent_split_uploads: usize,
116+
/// Cached from `IndexerConfig`. Selects whether new Parquet merge
117+
/// pipelines route regular merges through the streaming engine or
118+
/// the in-memory fallback. Promotion merges always use the
119+
/// streaming engine regardless of this flag.
120+
#[cfg(feature = "metrics")]
121+
pub(crate) parquet_merge_use_streaming_engine: bool,
116122
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
117123
#[cfg(feature = "metrics")]
118124
parquet_merge_pipeline_handles: HashMap<IndexUid, ParquetMergePipelineHandle>,
@@ -178,6 +184,8 @@ impl IndexingService {
178184
indexing_pipelines: Default::default(),
179185
counters: Default::default(),
180186
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
187+
#[cfg(feature = "metrics")]
188+
parquet_merge_use_streaming_engine: indexer_config.parquet_merge_use_streaming_engine,
181189
merge_pipeline_handles: HashMap::new(),
182190
#[cfg(feature = "metrics")]
183191
parquet_merge_pipeline_handles: HashMap::new(),
@@ -723,6 +731,8 @@ impl IndexingService {
723731
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
724732
event_broker: self.event_broker.clone(),
725733
writer_config,
734+
use_streaming_engine: self.parquet_merge_use_streaming_engine,
735+
target_split_size_bytes: cfg.target_split_size_bytes,
726736
};
727737

728738
let pipeline = super::parquet_pipeline::ParquetMergePipeline::new(

quickwit/quickwit-indexing/src/actors/parquet_pipeline/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ mod parquet_merge_pipeline_trace_conformance_test;
6161
#[allow(clippy::disallowed_methods)]
6262
mod parquet_merge_pipeline_sketch_test;
6363

64+
#[cfg(test)]
65+
#[allow(clippy::disallowed_methods)]
66+
mod parquet_merge_pipeline_multi_metric_test;
67+
6468
pub use parquet_doc_processor::{
6569
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
6670
};

0 commit comments

Comments
 (0)