Skip to content

Commit 1f6512e

Browse files
g-talbotclaude
andcommitted
feat: add ParquetMergeExecutor and full downloader implementation (Phase 3c)
Phase 3 pipeline integration, third PR: - ParquetMergeSplitDownloader: downloads each input split's Parquet file from object storage to a local temp directory, forwards ParquetMergeScratch to the executor. Replaces the stub from PR 3b. - ParquetMergeExecutor: runs merge_sorted_parquet_files via run_cpu_intensive, builds output ParquetSplitMetadata via merge_parquet_split_metadata, renames output files to match generated split IDs, sends ParquetSplitBatch with replaced_split_ids to the uploader. - ParquetSplitBatch.checkpoint_delta -> checkpoint_delta_opt: now Option to support merge operations (no checkpoint delta for data reorganization). Ingest path passes Some(delta), merge path passes None. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3a91a31 commit 1f6512e

6 files changed

Lines changed: 349 additions & 30 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
mod indexing_service_impl;
2626
mod parquet_doc_processor;
2727
mod parquet_indexer;
28+
mod parquet_merge_executor;
2829
pub(crate) mod parquet_merge_messages;
2930
mod parquet_merge_planner;
3031
mod parquet_merge_split_downloader;
@@ -47,6 +48,7 @@ pub use parquet_doc_processor::{
4748
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
4849
};
4950
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
51+
pub use parquet_merge_executor::ParquetMergeExecutor;
5052
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
5153
pub use parquet_merge_planner::ParquetMergePlanner;
5254
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,19 @@ pub struct ParquetSplitBatch {
9595
/// The uploader uses this to locate and upload the actual file content.
9696
pub output_dir: PathBuf,
9797
/// Checkpoint delta covering all data in these splits.
98-
pub checkpoint_delta: IndexCheckpointDelta,
98+
/// `None` for merge operations (data was already checkpointed at ingest).
99+
pub checkpoint_delta_opt: Option<IndexCheckpointDelta>,
99100
/// Publish lock for coordinating with sources.
100101
pub publish_lock: PublishLock,
101102
/// Optional publish token.
102103
pub publish_token_opt: Option<PublishToken>,
103104
/// Split IDs being replaced by this batch (non-empty for merges).
104105
/// Empty for the ingest path.
105106
pub replaced_split_ids: Vec<String>,
107+
/// Holds the temp directory alive until the uploader finishes reading.
108+
/// `None` for the ingest path (packager manages its own temp dir).
109+
/// `Some` for the merge path (executor's scratch directory).
110+
pub _scratch_directory_opt: Option<quickwit_common::temp_dir::TempDirectory>,
106111
}
107112

