From 3a91a31bd43eebb6180a8c789cc817aa5c0c7cb2 Mon Sep 17 00:00:00 2001 From: George Talbot Date: Tue, 28 Apr 2026 18:02:25 -0400 Subject: [PATCH] feat: add ParquetMergePlanner actor, scheduler extension, and downloader stub (Phase 3b) Phase 3 pipeline integration, second PR: - ParquetMergePlanner: receives ParquetNewSplits, groups by CompactionScope, invokes ParquetMergePolicy::operations(), dispatches to scheduler. Handles startup seeding of immature splits, deduplication via known_split_ids, RunFinalizeMergePolicyAndQuit for cold-window finalization. 6 tests. - MergeSchedulerService extension: ScheduleParquetMerge message with shared merge_semaphore for global concurrency control across Tantivy and Parquet merges. Feature-gated behind cfg(feature = "metrics"). Existing scheduler tests unaffected. - ParquetMergeSplitDownloader stub: minimal Actor + Handler for the scheduler and planner to reference. Full download implementation comes in PR 3c. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/actors/merge_scheduler_service.rs | 185 ++++++ .../src/actors/metrics_pipeline/mod.rs | 4 + .../metrics_pipeline/parquet_merge_planner.rs | 627 ++++++++++++++++++ .../parquet_merge_split_downloader.rs | 68 ++ quickwit/quickwit-indexing/src/actors/mod.rs | 2 + 5 files changed, 886 insertions(+) create mode 100644 quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs create mode 100644 quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index bbe5267d514..6f6fc60e467 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -20,11 +20,15 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox}; +#[cfg(feature = "metrics")] +use quickwit_parquet_engine::merge::policy::ParquetMergeOperation; use tantivy::TrackedObject; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::error; use super::MergeSplitDownloader; +#[cfg(feature = "metrics")] +use super::metrics_pipeline::{ParquetMergeSplitDownloader, ParquetMergeTask}; use crate::merge_policy::{MergeOperation, MergeTask}; pub struct MergePermit { @@ -70,6 +74,20 @@ pub async fn schedule_merge( Ok(()) } +#[cfg(feature = "metrics")] +pub async fn schedule_parquet_merge( + merge_scheduler_service: &Mailbox, + merge_operation: TrackedObject, + merge_split_downloader_mailbox: Mailbox, +) -> anyhow::Result<()> { + let schedule_merge = ScheduleParquetMerge::new(merge_operation, merge_split_downloader_mailbox); + merge_scheduler_service + .ask(schedule_merge) + .await + .context("failed to schedule parquet merge")?; + Ok(()) +} + struct ScheduledMerge { score: u64, id: u64, //< just for total ordering. @@ -103,6 +121,45 @@ impl Ord for ScheduledMerge { } } +#[cfg(feature = "metrics")] +struct ScheduledParquetMerge { + score: u64, + id: u64, + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, +} + +#[cfg(feature = "metrics")] +impl ScheduledParquetMerge { + fn order_key(&self) -> (u64, Reverse) { + (self.score, Reverse(self.id)) + } +} + +#[cfg(feature = "metrics")] +impl Eq for ScheduledParquetMerge {} + +#[cfg(feature = "metrics")] +impl PartialEq for ScheduledParquetMerge { + fn eq(&self, other: &Self) -> bool { + self.cmp(other).is_eq() + } +} + +#[cfg(feature = "metrics")] +impl PartialOrd for ScheduledParquetMerge { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[cfg(feature = "metrics")] +impl Ord for ScheduledParquetMerge { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.order_key().cmp(&other.order_key()) + } +} + /// The merge scheduler service is in charge of keeping track of all scheduled merge operations, /// and schedule them in the best possible order, respecting the `merge_concurrency` limit. /// @@ -116,6 +173,8 @@ pub struct MergeSchedulerService { merge_semaphore: Arc, merge_concurrency: usize, pending_merge_queue: BinaryHeap, + #[cfg(feature = "metrics")] + pending_parquet_merge_queue: BinaryHeap, next_merge_id: u64, pending_merge_bytes: u64, } @@ -133,6 +192,8 @@ impl MergeSchedulerService { merge_semaphore, merge_concurrency, pending_merge_queue: BinaryHeap::default(), + #[cfg(feature = "metrics")] + pending_parquet_merge_queue: BinaryHeap::default(), next_merge_id: 0, pending_merge_bytes: 0, } @@ -183,6 +244,54 @@ impl MergeSchedulerService { } } } + // Dispatch pending Parquet merges. Shares the same semaphore as + // Tantivy merges so the node doesn't exceed its merge concurrency + // limit regardless of how many pipelines of each type are running. + #[cfg(feature = "metrics")] + loop { + let merge_semaphore = self.merge_semaphore.clone(); + let Some(next_merge) = self.pending_parquet_merge_queue.peek_mut() else { + break; + }; + let Ok(semaphore_permit) = Semaphore::try_acquire_owned(merge_semaphore) else { + break; + }; + let merge_permit = MergePermit { + _semaphore_permit: Some(semaphore_permit), + merge_scheduler_mailbox: Some(ctx.mailbox().clone()), + }; + let ScheduledParquetMerge { + merge_operation, + split_downloader_mailbox, + .. + } = PeekMut::pop(next_merge); + // The permit is owned by the task and released via Drop when + // the executor finishes, triggering PermitReleased back here. + let parquet_merge_task = ParquetMergeTask { + merge_operation, + merge_permit, + }; + self.pending_merge_bytes -= parquet_merge_task.merge_operation.total_size_bytes(); + crate::metrics::INDEXER_METRICS + .pending_merge_operations + .set( + self.pending_merge_queue.len() as i64 + + self.pending_parquet_merge_queue.len() as i64, + ); + crate::metrics::INDEXER_METRICS + .pending_merge_bytes + .set(self.pending_merge_bytes as i64); + match split_downloader_mailbox.try_send_message(parquet_merge_task) { + Ok(_) => {} + Err(quickwit_actors::TrySendError::Full(_)) => { + error!("parquet split downloader queue is full: please report"); + } + Err(quickwit_actors::TrySendError::Disconnected) => { + // The downloader is dead — pipeline probably restarted. + } + } + } + let num_merges = self.merge_concurrency as i64 - self.merge_semaphore.available_permits() as i64; crate::metrics::INDEXER_METRICS @@ -293,6 +402,82 @@ impl Handler for MergeSchedulerService { } } +// --- Parquet merge scheduling (feature-gated) --- + +#[cfg(feature = "metrics")] +fn score_parquet_merge_operation(merge_operation: &ParquetMergeOperation) -> u64 { + let total_num_bytes = merge_operation.total_size_bytes(); + if total_num_bytes == 0 { + return u64::MAX; + } + let delta_num_splits = (merge_operation.splits.len() - 1) as u64; + (delta_num_splits << 48) + .checked_div(total_num_bytes) + .unwrap_or(1u64) +} + +#[cfg(feature = "metrics")] +#[derive(Debug)] +struct ScheduleParquetMerge { + score: u64, + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, +} + +#[cfg(feature = "metrics")] +impl ScheduleParquetMerge { + pub fn new( + merge_operation: TrackedObject, + split_downloader_mailbox: Mailbox, + ) -> Self { + let score = score_parquet_merge_operation(&merge_operation); + Self { + score, + merge_operation, + split_downloader_mailbox, + } + } +} + +#[cfg(feature = "metrics")] +#[async_trait] +impl Handler for MergeSchedulerService { + type Reply = (); + + async fn handle( + &mut self, + schedule_merge: ScheduleParquetMerge, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + let ScheduleParquetMerge { + score, + merge_operation, + split_downloader_mailbox, + } = schedule_merge; + let merge_id = self.next_merge_id; + self.next_merge_id += 1; + let scheduled = ScheduledParquetMerge { + score, + id: merge_id, + merge_operation, + split_downloader_mailbox, + }; + self.pending_merge_bytes += scheduled.merge_operation.total_size_bytes(); + self.pending_parquet_merge_queue.push(scheduled); + crate::metrics::INDEXER_METRICS + .pending_merge_operations + .set( + self.pending_merge_queue.len() as i64 + + self.pending_parquet_merge_queue.len() as i64, + ); + crate::metrics::INDEXER_METRICS + .pending_merge_bytes + .set(self.pending_merge_bytes as i64); + self.schedule_pending_merges(ctx); + Ok(()) + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index 413c3f3bb6c..ec73c03b1d2 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -26,6 +26,8 @@ mod indexing_service_impl; mod parquet_doc_processor; mod parquet_indexer; pub(crate) mod parquet_merge_messages; +mod parquet_merge_planner; +mod parquet_merge_split_downloader; mod parquet_packager; mod parquet_splits_update; mod parquet_uploader; @@ -46,6 +48,8 @@ pub use parquet_doc_processor::{ }; pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch}; pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits}; +pub use parquet_merge_planner::ParquetMergePlanner; +pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader; pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters}; pub use parquet_splits_update::ParquetSplitsUpdate; pub use parquet_uploader::ParquetUploader; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs new file mode 100644 index 00000000000..12cd66f3e0b --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_planner.rs @@ -0,0 +1,627 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet merge planner actor. +//! +//! Receives [`ParquetNewSplits`] notifications, groups splits by +//! [`CompactionScope`], invokes [`ParquetMergePolicy::operations()`], and +//! dispatches merge tasks to the [`MergeSchedulerService`]. +//! +//! Follows the same pattern as the Tantivy [`MergePlanner`] but uses +//! Parquet-specific types: +//! +//! - `CompactionScope` instead of `MergePartition` +//! - `ParquetMergePolicy` instead of `MergePolicy` +//! - `ParquetMergeOperation` instead of `MergeOperation` + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_parquet_engine::merge::policy::{ + CompactionScope, ParquetMergeOperation, ParquetMergePolicy, +}; +use quickwit_parquet_engine::split::ParquetSplitMetadata; +use tantivy::Inventory; +use tracing::{info, warn}; + +use super::{ParquetMergeSplitDownloader, ParquetNewSplits}; +use crate::actors::MergeSchedulerService; +use crate::actors::merge_scheduler_service::schedule_parquet_merge; + +/// Signal the planner to run `finalize_operations()` for cold windows and +/// then exit. +/// +/// Sent by `ParquetMergePipeline` during graceful shutdown to ensure +/// cold-window splits get a final merge before the pipeline stops. +#[derive(Debug)] +pub struct RunFinalizeMergePolicyAndQuit; + +/// Internal message to trigger merge planning after initialization. +#[derive(Debug)] +struct PlanParquetMerge { + incarnation_started_at: Instant, +} + +/// Parquet merge planner: decides when to start merge operations. +/// +/// Receives split notifications, groups them by [`CompactionScope`], applies +/// the merge policy, and dispatches merge operations to the scheduler. +/// +/// Grouping by `CompactionScope` ensures splits from different time windows, +/// partitions, or sort schemas are never merged together — doing so would +/// violate temporal pruning (TW-3) or sort order (MC-3) guarantees. +/// +/// Follows the same pattern as the Tantivy [`MergePlanner`] but with +/// `CompactionScope` instead of `MergePartition` and Parquet-specific types. +pub struct ParquetMergePlanner { + /// Splits grouped by compaction scope that are eligible for merging. + scoped_young_splits: HashMap>, + + /// Tracks known split IDs to deduplicate `ParquetNewSplits` messages. + /// + /// The publisher's feedback loop can re-send splits the planner already + /// knows about (e.g., after a mailbox recycle during pipeline restart). + /// Without deduplication, the same split would be counted twice and + /// could trigger incorrect merge planning. Periodically rebuilt from + /// `scoped_young_splits` + `ongoing_merge_operations_inventory` to + /// avoid unbounded growth. + known_split_ids: HashSet, + known_split_ids_recompute_attempt_id: usize, + + merge_policy: Arc, + + merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, + + /// Inventory of ongoing merge operations. Tracked objects are dropped + /// after the merged split is published, which lets us distinguish + /// "in-flight" splits from stale IDs during known_split_ids rebuild. + ongoing_merge_operations_inventory: Inventory, + + /// Incarnation timestamp — ignores stale `PlanParquetMerge` messages + /// from a previous planner instance when the mailbox is recycled during + /// pipeline restart. Without this, old messages would trigger premature + /// merge planning before the new planner has a consolidated view of + /// published splits. See Tantivy MergePlanner #3847. + incarnation_started_at: Instant, +} + +#[async_trait] +impl Actor for ParquetMergePlanner { + type ObservableState = (); + + fn observable_state(&self) {} + + fn name(&self) -> String { + "ParquetMergePlanner".to_string() + } + + fn queue_capacity(&self) -> QueueCapacity { + // Same capacity as Tantivy planner — low to avoid stale backlogs. + QueueCapacity::Bounded(1) + } + + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + // Queue a PlanParquetMerge to consolidate any splits from a recycled + // mailbox before planning. See Tantivy MergePlanner #3847. + let _ = ctx.try_send_self_message(PlanParquetMerge { + incarnation_started_at: self.incarnation_started_at, + }); + Ok(()) + } +} + +#[async_trait] +impl Handler for ParquetMergePlanner { + type Reply = (); + + async fn handle( + &mut self, + new_splits: ParquetNewSplits, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.record_splits_if_necessary(new_splits.new_splits); + self.send_merge_ops(false, ctx).await?; + self.recompute_known_splits_if_necessary(); + Ok(()) + } +} + +#[async_trait] +impl Handler for ParquetMergePlanner { + type Reply = (); + + async fn handle( + &mut self, + _msg: RunFinalizeMergePolicyAndQuit, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.send_merge_ops(true, ctx).await?; + Err(ActorExitStatus::Success) + } +} + +#[async_trait] +impl Handler for ParquetMergePlanner { + type Reply = (); + + async fn handle( + &mut self, + plan_merge: PlanParquetMerge, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + if plan_merge.incarnation_started_at == self.incarnation_started_at { + self.send_merge_ops(false, ctx).await?; + } + self.recompute_known_splits_if_necessary(); + Ok(()) + } +} + +impl ParquetMergePlanner { + pub fn new( + immature_splits: Vec, + merge_policy: Arc, + merge_split_downloader_mailbox: Mailbox, + merge_scheduler_service: Mailbox, + ) -> Self { + let mut planner = Self { + scoped_young_splits: HashMap::new(), + known_split_ids: HashSet::new(), + known_split_ids_recompute_attempt_id: 0, + merge_policy, + merge_split_downloader_mailbox, + merge_scheduler_service, + ongoing_merge_operations_inventory: Inventory::default(), + incarnation_started_at: Instant::now(), + }; + planner.record_splits_if_necessary(immature_splits); + planner + } + + /// Filters and records incoming splits, skipping: + /// - Mature splits (already at max merge ops or target size) + /// - Splits we've already seen (dedup via `known_split_ids`) + /// - Pre-Phase-31 splits without a window (can't participate in compaction) + fn record_splits_if_necessary(&mut self, splits: Vec) { + for split in splits { + if let quickwit_parquet_engine::merge::policy::ParquetSplitMaturity::Mature = self + .merge_policy + .split_maturity(split.size_bytes, split.num_merge_ops) + { + continue; + } + if !self.acknowledge_split(split.split_id.as_str()) { + continue; + } + self.record_split(split); + } + } + + /// Returns `true` if this split ID was not previously known, and records + /// it. Returns `false` (and does nothing) if we've already seen it. + fn acknowledge_split(&mut self, split_id: &str) -> bool { + if self.known_split_ids.contains(split_id) { + return false; + } + self.known_split_ids.insert(split_id.to_string()); + true + } + + /// Places a split into the appropriate compaction scope bucket. + fn record_split(&mut self, split: ParquetSplitMetadata) { + let Some(scope) = CompactionScope::from_split(&split) else { + // Pre-Phase-31 splits have no window — can't compact them. + return; + }; + self.scoped_young_splits + .entry(scope) + .or_default() + .push(split); + } + + /// Amortized GC for `known_split_ids`. + /// + /// The set grows monotonically (we add IDs but never remove inline). + /// Every 100 calls, we rebuild from only the splits that still matter + /// (young splits + in-flight merges), shrinking the set back down. + /// Same pattern as Tantivy's `MergePlanner`. + fn recompute_known_splits_if_necessary(&mut self) { + self.known_split_ids_recompute_attempt_id += 1; + if self + .known_split_ids_recompute_attempt_id + .is_multiple_of(100) + { + self.known_split_ids = self.rebuild_known_split_ids(); + self.known_split_ids_recompute_attempt_id = 0; + } + } + + /// Rebuilds `known_split_ids` from current state: young splits still + /// waiting for merge + splits currently in-flight in merge operations. + fn rebuild_known_split_ids(&self) -> HashSet { + let mut known = HashSet::new(); + for splits in self.scoped_young_splits.values() { + for split in splits { + known.insert(split.split_id.as_str().to_string()); + } + } + for op in self.ongoing_merge_operations_inventory.list() { + for split in &op.splits { + known.insert(split.split_id.as_str().to_string()); + } + } + // If rebuild didn't shrink the set by at least half, something may be + // leaking IDs (splits not being dropped from the inventory). + if known.len() * 2 >= self.known_split_ids.len() { + warn!( + known_split_ids_len_after = known.len(), + known_split_ids_len_before = self.known_split_ids.len(), + "rebuilding known_split_ids did not halve the set — potential leak" + ); + } + known + } + + /// Asks the merge policy which splits should be merged, per scope. + /// + /// `operations()` drains participating splits from each scope's vec; + /// splits not selected remain for future rounds. `finalize_operations()` + /// uses a lower merge factor for cold-window cleanup at shutdown. + async fn compute_merge_ops( + &mut self, + is_finalize: bool, + ctx: &ActorContext, + ) -> Result, ActorExitStatus> { + let mut merge_operations = Vec::new(); + for young_splits in self.scoped_young_splits.values_mut() { + if !young_splits.is_empty() { + let operations = if is_finalize { + self.merge_policy.finalize_operations(young_splits) + } else { + self.merge_policy.operations(young_splits) + }; + merge_operations.extend(operations); + } + ctx.record_progress(); + ctx.yield_now().await; + } + self.scoped_young_splits + .retain(|_, splits| !splits.is_empty()); + Ok(merge_operations) + } + + /// Computes merge operations and dispatches each to the scheduler. + /// + /// Each operation is tracked in the inventory so we know which splits are + /// in-flight (for known_split_ids rebuild and for the pipeline to observe). + async fn send_merge_ops( + &mut self, + is_finalize: bool, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + let merge_ops = self.compute_merge_ops(is_finalize, ctx).await?; + for merge_operation in merge_ops { + info!( + merge_split_id = %merge_operation.merge_split_id, + num_inputs = merge_operation.splits.len(), + total_bytes = merge_operation.total_size_bytes(), + "scheduling parquet merge operation" + ); + let tracked = self + .ongoing_merge_operations_inventory + .track(merge_operation); + schedule_parquet_merge( + &self.merge_scheduler_service, + tracked, + self.merge_split_downloader_mailbox.clone(), + ) + .await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use quickwit_actors::Universe; + use quickwit_parquet_engine::merge::policy::{ + ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, + }; + use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange}; + + use super::*; + use crate::actors::metrics_pipeline::ParquetMergeTask; + + fn make_split(split_id: &str, size_bytes: u64, num_merge_ops: u32) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("test-index:00000000000000000000000001") + .partition_id(0) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(size_bytes) + .sort_fields("metric_name|host|timestamp_secs/V2") + .window_start_secs(0) + .window_duration_secs(3600) + .num_merge_ops(num_merge_ops) + .build() + } + + fn make_policy(merge_factor: usize) -> Arc { + Arc::new(ConstWriteAmplificationParquetMergePolicy::new( + ParquetMergePolicyConfig { + merge_factor, + max_merge_factor: merge_factor, + max_merge_ops: 5, + target_split_size_bytes: 256 * 1024 * 1024, + maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 3, + }, + )) + } + + #[tokio::test] + async fn test_planner_plans_merge_when_enough_splits() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe.create_test_mailbox(); + let policy = make_policy(3); + + let planner = ParquetMergePlanner::new( + Vec::new(), + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (planner_mailbox, planner_handle) = universe.spawn_builder().spawn(planner); + + // Send 2 splits — not enough for merge_factor=3. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![ + make_split("s0", 1_000_000, 0), + make_split("s1", 1_000_000, 0), + ], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert!( + tasks.is_empty(), + "should not merge with < merge_factor splits" + ); + + // Send 1 more — now we have 3 splits at level 0, should trigger merge. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![make_split("s2", 1_000_000, 0)], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].merge_operation.splits.len(), 3); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_planner_deduplicates_known_splits() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe.create_test_mailbox(); + let policy = make_policy(2); + + let planner = ParquetMergePlanner::new( + Vec::new(), + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (planner_mailbox, planner_handle) = universe.spawn_builder().spawn(planner); + + // Send 2 splits — should trigger a merge. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![ + make_split("s0", 1_000_000, 0), + make_split("s1", 1_000_000, 0), + ], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert_eq!(tasks.len(), 1); + + // Re-send the same splits — should be deduped, no new merge. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![ + make_split("s0", 1_000_000, 0), + make_split("s1", 1_000_000, 0), + ], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert!(tasks.is_empty(), "duplicate splits should be deduped"); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_planner_seeds_immature_splits_on_start() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe + .spawn_ctx() + .create_mailbox("downloader", QueueCapacity::Bounded(2)); + let policy = make_policy(2); + + // Seed with 2 immature splits — should trigger merge on startup. + let immature = vec![ + make_split("seed-0", 1_000_000, 0), + make_split("seed-1", 1_000_000, 0), + ]; + let planner = ParquetMergePlanner::new( + immature, + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (_planner_mailbox, _planner_handle) = universe.spawn_builder().spawn(planner); + + // Wait for the initial PlanParquetMerge to be processed. + let task_res = downloader_inbox + .recv_typed_message::() + .await; + assert!(task_res.is_ok(), "should merge seeded splits on startup"); + assert_eq!(task_res.unwrap().merge_operation.splits.len(), 2); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_planner_skips_mature_splits() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe.create_test_mailbox(); + // Policy with max_merge_ops=2: splits with num_merge_ops >= 2 are mature. + let policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new( + ParquetMergePolicyConfig { + merge_factor: 2, + max_merge_factor: 2, + max_merge_ops: 2, + target_split_size_bytes: 256 * 1024 * 1024, + maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 3, + }, + )); + + let planner = ParquetMergePlanner::new( + Vec::new(), + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (planner_mailbox, planner_handle) = universe.spawn_builder().spawn(planner); + + // Send 2 splits at num_merge_ops=2 — both mature, should not merge. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![ + make_split("mature-0", 1_000_000, 2), + make_split("mature-1", 1_000_000, 2), + ], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert!(tasks.is_empty(), "mature splits should not be merged"); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_planner_groups_by_scope() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe.create_test_mailbox(); + let policy = make_policy(2); + + let planner = ParquetMergePlanner::new( + Vec::new(), + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (planner_mailbox, planner_handle) = universe.spawn_builder().spawn(planner); + + // Send 2 splits in window 0 and 1 split in window 3600. + // Only window 0 should trigger a merge. + let mut w0_s0 = make_split("w0-s0", 1_000_000, 0); + w0_s0.window = Some(0..3600); + let mut w0_s1 = make_split("w0-s1", 1_000_000, 0); + w0_s1.window = Some(0..3600); + let mut w1_s0 = make_split("w1-s0", 1_000_000, 0); + w1_s0.window = Some(3600..7200); + + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![w0_s0, w0_s1, w1_s0], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert_eq!(tasks.len(), 1, "only window 0 has enough splits"); + assert_eq!(tasks[0].merge_operation.splits.len(), 2); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_planner_finalize_and_quit() { + let universe = Universe::with_accelerated_time(); + let (downloader_mailbox, downloader_inbox) = universe.create_test_mailbox(); + // Finalize should merge even with fewer than merge_factor splits. + let policy = make_policy(10); // High threshold — operations() won't fire. + + let planner = ParquetMergePlanner::new( + Vec::new(), + policy, + downloader_mailbox, + universe.get_or_spawn_one(), + ); + let (planner_mailbox, planner_handle) = universe.spawn_builder().spawn(planner); + + // Send 3 splits — not enough for merge_factor=10. + planner_mailbox + .send_message(ParquetNewSplits { + new_splits: vec![ + make_split("f0", 1_000_000, 0), + make_split("f1", 1_000_000, 0), + make_split("f2", 1_000_000, 0), + ], + }) + .await + .unwrap(); + planner_handle.process_pending_and_observe().await; + let tasks = downloader_inbox.drain_for_test_typed::(); + assert!(tasks.is_empty(), "normal operations should not fire"); + + // Send RunFinalizeMergePolicyAndQuit — should trigger finalize. + planner_mailbox + .send_message(RunFinalizeMergePolicyAndQuit) + .await + .unwrap(); + let (exit_status, _) = planner_handle.join().await; + assert!( + matches!(exit_status, ActorExitStatus::Success), + "planner should exit with Success after finalize" + ); + + let tasks = downloader_inbox.drain_for_test_typed::(); + assert_eq!(tasks.len(), 1, "finalize should merge the 3 splits"); + + universe.assert_quit().await; + } +} diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs new file mode 100644 index 00000000000..9376ba71fed --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_split_downloader.rs @@ -0,0 +1,68 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Stub actor for downloading Parquet files prior to merge. +//! +//! The full implementation (PR 3c) will download each input split's Parquet +//! file from object storage to a local temp directory, then forward a +//! `ParquetMergeScratch` to the `ParquetMergeExecutor`. +//! +//! This stub exists so that `MergeSchedulerService` can reference the +//! `Mailbox` type and the `ParquetMergePlanner` +//! can be tested end-to-end through the scheduler. + +use async_trait::async_trait; +use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, QueueCapacity}; +use tracing::debug; + +use super::ParquetMergeTask; + +/// Downloads Parquet split files from object storage for merge. +/// +/// Stub implementation — accepts `ParquetMergeTask` messages but does not +/// perform real downloads. The full implementation comes in PR 3c. +pub struct ParquetMergeSplitDownloader; + +#[async_trait] +impl Actor for ParquetMergeSplitDownloader { + type ObservableState = (); + + fn observable_state(&self) {} + + fn name(&self) -> String { + "ParquetMergeSplitDownloader".to_string() + } + + fn queue_capacity(&self) -> QueueCapacity { + QueueCapacity::Unbounded + } +} + +#[async_trait] +impl Handler for ParquetMergeSplitDownloader { + type Reply = (); + + async fn handle( + &mut self, + task: ParquetMergeTask, + _ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + debug!( + merge_split_id = %task.merge_operation.merge_split_id, + num_inputs = task.merge_operation.splits.len(), + "received parquet merge task (stub — real download in PR 3c)" + ); + Ok(()) + } +} diff --git a/quickwit/quickwit-indexing/src/actors/mod.rs b/quickwit/quickwit-indexing/src/actors/mod.rs index 8914c5c4c2c..88b2d414e5e 100644 --- a/quickwit/quickwit-indexing/src/actors/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/mod.rs @@ -49,6 +49,8 @@ pub use merge_pipeline::{ pub(crate) use merge_planner::MergePlanner; #[cfg(test)] pub(crate) use merge_planner::RunFinalizeMergePolicyAndQuit; +#[cfg(feature = "metrics")] +pub use merge_scheduler_service::schedule_parquet_merge; pub use merge_scheduler_service::{MergePermit, MergeSchedulerService, schedule_merge}; pub use merge_split_downloader::MergeSplitDownloader; #[cfg(feature = "metrics")]