Skip to content

Commit f05fa60

Browse files
committed
Integrate parquet GC and retention policy into janitor actors
1 parent 5569aef commit f05fa60

File tree

5 files changed

+394
-87
lines changed

5 files changed

+394
-87
lines changed

quickwit/quickwit-janitor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ quickwit-doc-mapper = { workspace = true }
3131
quickwit-index-management = { workspace = true }
3232
quickwit-indexing = { workspace = true }
3333
quickwit-metastore = { workspace = true }
34+
quickwit-parquet-engine = { workspace = true }
3435
quickwit-proto = { workspace = true }
3536
quickwit-query = { workspace = true }
3637
quickwit-search = { workspace = true }

quickwit/quickwit-janitor/src/actors/garbage_collector.rs

Lines changed: 234 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::{HashMap, HashSet};
16-
use std::path::Path;
15+
use std::collections::HashMap;
1716
use std::sync::Arc;
1817
use std::time::{Duration, Instant};
1918

2019
use async_trait::async_trait;
2120
use futures::{StreamExt, stream};
2221
use quickwit_actors::{Actor, ActorContext, Handler};
22+
use quickwit_common::is_metrics_index;
2323
use quickwit_common::shared_consts::split_deletion_grace_period;
24-
use quickwit_index_management::{GcMetrics, run_garbage_collect};
24+
use quickwit_index_management::{GcMetrics, run_garbage_collect, run_parquet_garbage_collect};
2525
use quickwit_metastore::ListIndexesMetadataResponseExt;
2626
use quickwit_proto::metastore::{
2727
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
@@ -35,6 +35,42 @@ use crate::metrics::JANITOR_METRICS;
3535

3636
const RUN_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
3737

38+
/// Result of a GC run (tantivy or parquet).
39+
struct GcRunResult {
40+
num_deleted_splits: usize,
41+
num_deleted_bytes: usize,
42+
num_failed: usize,
43+
sample_deleted_files: Vec<String>,
44+
}
45+
46+
impl GcRunResult {
47+
fn failed() -> Self {
48+
Self {
49+
num_deleted_splits: 0,
50+
num_deleted_bytes: 0,
51+
num_failed: 0,
52+
sample_deleted_files: Vec::new(),
53+
}
54+
}
55+
}
56+
57+
fn gc_metrics(split_type: &str) -> GcMetrics {
58+
GcMetrics {
59+
deleted_splits: JANITOR_METRICS
60+
.gc_deleted_splits
61+
.with_label_values(["success", split_type])
62+
.clone(),
63+
deleted_bytes: JANITOR_METRICS
64+
.gc_deleted_bytes
65+
.with_label_values([split_type])
66+
.clone(),
67+
failed_splits: JANITOR_METRICS
68+
.gc_deleted_splits
69+
.with_label_values(["error", split_type])
70+
.clone(),
71+
}
72+
}
73+
3874
/// Staged files needs to be deleted if there was a failure.
3975
/// TODO ideally we want clean up all staged splits every time we restart the indexing pipeline, but
4076
/// the grace period strategy should do the job for the moment.
@@ -77,14 +113,24 @@ impl GarbageCollector {
77113
}
78114
}
79115