108113
/// ParquetIndexer actor that accumulates RecordBatches and forwards them to ParquetPackager.
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Parquet merge executor actor.
16+
//!
17+
//! Calls the Phase 1 merge engine (`merge_sorted_parquet_files`) via
18+
//! `run_cpu_intensive()`, builds output split metadata using
19+
//! `merge_parquet_split_metadata()`, and sends the result to the uploader.
20+
21+
use anyhow::Context;
22+
use async_trait::async_trait;
23+
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
24+
use quickwit_common::thread_pool::run_cpu_intensive;
25+
use quickwit_parquet_engine::merge::metadata_aggregation::merge_parquet_split_metadata;
26+
use quickwit_parquet_engine::merge::{MergeConfig, MergeOutputFile, merge_sorted_parquet_files};
27+
use quickwit_parquet_engine::storage::ParquetWriterConfig;
28+
use quickwit_proto::types::IndexUid;
29+
use tracing::{info, instrument, warn};
30+
31+
use super::ParquetUploader;
32+
use super::parquet_indexer::ParquetSplitBatch;
33+
use super::parquet_merge_messages::ParquetMergeScratch;
34+
use crate::models::PublishLock;
35+
36+
/// Executes Parquet merge operations using the Phase 1 k-way merge engine.
37+
///
38+
/// Receives `ParquetMergeScratch` from the downloader, runs the merge as a
39+
/// CPU-intensive task, builds output metadata, and sends the result to the
40+
/// uploader for staging and upload.
41+
///
42+
/// No separate Packager step is needed — the merge engine produces
43+
/// ready-to-upload Parquet files with complete metadata.
44+
pub struct ParquetMergeExecutor {
45+
uploader_mailbox: Mailbox<ParquetUploader>,
46+
}
47+
48+
impl ParquetMergeExecutor {
49+
pub fn new(uploader_mailbox: Mailbox<ParquetUploader>) -> Self {
50+
Self { uploader_mailbox }
51+
}
52+
}
53+
54+
#[async_trait]
55+
impl Actor for ParquetMergeExecutor {
56+
type ObservableState = ();
57+
58+
fn observable_state(&self) {}
59+
60+
fn name(&self) -> String {
61+
"ParquetMergeExecutor".to_string()
62+
}
63+
64+
fn queue_capacity(&self) -> QueueCapacity {
65+
QueueCapacity::Bounded(1)
66+
}
67+
}
68+
69+
#[async_trait]
70+
impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
71+
type Reply = ();
72+
73+
#[instrument(name = "parquet_merge_executor", skip_all, fields(
74+
merge_split_id = %scratch.merge_operation.merge_split_id,
75+
num_inputs = scratch.merge_operation.splits.len(),
76+
))]
77+
async fn handle(
78+
&mut self,
79+
scratch: ParquetMergeScratch,
80+
ctx: &ActorContext<Self>,
81+
) -> Result<(), ActorExitStatus> {
82+
let merge_split_id = scratch.merge_operation.merge_split_id.to_string();
83+
let num_inputs = scratch.merge_operation.splits.len();
84+
85+
info!(
86+
merge_split_id = %merge_split_id,
87+
num_inputs,
88+
total_bytes = scratch.merge_operation.total_size_bytes(),
89+
"executing parquet merge"
90+
);
91+
92+
// Separate output subdirectory so the merge engine's temp files
93+
// don't collide with the downloaded inputs in scratch_directory.
94+
let output_dir = scratch.scratch_directory.path().join("merged_output");
95+
std::fs::create_dir_all(&output_dir)
96+
.context("failed to create merge output directory")
97+
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
98+
99+
// Run the CPU-intensive merge on the dedicated thread pool.
100+
let input_paths = scratch.downloaded_parquet_files.clone();
101+
let output_dir_clone = output_dir.clone();
102+
let merge_result = run_cpu_intensive(move || {
103+
let config = MergeConfig {
104+
num_outputs: 1,
105+
writer_config: ParquetWriterConfig::default(),
106+
};
107+
merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config)
108+
})
109+
.await;
110+
111+
// We return Ok(()) on merge failure rather than Err to keep the actor
112+
// alive — same strategy as Tantivy's MergeExecutor. This prevents a
113+
// single "split of death" from crash-looping the entire pipeline.
114+
// The trade-off: failed splits aren't retried until pipeline respawn.
115+
let outputs: Vec<MergeOutputFile> = match merge_result {
116+
Ok(Ok(outputs)) => outputs,
117+
Ok(Err(merge_err)) => {
118+
warn!(
119+
error = %merge_err,
120+
merge_split_id = %merge_split_id,
121+
"parquet merge failed"
122+
);
123+
// Input splits were drained from the planner by operations().
124+
// They remain published but won't be re-planned until respawn.
125+
return Ok(());
126+
}
127+
Err(panicked) => {
128+
warn!(
129+
error = %panicked,
130+
merge_split_id = %merge_split_id,
131+
"parquet merge panicked"
132+
);
133+
return Ok(());
134+
}
135+
};
136+
137+
// Empty output is valid (all input rows were empty). Nothing to publish.
138+
if outputs.is_empty() {
139+
info!(
140+
merge_split_id = %merge_split_id,
141+
"merge produced no output (all inputs empty)"
142+
);
143+
return Ok(());
144+
}
145+
146+
// Build metadata for each output file and rename to match split IDs.
147+
let input_splits = &scratch.merge_operation.splits;
148+
let index_uid: IndexUid = input_splits[0]
149+
.index_uid
150+
.parse()
151+
.context("invalid index_uid in merge input")
152+
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
153+
154+
let replaced_split_ids: Vec<String> = input_splits
155+
.iter()
156+
.map(|s| s.split_id.as_str().to_string())
157+
.collect();
158+
159+
let mut merged_splits = Vec::with_capacity(outputs.len());
160+
for output in &outputs {
161+
let metadata = merge_parquet_split_metadata(input_splits, output)
162+
.context("failed to build merge output metadata")
163+
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
164+
165+
// The merge engine writes to a temp filename (merge_output_*.parquet).
166+
// Rename to {split_id}.parquet so the uploader can find it at the
167+
// path derived from ParquetSplitMetadata::parquet_filename().
168+
let expected_path = output_dir.join(&metadata.parquet_file);
169+
if output.path != expected_path {
170+
std::fs::rename(&output.path, &expected_path)
171+
.with_context(|| {
172+
format!(
173+
"failed to rename merge output {} to {}",
174+
output.path.display(),
175+
expected_path.display()
176+
)
177+
})
178+
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
179+
}
180+
181+
info!(
182+
split_id = %metadata.split_id,
183+
num_rows = metadata.num_rows,
184+
size_bytes = metadata.size_bytes,
185+
"merge produced output split"
186+
);
187+
188+
merged_splits.push(metadata);
189+
}
190+
191+
// Send to uploader. Merges have no checkpoint delta, no publish lock,
192+
// and no publish token — they're just reorganizing existing data.
193+
// The scratch directory is passed along to keep it alive until the
194+
// uploader finishes reading the merged files.
195+
let batch = ParquetSplitBatch {
196+
index_uid,
197+
splits: merged_splits,
198+
output_dir,
199+
checkpoint_delta_opt: None,
200+
publish_lock: PublishLock::default(),
201+
publish_token_opt: None,
202+
replaced_split_ids,
203+
_scratch_directory_opt: Some(scratch.scratch_directory),
204+
};
205+
206+
ctx.send_message(&self.uploader_mailbox, batch).await?;
207+
208+
// The merge permit is dropped here when `scratch` goes out of scope,
209+
// releasing the semaphore slot for the next merge.
210+
info!(
211+
merge_split_id = %merge_split_id,
212+
"parquet merge complete, sent to uploader"
213+
);
214+
Ok(())
215+
}
216+
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs

