Skip to content

Commit e220505

Browse files
g-talbotclaude
andcommitted
feat: add end-to-end integration test for Parquet merge pipeline (Phase 3f)
Integration test that exercises the full merge actor chain: 1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series, sort schema KV metadata, and window metadata) 2. Uploads to RamStorage 3. Seeds ParquetMergePipeline with split metadata (merge_factor=2) 4. Verifies the pipeline plans and executes a merge 5. Asserts publish_metrics_splits called with correct replaced_split_ids Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to ParquetSplitBatch so the merge executor's scratch directory stays alive until the uploader finishes reading the merged files. Without this, the temp directory was cleaned up between the executor handler returning and the uploader's async upload task reading the files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3227b37 commit e220505

6 files changed

Lines changed: 233 additions & 0 deletions

File tree

quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ mod publisher_impl;
4545
)]
4646
mod parquet_e2e_test;
4747

48+
#[cfg(test)]
49+
#[allow(clippy::disallowed_methods)]
50+
mod parquet_merge_pipeline_test;
51+
4852
pub use parquet_doc_processor::{
4953
ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc,
5054
};

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_indexer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ pub struct ParquetSplitBatch {
104104
/// Split IDs being replaced by this batch (non-empty for merges).
105105
/// Empty for the ingest path.
106106
pub replaced_split_ids: Vec<String>,
107+
/// Holds the temp directory alive until the uploader finishes reading.
108+
/// `None` for the ingest path (packager manages its own temp dir).
109+
/// `Some` for the merge path (executor's scratch directory).
110+
pub _scratch_directory_opt: Option<quickwit_common::temp_dir::TempDirectory>,
107111
}
108112

109113
/// ParquetIndexer actor that accumulates RecordBatches and forwards them to ParquetPackager.

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
182182

