Skip to content

Commit 10f40a2

Browse files
authored
CLI to merge mature splits (#29)
* Simple plan considering splits within days * Improve logging * Add index filter * Serve metrics * Fix clippy * Address all review comments * Hang when done * Ensure doc mapping changes don't break merges * Add integration test * Include secondary time in merge oportunity evaluation
1 parent 17a1ba1 commit 10f40a2

8 files changed

Lines changed: 1587 additions & 22 deletions

File tree

quickwit/quickwit-cli/src/tool.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use quickwit_config::{
3939
use quickwit_index_management::{IndexService, clear_cache_directory};
4040
use quickwit_indexing::IndexingPipeline;
4141
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
42+
use quickwit_indexing::mature_merge::{MatureMergeConfig, merge_mature_all_indexes};
4243
use quickwit_indexing::models::{
4344
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
4445
};
@@ -163,6 +164,56 @@ pub fn build_tool_command() -> Command {
163164
.required(true),
164165
])
165166
)
167+
.subcommand(
168+
Command::new("merge-mature")
169+
.display_order(10)
170+
.about("Merges mature splits across all indexes and nodes.")
171+
.long_about(
172+
"Scans indexes for merge opportunities in mature Published splits. Considers \
173+
opportunities across all origin nodes and sources. Runs once and exits."
174+
)
175+
.args(&[
176+
arg!(--"dry-run"
177+
"Prints the planned merge operations without executing them.")
178+
.required(false),
179+
arg!(--"max-concurrent-merges" <MAX_CONCURRENT_MERGES>
180+
"Maximum number of merges to run concurrently (default: 10).")
181+
.display_order(1)
182+
.required(false),
183+
arg!(--"retention-safety-buffer-days" <RETENTION_SAFETY_BUFFER_DAYS>
184+
"Splits within this many days of the retention cutoff are excluded (default: 5).")
185+
.display_order(2)
186+
.required(false),
187+
arg!(--"min-merge-group-size" <MIN_MERGE_GROUP_SIZE>
188+
"Minimum number of splits in a group to trigger a merge (default: 5).")
189+
.display_order(3)
190+
.required(false),
191+
arg!(--"input-split-max-num-docs" <INPUT_SPLIT_MAX_NUM_DOCS>
192+
"Maximum number of docs in a split for it to be eligible (default: 10_000).")
193+
.display_order(4)
194+
.required(false),
195+
arg!(--"max-merge-group-size" <MAX_MERGE_GROUP_SIZE>
196+
"Maximum number of splits per merge operation (default: 100).")
197+
.display_order(5)
198+
.required(false),
199+
arg!(--"split-target-num-docs" <SPLIT_TARGET_NUM_DOCS>
200+
"Maximum total docs per merge operation (default: 5_000_000).")
201+
.display_order(6)
202+
.required(false),
203+
arg!(--"index-parallelism" <INDEX_PARALLELISM>
204+
"Number of indexes processed concurrently (default: 50).")
205+
.display_order(7)
206+
.required(false),
207+
arg!(--"index-id-patterns" <INDEX_ID_PATTERNS>
208+
"Comma-separated list of index ID patterns to include (default: '*').")
209+
.display_order(8)
210+
.required(false),
211+
arg!(--"metrics"
212+
"Expose Prometheus metrics on the REST listen address during the run.")
213+
.display_order(9)
214+
.required(false),
215+
])
216+
)
166217
.arg_required_else_help(true)
167218
}
168219

@@ -207,6 +258,13 @@ pub struct MergeArgs {
207258
pub source_id: SourceId,
208259
}
209260

