Skip to content

Commit d51894c

Browse files
committed
parquet janitor
1 parent 1d0e5e8 commit d51894c

File tree

17 files changed

+1254
-174
lines changed

17 files changed

+1254
-174
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-cli/src/tool.rs

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -660,57 +660,80 @@ pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow:
660660
get_resolvers(&config.storage_configs, &config.metastore_configs);
661661
let metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
662662
let mut index_service = IndexService::new(metastore, storage_resolver);
663+
664+
if quickwit_common::is_metrics_index(&args.index_id) {
665+
let removal_info = index_service
666+
.garbage_collect_parquet_index(&args.index_id, args.grace_period, args.dry_run)
667+
.await?;
668+
let removed_files: Vec<String> = removal_info
669+
.removed_parquet_splits_entries
670+
.iter()
671+
.map(|s| format!("{}.parquet", s.split_id))
672+
.collect();
673+
let failed_ids: Vec<String> = removal_info
674+
.failed_parquet_splits
675+
.iter()
676+
.map(|s| s.split_id.clone())
677+
.collect();
678+
return print_gc_result(args.dry_run, removed_files, failed_ids, removal_info.removed_bytes());
679+
}
680+
663681
let removal_info = index_service
664682
.garbage_collect_index(&args.index_id, args.grace_period, args.dry_run)
665683
.await?;
666-
if removal_info.removed_split_entries.is_empty() && removal_info.failed_splits.is_empty() {
684+
let removed_files: Vec<String> = removal_info
685+
.removed_split_entries
686+
.iter()
687+
.map(|s| s.file_name.display().to_string())
688+
.collect();
689+
let failed_ids: Vec<String> = removal_info
690+
.failed_splits
691+
.iter()
692+
.map(|s| s.split_id.clone())
693+
.collect();
694+
let deleted_bytes: u64 = removal_info
695+
.removed_split_entries
696+
.iter()
697+
.map(|s| s.file_size_bytes.as_u64())
698+
.sum();
699+
print_gc_result(args.dry_run, removed_files, failed_ids, deleted_bytes)
700+
}
701+
702+
fn print_gc_result(
703+
dry_run: bool,
704+
removed_files: Vec<String>,
705+
failed_ids: Vec<String>,
706+
deleted_bytes: u64,
707+
) -> anyhow::Result<()> {
708+
if removed_files.is_empty() && failed_ids.is_empty() {
667709
println!("No dangling files to garbage collect.");
668710
return Ok(());
669711
}
670712

671-
if args.dry_run {
713+
if dry_run {
672714
println!("The following files will be garbage collected.");
673-
for split_info in removal_info.removed_split_entries {
674-
println!(" - {}", split_info.file_name.display());
715+
for file in &removed_files {
716+
println!(" - {file}");
675717
}
676718
return Ok(());
677719
}
678720

679-
if !removal_info.failed_splits.is_empty() {
721+
if !failed_ids.is_empty() {
680722
println!("The following splits were attempted to be removed, but failed.");
681-
for split_info in &removal_info.failed_splits {
682-
println!(" - {}", split_info.split_id);
723+
for split_id in &failed_ids {
724+
println!(" - {split_id}");
683725
}
684-
println!(
685-
"{} Splits were unable to be removed.",
686-
removal_info.failed_splits.len()
687-
);
726+
println!("{} Splits were unable to be removed.", failed_ids.len());
688727
}
689728

690-
let deleted_bytes: u64 = removal_info
691-
.removed_split_entries
692-
.iter()
693-
.map(|split_info| split_info.file_size_bytes.as_u64())
694-
.sum();
695-
println!(
696-
"{}MB of storage garbage collected.",
697-
deleted_bytes / 1_000_000
698-
);
729+
println!("{}MB of storage garbage collected.", deleted_bytes / 1_000_000);
699730

700-
if removal_info.failed_splits.is_empty() {
701-
println!(
702-
"{} Index successfully garbage collected.",
703-
"✔".color(GREEN_COLOR)
704-
);
705-
} else if removal_info.removed_split_entries.is_empty()
706-
&& !removal_info.failed_splits.is_empty()
707-
{
731+
if failed_ids.is_empty() {
732+
println!("{} Index successfully garbage collected.", "✔".color(GREEN_COLOR));
733+
} else if removed_files.is_empty() {
708734
println!("{} Failed to garbage collect index.", "✘".color(RED_COLOR));
709735
} else {
710-
println!(
711-
"{} Index partially garbage collected.",
712-
"✘".color(RED_COLOR)
713-
);
736+
println!("{} Index partially garbage collected.", "✘".color(RED_COLOR));
714737
}
715738

716739
Ok(())

quickwit/quickwit-index-management/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ quickwit-common = { workspace = true }
2424
quickwit-config = { workspace = true }
2525
quickwit-indexing = { workspace = true }
2626
quickwit-metastore = { workspace = true }
27+
quickwit-parquet-engine = { workspace = true }
2728
quickwit-proto = { workspace = true }
2829
quickwit-storage = { workspace = true }
2930

quickwit/quickwit-index-management/src/garbage_collection.rs

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct GcMetrics {
4646
pub failed_splits: IntCounter,
4747
}
4848

49-
trait RecordGcMetrics {
49+
pub(crate) trait RecordGcMetrics {
5050
fn record(&self, num_delete_splits: usize, num_deleted_bytes: u64, num_failed_splits: usize);
5151
}
5252

@@ -72,7 +72,7 @@ pub struct DeleteSplitsError {
7272
metastore_failures: Vec<SplitInfo>,
7373
}
7474

75-
async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
75+
pub(crate) async fn protect_future<Fut, T>(progress: Option<&Progress>, future: Fut) -> T
7676
where Fut: Future<Output = T> {
7777
match progress {
7878
None => future.await,
@@ -289,7 +289,7 @@ async fn list_splits_metadata(
289289

290290
/// In order to avoid hammering the load on the metastore, we can throttle the rate of split
291291
/// deletion by setting this environment variable.
292-
fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
292+
pub(crate) fn get_maximum_split_deletion_rate_per_sec() -> Option<usize> {
293293
static MAX_SPLIT_DELETION_RATE_PER_SEC: OnceLock<Option<usize>> = OnceLock::new();
294294
*MAX_SPLIT_DELETION_RATE_PER_SEC.get_or_init(|| {
295295
quickwit_common::get_from_env_opt::<usize>("QW_MAX_SPLIT_DELETION_RATE_PER_SEC", false)
@@ -408,6 +408,43 @@ async fn delete_splits_marked_for_deletion_several_indexes(
408408
split_removal_info
409409
}
410410

411+
/// A split normalized for storage deletion: just the id, path, and size.
412+
/// Used as the common currency between tantivy and parquet GC paths.
413+
pub(crate) struct SplitToDelete {
414+
pub split_id: String,
415+
pub path: PathBuf,
416+
pub size_bytes: u64,
417+
}
418+
419+
/// Deletes split files from storage and partitions into (succeeded, failed).
420+
///
421+
/// Returns the `BulkDeleteError` if there was a partial failure, so the caller
422+
/// can log it with index-specific context. Does NOT touch the metastore.
423+
pub(crate) async fn delete_split_files(
424+
storage: &dyn Storage,
425+
splits: Vec<SplitToDelete>,
426+
progress_opt: Option<&Progress>,
427+
) -> (Vec<SplitToDelete>, Vec<SplitToDelete>, Option<BulkDeleteError>) {
428+
if splits.is_empty() {
429+
return (Vec::new(), Vec::new(), None);
430+
}
431+
let paths: Vec<&Path> = splits.iter().map(|s| s.path.as_path()).collect();
432+
let result = protect_future(progress_opt, storage.bulk_delete(&paths)).await;
433+
434+
if let Some(progress) = progress_opt {
435+
progress.record_progress();
436+
}
437+
match result {
438+
Ok(()) => (splits, Vec::new(), None),
439+
Err(bulk_err) => {
440+
let success_paths: HashSet<&PathBuf> = bulk_err.successes.iter().collect();
441+
let (succeeded, failed) =
442+
splits.into_iter().partition(|s| success_paths.contains(&s.path));
443+
(succeeded, failed, Some(bulk_err))
444+
}
445+
}
446+
}
447+
411448
/// Delete a list of splits from the storage and the metastore.
412449
/// It should leave the index and the metastore in good state.
413450
///
@@ -424,49 +461,41 @@ pub async fn delete_splits_from_storage_and_metastore(
424461
progress_opt: Option<&Progress>,
425462
) -> Result<Vec<SplitInfo>, DeleteSplitsError> {
426463
let mut split_infos: HashMap<PathBuf, SplitInfo> = HashMap::with_capacity(splits.len());
427-
428464
for split in splits {
429465
let split_info = split.as_split_info();
430466
split_infos.insert(split_info.file_name.clone(), split_info);
431467
}
432-
let split_paths = split_infos
433-
.keys()
434-
.map(|split_path_buf| split_path_buf.as_path())
435-
.collect::<Vec<&Path>>();
436-
let delete_result = protect_future(progress_opt, storage.bulk_delete(&split_paths)).await;
437468

438-
if let Some(progress) = progress_opt {
439-
progress.record_progress();
440-
}
441-
let mut successes = Vec::with_capacity(split_infos.len());
469+
let splits_to_delete: Vec<SplitToDelete> = split_infos
470+
.values()
471+
.map(|info| SplitToDelete {
472+
split_id: info.split_id.clone(),
473+
path: info.file_name.clone(),
474+
size_bytes: info.file_size_bytes.as_u64(),
475+
})
476+
.collect();
477+
478+
let (succeeded_stds, failed_stds, storage_err) =
479+
delete_split_files(&*storage, splits_to_delete, progress_opt).await;
480+
481+
let successes: Vec<SplitInfo> = succeeded_stds.iter().map(|s| split_infos[&s.path].clone()).collect();
482+
let storage_failures: Vec<SplitInfo> = failed_stds.iter().map(|s| split_infos[&s.path].clone()).collect();
483+
442484
let mut storage_error: Option<BulkDeleteError> = None;
443-
let mut storage_failures = Vec::new();
444-
445-
match delete_result {
446-
Ok(_) => successes.extend(split_infos.into_values()),
447-
Err(bulk_delete_error) => {
448-
let success_split_paths: HashSet<&PathBuf> =
449-
bulk_delete_error.successes.iter().collect();
450-
for (split_path, split_info) in split_infos {
451-
if success_split_paths.contains(&split_path) {
452-
successes.push(split_info);
453-
} else {
454-
storage_failures.push(split_info);
455-
}
456-
}
457-
let failed_split_paths = storage_failures
458-
.iter()
459-
.map(|split_info| split_info.file_name.as_path())
460-
.collect::<Vec<_>>();
461-
error!(
462-
error=?bulk_delete_error.error,
463-
index_id=index_uid.index_id,
464-
"failed to delete split file(s) {:?} from storage",
465-
PrettySample::new(&failed_split_paths, 5),
466-
);
467-
storage_error = Some(bulk_delete_error);
468-
}
469-
};
485+
if let Some(bulk_delete_error) = storage_err {
486+
let failed_split_paths = storage_failures
487+
.iter()
488+
.map(|split_info| split_info.file_name.as_path())
489+
.collect::<Vec<_>>();
490+
error!(
491+
error=?bulk_delete_error.error,
492+
index_id=index_uid.index_id,
493+
"failed to delete split file(s) {:?} from storage",
494+
PrettySample::new(&failed_split_paths, 5),
495+
);
496+
storage_error = Some(bulk_delete_error);
497+
}
498+
470499
if !successes.is_empty() {
471500
let split_ids: Vec<SplitId> = successes
472501
.iter()
@@ -486,25 +515,23 @@ pub async fn delete_splits_from_storage_and_metastore(
486515
"failed to delete split(s) {:?} from metastore",
487516
PrettySample::new(&split_ids, 5),
488517
);
489-
let delete_splits_error = DeleteSplitsError {
518+
return Err(DeleteSplitsError {
490519
successes: Vec::new(),
491520
storage_error,
492521
storage_failures,
493522
metastore_error: Some(metastore_error),
494523
metastore_failures: successes,
495-
};
496-
return Err(delete_splits_error);
524+
});
497525
}
498526
}
499527
if !storage_failures.is_empty() {
500-
let delete_splits_error = DeleteSplitsError {
528+
return Err(DeleteSplitsError {
501529
successes,
502530
storage_error,
503531
storage_failures,
504532
metastore_error: None,
505533
metastore_failures: Vec::new(),
506-
};
507-
return Err(delete_splits_error);
534+
});
508535
}
509536
Ok(successes)
510537
}

quickwit/quickwit-index-management/src/index.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::garbage_collection::{
4545
DeleteSplitsError, SplitRemovalInfo, delete_splits_from_storage_and_metastore,
4646
run_garbage_collect,
4747
};
48+
use crate::parquet_garbage_collection::{ParquetSplitRemovalInfo, run_parquet_garbage_collect};
4849

4950
#[derive(Error, Debug)]
5051
pub enum IndexServiceError {
@@ -405,6 +406,45 @@ impl IndexService {
405406
Ok(deleted_entries)
406407
}
407408

409+
/// Detect all dangling parquet splits and associated files from a metrics index
410+
/// and removes them.
411+
///
412+
/// * `index_id` - The target metrics index Id.
413+
/// * `grace_period` - Threshold period after which a staged split can be garbage collected.
414+
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
415+
pub async fn garbage_collect_parquet_index(
416+
&mut self,
417+
index_id: &str,
418+
grace_period: Duration,
419+
dry_run: bool,
420+
) -> anyhow::Result<ParquetSplitRemovalInfo> {
421+
let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string());
422+
let index_metadata = self
423+
.metastore
424+
.index_metadata(index_metadata_request)
425+
.await?
426+
.deserialize_index_metadata()?;
427+
let index_uid = index_metadata.index_uid.clone();
428+
let index_config = index_metadata.into_index_config();
429+
let storage = self
430+
.storage_resolver
431+
.resolve(&index_config.index_uri)
432+
.await?;
433+
434+
let deleted_entries = run_parquet_garbage_collect(
435+
HashMap::from([(index_uid, storage)]),
436+
self.metastore.clone(),
437+
grace_period,
438+
Duration::ZERO,
439+
dry_run,
440+
None,
441+
None,
442+
)
443+
.await?;
444+
445+
Ok(deleted_entries)
446+
}
447+
408448
/// Clears the index by applying the following actions:
409449
/// - mark all splits for deletion in the metastore.
410450
/// - delete the files of all splits marked for deletion using garbage collection.

quickwit/quickwit-index-management/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414

1515
mod garbage_collection;
1616
mod index;
17+
mod parquet_garbage_collection;
1718

1819
pub use garbage_collection::{GcMetrics, run_garbage_collect};
1920
pub use index::{IndexService, IndexServiceError, clear_cache_directory, validate_storage_uri};
21+
pub use parquet_garbage_collection::{
22+
ParquetSplitInfo, ParquetSplitRemovalInfo, run_parquet_garbage_collect,
23+
};

0 commit comments

Comments
 (0)