183183
// Send to uploader. Merges have no checkpoint delta, no publish lock,
184184
// and no publish token — they're just reorganizing existing data.
185+
// The scratch directory is passed along to keep it alive until the
186+
// uploader finishes reading the merged files.
185187
let batch = ParquetSplitBatch {
186188
index_uid,
187189
splits: merged_splits,
@@ -190,6 +192,7 @@ impl Handler<ParquetMergeScratch> for ParquetMergeExecutor {
190192
publish_lock: PublishLock::default(),
191193
publish_token_opt: None,
192194
replaced_split_ids,
195+
_scratch_directory_opt: Some(scratch.scratch_directory),
193196
};
194197

195198
ctx.send_message(&self.uploader_mailbox, batch).await?;
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Integration test for the Parquet merge pipeline.
16+
//!
17+
//! Tests the full actor chain: seeds splits → planner plans merge →
18+
//! downloader downloads from storage → executor merges → uploader uploads →
19+
//! publisher publishes with replaced_split_ids.
20+
21+
use std::path::Path;
22+
use std::sync::Arc;
23+
use std::sync::atomic::Ordering;
24+
use std::time::Duration;
25+
26+
use quickwit_actors::Universe;
27+
use quickwit_common::pubsub::EventBroker;
28+
use quickwit_common::temp_dir::TempDirectory;
29+
use quickwit_common::test_utils::wait_until_predicate;
30+
use quickwit_parquet_engine::merge::policy::{
31+
ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig,
32+
};
33+
use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange};
34+
use quickwit_parquet_engine::storage::{ParquetWriter, ParquetWriterConfig};
35+
use quickwit_parquet_engine::table_config::TableConfig;
36+
use quickwit_parquet_engine::test_helpers::create_test_batch_with_tags;
37+
use quickwit_proto::metastore::{EmptyResponse, MetastoreServiceClient, MockMetastoreService};
38+
use quickwit_storage::{RamStorage, Storage};
39+
40+
use super::parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams};
41+
42+
/// Write a sorted Parquet file to the given directory using the standard
43+
/// writer (which computes sorted_series, row_keys, zonemaps, and KV metadata).
44+
fn write_test_parquet_file(
45+
dir: &Path,
46+
filename: &str,
47+
num_rows: usize,
48+
split_metadata: &ParquetSplitMetadata,
49+
) -> u64 {
50+
let table_config = TableConfig::default();
51+
let writer = ParquetWriter::new(ParquetWriterConfig::default(), &table_config)
52+
.expect("failed to create ParquetWriter");
53+
54+
let batch = create_test_batch_with_tags(num_rows, &["service", "host"]);
55+
let path = dir.join(filename);
56+
let (file_size, _write_metadata) = writer
57+
.write_to_file_with_metadata(&batch, &path, Some(split_metadata))
58+
.expect("failed to write test Parquet file");
59+
file_size
60+
}
61+
62+
/// Create a ParquetSplitMetadata consistent with the test Parquet writer.
63+
fn make_test_split_metadata(
64+
split_id: &str,
65+
num_rows: u64,
66+
size_bytes: u64,
67+
) -> ParquetSplitMetadata {
68+
let table_config = TableConfig::default();
69+
ParquetSplitMetadata::metrics_builder()
70+
.split_id(ParquetSplitId::new(split_id))
71+
.index_uid("test-merge-index:00000000000000000000000001")
72+
.partition_id(0)
73+
.time_range(TimeRange::new(100, 100 + num_rows))
74+
.num_rows(num_rows)
75+
.size_bytes(size_bytes)
76+
.sort_fields(table_config.effective_sort_fields())
77+
.window_start_secs(0)
78+
.window_duration_secs(900)
79+
.add_metric_name("cpu.usage")
80+
.build()
81+
}
82+
83+
/// Full integration test: seed splits → merge → verify replace publish.
84+
///
85+
/// Creates 2 real sorted Parquet files in RamStorage, seeds the merge
86+
/// pipeline with their metadata, and verifies the pipeline:
87+
/// 1. Plans a merge (merge_factor=2)
88+
/// 2. Downloads files from storage
89+
/// 3. Executes the merge via the k-way merge engine
90+
/// 4. Uploads the merged output
91+
/// 5. Publishes with replaced_split_ids matching the input splits
92+
#[tokio::test]
93+
async fn test_merge_pipeline_end_to_end() {
94+
quickwit_common::setup_logging_for_tests();
95+
96+
let universe = Universe::with_accelerated_time();
97+
let temp_dir = tempfile::tempdir().unwrap();
98+
let ram_storage: Arc<dyn Storage> = Arc::new(RamStorage::default());
99+
100+
// --- Step 1: Create real sorted Parquet files and upload to storage ---
101+
102+
let meta_a = make_test_split_metadata("split-a", 50, 0);
103+
let size_a = write_test_parquet_file(temp_dir.path(), "split-a.parquet", 50, &meta_a);
104+
let meta_a = {
105+
let mut m = meta_a;
106+
m.size_bytes = size_a;
107+
m.parquet_file = "split-a.parquet".to_string();
108+
m
109+
};
110+
111+
let meta_b = make_test_split_metadata("split-b", 50, 0);
112+
let size_b = write_test_parquet_file(temp_dir.path(), "split-b.parquet", 50, &meta_b);
113+
let meta_b = {
114+
let mut m = meta_b;
115+
m.size_bytes = size_b;
116+
m.parquet_file = "split-b.parquet".to_string();
117+
m
118+
};
119+
120+
// Upload files to RamStorage.
121+
let content_a = std::fs::read(temp_dir.path().join("split-a.parquet")).unwrap();
122+
ram_storage
123+
.put(Path::new("split-a.parquet"), Box::new(content_a))
124+
.await
125+
.unwrap();
126+
let content_b = std::fs::read(temp_dir.path().join("split-b.parquet")).unwrap();
127+
ram_storage
128+
.put(Path::new("split-b.parquet"), Box::new(content_b))
129+
.await
130+
.unwrap();
131+
132+
// --- Step 2: Set up mock metastore ---
133+
134+
let mut mock_metastore = MockMetastoreService::new();
135+
136+
// Expect staging of the merged output split.
137+
mock_metastore
138+
.expect_stage_metrics_splits()
139+
.returning(|_| Ok(EmptyResponse {}));
140+
141+
// Capture the publish request to verify replaced_split_ids.
142+
let publish_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
143+
let publish_called_clone = publish_called.clone();
144+
let replaced_ids = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
145+
let replaced_ids_clone = replaced_ids.clone();
146+
147+
mock_metastore
148+
.expect_publish_metrics_splits()
149+
.returning(move |request| {
150+
replaced_ids_clone
151+
.lock()
152+
.unwrap()
153+
.extend(request.replaced_split_ids.clone());
154+
publish_called_clone.store(true, Ordering::SeqCst);
155+
Ok(EmptyResponse {})
156+
});
157+
158+
let metastore = MetastoreServiceClient::from_mock(mock_metastore);
159+
160+
// --- Step 3: Spawn the merge pipeline ---
161+
162+
let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new(
163+
ParquetMergePolicyConfig {
164+
merge_factor: 2,
165+
max_merge_factor: 2,
166+
max_merge_ops: 5,
167+
target_split_size_bytes: 256 * 1024 * 1024,
168+
maturation_period: Duration::from_secs(3600),
169+
max_finalize_merge_operations: 3,
170+
},
171+
));
172+
173+
let params = ParquetMergePipelineParams {
174+
indexing_directory: TempDirectory::for_test(),
175+
metastore,
176+
storage: ram_storage.clone(),
177+
merge_policy,
178+
merge_scheduler_service: universe.get_or_spawn_one(),
179+
max_concurrent_split_uploads: 4,
180+
event_broker: EventBroker::default(),
181+
};
182+
183+
let initial_splits = vec![meta_a, meta_b];
184+
let pipeline = ParquetMergePipeline::new(params, Some(initial_splits), universe.spawn_ctx());
185+
let (_pipeline_mailbox, _pipeline_handle) = universe.spawn_builder().spawn(pipeline);
186+
187+
// --- Step 4: Wait for publish with replaced_split_ids ---
188+
189+
wait_until_predicate(
190+
|| {
191+
let publish_called = publish_called.clone();
192+
async move { publish_called.load(Ordering::SeqCst) }
193+
},
194+
Duration::from_secs(30),
195+
Duration::from_millis(100),
196+
)
197+
.await
198+
.expect("timed out waiting for merge publish");
199+
200+
// --- Step 5: Verify ---
201+
202+
let mut replaced_sorted: Vec<String> = replaced_ids.lock().unwrap().clone();
203+
replaced_sorted.sort();
204+
assert_eq!(
205+
replaced_sorted,
206+
vec!["split-a".to_string(), "split-b".to_string()],
207+
"publish should replace both input splits"
208+
);
209+
210+
universe.assert_quit().await;
211+
}

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_packager.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ impl Handler<ParquetBatchForPackager> for ParquetPackager {
237237
publish_lock,
238238
publish_token_opt,
239239
replaced_split_ids: Vec::new(),
240+
_scratch_directory_opt: None,
240241
};
241242