Lines changed: 101 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,53 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
//! Stub actor for downloading Parquet files prior to merge.
15+
//! Actor that downloads Parquet files from object storage for merge.
1616
//!
17-
//! The full implementation (PR 3c) will download each input split's Parquet
18-
//! file from object storage to a local temp directory, then forward a
19-
//! `ParquetMergeScratch` to the `ParquetMergeExecutor`.
20-
//!
21-
//! This stub exists so that `MergeSchedulerService` can reference the
22-
//! `Mailbox<ParquetMergeSplitDownloader>` type and the `ParquetMergePlanner`
23-
//! can be tested end-to-end through the scheduler.
17+
//! For each `ParquetMergeTask`, downloads all input split files to a local
18+
//! temp directory, then forwards a `ParquetMergeScratch` to the
19+
//! `ParquetMergeExecutor`.
20+
21+
use std::path::Path;
22+
use std::sync::Arc;
2423

24+
use anyhow::Context;
2525
use async_trait::async_trait;
26-
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, QueueCapacity};
27-
use tracing::debug;
26+
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
27+
use quickwit_common::temp_dir::TempDirectory;
28+
use quickwit_storage::Storage;
29+
use tracing::{debug, info, warn};
2830

29-
use super::ParquetMergeTask;
31+
use super::parquet_merge_executor::ParquetMergeExecutor;
32+
use super::parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask};
3033

