diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 45b859ee8c1..0a6807c3884 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -8595,6 +8595,7 @@ dependencies = [ "mockall", "oneshot 0.2.1", "openssl", + "parquet", "percent-encoding", "proptest", "prost 0.14.3", diff --git a/quickwit/quickwit-indexing/Cargo.toml b/quickwit/quickwit-indexing/Cargo.toml index 797b59a5e97..2a0d581797d 100644 --- a/quickwit/quickwit-indexing/Cargo.toml +++ b/quickwit/quickwit-indexing/Cargo.toml @@ -114,6 +114,7 @@ ci-test = [] bytes = { workspace = true } criterion = { workspace = true, features = ["async_tokio"] } mockall = { workspace = true } +parquet = { workspace = true } proptest = { workspace = true } prost = { workspace = true } rand = { workspace = true } diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index 6f6fc60e467..70fe17c621b 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -267,6 +267,7 @@ impl MergeSchedulerService { } = PeekMut::pop(next_merge); // The permit is owned by the task and released via Drop when // the executor finishes, triggering PermitReleased back here. + // Drop-based release ensures the semaphore is freed even on panic. let parquet_merge_task = ParquetMergeTask { merge_operation, merge_permit, @@ -318,24 +319,30 @@ struct ScheduleMerge { split_downloader_mailbox: Mailbox, } -/// The higher, the sooner we will execute the merge operation. -/// A good merge operation -/// - strongly reduces the number splits -/// - is light. -fn score_merge_operation(merge_operation: &MergeOperation) -> u64 { - let total_num_bytes: u64 = merge_operation.total_num_bytes(); +/// Scores a merge operation for priority scheduling. +/// +/// Higher score = scheduled sooner. Prefers merges that strongly reduce +/// split count relative to their total byte cost. Used by both Tantivy +/// and Parquet merge scheduling. +fn score_merge(num_splits: usize, total_num_bytes: u64) -> u64 { if total_num_bytes == 0 { - // Silly corner case that should never happen. return u64::MAX; } - // We will remove splits.len() and add 1 merge splits. - let delta_num_splits = (merge_operation.splits.len() - 1) as u64; - // We use integer arithmetic to avoid `f64 are not ordered` silliness. + // We will remove num_splits and add 1 merged split. + let delta_num_splits = (num_splits - 1) as u64; + // Integer arithmetic to avoid `f64 are not ordered` silliness. (delta_num_splits << 48) .checked_div(total_num_bytes) .unwrap_or(1u64) } +fn score_merge_operation(merge_operation: &MergeOperation) -> u64 { + score_merge( + merge_operation.splits.len(), + merge_operation.total_num_bytes(), + ) +} + impl ScheduleMerge { pub fn new( merge_operation: TrackedObject, @@ -406,14 +413,10 @@ impl Handler for MergeSchedulerService { #[cfg(feature = "metrics")] fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 { - let total_num_bytes = merge_operation.total_size_bytes(); - if total_num_bytes == 0 { - return u64::MAX; - } - let delta_num_splits = (merge_operation.splits.len() - 1) as u64; - (delta_num_splits << 48) - .checked_div(total_num_bytes) - .unwrap_or(1u64) + score_merge( + merge_operation.splits.len(), + merge_operation.total_size_bytes(), + ) } #[cfg(feature = "metrics")] diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index 9666c6eccb4..969b87f3b26 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -45,6 +45,10 @@ mod publisher_impl; )] mod parquet_e2e_test; +#[cfg(test)] +#[allow(clippy::disallowed_methods)] +mod parquet_merge_pipeline_test; + pub use parquet_doc_processor::{ ParquetDocProcessor, ParquetDocProcessorCounters, ParquetDocProcessorError, is_arrow_ipc, }; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs index 536c7edcc34..bf1f5bddb4d 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_executor.rs @@ -43,11 +43,18 @@ use crate::models::PublishLock; /// ready-to-upload Parquet files with complete metadata. pub struct ParquetMergeExecutor { uploader_mailbox: Mailbox, + writer_config: ParquetWriterConfig, } impl ParquetMergeExecutor { - pub fn new(uploader_mailbox: Mailbox) -> Self { - Self { uploader_mailbox } + pub fn new( + uploader_mailbox: Mailbox, + writer_config: ParquetWriterConfig, + ) -> Self { + Self { + uploader_mailbox, + writer_config, + } } } @@ -99,36 +106,37 @@ impl Handler for ParquetMergeExecutor { // Run the CPU-intensive merge on the dedicated thread pool. let input_paths = scratch.downloaded_parquet_files.clone(); let output_dir_clone = output_dir.clone(); + let writer_config = self.writer_config.clone(); let merge_result = run_cpu_intensive(move || { let config = MergeConfig { num_outputs: 1, - writer_config: ParquetWriterConfig::default(), + writer_config, }; merge_sorted_parquet_files(&input_paths, &output_dir_clone, &config) }) .await; - // We return Ok(()) on merge failure rather than Err to keep the actor - // alive — same strategy as Tantivy's MergeExecutor. This prevents a - // single "split of death" from crash-looping the entire pipeline. - // The trade-off: failed splits aren't retried until pipeline respawn. let outputs: Vec = match merge_result { Ok(Ok(outputs)) => outputs, Ok(Err(merge_err)) => { warn!( error = %merge_err, merge_split_id = %merge_split_id, - "parquet merge failed" + "parquet merge failed — input splits will not be retried until \ + the pipeline restarts with metastore re-seeding" ); - // Input splits were drained from the planner by operations(). - // They remain published but won't be re-planned until respawn. + // The input splits were drained from the planner by operations(). + // They remain published in the metastore and will be re-seeded + // into the planner when the pipeline respawns (via + // fetch_immature_splits on the ParquetMergePipeline supervisor). return Ok(()); } Err(panicked) => { warn!( error = %panicked, merge_split_id = %merge_split_id, - "parquet merge panicked" + "parquet merge panicked — input splits will not be retried until \ + the pipeline restarts with metastore re-seeding" ); return Ok(()); } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs index e20037c34d9..d1347f2d4bf 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs @@ -38,9 +38,11 @@ use quickwit_actors::{ use quickwit_common::KillSwitch; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; +use quickwit_metastore::{ListParquetSplitsRequestExt, ListParquetSplitsResponseExt}; use quickwit_parquet_engine::merge::policy::ParquetMergePolicy; use quickwit_parquet_engine::split::ParquetSplitMetadata; -use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient}; +use quickwit_proto::types::IndexUid; use quickwit_storage::Storage; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; @@ -52,6 +54,7 @@ use super::{METRICS_PUBLISHER_NAME, ParquetUploader}; use crate::actors::pipeline_shared::wait_duration_before_retry; use crate::actors::publisher::DisconnectMergePlanner; use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType}; +use crate::models::MergeStatistics; /// Limits concurrent Parquet merge pipeline spawns to avoid overwhelming the /// metastore. This is a separate semaphore from the Tantivy merge pipeline's. @@ -106,13 +109,12 @@ pub struct ParquetMergePipeline { /// the new planner instance. merge_planner_mailbox: Mailbox, merge_planner_inbox: Inbox, + previous_generations_statistics: MergeStatistics, + statistics: MergeStatistics, handles_opt: Option, /// Child kill switch — killing this kills all actors in the pipeline /// without affecting the supervisor itself. kill_switch: KillSwitch, - /// Increments on each spawn. Used for log correlation. - generation: usize, - num_spawn_attempts: usize, /// Immature splits passed to the planner on first spawn. On subsequent /// spawns (after crash/respawn), the planner starts empty and picks up /// new splits from the feedback loop. @@ -122,9 +124,11 @@ pub struct ParquetMergePipeline { #[async_trait] impl Actor for ParquetMergePipeline { - type ObservableState = (); + type ObservableState = MergeStatistics; - fn observable_state(&self) {} + fn observable_state(&self) -> Self::ObservableState { + self.statistics.clone() + } fn name(&self) -> String { "ParquetMergePipeline".to_string() @@ -152,10 +156,10 @@ impl ParquetMergePipeline { ); Self { params, + previous_generations_statistics: MergeStatistics::default(), + statistics: MergeStatistics::default(), handles_opt: None, kill_switch: KillSwitch::default(), - generation: 0, - num_spawn_attempts: 0, merge_planner_inbox, merge_planner_mailbox, initial_immature_splits_opt, @@ -204,7 +208,7 @@ impl ParquetMergePipeline { } if !failure_or_unhealthy_actors.is_empty() { error!( - generation = self.generation, + generation = self.generation(), healthy_actors = ?healthy_actors, failed_or_unhealthy_actors = ?failure_or_unhealthy_actors, success_actors = ?success_actors, @@ -214,13 +218,13 @@ impl ParquetMergePipeline { } if healthy_actors.is_empty() { info!( - generation = self.generation, + generation = self.generation(), "parquet merge pipeline completed successfully" ); return Health::Success; } debug!( - generation = self.generation, + generation = self.generation(), healthy_actors = ?healthy_actors, success_actors = ?success_actors, "parquet merge pipeline is running and healthy" @@ -228,23 +232,27 @@ impl ParquetMergePipeline { Health::Healthy } - #[instrument(name="spawn_parquet_merge_pipeline", level="info", skip_all, fields(generation=self.generation))] + fn generation(&self) -> usize { + self.statistics.generation + } + + #[instrument(name="spawn_parquet_merge_pipeline", level="info", skip_all, fields(generation=self.generation()))] async fn spawn_pipeline(&mut self, ctx: &ActorContext) -> anyhow::Result<()> { let _spawn_permit = ctx .protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire()) .await .expect("semaphore should not be closed"); - self.num_spawn_attempts += 1; + self.statistics.num_spawn_attempts += 1; self.kill_switch = ctx.kill_switch().child(); info!( - generation = self.generation, + generation = self.generation(), root_dir = %self.params.indexing_directory.path().display(), "spawning parquet merge pipeline" ); - let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default(); + let immature_splits = self.fetch_immature_splits(ctx).await?; // Spawn actors bottom-up: each actor's constructor needs a mailbox // for the actor below it in the chain, so we start from the publisher @@ -288,7 +296,8 @@ impl ParquetMergePipeline { .spawn(merge_uploader); // 4. Merge executor - let merge_executor = ParquetMergeExecutor::new(merge_uploader_mailbox); + let merge_executor = + ParquetMergeExecutor::new(merge_uploader_mailbox, self.params.writer_config.clone()); let (merge_executor_mailbox, merge_executor_handle) = ctx .spawn_actor() .set_kill_switch(self.kill_switch.clone()) @@ -323,7 +332,8 @@ impl ParquetMergePipeline { ) .spawn(merge_planner); - self.generation += 1; + self.previous_generations_statistics = self.statistics.clone(); + self.statistics.generation += 1; self.handles_opt = Some(ParquetMergePipelineHandles { merge_planner: merge_planner_handle, merge_split_downloader: merge_split_downloader_handle, @@ -350,6 +360,28 @@ impl ParquetMergePipeline { } } + async fn perform_observe(&mut self) { + let Some(handles) = &self.handles_opt else { + return; + }; + handles.merge_planner.refresh_observe(); + handles.merge_uploader.refresh_observe(); + handles.merge_publisher.refresh_observe(); + let num_ongoing_merges = crate::metrics::INDEXER_METRICS + .ongoing_merge_operations + .get(); + self.statistics = self + .previous_generations_statistics + .clone() + .add_actor_counters( + &handles.merge_uploader.last_observation(), + &handles.merge_publisher.last_observation(), + ) + .set_generation(self.statistics.generation) + .set_num_spawn_attempts(self.statistics.num_spawn_attempts) + .set_ongoing_merges(usize::try_from(num_ongoing_merges).unwrap_or(0)); + } + async fn perform_health_check( &mut self, ctx: &ActorContext, @@ -372,6 +404,56 @@ impl ParquetMergePipeline { } Ok(()) } + + /// Fetch published Parquet splits from the metastore for merge planning. + /// + /// On first spawn, uses the initial splits provided by the IndexingService + /// (avoids per-pipeline metastore queries when many pipelines start). + /// On subsequent spawns (after crash/respawn), queries the metastore + /// directly to recover splits that were in-flight during the crash. + /// + /// The planner's `record_splits_if_necessary` filters out mature splits, + /// so we don't need to filter here. + async fn fetch_immature_splits( + &mut self, + ctx: &ActorContext, + ) -> anyhow::Result> { + // On first spawn, use the initial splits provided by the IndexingService. + if let Some(immature_splits) = self.initial_immature_splits_opt.take() { + return Ok(immature_splits); + } + // On subsequent spawns, query the metastore for published splits. + // Dispatch to the correct RPC based on whether this is a metrics or + // sketches index — they use separate Postgres tables. + let index_uid = self.params.index_uid.clone(); + let query = quickwit_metastore::ListParquetSplitsQuery::for_index(index_uid.clone()); + let is_sketch = quickwit_common::is_sketches_index(&index_uid.index_id); + let records = if is_sketch { + let list_request = quickwit_proto::metastore::ListSketchSplitsRequest::try_from_query( + index_uid.clone(), + &query, + )?; + let response = ctx + .protect_future(self.params.metastore.list_sketch_splits(list_request)) + .await?; + response.deserialize_splits()? + } else { + let list_request = quickwit_proto::metastore::ListMetricsSplitsRequest::try_from_query( + index_uid.clone(), + &query, + )?; + let response = ctx + .protect_future(self.params.metastore.list_metrics_splits(list_request)) + .await?; + response.deserialize_splits()? + }; + let splits: Vec = records.into_iter().map(|r| r.metadata).collect(); + info!( + num_splits = splits.len(), + "fetched published parquet splits for merge planning on respawn" + ); + Ok(splits) + } } #[async_trait] @@ -383,6 +465,7 @@ impl Handler for ParquetMergePipeline { supervise_loop_token: SuperviseLoop, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + self.perform_observe().await; self.perform_health_check(ctx).await?; ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, supervise_loop_token); Ok(()) @@ -475,6 +558,8 @@ impl Handler for ParquetMergePipeline { /// All actors in the pipeline share these resources via `Arc`/`Clone`. #[derive(Clone)] pub struct ParquetMergePipelineParams { + /// Index UID for metastore queries when re-seeding on respawn. + pub index_uid: IndexUid, /// Root temp directory for scratch files (downloads, merge output). pub indexing_directory: TempDirectory, /// Metastore client for staging/publishing merged splits and for @@ -490,6 +575,10 @@ pub struct ParquetMergePipelineParams { pub merge_scheduler_service: Mailbox, pub max_concurrent_split_uploads: usize, pub event_broker: EventBroker, + /// Parquet writer config for merge output (compression, page size, etc.). + /// Should match the ingest pipeline's writer config so merged files have + /// consistent compression. + pub writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig, } #[cfg(test)] @@ -507,7 +596,13 @@ mod tests { use super::*; fn make_pipeline_params(universe: &Universe) -> ParquetMergePipelineParams { - let mock_metastore = MockMetastoreService::new(); + let mut mock_metastore = MockMetastoreService::new(); + // Allow list_metrics_splits for respawn seeding (returns empty). + mock_metastore.expect_list_metrics_splits().returning(|_| { + Ok(quickwit_proto::metastore::ListMetricsSplitsResponse { + splits_serialized_json: Vec::new(), + }) + }); let storage = Arc::new(quickwit_storage::RamStorage::default()); let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new( ParquetMergePolicyConfig { @@ -520,6 +615,7 @@ mod tests { }, )); ParquetMergePipelineParams { + index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0), indexing_directory: TempDirectory::for_test(), metastore: MetastoreServiceClient::from_mock(mock_metastore), storage, @@ -527,6 +623,7 @@ mod tests { merge_scheduler_service: universe.get_or_spawn_one(), max_concurrent_split_uploads: 4, event_broker: EventBroker::default(), + writer_config: quickwit_parquet_engine::storage::ParquetWriterConfig::default(), } } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs new file mode 100644 index 00000000000..89fb53f5cea --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline_test.rs @@ -0,0 +1,645 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Integration test for the Parquet merge pipeline. +//! +//! Tests the full actor chain: seeds splits → planner plans merge → +//! downloader downloads from storage → executor merges → uploader uploads → +//! publisher publishes with replaced_split_ids. +//! +//! Verifies: +//! - Staged metadata (num_rows, time_range, metric_names, tags, merge_ops, row_keys, +//! zonemap_regexes) +//! - Merged Parquet file contents (row count, column values, sort order) +//! - Parquet KV headers (qh.sort_fields, qh.num_merge_ops, qh.row_keys, qh.zonemap_regexes) +//! - Cross-validation: metadata agrees with file contents + +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use arrow::array::{ + Array, ArrayRef, DictionaryArray, Float64Array, Int32Array, Int64Array, StringArray, + UInt8Array, UInt64Array, +}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema as ArrowSchema}; +use arrow::record_batch::RecordBatch; +use quickwit_actors::Universe; +use quickwit_common::pubsub::EventBroker; +use quickwit_common::temp_dir::TempDirectory; +use quickwit_common::test_utils::wait_until_predicate; +use quickwit_metastore::StageParquetSplitsRequestExt; +use quickwit_parquet_engine::merge::policy::{ + ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, +}; +use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN; +use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange}; +use quickwit_parquet_engine::storage::{ParquetWriter, ParquetWriterConfig}; +use quickwit_parquet_engine::table_config::TableConfig; +use quickwit_parquet_engine::test_helpers::create_nullable_dict_array; +use quickwit_proto::metastore::{ + EmptyResponse, MetastoreServiceClient, MockMetastoreService, StageMetricsSplitsRequest, +}; +use quickwit_storage::{RamStorage, Storage}; + +use super::parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams}; + +// --------------------------------------------------------------------------- +// Test data helpers +// --------------------------------------------------------------------------- + +/// Creates a RecordBatch with configurable metric name, timestamp range, and +/// tag values. This allows building diverse inputs to verify the merge engine +/// handles heterogeneous data correctly. +fn create_custom_test_batch( + metric_name: &str, + ts_start: u64, + num_rows: usize, + service_val: &str, + host_val: &str, +) -> RecordBatch { + let dict_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + let fields = vec![ + Field::new("metric_name", dict_type.clone(), false), + Field::new("metric_type", DataType::UInt8, false), + Field::new("timestamp_secs", DataType::UInt64, false), + Field::new("value", DataType::Float64, false), + Field::new("timeseries_id", DataType::Int64, false), + Field::new("service", dict_type.clone(), true), + Field::new("host", dict_type.clone(), true), + ]; + let schema = Arc::new(ArrowSchema::new(fields)); + + let metric_names: Vec<&str> = vec![metric_name; num_rows]; + let mn_keys: Vec = (0..num_rows).map(|i| i as i32).collect(); + let mn_values = StringArray::from(metric_names); + let metric_name_arr: ArrayRef = Arc::new( + DictionaryArray::::try_new(Int32Array::from(mn_keys), Arc::new(mn_values)) + .unwrap(), + ); + + let metric_type: ArrayRef = Arc::new(UInt8Array::from(vec![0u8; num_rows])); + + let timestamps: Vec = (0..num_rows).map(|i| ts_start + i as u64).collect(); + let timestamp_secs: ArrayRef = Arc::new(UInt64Array::from(timestamps)); + + let values: Vec = (0..num_rows).map(|i| 42.0 + i as f64).collect(); + let value: ArrayRef = Arc::new(Float64Array::from(values)); + + // Each row gets a unique timeseries_id based on the metric name hash + row + // index. This ensures split-a and split-b have non-overlapping IDs. + let ts_id_base: i64 = metric_name.bytes().map(|b| b as i64).sum::() * 1000; + let timeseries_ids: Vec = (0..num_rows).map(|i| ts_id_base + i as i64).collect(); + let timeseries_id: ArrayRef = Arc::new(Int64Array::from(timeseries_ids)); + + let service_vals: Vec> = vec![Some(service_val); num_rows]; + let service: ArrayRef = create_nullable_dict_array(&service_vals); + + let host_vals: Vec> = vec![Some(host_val); num_rows]; + let host: ArrayRef = create_nullable_dict_array(&host_vals); + + RecordBatch::try_new( + schema, + vec![ + metric_name_arr, + metric_type, + timestamp_secs, + value, + timeseries_id, + service, + host, + ], + ) + .unwrap() +} + +/// Write a sorted Parquet file to the given directory using the standard +/// writer (which computes sorted_series, row_keys, zonemaps, and KV metadata). +fn write_test_parquet_file( + dir: &Path, + filename: &str, + batch: &RecordBatch, + split_metadata: &ParquetSplitMetadata, +) -> u64 { + let table_config = TableConfig::default(); + let writer = ParquetWriter::new(ParquetWriterConfig::default(), &table_config) + .expect("failed to create ParquetWriter"); + + let path = dir.join(filename); + let (file_size, _write_metadata) = writer + .write_to_file_with_metadata(batch, &path, Some(split_metadata)) + .expect("failed to write test Parquet file"); + file_size +} + +/// Create a ParquetSplitMetadata consistent with the test Parquet writer. +fn make_test_split_metadata( + split_id: &str, + num_rows: u64, + size_bytes: u64, + ts_start: u64, + metric_name: &str, +) -> ParquetSplitMetadata { + let table_config = TableConfig::default(); + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("test-merge-index:00000000000000000000000001") + .partition_id(0) + .time_range(TimeRange::new(ts_start, ts_start + num_rows)) + .num_rows(num_rows) + .size_bytes(size_bytes) + .sort_fields(table_config.effective_sort_fields()) + .window_start_secs(0) + .window_duration_secs(900) + .add_metric_name(metric_name) + .build() +} + +// --------------------------------------------------------------------------- +// Parquet reading helpers (for verifying merged output) +// --------------------------------------------------------------------------- + +/// Read a Parquet file from raw bytes into a single concatenated RecordBatch. +fn read_parquet_from_bytes(data: &[u8]) -> RecordBatch { + let bytes = bytes::Bytes::copy_from_slice(data); + let builder = + parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + let reader = builder.build().unwrap(); + + let batches: Vec = reader.collect::, _>>().unwrap(); + assert!(!batches.is_empty(), "expected at least one batch"); + + let schema = batches[0].schema(); + arrow::compute::concat_batches(&schema, &batches).unwrap() +} + +/// Extract the Parquet file-level KV metadata from raw bytes. +fn extract_parquet_kv_metadata(data: &[u8]) -> HashMap { + let bytes = bytes::Bytes::copy_from_slice(data); + let builder = + parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap(); + let kv = builder + .metadata() + .file_metadata() + .key_value_metadata() + .expect("kv metadata must be present"); + kv.iter() + .filter_map(|entry| entry.value.as_ref().map(|v| (entry.key.clone(), v.clone()))) + .collect() +} + +/// Extract string values from a column that may be Dictionary-encoded or plain +/// Utf8. Handles all representations uniformly. +fn extract_string_column(batch: &RecordBatch, name: &str) -> Vec { + let idx = batch.schema().index_of(name).unwrap(); + let col = batch.column(idx); + + if let Some(dict) = col.as_any().downcast_ref::>() { + let values = dict + .values() + .as_any() + .downcast_ref::() + .unwrap(); + return (0..dict.len()) + .map(|i| { + let key = dict.keys().value(i) as usize; + values.value(key).to_string() + }) + .collect(); + } + + if let Some(str_arr) = col.as_any().downcast_ref::() { + return (0..str_arr.len()) + .map(|i| str_arr.value(i).to_string()) + .collect(); + } + + panic!( + "column '{}' is neither Dict nor Utf8, got {:?}", + name, + col.data_type() + ); +} + +fn extract_u64_column(batch: &RecordBatch, name: &str) -> Vec { + let idx = batch.schema().index_of(name).unwrap(); + batch + .column(idx) + .as_any() + .downcast_ref::() + .expect("expected UInt64") + .values() + .to_vec() +} + +fn extract_binary_column(batch: &RecordBatch, name: &str) -> Vec> { + let idx = batch.schema().index_of(name).unwrap(); + let col = batch + .column(idx) + .as_any() + .downcast_ref::() + .expect("expected Binary"); + (0..col.len()).map(|i| col.value(i).to_vec()).collect() +} + +// --------------------------------------------------------------------------- +// Integration test +// --------------------------------------------------------------------------- + +/// Full integration test: seed splits → merge → verify everything. +/// +/// Creates 2 real sorted Parquet files with DIFFERENT data: +/// - split-a: 50 rows, metric "cpu.usage", timestamps 100–149, service="web", host="host-1" +/// - split-b: 50 rows, metric "mem.usage", timestamps 200–249, service="api", host="host-2" +/// +/// Then verifies the merge pipeline: +/// 1. Plans a merge (merge_factor=2) +/// 2. Downloads files from storage +/// 3. Executes the merge via the k-way merge engine +/// 4. Uploads the merged output +/// 5. Publishes with replaced_split_ids matching the input splits +/// +/// And asserts: +/// - Staged metadata correctness (num_rows, time_range, metric_names, tags, num_merge_ops, +/// row_keys, zonemaps) +/// - Merged Parquet file data (row count, column values, sort order) +/// - Parquet KV headers (qh.sort_fields, qh.num_merge_ops, qh.row_keys, qh.zonemap_regexes) +/// - Cross-validation: metadata matches file contents +#[tokio::test] +async fn test_merge_pipeline_end_to_end() { + quickwit_common::setup_logging_for_tests(); + + let universe = Universe::with_accelerated_time(); + let temp_dir = tempfile::tempdir().unwrap(); + let ram_storage: Arc = Arc::new(RamStorage::default()); + + // --- Step 1: Create diverse Parquet files and upload to storage --- + + let batch_a = create_custom_test_batch("cpu.usage", 100, 50, "web", "host-1"); + let meta_a = make_test_split_metadata("split-a", 50, 0, 100, "cpu.usage"); + let size_a = write_test_parquet_file(temp_dir.path(), "split-a.parquet", &batch_a, &meta_a); + let meta_a = { + let mut m = meta_a; + m.size_bytes = size_a; + m.parquet_file = "split-a.parquet".to_string(); + m + }; + + let batch_b = create_custom_test_batch("mem.usage", 200, 50, "api", "host-2"); + let meta_b = make_test_split_metadata("split-b", 50, 0, 200, "mem.usage"); + let size_b = write_test_parquet_file(temp_dir.path(), "split-b.parquet", &batch_b, &meta_b); + let meta_b = { + let mut m = meta_b; + m.size_bytes = size_b; + m.parquet_file = "split-b.parquet".to_string(); + m + }; + + // Upload files to RamStorage. + let content_a = std::fs::read(temp_dir.path().join("split-a.parquet")).unwrap(); + ram_storage + .put(Path::new("split-a.parquet"), Box::new(content_a)) + .await + .unwrap(); + let content_b = std::fs::read(temp_dir.path().join("split-b.parquet")).unwrap(); + ram_storage + .put(Path::new("split-b.parquet"), Box::new(content_b)) + .await + .unwrap(); + + // --- Step 2: Set up mock metastore (capture staged metadata) --- + + let mut mock_metastore = MockMetastoreService::new(); + + // Capture the staged split metadata for verification. + let staged_metadata: Arc>> = + Arc::new(std::sync::Mutex::new(Vec::new())); + let staged_metadata_clone = staged_metadata.clone(); + + mock_metastore.expect_stage_metrics_splits().returning( + move |request: StageMetricsSplitsRequest| { + let splits = request + .deserialize_splits_metadata() + .expect("failed to deserialize staged metadata"); + staged_metadata_clone.lock().unwrap().extend(splits); + Ok(EmptyResponse {}) + }, + ); + + // Capture the publish request to verify replaced_split_ids. + let publish_called = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let publish_called_clone = publish_called.clone(); + let replaced_ids = Arc::new(std::sync::Mutex::new(Vec::::new())); + let replaced_ids_clone = replaced_ids.clone(); + + mock_metastore + .expect_publish_metrics_splits() + .returning(move |request| { + replaced_ids_clone + .lock() + .unwrap() + .extend(request.replaced_split_ids.clone()); + publish_called_clone.store(true, Ordering::SeqCst); + Ok(EmptyResponse {}) + }); + + let metastore = MetastoreServiceClient::from_mock(mock_metastore); + + // --- Step 3: Spawn the merge pipeline --- + + let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new( + ParquetMergePolicyConfig { + merge_factor: 2, + max_merge_factor: 2, + max_merge_ops: 5, + target_split_size_bytes: 256 * 1024 * 1024, + maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 3, + }, + )); + + let params = ParquetMergePipelineParams { + index_uid: quickwit_proto::types::IndexUid::for_test("test-merge-index", 0), + indexing_directory: TempDirectory::for_test(), + metastore, + storage: ram_storage.clone(), + merge_policy, + merge_scheduler_service: universe.get_or_spawn_one(), + max_concurrent_split_uploads: 4, + event_broker: EventBroker::default(), + writer_config: ParquetWriterConfig::default(), + }; + + let initial_splits = vec![meta_a, meta_b]; + let pipeline = ParquetMergePipeline::new(params, Some(initial_splits), universe.spawn_ctx()); + let (_pipeline_mailbox, _pipeline_handle) = universe.spawn_builder().spawn(pipeline); + + // --- Step 4: Wait for publish with replaced_split_ids --- + + wait_until_predicate( + || { + let publish_called = publish_called.clone(); + async move { publish_called.load(Ordering::SeqCst) } + }, + Duration::from_secs(30), + Duration::from_millis(100), + ) + .await + .expect("timed out waiting for merge publish"); + + // --- Step 5: Verify replaced_split_ids --- + + let mut replaced_sorted: Vec = replaced_ids.lock().unwrap().clone(); + replaced_sorted.sort(); + assert_eq!( + replaced_sorted, + vec!["split-a".to_string(), "split-b".to_string()], + "publish should replace both input splits" + ); + + // --- Step 6: Verify staged metadata --- + + let staged = staged_metadata.lock().unwrap().clone(); + assert_eq!(staged.len(), 1, "exactly one merged split should be staged"); + let merged_meta = &staged[0]; + + // MC-1: total row count preserved. + assert_eq!(merged_meta.num_rows, 100, "merged split must have 100 rows"); + + // Time range covers both inputs: min(100) to max(249+1=250). + assert_eq!( + merged_meta.time_range.start_secs, 100, + "time_range.start should be min of inputs" + ); + assert_eq!( + merged_meta.time_range.end_secs, 250, + "time_range.end should be max timestamp + 1" + ); + + // Metric names from both inputs. + let expected_metrics: HashSet = ["cpu.usage", "mem.usage"] + .iter() + .map(|s| s.to_string()) + .collect(); + assert_eq!( + merged_meta.metric_names, expected_metrics, + "merged split must contain both metric names" + ); + + // num_merge_ops: max(0, 0) + 1 = 1. + assert_eq!( + merged_meta.num_merge_ops, 1, + "first merge should set num_merge_ops to 1" + ); + + // Sort fields should be preserved from inputs. + let table_config = TableConfig::default(); + assert_eq!( + merged_meta.sort_fields, + table_config.effective_sort_fields(), + "sort_fields must be preserved through merge" + ); + + // Row keys should be present (the writer always computes them). + assert!( + merged_meta.row_keys_proto.is_some(), + "merged split must have row_keys_proto" + ); + let row_keys_bytes = merged_meta.row_keys_proto.as_ref().unwrap(); + assert!( + !row_keys_bytes.is_empty(), + "row_keys_proto must not be empty" + ); + + // Zonemap regexes should be present for string sort columns. + assert!( + !merged_meta.zonemap_regexes.is_empty(), + "merged split must have zonemap regexes" + ); + // metric_name is a string sort column, so it must have a zonemap regex. + assert!( + merged_meta.zonemap_regexes.contains_key("metric_name"), + "zonemap_regexes must include metric_name; got keys: {:?}", + merged_meta.zonemap_regexes.keys().collect::>() + ); + + // low_cardinality_tags: only "service" is extracted in both the ingest + // and merge paths (see extract_service_names in split_writer.rs and + // merge/writer.rs). Other tag columns (host, env, etc.) are fully + // covered by zonemap regexes and row keys for pruning — this field + // is a secondary optimization for exact-match Postgres queries. + assert!( + merged_meta.low_cardinality_tags.contains_key("service"), + "tags must include 'service'; got: {:?}", + merged_meta.low_cardinality_tags.keys().collect::>() + ); + let service_tags = &merged_meta.low_cardinality_tags["service"]; + let expected_services: HashSet = ["web", "api"].iter().map(|s| s.to_string()).collect(); + assert_eq!( + *service_tags, expected_services, + "service tag values must include both inputs" + ); + + // --- Step 7: Read back merged Parquet file and verify contents --- + + let merged_parquet_path = &merged_meta.parquet_file; + let merged_bytes = ram_storage + .get_all(Path::new(merged_parquet_path)) + .await + .expect("merged parquet file must exist in storage"); + + let merged_batch = read_parquet_from_bytes(&merged_bytes); + + // MC-1: row count matches metadata. + assert_eq!( + merged_batch.num_rows(), + 100, + "Parquet file row count must match metadata" + ); + assert_eq!( + merged_batch.num_rows() as u64, + merged_meta.num_rows, + "Parquet row count must equal staged metadata num_rows" + ); + + // Verify all expected metric names are present. + let metric_names_in_file: HashSet = extract_string_column(&merged_batch, "metric_name") + .into_iter() + .collect(); + assert_eq!( + metric_names_in_file, expected_metrics, + "Parquet file must contain both metric names" + ); + + // Verify all expected timestamps are present. + let timestamps_in_file = extract_u64_column(&merged_batch, "timestamp_secs"); + assert_eq!( + timestamps_in_file.len(), + 100, + "must have 100 timestamp values" + ); + let ts_min = *timestamps_in_file.iter().min().unwrap(); + let ts_max = *timestamps_in_file.iter().max().unwrap(); + assert_eq!(ts_min, 100, "min timestamp must be 100"); + assert_eq!(ts_max, 249, "max timestamp must be 249"); + + // Cross-validate: metadata time_range matches actual timestamps. + assert_eq!( + merged_meta.time_range.start_secs, ts_min, + "metadata time_range.start must match actual min timestamp" + ); + assert_eq!( + merged_meta.time_range.end_secs, + ts_max + 1, + "metadata time_range.end must be actual max timestamp + 1" + ); + + // Verify all tag values from both inputs survive in the Parquet data + // (even if the metadata doesn't track all tag columns yet). + let services_in_file: HashSet = extract_string_column(&merged_batch, "service") + .into_iter() + .collect(); + assert_eq!( + services_in_file, expected_services, + "Parquet file must contain both service values" + ); + let expected_hosts: HashSet = + ["host-1", "host-2"].iter().map(|s| s.to_string()).collect(); + let hosts_in_file: HashSet = extract_string_column(&merged_batch, "host") + .into_iter() + .collect(); + assert_eq!( + hosts_in_file, expected_hosts, + "Parquet file must contain both host values" + ); + + // Verify sort order: sorted_series column must be monotonically + // non-decreasing (the fundamental invariant of the Parquet writer). + let sorted_series = extract_binary_column(&merged_batch, SORTED_SERIES_COLUMN); + for i in 1..sorted_series.len() { + assert!( + sorted_series[i] >= sorted_series[i - 1], + "sorted_series must be monotonically non-decreasing at row {}: {:?} < {:?}", + i, + &sorted_series[i], + &sorted_series[i - 1] + ); + } + + // Verify sort order semantics: cpu.usage rows should come before + // mem.usage rows (metric_name is the first sort column, ascending). + let metric_name_vec = extract_string_column(&merged_batch, "metric_name"); + let first_mem_idx = metric_name_vec + .iter() + .position(|n| n == "mem.usage") + .expect("must have mem.usage rows"); + let last_cpu_idx = metric_name_vec + .iter() + .rposition(|n| n == "cpu.usage") + .expect("must have cpu.usage rows"); + assert!( + last_cpu_idx < first_mem_idx, + "all cpu.usage rows (last at {}) must precede all mem.usage rows (first at {})", + last_cpu_idx, + first_mem_idx + ); + + // --- Step 8: Verify Parquet KV metadata headers --- + + let kv_metadata = extract_parquet_kv_metadata(&merged_bytes); + + // qh.sort_fields must match the table config. + let qh_sort_fields = kv_metadata + .get("qh.sort_fields") + .expect("qh.sort_fields must be present"); + assert_eq!( + qh_sort_fields, + &table_config.effective_sort_fields(), + "qh.sort_fields must match table config" + ); + + // qh.num_merge_ops must be "1" (first merge). + let qh_merge_ops = kv_metadata + .get("qh.num_merge_ops") + .expect("qh.num_merge_ops must be present"); + assert_eq!(qh_merge_ops, "1", "qh.num_merge_ops must be 1"); + + // qh.row_keys must be present and non-empty. + let qh_row_keys = kv_metadata + .get("qh.row_keys") + .expect("qh.row_keys must be present"); + assert!(!qh_row_keys.is_empty(), "qh.row_keys must not be empty"); + + // qh.zonemap_regexes must be present and contain metric_name. + let qh_zonemaps = kv_metadata + .get("qh.zonemap_regexes") + .expect("qh.zonemap_regexes must be present"); + let zonemaps_parsed: HashMap = + serde_json::from_str(qh_zonemaps).expect("qh.zonemap_regexes must be valid JSON"); + assert!( + zonemaps_parsed.contains_key("metric_name"), + "qh.zonemap_regexes must include metric_name" + ); + + // Cross-validate: metadata zonemap_regexes should match Parquet header. + assert_eq!( + merged_meta.zonemap_regexes, zonemaps_parsed, + "metadata zonemap_regexes must match Parquet qh.zonemap_regexes" + ); + + universe.assert_quit().await; +} diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs index 6d6b43e86da..9bb1c31dd65 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/pipeline.rs @@ -346,9 +346,14 @@ impl MetricsPipeline { .set_kill_switch(self.kill_switch.clone()) .spawn(uploader); - // ParquetPackager + // ParquetPackager — read sort schema and window duration from index config. let writer_config = quickwit_parquet_engine::storage::ParquetWriterConfig::default(); - let table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + let parquet_indexing_config = self.params.indexing_settings.parquet_indexing(); + let mut table_config = quickwit_parquet_engine::table_config::TableConfig::default(); + if let Some(ref sort_fields) = parquet_indexing_config.sort_fields { + table_config.sort_fields = Some(sort_fields.clone()); + } + table_config.window_duration_secs = parquet_indexing_config.window_duration_secs; let split_kind = if self.params.use_sketch_processors { quickwit_parquet_engine::split::ParquetSplitKind::Sketches } else { diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 9319f8d8498..e94e70d3370 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -192,6 +192,27 @@ pub fn default_merge_policy() -> Arc { merge_policy_from_settings(&indexing_settings) } +/// Creates a Parquet merge policy from the index's `ParquetMergePolicyConfig`. +#[cfg(feature = "metrics")] +pub fn parquet_merge_policy_from_settings( + settings: &IndexingSettings, +) -> Arc { + let config = settings.parquet_merge_policy(); + let engine_config = quickwit_parquet_engine::merge::policy::ParquetMergePolicyConfig { + merge_factor: config.merge_factor, + max_merge_factor: config.max_merge_factor, + max_merge_ops: config.max_merge_ops, + target_split_size_bytes: config.target_split_size_bytes, + maturation_period: config.maturation_period, + max_finalize_merge_operations: config.max_finalize_merge_operations, + }; + Arc::new( + quickwit_parquet_engine::merge::policy::ConstWriteAmplificationParquetMergePolicy::new( + engine_config, + ), + ) +} + pub fn nop_merge_policy() -> Arc { Arc::new(NopMergePolicy) }