242243
ctx.send_message(&self.uploader_mailbox, split_batch)

quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_uploader.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
222222
let publish_token_opt = batch.publish_token_opt;
223223
let splits = batch.splits;
224224
let replaced_split_ids = batch.replaced_split_ids;
225+
// Hold the scratch directory alive until the upload task completes.
226+
// For the merge path, this prevents the TempDirectory from being
227+
// cleaned up before the upload task reads the merged files.
228+
let _scratch_directory_guard = batch._scratch_directory_opt;
225229
debug!(
226230
index_uid = %index_uid,
227231
num_splits = splits.len(),
@@ -335,6 +339,8 @@ impl Handler<ParquetSplitBatch> for ParquetUploader {
335339

336340
// Drop permit to allow next upload
337341
drop(permit_guard);
342+
// Drop scratch directory guard after upload completes.
343+
drop(_scratch_directory_guard);
338344
}
339345
.instrument(Span::current()),
340346
"metrics_upload_task",
@@ -429,6 +435,7 @@ mod tests {
429435
publish_lock: PublishLock::default(),
430436
publish_token_opt: None,
431437
replaced_split_ids: Vec::new(),
438+
_scratch_directory_opt: None,
432439
};
433440

434441
uploader_mailbox.send_message(batch).await.unwrap();
@@ -523,6 +530,7 @@ mod tests {
523530
publish_lock: PublishLock::default(),
524531
publish_token_opt: None,
525532
replaced_split_ids: Vec::new(),
533+
_scratch_directory_opt: None,
526534
};
527535

528536
uploader_mailbox.send_message(batch).await.unwrap();
@@ -598,6 +606,7 @@ mod tests {
598606
publish_lock: PublishLock::default(),
599607
publish_token_opt: None,
600608
replaced_split_ids: Vec::new(),
609+
_scratch_directory_opt: None,
601610
};
602611

603612
uploader_mailbox.send_message(batch).await.unwrap();
@@ -669,6 +678,7 @@ mod tests {
669678
publish_lock: PublishLock::default(),
670679
publish_token_opt: None,
671680
replaced_split_ids: Vec::new(),
681+
_scratch_directory_opt: None,
672682
};
673683
uploader_mailbox.send_message(batch).await.unwrap();
674684
}

0 commit comments

Comments
 (0)