31-
/// Downloads Parquet split files from object storage for merge.
34+
/// Downloads Parquet split files from object storage for merge execution.
3235
///
33-
/// Stub implementation — accepts `ParquetMergeTask` messages but does not
34-
/// perform real downloads. The full implementation comes in PR 3c.
35-
pub struct ParquetMergeSplitDownloader;
36+
/// Downloads are isolated in a separate actor so that I/O latency doesn't
37+
/// block the CPU-intensive merge executor. Much simpler than the Tantivy
38+
/// `MergeSplitDownloader`: Parquet splits are single files (not bundles),
39+
/// so downloading is a straightforward `storage.copy_to_file()` per split.
40+
pub struct ParquetMergeSplitDownloader {
41+
/// Parent directory for creating per-merge temp directories.
42+
scratch_directory: TempDirectory,
43+
/// Object storage for downloading split files.
44+
storage: Arc<dyn Storage>,
45+
/// Downstream executor to forward downloaded files to.
46+
executor_mailbox: Mailbox<ParquetMergeExecutor>,
47+
}
48+
49+
impl ParquetMergeSplitDownloader {
50+
pub fn new(
51+
scratch_directory: TempDirectory,
52+
storage: Arc<dyn Storage>,
53+
executor_mailbox: Mailbox<ParquetMergeExecutor>,
54+
) -> Self {
55+
Self {
56+
scratch_directory,
57+
storage,
58+
executor_mailbox,
59+
}
60+
}
61+
}
3662

3763
#[async_trait]
3864
impl Actor for ParquetMergeSplitDownloader {
@@ -56,13 +82,68 @@ impl Handler<ParquetMergeTask> for ParquetMergeSplitDownloader {
5682
async fn handle(
5783
&mut self,
5884
task: ParquetMergeTask,
59-
_ctx: &ActorContext<Self>,
85+
ctx: &ActorContext<Self>,
6086
) -> Result<(), ActorExitStatus> {
61-
debug!(
62-
merge_split_id = %task.merge_operation.merge_split_id,
63-
num_inputs = task.merge_operation.splits.len(),
64-
"received parquet merge task (stub — real download in PR 3c)"
87+
let merge_split_id = task.merge_operation.merge_split_id.to_string();
88+
let num_inputs = task.merge_operation.splits.len();
89+
90+
info!(
91+
merge_split_id = %merge_split_id,
92+
num_inputs,
93+
"downloading parquet files for merge"
94+
);
95+
96+
// Each merge gets its own temp directory so partial downloads from a
97+
// failed merge don't interfere with other merges. TempDirectory's Drop
98+
// impl cleans up automatically on error paths.
99+
let download_dir = self
100+
.scratch_directory
101+
.named_temp_child("parquet-merge-")
102+
.context("failed to create merge download directory")
103+
.map_err(|e| ActorExitStatus::from(anyhow::anyhow!(e)))?;
104+
105+
// Download each input split's Parquet file.
106+
let mut downloaded_paths = Vec::with_capacity(num_inputs);
107+
for split in &task.merge_operation.splits {
108+
let parquet_filename = split.parquet_filename();
109+
let local_path = download_dir.path().join(&parquet_filename);
110+
111+
debug!(
112+
split_id = %split.split_id,
113+
parquet_file = %parquet_filename,
114+
"downloading parquet file"
115+
);
116+
117+
self.storage
118+
.copy_to_file(Path::new(&parquet_filename), &local_path)
119+
.await
120+
.map_err(|e| {
121+
warn!(
122+
error = %e,
123+
split_id = %split.split_id,
124+
"failed to download parquet file for merge"
125+
);
126+
ActorExitStatus::from(anyhow::anyhow!(e))
127+
})?;
128+
129+
downloaded_paths.push(local_path);
130+
ctx.record_progress();
131+
}
132+
133+
info!(
134+
merge_split_id = %merge_split_id,
135+
num_files = downloaded_paths.len(),
136+
"all parquet files downloaded for merge"
65137
);
138+
139+
let scratch = ParquetMergeScratch {
140+
merge_operation: task.merge_operation,
141+
downloaded_parquet_files: downloaded_paths,
142+
scratch_directory: download_dir,
143+
merge_permit: task.merge_permit,
144+
};
145+
146+
ctx.send_message(&self.executor_mailbox, scratch).await?;
66147
Ok(())
67148
}
68149
}

0 commit comments

Comments
 (0)