261+
#[derive(Debug, Eq, PartialEq)]
262+
pub struct MatureMergeArgs {
263+
pub config_uri: Uri,
264+
pub merge_config: MatureMergeConfig,
265+
pub serve_metrics: bool,
266+
}
267+
210268
#[derive(Debug, Eq, PartialEq)]
211269
pub struct ExtractSplitArgs {
212270
pub config_uri: Uri,
@@ -221,6 +279,7 @@ pub enum ToolCliCommand {
221279
LocalIngest(LocalIngestDocsArgs),
222280
LocalSearch(LocalSearchArgs),
223281
Merge(MergeArgs),
282+
MatureMerge(MatureMergeArgs),
224283
ExtractSplit(ExtractSplitArgs),
225284
}
226285

@@ -234,6 +293,7 @@ impl ToolCliCommand {
234293
"local-ingest" => Self::parse_local_ingest_args(submatches),
235294
"local-search" => Self::parse_local_search_args(submatches),
236295
"merge" => Self::parse_merge_args(submatches),
296+
"merge-mature" => Self::parse_mature_merge_args(submatches),
237297
"extract-split" => Self::parse_extract_split_args(submatches),
238298
_ => bail!("unknown tool subcommand `{subcommand}`"),
239299
}
@@ -385,12 +445,84 @@ impl ToolCliCommand {
385445
}))
386446
}
387447

448+
fn parse_mature_merge_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
449+
let config_uri = matches
450+
.remove_one::<String>("config")
451+
.map(|uri_str| Uri::from_str(&uri_str))
452+
.expect("`config` should be a required arg.")?;
453+
let defaults = MatureMergeConfig::default();
454+
let dry_run = matches.get_flag("dry-run");
455+
let max_concurrent_merges = matches
456+
.remove_one::<String>("max-concurrent-merges")
457+
.map(|s| s.parse::<usize>())
458+
.transpose()?
459+
.unwrap_or(defaults.max_concurrent_merges);
460+
let retention_safety_buffer_days = matches
461+
.remove_one::<String>("retention-safety-buffer-days")
462+
.map(|s| s.parse::<u64>())
463+
.transpose()?
464+
.unwrap_or(defaults.retention_safety_buffer_days);
465+
let min_merge_group_size = matches
466+
.remove_one::<String>("min-merge-group-size")
467+
.map(|s| s.parse::<usize>())
468+
.transpose()?
469+
.unwrap_or(defaults.min_merge_group_size);
470+
let input_split_max_num_docs = matches
471+
.remove_one::<String>("input-split-max-num-docs")
472+
.map(|s| s.parse::<usize>())
473+
.transpose()?
474+
.unwrap_or(defaults.input_split_max_num_docs);
475+
let max_merge_group_size = matches
476+
.remove_one::<String>("max-merge-group-size")
477+
.map(|s| s.parse::<usize>())
478+
.transpose()?
479+
.unwrap_or(defaults.max_merge_group_size);
480+
let split_target_num_docs = matches
481+
.remove_one::<String>("split-target-num-docs")
482+
.map(|s| s.parse::<usize>())
483+
.transpose()?
484+
.unwrap_or(defaults.split_target_num_docs);
485+
let index_parallelism = matches
486+
.remove_one::<String>("index-parallelism")
487+
.map(|s| s.parse::<usize>())
488+
.transpose()?
489+
.unwrap_or(defaults.index_parallelism);
490+
let index_id_patterns = matches
491+
.remove_one::<String>("index-id-patterns")
492+
.map(|s| s.split(',').map(|p| p.trim().to_string()).collect())
493+
.unwrap_or(defaults.index_id_patterns);
494+
let serve_metrics = matches.get_flag("metrics");
495+
496+
if max_concurrent_merges == 0 {
497+
bail!("`max-concurrent-merges` must be greater than or equal to 1.");
498+
}
499+
if index_parallelism == 0 {
500+
bail!("`index-parallelism` must be greater than or equal to 1.");
501+
}
502+
Ok(Self::MatureMerge(MatureMergeArgs {
503+
config_uri,
504+
serve_metrics,
505+
merge_config: MatureMergeConfig {
506+
dry_run,
507+
max_concurrent_merges,
508+
retention_safety_buffer_days,
509+
min_merge_group_size,
510+
input_split_max_num_docs,
511+
max_merge_group_size,
512+
split_target_num_docs,
513+
index_parallelism,
514+
index_id_patterns,
515+
},
516+
}))
517+
}
518+
388519
pub async fn execute(self) -> anyhow::Result<()> {
389520
match self {
390521
Self::GarbageCollect(args) => garbage_collect_index_cli(args).await,
391522
Self::LocalIngest(args) => local_ingest_docs_cli(args).await,
392523
Self::LocalSearch(args) => local_search_cli(args).await,
393524
Self::Merge(args) => merge_cli(args).await,
525+
Self::MatureMerge(args) => merge_mature_cli(args).await,
394526
Self::ExtractSplit(args) => extract_split_cli(args).await,
395527
}
396528
}
@@ -651,6 +783,43 @@ pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
651783
Ok(())
652784
}
653785

