Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion quickwit/quickwit-compaction/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@ authors.workspace = true
license.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-common = { workspace = true }
quickwit-config = { workspace = true }
quickwit-doc-mapper = { workspace = true }
quickwit-indexing = { workspace = true }
quickwit-proto = { workspace = true }
quickwit-storage = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true, features = ["testsuite"] }
quickwit-storage = { workspace = true, features = ["testsuite"] }
206 changes: 183 additions & 23 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use quickwit_actors::{ActorHandle, Health, Supervisable};
use std::sync::Arc;

use quickwit_actors::{ActorContext, ActorHandle, Health, Supervisable};
use quickwit_common::KillSwitch;
use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_config::RetentionPolicy;
use quickwit_doc_mapper::DocMapper;
use quickwit_indexing::actors::{
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader,
MergeExecutor, MergeSplitDownloader, Packager, Publisher, Uploader, UploaderType,
};
use tracing::{debug, error};
use quickwit_indexing::merge_policy::MergeOperation;
use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox};
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::MetastoreServiceClient;
use tracing::{debug, error, info};

use crate::CompactorSupervisor;

pub struct CompactionPipelineHandles {
pub merge_split_downloader: ActorHandle<MergeSplitDownloader>,
Expand All @@ -35,22 +47,58 @@ pub struct CompactionPipelineHandles {
/// `check_actor_health()` and acts on the result (retry, reap, etc.).
pub struct CompactionPipeline {
pub task_id: String,
pub split_ids: Vec<String>,
pub retry_count: usize,
pub kill_switch: KillSwitch,
pub scratch_directory: TempDirectory,
pub handles: Option<CompactionPipelineHandles>,

// Per-task parameters.
pub merge_operation: MergeOperation,
pub pipeline_id: MergePipelineId,
pub doc_mapper: Arc<DocMapper>,
pub merge_policy: Arc<dyn quickwit_indexing::merge_policy::MergePolicy>,
pub retention_policy: Option<RetentionPolicy>,

// Shared resources (cloned from CompactorSupervisor).
pub metastore: MetastoreServiceClient,
pub split_store: IndexingSplitStore,
pub io_throughput_limiter: Option<Limiter>,
pub max_concurrent_split_uploads: usize,
pub event_broker: EventBroker,
}

impl CompactionPipeline {
pub fn new(task_id: String, split_ids: Vec<String>, scratch_directory: TempDirectory) -> Self {
#[allow(clippy::too_many_arguments)]
pub fn new(
task_id: String,
scratch_directory: TempDirectory,
merge_operation: MergeOperation,
pipeline_id: MergePipelineId,
doc_mapper: Arc<DocMapper>,
merge_policy: Arc<dyn quickwit_indexing::merge_policy::MergePolicy>,
retention_policy: Option<RetentionPolicy>,
metastore: MetastoreServiceClient,
split_store: IndexingSplitStore,
io_throughput_limiter: Option<Limiter>,
max_concurrent_split_uploads: usize,
event_broker: EventBroker,
) -> Self {
CompactionPipeline {
task_id,
split_ids,
retry_count: 0,
kill_switch: KillSwitch::default(),
scratch_directory,
handles: None,
merge_operation,
pipeline_id,
doc_mapper,
merge_policy,
retention_policy,
metastore,
split_store,
io_throughput_limiter,
max_concurrent_split_uploads,
event_broker,
}
}

Expand Down Expand Up @@ -132,32 +180,153 @@ impl CompactionPipeline {

/// Terminates the current actor chain, increments retry count, and
/// re-spawns. Downloaded splits remain on disk in the scratch directory.
pub async fn restart(&mut self) {
pub async fn restart(&mut self, ctx: &ActorContext<CompactorSupervisor>) {
self.terminate().await;
self.retry_count += 1;
self.spawn_pipeline();
if let Err(err) = self.spawn_pipeline(ctx) {
error!(task_id=%self.task_id, error=?err, "failed to respawn compaction pipeline");
}
}

/// Spawns the actor chain. Currently a no-op stub — actor chain
/// construction will be implemented in a later PR.
fn spawn_pipeline(&mut self) {
// TODO: construct MergeSplitDownloader → MergeExecutor → Packager →
// Uploader → Publisher actor chain and set self.handles.
/// Spawns the 5-actor merge execution chain and sends the `MergeOperation`
/// to the downloader to kick off execution.
fn spawn_pipeline(&mut self, ctx: &ActorContext<CompactorSupervisor>) -> anyhow::Result<()> {
self.kill_switch = ctx.kill_switch().child();

info!(
task_id=%self.task_id,
pipeline_id=%self.pipeline_id,
"spawning compaction pipeline"
);

// Publisher (no merge planner feedback, no source)
let merge_publisher = Publisher::new(
PublisherType::MergePublisher,
self.metastore.clone(),
None,
None,
);
let (merge_publisher_mailbox, merge_publisher_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_publisher);

// Uploader
let merge_uploader = Uploader::new(
UploaderType::MergeUploader,
self.metastore.clone(),
self.merge_policy.clone(),
self.retention_policy.clone(),
self.split_store.clone(),
SplitsUpdateMailbox::from(merge_publisher_mailbox),
self.max_concurrent_split_uploads,
self.event_broker.clone(),
);
let (merge_uploader_mailbox, merge_uploader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_uploader);

// Packager
let tag_fields = self.doc_mapper.tag_named_fields()?;
let merge_packager = Packager::new("MergePackager", tag_fields, merge_uploader_mailbox);
let (merge_packager_mailbox, merge_packager_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);

// MergeExecutor
let split_downloader_io_controls = IoControls::default()
.set_throughput_limiter_opt(self.io_throughput_limiter.clone())
.set_component("split_downloader_merge");
let merge_executor_io_controls =
split_downloader_io_controls.clone().set_component("merger");
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: throughput throttling


let merge_executor = MergeExecutor::new(
self.pipeline_id.clone(),
self.metastore.clone(),
self.doc_mapper.clone(),
merge_executor_io_controls,
merge_packager_mailbox,
);
let (merge_executor_mailbox, merge_executor_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_executor);

// MergeSplitDownloader
let merge_split_downloader = MergeSplitDownloader {
scratch_directory: self.scratch_directory.clone(),
split_store: self.split_store.clone(),
executor_mailbox: merge_executor_mailbox,
io_controls: split_downloader_io_controls,
};
let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx
.spawn_actor()
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_split_downloader);

// Kick off the pipeline.
merge_split_downloader_mailbox
.try_send_message(self.merge_operation.clone())
.map_err(|err| {
anyhow::anyhow!("failed to send merge operation to downloader: {err:?}")
})?;

self.handles = Some(CompactionPipelineHandles {
merge_split_downloader: merge_split_downloader_handle,
merge_executor: merge_executor_handle,
merge_packager: merge_packager_handle,
merge_uploader: merge_uploader_handle,
merge_publisher: merge_publisher_handle,
});

Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use quickwit_actors::Health;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_doc_mapper::default_doc_mapper_for_test;
use quickwit_indexing::IndexingSplitStore;
use quickwit_indexing::merge_policy::{MergeOperation, default_merge_policy};
use quickwit_metastore::SplitMetadata;
use quickwit_proto::indexing::MergePipelineId;
use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService};
use quickwit_proto::types::{IndexUid, NodeId};
use quickwit_storage::RamStorage;

use super::CompactionPipeline;

fn test_pipeline() -> CompactionPipeline {
let storage = Arc::new(RamStorage::default());
let split_store = IndexingSplitStore::create_without_local_store_for_test(storage);
let metastore = MetastoreServiceClient::from_mock(MockMetastoreService::new());
let splits = vec![SplitMetadata::for_test("split-1".to_string())];
let merge_operation = MergeOperation::new_merge_operation(splits);
let pipeline_id = MergePipelineId {
node_id: NodeId::from("test-node"),
index_uid: IndexUid::for_test("test-index", 0),
source_id: "test-source".to_string(),
};
CompactionPipeline::new(
"test-task".to_string(),
vec!["split-1".to_string(), "split-2".to_string()],
TempDirectory::for_test(),
merge_operation,
pipeline_id,
Arc::new(default_doc_mapper_for_test()),
default_merge_policy(),
None,
metastore,
split_store,
None,
2,
EventBroker::default(),
)
}

Expand All @@ -171,16 +340,7 @@ mod tests {
#[tokio::test]
async fn test_pipeline_terminate_without_handles() {
let mut pipeline = test_pipeline();
// Should not panic when there are no handles.
pipeline.terminate().await;
assert!(pipeline.handles.is_none());
}

#[tokio::test]
async fn test_pipeline_restart_increments_retry_count() {
let mut pipeline = test_pipeline();
assert_eq!(pipeline.retry_count, 0);
pipeline.restart().await;
assert_eq!(pipeline.retry_count, 1);
}
}
27 changes: 11 additions & 16 deletions quickwit/quickwit-compaction/src/compactor_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl CompactorSupervisor {
}
}

async fn supervise(&mut self) {
async fn supervise(&mut self, ctx: &ActorContext<Self>) {
for slot in &mut self.pipelines {
let Some(pipeline) = slot else {
continue;
Expand All @@ -114,7 +114,7 @@ impl CompactorSupervisor {
retry_count=%pipeline.retry_count,
"retrying compaction pipeline"
);
pipeline.restart().await;
pipeline.restart(ctx).await;
} else {
error!(
task_id=%pipeline.task_id,
Expand Down Expand Up @@ -160,7 +160,7 @@ impl Handler<SuperviseLoop> for CompactorSupervisor {
_msg: SuperviseLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.supervise().await;
self.supervise(ctx).await;
ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, SuperviseLoop);
Ok(())
}
Expand All @@ -176,7 +176,6 @@ mod tests {
use quickwit_storage::{RamStorage, StorageResolver};

use super::*;
use crate::compaction_pipeline::CompactionPipeline;

fn test_supervisor(num_slots: usize) -> CompactorSupervisor {
let storage = Arc::new(RamStorage::default());
Expand Down Expand Up @@ -208,17 +207,13 @@ mod tests {
#[tokio::test]
async fn test_supervisor_supervise_reaps_no_handle_pipelines() {
// A pipeline with no handles returns Healthy, so it stays in its slot.
let mut supervisor = test_supervisor(2);
let pipeline = CompactionPipeline::new(
"task-1".to_string(),
vec!["split-1".to_string()],
TempDirectory::for_test(),
);
supervisor.pipelines[0] = Some(pipeline);
supervisor.supervise().await;
// Pipeline has no handles → Healthy → not reaped.
assert!(supervisor.pipelines[0].is_some());
assert_eq!(supervisor.num_completed_tasks, 0);
assert_eq!(supervisor.num_failed_tasks, 0);
// We spawn the supervisor as an actor so we can get a context for supervise().
let universe = Universe::with_accelerated_time();
let supervisor = test_supervisor(2);
let (_mailbox, handle) = universe.spawn_builder().spawn(supervisor);
// Let the supervisor run one supervision loop (it schedules SuperviseLoop on init).
let obs = handle.process_pending_and_observe().await;
assert_eq!(obs.obs_type, quickwit_actors::ObservationType::Alive);
universe.assert_quit().await;
}
}
5 changes: 3 additions & 2 deletions quickwit/quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ async fn test_merge_executor_controlled_directory_kill_switch() -> anyhow::Resul
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap());
}
let merge_operation = MergeOperation::new_merge_operation(split_metadatas);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation);
let merge_task = MergeTask::from_merge_operation_for_test(merge_operation.clone());
let merge_scratch = MergeScratch {
merge_task,
merge_operation,
merge_task: Some(merge_task),
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs,
Expand Down
Loading
Loading