Skip to content

Commit 63886de

Browse files
committed
Add CLI support for parquet GC, IndexService method, and OTEL metrics retention config
1 parent f05fa60 commit 63886de

File tree

3 files changed

+135
-8
lines changed

3 files changed

+135
-8
lines changed

quickwit/quickwit-cli/src/tool.rs

Lines changed: 73 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -660,26 +660,41 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
660660
get_resolvers(&config.storage_configs, &config.metastore_configs);
661661
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
662662
let mut index_service = IndexService::new(metastore, storage_resolver);
663+
664+
if quickwit_common::is_metrics_index(&args.index_id) {
665+
let removal_info = index_service
666+
.garbage_collect_parquet_index(&args.index_id, args.grace_period, args.dry_run)
667+
.await?;
668+
return print_parquet_gc_result(args.dry_run, removal_info);
669+
}
670+
663671
let removal_info = index_service
664672
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
665673
.await?;
674+
print_tantivy_gc_result(args.dry_run, removal_info)
675+
}
676+
677+
fn print_tantivy_gc_result(
678+
dry_run: bool,
679+
removal_info: quickwit_index_management::SplitRemovalInfo,
680+
) -> anyhow::Result<()> {
666681
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
667682
println!("No dangling files to garbage collect.");
668683
return Ok(());
669684
}
670685