786+
pub async fn merge_mature_cli(args: MatureMergeArgs) -> anyhow::Result<()> {
787+
debug!(args=?args, "merge-mature");
788+
info!(merge_config=?args.merge_config, "merge-mature configuration");
789+
println!("❯ Scanning all indexes for mature merge opportunities...");
790+
let config = load_node_config(&args.config_uri).await?;
791+
let (storage_resolver, metastore_resolver) =
792+
get_resolvers(&config.storage_configs, &config.metastore_configs);
793+
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
794+
795+
let runtimes_config = RuntimesConfig::default();
796+
start_actor_runtimes(
797+
runtimes_config,
798+
&HashSet::from_iter([QuickwitService::Indexer]),
799+
)?;
800+
801+
if args.serve_metrics {
802+
let metrics_addr = config.rest_config.listen_addr;
803+
tokio::spawn(serve_metrics(metrics_addr));
804+
}
805+
806+
merge_mature_all_indexes(
807+
metastore,
808+
storage_resolver,
809+
&config.data_dir_path,
810+
args.merge_config.clone(),
811+
config.node_id,
812+
)
813+
.await?;
814+
815+
if !args.merge_config.dry_run {
816+
info!("mature splits successfully merged, waiting for explicit termination signal");
817+
tokio::time::sleep(Duration::MAX).await;
818+
}
819+
820+
Ok(())
821+
}
822+
654823
pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> {
655824
debug!(args=?args, "garbage-collect-index");
656825
println!("❯ Garbage collecting index...");
@@ -955,3 +1124,48 @@ async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
9551124

9561125
Ok(cluster)
9571126
}
1127+
1128+
/// A shortcut to expose the metrics without loading the whole quickwit_serve
1129+
/// machinery.
1130+
async fn serve_metrics(addr: std::net::SocketAddr) {
1131+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
1132+
let listener = match tokio::net::TcpListener::bind(addr).await {
1133+
Ok(l) => l,
1134+
Err(err) => {
1135+
tracing::warn!("metrics server could not bind to {addr}: {err}");
1136+
return;
1137+
}
1138+
};
1139+
tracing::info!("metrics server listening on http://{addr}/metrics");
1140+
loop {
1141+
let Ok((mut stream, _peer)) = listener.accept().await else {
1142+
continue;
1143+
};
1144+
tokio::spawn(async move {
1145+
let mut buf = [0u8; 4096];
1146+
let n = match stream.read(&mut buf).await {
1147+
Ok(n) => n,
1148+
Err(_) => return,
1149+
};
1150+
let request = std::str::from_utf8(&buf[..n]).unwrap_or("");
1151+
let is_metrics = request.starts_with("GET /metrics");
1152+
let (status, body) = if is_metrics {
1153+
match quickwit_common::metrics::metrics_text_payload() {
1154+
Ok(payload) => ("200 OK", payload),
1155+
Err(e) => {
1156+
tracing::error!("failed to encode prometheus metrics: {e}");
1157+
("500 Internal Server Error", String::new())
1158+
}
1159+
}
1160+
} else {
1161+
("404 Not Found", String::new())
1162+
};
1163+
let response = format!(
1164+
"HTTP/1.1 {status}\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: \
1165+
{}\r\nConnection: close\r\n\r\n{body}",
1166+
body.len()
1167+
);
1168+
let _ = stream.write_all(response.as_bytes()).await;
1169+
});
1170+
}
1171+
}

0 commit comments

Comments
 (0)