116+
fn record_gc_result(&mut self, result: &GcRunResult, split_type: &str) {
117+
self.counters.num_failed_splits += result.num_failed;
118+
if result.num_deleted_splits > 0 {
119+
info!(
120+
"Janitor deleted {:?} and {} other {} splits.",
121+
result.sample_deleted_files, result.num_deleted_splits, split_type,
122+
);
123+
self.counters.num_deleted_files += result.num_deleted_splits;
124+
self.counters.num_deleted_bytes += result.num_deleted_bytes;
125+
}
126+
}
127+
80128
/// Gc Loop handler logic.
81129
/// Should not return an error to prevent the actor from crashing.
82130
async fn handle_inner(&mut self, ctx: &ActorContext<Self>) {
83131
debug!("loading indexes from the metastore");
84132
self.counters.num_passes += 1;
85133

86-
let start = Instant::now();
87-
88134
let response = match self
89135
.metastore
90136
.list_indexes_metadata(ListIndexesMetadataRequest::all())
@@ -106,84 +152,147 @@ impl GarbageCollector {
106152
info!("loaded {} indexes from the metastore", indexes.len());
107153

108154
let expected_count = indexes.len();
109-
let index_storages: HashMap<IndexUid, Arc<dyn Storage>> = stream::iter(indexes).filter_map(|index| {
155+
156+
// Resolve storages and split into tantivy vs parquet indexes.
157+
let mut tantivy_storages: HashMap<IndexUid, Arc<dyn Storage>> = HashMap::new();
158+
let mut parquet_storages: HashMap<IndexUid, Arc<dyn Storage>> = HashMap::new();
159+
160+
let resolved: Vec<_> = stream::iter(indexes).filter_map(|index| {
110161
let storage_resolver = self.storage_resolver.clone();
111162
async move {
112163
let index_uid = index.index_uid.clone();
113164
let index_uri = index.index_uri();
114165
let storage = match storage_resolver.resolve(index_uri).await {
115166
Ok(storage) => storage,
116167
Err(error) => {
117-
error!(index=%index.index_id(), error=?error, "failed to resolve the index storage Uri");
168+
error!(index=%index_uid.index_id, error=?error, "failed to resolve the index storage Uri");
118169
return None;
119170
}
120171
};
121172
Some((index_uid, storage))
122173
}}).collect()
123174
.await;
124175

125-
let storage_got_count = index_storages.len();
126-
self.counters.num_failed_storage_resolution += expected_count - storage_got_count;
176+
self.counters.num_failed_storage_resolution += expected_count - resolved.len();
177+
178+
for (index_uid, storage) in resolved {
179+
if is_metrics_index(&index_uid.index_id) {
180+
parquet_storages.insert(index_uid, storage);
181+
} else {
182+
tantivy_storages.insert(index_uid, storage);
183+
}
184+
}
127185

128-
if index_storages.is_empty() {
186+
if tantivy_storages.is_empty() && parquet_storages.is_empty() {
129187
return;
130188
}
131189

132-
let gc_res = run_garbage_collect(
133-
index_storages,
134-
self.metastore.clone(),
135-
STAGED_GRACE_PERIOD,
136-
split_deletion_grace_period(),
137-
false,
138-
Some(ctx.progress()),
139-
Some(GcMetrics {
140-
deleted_splits: JANITOR_METRICS
141-
.gc_deleted_splits
142-
.with_label_values(["success"])
143-
.clone(),
144-
deleted_bytes: JANITOR_METRICS.gc_deleted_bytes.clone(),
145-
failed_splits: JANITOR_METRICS
146-
.gc_deleted_splits
147-
.with_label_values(["error"])
148-
.clone(),
149-
}),
150-
)
151-
.await;
190+
// Run Tantivy GC
191+
if !tantivy_storages.is_empty() {
192+
let tantivy_start = Instant::now();
193+
let gc_res = run_garbage_collect(
194+
tantivy_storages,
195+
self.metastore.clone(),
196+
STAGED_GRACE_PERIOD,
197+
split_deletion_grace_period(),
198+
false,
199+
Some(ctx.progress()),
200+
Some(gc_metrics("tantivy")),
201+
)
202+
.await;
152203

153-
let run_duration = start.elapsed().as_secs();
154-
JANITOR_METRICS.gc_seconds_total.inc_by(run_duration);
204+
let tantivy_run_duration = tantivy_start.elapsed().as_secs();
205+
JANITOR_METRICS
206+
.gc_seconds_total
207+
.with_label_values(["tantivy"])
208+
.inc_by(tantivy_run_duration);
209+
210+
let result = match gc_res {
211+
Ok(removal_info) => {
212+
self.counters.num_successful_gc_run += 1;
213+
JANITOR_METRICS
214+
.gc_runs
215+
.with_label_values(["success", "tantivy"])
216+
.inc();
217+
GcRunResult {
218+
num_deleted_splits: removal_info.removed_split_entries.len(),
219+
num_deleted_bytes: removal_info
220+
.removed_split_entries
221+
.iter()
222+
.map(|e| e.file_size_bytes.as_u64() as usize)
223+
.sum(),
224+
num_failed: removal_info.failed_splits.len(),
225+
sample_deleted_files: removal_info
226+
.removed_split_entries
227+
.iter()
228+
.take(5)
229+
.map(|e| e.file_name.display().to_string())
230+
.collect(),
231+
}
232+
}
233+
Err(error) => {
234+
self.counters.num_failed_gc_run += 1;
235+
JANITOR_METRICS
236+
.gc_runs
237+
.with_label_values(["error", "tantivy"])
238+
.inc();
239+
error!(error=?error, "failed to run garbage collection");
240+
GcRunResult::failed()
241+
}
242+
};
243+
self.record_gc_result(&result, "tantivy");
244+
}
155245

156-
let deleted_file_entries = match gc_res {
157-
Ok(removal_info) => {
158-
self.counters.num_successful_gc_run += 1;
159-
JANITOR_METRICS.gc_runs.with_label_values(["success"]).inc();
160-
self.counters.num_failed_splits += removal_info.failed_splits.len();
161-
removal_info.removed_split_entries
162-
}
163-
Err(error) => {
164-
self.counters.num_failed_gc_run += 1;
165-
JANITOR_METRICS.gc_runs.with_label_values(["error"]).inc();
166-
error!(error=?error, "failed to run garbage collection");
167-
return;
168-
}
169-
};
170-
if !deleted_file_entries.is_empty() {
171-
let num_deleted_splits = deleted_file_entries.len();
172-
let num_deleted_bytes = deleted_file_entries
173-
.iter()
174-
.map(|entry| entry.file_size_bytes.as_u64() as usize)
175-
.sum::<usize>();
176-
let deleted_files: HashSet<&Path> = deleted_file_entries
177-
.iter()
178-
.map(|deleted_entry| deleted_entry.file_name.as_path())
179-
.take(5)
180-
.collect();
181-
info!(
182-
num_deleted_splits = num_deleted_splits,
183-
"Janitor deleted {:?} and {} other splits.", deleted_files, num_deleted_splits,
184-
);
185-
self.counters.num_deleted_files += num_deleted_splits;
186-
self.counters.num_deleted_bytes += num_deleted_bytes;
246+
// Run Parquet GC
247+
if !parquet_storages.is_empty() {
248+
let parquet_start = Instant::now();
249+
let gc_res = run_parquet_garbage_collect(
250+
parquet_storages,
251+
self.metastore.clone(),
252+
STAGED_GRACE_PERIOD,
253+
split_deletion_grace_period(),
254+
false,
255+
Some(ctx.progress()),
256+
Some(gc_metrics("parquet")),
257+
)
258+
.await;
259+
260+
let parquet_run_duration = parquet_start.elapsed().as_secs();
261+
JANITOR_METRICS
262+
.gc_seconds_total
263+
.with_label_values(["parquet"])
264+
.inc_by(parquet_run_duration);
265+
266+
let result = match gc_res {
267+
Ok(removal_info) => {
268+
self.counters.num_successful_gc_run += 1;
269+
JANITOR_METRICS
270+
.gc_runs
271+
.with_label_values(["success", "parquet"])
272+
.inc();
273+
GcRunResult {
274+
num_deleted_splits: removal_info.removed_split_count(),
275+
num_deleted_bytes: removal_info.removed_bytes() as usize,
276+
num_failed: removal_info.failed_split_count(),
277+
sample_deleted_files: removal_info
278+
.removed_parquet_splits_entries
279+
.iter()
280+
.take(5)
281+
.map(|e| format!("{}.parquet", e.split_id))
282+
.collect(),
283+
}
284+
}
285+
Err(error) => {
286+
self.counters.num_failed_gc_run += 1;
287+
JANITOR_METRICS
288+
.gc_runs
289+
.with_label_values(["error", "parquet"])
290+
.inc();
291+
error!(error=?error, "failed to run parquet garbage collection");
292+
GcRunResult::failed()
293+
}
294+
};
295+
self.record_gc_result(&result, "parquet");
187296
}
188297
}
189298
}
@@ -226,6 +335,7 @@ impl Handler<Loop> for GarbageCollector {
226335

227336
#[cfg(test)]
228337
mod tests {
338+
use std::collections::HashSet;
229339
use std::ops::Bound;
230340
use std::path::Path;
231341
use std::sync::Arc;
@@ -756,4 +866,65 @@ mod tests {
756866
assert_eq!(counters.num_failed_splits, 2000);
757867
universe.assert_quit().await;
758868
}
869+
870+
#[tokio::test]
871+
async fn test_garbage_collect_parquet_index() {
872+
use quickwit_metastore::ListMetricsSplitsResponseExt;
873+
use quickwit_parquet_engine::split::{
874+
MetricsSplitMetadata, MetricsSplitRecord, MetricsSplitState, SplitId, TimeRange,
875+
};
876+
use quickwit_proto::metastore::ListMetricsSplitsResponse;
877+
878+
let storage_resolver = StorageResolver::unconfigured();
879+
let mut mock = MockMetastoreService::new();
880+
881+
mock.expect_list_indexes_metadata().times(1).returning(|_| {
882+
let indexes = vec![IndexMetadata::for_test(
883+
"otel-metrics-v0_1",
884+
"ram://indexes/otel-metrics-v0_1",
885+
)];
886+
Ok(ListIndexesMetadataResponse::for_test(indexes))
887+
});
888+
889+
let marked_split = MetricsSplitRecord {
890+
state: MetricsSplitState::MarkedForDeletion,
891+
update_timestamp: 0,
892+
metadata: MetricsSplitMetadata::builder()
893+
.split_id(SplitId::new("metrics_aaa"))
894+
.index_uid("otel-metrics-v0_1:00000000000000000000000000")
895+
.time_range(TimeRange::new(1000, 2000))
896+
.num_rows(10)
897+
.size_bytes(512)
898+
.build(),
899+
};
900+
901+
// Phase 1 (staged): empty
902+
mock.expect_list_metrics_splits()
903+
.times(1)
904+
.returning(|_| Ok(ListMetricsSplitsResponse::empty()));
905+
// Phase 2 (marked): one split to delete
906+
let marked_resp = ListMetricsSplitsResponse::try_from_splits(&[marked_split]).unwrap();
907+
mock.expect_list_metrics_splits()
908+
.times(1)
909+
.returning(move |_| Ok(marked_resp.clone()));
910+
mock.expect_delete_metrics_splits()
911+
.times(1)
912+
.returning(|req| {
913+
assert_eq!(req.split_ids, ["metrics_aaa"]);
914+
Ok(EmptyResponse {})
915+
});
916+
917+
let garbage_collect_actor =
918+
GarbageCollector::new(MetastoreServiceClient::from_mock(mock), storage_resolver);
919+
let universe = Universe::with_accelerated_time();
920+
let (_mailbox, handle) = universe.spawn_builder().spawn(garbage_collect_actor);
921+
922+
let counters = handle.process_pending_and_observe().await.state;
923+
assert_eq!(counters.num_passes, 1);
924+
assert_eq!(counters.num_successful_gc_run, 1);
925+
assert_eq!(counters.num_failed_gc_run, 0);
926+
assert_eq!(counters.num_deleted_files, 1);
927+
assert_eq!(counters.num_failed_splits, 0);
928+
universe.assert_quit().await;
929+
}
759930
}

0 commit comments

Comments
 (0)