671-
if args.dry_run {
686+
if dry_run {
672687
println!("The following files will be garbage collected.");
673-
for split_info in removal_info.removed_split_entries {
674-
println!(" - {}", split_info.file_name.display());
688+
for entry in &removal_info.removed_split_entries {
689+
println!(" - {}", entry.file_name.display());
675690
}
676691
return Ok(());
677692
}
678693

679694
if !removal_info.failed_splits.is_empty() {
680695
println!("The following splits were attempted to be removed, but failed.");
681-
for split_info in &removal_info.failed_splits {
682-
println!(" - {}", split_info.split_id);
696+
for split in &removal_info.failed_splits {
697+
println!(" - {}", split.split_id);
683698
}
684699
println!(
685700
"{} Splits were unable to be removed.",
@@ -690,7 +705,7 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
690705
let deleted_bytes: u64 = removal_info
691706
.removed_split_entries
692707
.iter()
693-
.map(|split_info| split_info.file_size_bytes.as_u64())
708+
.map(|s| s.file_size_bytes.as_u64())
694709
.sum();
695710
println!(
696711
"{}MB of storage garbage collected.",
@@ -702,9 +717,59 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
702717
"{} Index successfully garbage collected.",
703718
"✔".color(GREEN_COLOR)
704719
);
705-
} else if removal_info.removed_split_entries.is_empty()
706-
&& !removal_info.failed_splits.is_empty()
720+
} else if removal_info.removed_split_entries.is_empty() {
721+
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
722+
} else {
723+
println!(
724+
"{} Index partially garbage collected.",
725+
"✘".color(RED_COLOR)
726+
);
727+
}
728+
729+
Ok(())
730+
}
731+
732+
fn print_parquet_gc_result(
733+
dry_run: bool,
734+
removal_info: quickwit_index_management::ParquetSplitRemovalInfo,
735+
) -> anyhow::Result<()> {
736+
if removal_info.removed_parquet_splits_entries.is_empty()
737+
&& removal_info.failed_parquet_splits.is_empty()
707738
{
739+
println!("No dangling files to garbage collect.");
740+
return Ok(());
741+
}
742+
743+
if dry_run {
744+
println!("The following files will be garbage collected.");
745+
for entry in &removal_info.removed_parquet_splits_entries {
746+
println!(" - {}.parquet", entry.split_id);
747+
}
748+
return Ok(());
749+
}
750+
751+
if !removal_info.failed_parquet_splits.is_empty() {
752+
println!("The following splits were attempted to be removed, but failed.");
753+
for split in &removal_info.failed_parquet_splits {
754+
println!(" - {}", split.split_id);
755+
}
756+
println!(
757+
"{} Splits were unable to be removed.",
758+
removal_info.failed_parquet_splits.len()
759+
);
760+
}
761+
762+
println!(
763+
"{}MB of storage garbage collected.",
764+
removal_info.removed_bytes() / 1_000_000
765+
);
766+
767+
if removal_info.failed_parquet_splits.is_empty() {
768+
println!(
769+
"{} Index successfully garbage collected.",
770+
"✔".color(GREEN_COLOR)
771+
);
772+
} else if removal_info.removed_parquet_splits_entries.is_empty() {
708773
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
709774
} else {
710775
println!(

quickwit/quickwit-index-management/src/index.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::garbage_collection::{
4545
DeleteSplitsError, SplitRemovalInfo, delete_splits_from_storage_and_metastore,
4646
run_garbage_collect,
4747
};
48+
use crate::parquet_garbage_collection::{ParquetSplitRemovalInfo, run_parquet_garbage_collect};
4849

4950
#[derive(Error, Debug)]
5051
pub enum IndexServiceError {
@@ -405,6 +406,47 @@ impl IndexService {
405406
Ok(deleted_entries)
406407
}
407408

409+
/// Detect all dangling parquet splits and associated files from a metrics index
410+
/// and removes them.
411+
///
412+
/// * `index_id` - The target metrics index Id.
413+
/// * `grace_period` - Threshold period after which a staged split can be garbage collected.
414+
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
415+
pub async fn garbage_collect_parquet_index(
416+
&mut self,
417+
index_id: &str,
418+
grace_period: Duration,
419+
dry_run: bool,
420+
) -> anyhow::Result<ParquetSplitRemovalInfo> {
421+
let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string());
422+
let index_metadata = self
423+
.metastore
424+
.index_metadata(index_metadata_request)
425+
.await?
426+
.deserialize_index_metadata()?;
427+
let index_uid = index_metadata.index_uid.clone();
428+
let index_config = index_metadata.into_index_config();
429+
let storage = self
430+
.storage_resolver
431+
.resolve(&index_config.index_uri)
432+
.await?;
433+
434+
let deleted_entries = run_parquet_garbage_collect(
435+
HashMap::from([(index_uid, storage)]),
436+
self.metastore.clone(),
437+
grace_period,
438+
// deletion_grace_period of zero, so that a cli call directly deletes splits after
439+
// marking to be deleted.
440+
Duration::ZERO,
441+
dry_run,
442+
None,
443+
None,
444+
)
445+
.await?;
446+
447+
Ok(deleted_entries)
448+
}
449+
408450
/// Clears the index by applying the following actions:
409451
/// - mark all splits for deletion in the metastore.
410452
/// - delete the files of all splits marked for deletion using garbage collection.

quickwit/quickwit-opentelemetry/src/otlp/otel_metrics.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,29 @@ pub const OTEL_METRICS_INDEX_ID: &str = "otel-metrics-v0_9";
4848
/// directly and queries via DataFusion. The doc mapping is unused; this config
4949
/// exists only so the index can be registered in the metastore for source
5050
/// assignment and lifecycle management.
51+
///
52+
/// TODO: As a temporary hack, we are including a timestamp_field, so that
53+
/// we can pass the retention policy validation.
5154
const OTEL_METRICS_INDEX_CONFIG: &str = r#"
5255
version: 0.8
5356
5457
index_id: ${INDEX_ID}
5558
5659
doc_mapping:
5760
mode: dynamic
61+
field_mappings:
62+
- name: timestamp
63+
type: datetime
64+
fast: true
65+
timestamp_field: timestamp
5866
5967
indexing_settings:
6068
commit_timeout_secs: 15
6169
70+
retention:
71+
period: 30 days
72+
schedule: hourly
73+
6274
search_settings:
6375
default_search_fields: []
6476
"#;
@@ -579,6 +591,14 @@ mod tests {
579591
let index_config =
580592
OtlpGrpcMetricsService::index_config(&Uri::for_test("ram:///indexes")).unwrap();
581593
assert_eq!(index_config.index_id, OTEL_METRICS_INDEX_ID);
594+
let retention = index_config
595+
.retention_policy_opt
596+
.expect("retention policy should be set");
597+
assert_eq!(
598+
retention.retention_period().unwrap(),
599+
std::time::Duration::from_secs(30 * 24 * 3600)
600+
);
601+
assert_eq!(retention.evaluation_schedule, "hourly");
582602
}
583603

584604
#[tokio::test]

0 commit comments

Comments
 (0)