Skip to content

Commit 5a5ee74

Browse files
g-talbotclaude
andcommitted
feat: add merge metadata aggregation, message types, and replaced_split_ids (Phase 3a)
Phase 3 pipeline integration, first PR: - merge_parquet_split_metadata(): aggregates input split metadata with MergeOutputFile physical metadata to produce complete ParquetSplitMetadata for merged output. Validates invariant fields, unions metric_names and tags, finalizes tag cardinality after merge. 17 tests. - ParquetNewSplits, ParquetMergeTask, ParquetMergeScratch message types for the merge actor chain (planner → scheduler → downloader → executor). - Add replaced_split_ids to ParquetSplitBatch and propagate through ParquetUploader (was hardcoded Vec::new()). Enables merge executor to specify which splits are being replaced. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3e61af4 commit 5a5ee74

7 files changed

Lines changed: 490 additions & 1 deletion

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
mod indexing_service_impl;
2626
mod parquet_doc_processor;
2727
mod parquet_indexer;
28+
pub(crate) mod parquet_merge_messages;
2829
mod parquet_packager;
2930
mod parquet_splits_update;
3031
mod parquet_uploader;
@@ -44,6 +45,7 @@ pub use parquet_doc_processor::{
4445
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
4546
};
4647
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
48+
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
4749
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};
4850
pub use parquet_splits_update::ParquetSplitsUpdate;
4951
pub use parquet_uploader::ParquetUploader;

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ pub struct ParquetSplitBatch {
100100
pub publish_lock: PublishLock,
101101
/// Optional publish token.
102102
pub publish_token_opt: Option<PublishToken>,
103+
/// Split IDs being replaced by this batch (non-empty for merges).
104+
/// Empty for the ingest path.
105+
pub replaced_split_ids: Vec<String>,
103106
}
104107

105108
/// ParquetIndexer actor that accumulates RecordBatches and forwards them to ParquetPackager.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Message types for the Parquet merge pipeline.
16+
//!
17+
//! These messages flow through the actor chain:
18+
//!
19+
//! ```text
20+
//! ParquetMergePlanner ──► MergeSchedulerService ──► ParquetMergeSplitDownloader
21+
//! │
22+
//! ▼ (ParquetMergeScratch)
23+
//! ParquetMergeExecutor
24+
//! ```
25+
26+
use std::fmt;
27+
use std::path::PathBuf;
28+
29+
use quickwit_common::temp_dir::TempDirectory;
30+
use quickwit_parquet_engine::merge::policy::ParquetMergeOperation;
31+
use quickwit_parquet_engine::split::ParquetSplitMetadata;
32+
use tantivy::TrackedObject;
33+
34+
use crate::actors::MergePermit;
35+
36+
/// Notification of newly created Parquet splits.
37+
///
38+
/// Sent to `ParquetMergePlanner` from:
39+
/// - The publisher (feedback loop after a merge completes)
40+
/// - The indexing service (initial seeding of immature splits on start)
41+
#[derive(Debug)]
42+
pub struct ParquetNewSplits {
43+
pub new_splits: Vec<ParquetSplitMetadata>,
44+
}
45+
46+
/// A merge task dispatched by `MergeSchedulerService` to `ParquetMergeSplitDownloader`.
47+
///
48+
/// Carries the merge operation (tracked by the planner's inventory) and a
49+
/// concurrency permit from the global merge semaphore.
50+
pub struct ParquetMergeTask {
51+
pub merge_operation: TrackedObject<ParquetMergeOperation>,
52+
pub merge_permit: MergePermit,
53+
}
54+
55+
impl fmt::Debug for ParquetMergeTask {
56+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57+
f.debug_struct("ParquetMergeTask")
58+
.field("merge_split_id", &self.merge_operation.merge_split_id)
59+
.field("num_inputs", &self.merge_operation.splits.len())
60+
.finish()
61+
}
62+
}
63+
64+
/// Downloaded Parquet files ready for merge execution.
65+
///
66+
/// Sent from `ParquetMergeSplitDownloader` to `ParquetMergeExecutor` after
67+
/// all input files have been downloaded to local storage.
68+
pub struct ParquetMergeScratch {
69+
/// The merge operation describing what to merge.
70+
pub merge_operation: TrackedObject<ParquetMergeOperation>,
71+
72+
/// Local paths to the downloaded Parquet files, one per input split,
73+
/// in the same order as `merge_operation.splits`.
74+
pub downloaded_parquet_files: Vec<PathBuf>,
75+
76+
/// Temp directory containing the downloaded files. Held to prevent cleanup
77+
/// until the merge executor is done with the files.
78+
pub scratch_directory: TempDirectory,
79+
80+
/// Concurrency permit — held until the merge completes (including upload).
81+
pub merge_permit: MergePermit,
82+
}
83+
84+
impl fmt::Debug for ParquetMergeScratch {
85+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86+
f.debug_struct("ParquetMergeScratch")
87+
.field("merge_split_id", &self.merge_operation.merge_split_id)
88+
.field("num_files", &self.downloaded_parquet_files.len())
89+
.finish()
90+
}
91+
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ impl Handler<ParquetBatchForPackager> for ParquetPackager {
236236
checkpoint_delta,
237237
publish_lock,
238238
publish_token_opt,
239+
replaced_split_ids: Vec::new(),
239240
};
240241

241242
ctx.send_message(&self.uploader_mailbox, split_batch)

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
221221
let publish_lock = batch.publish_lock;
222222
let publish_token_opt = batch.publish_token_opt;
223223
let splits = batch.splits;
224+
let replaced_split_ids = batch.replaced_split_ids;
224225
debug!(
225226
index_uid = %index_uid,
226227
num_splits = splits.len(),
@@ -321,7 +322,7 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
321322
let update = ParquetSplitsUpdate {
322323
index_uid,
323324
new_splits: splits,
324-
replaced_split_ids: Vec::new(), // No merging yet
325+
replaced_split_ids,
325326
checkpoint_delta_opt: Some(checkpoint_delta),
326327
publish_lock,
327328
publish_token_opt,
@@ -427,6 +428,7 @@ mod tests {
427428
checkpoint_delta,
428429
publish_lock: PublishLock::default(),
429430
publish_token_opt: None,
431+
replaced_split_ids: Vec::new(),
430432
};
431433

432434
uploader_mailbox.send_message(batch).await.unwrap();
@@ -520,6 +522,7 @@ mod tests {
520522
checkpoint_delta,
521523
publish_lock: PublishLock::default(),
522524
publish_token_opt: None,
525+
replaced_split_ids: Vec::new(),
523526
};
524527

525528
uploader_mailbox.send_message(batch).await.unwrap();
@@ -594,6 +597,7 @@ mod tests {
594597
checkpoint_delta,
595598
publish_lock: PublishLock::default(),
596599
publish_token_opt: None,
600+
replaced_split_ids: Vec::new(),
597601
};
598602

599603
uploader_mailbox.send_message(batch).await.unwrap();
@@ -664,6 +668,7 @@ mod tests {
664668
checkpoint_delta,
665669
publish_lock: PublishLock::default(),
666670
publish_token_opt: None,
671+
replaced_split_ids: Vec::new(),
667672
};
668673
uploader_mailbox.send_message(batch).await.unwrap();
669674
}

0 commit comments

Comments
 (0)