@@ -20,12 +20,12 @@ use std::time::Duration;
2020use anyhow:: Context ;
2121use quickwit_common:: { Progress , is_sketches_index} ;
2222use quickwit_metastore:: {
23- ListParquetSplitsQuery , ListParquetSplitsRequestExt , ListParquetSplitsResponseExt ,
24- ParquetSplitRecord , SplitState ,
23+ ListParquetSplitsQuery , PARQUET_SPLITS_PAGE_SIZE , ParquetSplitRecord , SplitState ,
24+ list_parquet_splits_page , list_parquet_splits_paginated ,
2525} ;
26+ use quickwit_parquet_engine:: split:: ParquetSplitKind ;
2627use quickwit_proto:: metastore:: {
27- DeleteMetricsSplitsRequest , DeleteSketchSplitsRequest , ListMetricsSplitsRequest ,
28- ListSketchSplitsRequest , MarkMetricsSplitsForDeletionRequest ,
28+ DeleteMetricsSplitsRequest , DeleteSketchSplitsRequest , MarkMetricsSplitsForDeletionRequest ,
2929 MarkSketchSplitsForDeletionRequest , MetastoreService , MetastoreServiceClient ,
3030} ;
3131use quickwit_proto:: types:: IndexUid ;
@@ -69,9 +69,6 @@ impl ParquetSplitRemovalInfo {
6969 }
7070}
7171
72- /// Maximum number of parquet splits to process per paginated query.
73- const DELETE_PARQUET_SPLITS_BATCH_SIZE : usize = 10_000 ;
74-
7572/// Runs garbage collection for parquet splits.
7673#[ instrument( skip_all, fields( num_indexes=%indexes. len( ) ) ) ]
7774pub async fn run_parquet_garbage_collect (
@@ -177,22 +174,13 @@ async fn list_parquet_splits(
177174 let query = ListParquetSplitsQuery :: for_index ( index_uid. clone ( ) )
178175 . with_split_states ( states)
179176 . with_update_timestamp_lte ( cutoff) ;
180-
181- if is_sketches_index ( & index_uid. index_id ) {
182- let request = ListSketchSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query)
183- . context ( "failed to build list sketch splits request" ) ?;
184- protect_future ( progress_opt, metastore. list_sketch_splits ( request) )
185- . await ?
186- . deserialize_splits ( )
187- . context ( "failed to deserialize sketch splits" )
188- } else {
189- let request = ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query)
190- . context ( "failed to build list metrics splits request" ) ?;
191- protect_future ( progress_opt, metastore. list_metrics_splits ( request) )
192- . await ?
193- . deserialize_splits ( )
194- . context ( "failed to deserialize metrics splits" )
195- }
177+ let kind = parquet_split_kind_for_index ( index_uid) ;
178+ protect_future (
179+ progress_opt,
180+ list_parquet_splits_paginated ( metastore. clone ( ) , kind, query) ,
181+ )
182+ . await
183+ . context ( "failed to list parquet splits" )
196184}
197185
198186/// Marks the given splits for deletion in the metastore, grouped by index.
@@ -216,26 +204,32 @@ async fn mark_splits_for_deletion(
216204
217205 for ( index_uid_str, split_ids) in splits_by_index {
218206 let index_uid: IndexUid = index_uid_str. parse ( ) ?;
219- info ! ( index_uid=%index_uid, count=%split_ids. len( ) , "marking stale staged parquet splits for deletion" ) ;
220-
221- if is_sketches_index ( & index_uid. index_id ) {
222- protect_future (
223- progress_opt,
224- metastore. mark_sketch_splits_for_deletion ( MarkSketchSplitsForDeletionRequest {
225- index_uid : Some ( index_uid) ,
226- split_ids,
227- } ) ,
228- )
229- . await ?;
230- } else {
231- protect_future (
232- progress_opt,
233- metastore. mark_metrics_splits_for_deletion ( MarkMetricsSplitsForDeletionRequest {
234- index_uid : Some ( index_uid) ,
235- split_ids,
236- } ) ,
237- )
238- . await ?;
207+ let is_sketch = is_sketches_index ( & index_uid. index_id ) ;
208+ for split_ids_chunk in split_ids. chunks ( PARQUET_SPLITS_PAGE_SIZE ) {
209+ let split_ids = split_ids_chunk. to_vec ( ) ;
210+ info ! ( index_uid=%index_uid, count=%split_ids. len( ) , "marking stale staged parquet splits for deletion" ) ;
211+
212+ if is_sketch {
213+ protect_future (
214+ progress_opt,
215+ metastore. mark_sketch_splits_for_deletion ( MarkSketchSplitsForDeletionRequest {
216+ index_uid : Some ( index_uid. clone ( ) ) ,
217+ split_ids,
218+ } ) ,
219+ )
220+ . await ?;
221+ } else {
222+ protect_future (
223+ progress_opt,
224+ metastore. mark_metrics_splits_for_deletion (
225+ MarkMetricsSplitsForDeletionRequest {
226+ index_uid : Some ( index_uid. clone ( ) ) ,
227+ split_ids,
228+ } ,
229+ ) ,
230+ )
231+ . await ?;
232+ }
239233 }
240234 }
241235
@@ -255,75 +249,39 @@ async fn delete_marked_parquet_splits(
255249
256250 let mut query = ListParquetSplitsQuery :: for_index ( index_uid. clone ( ) )
257251 . with_split_states ( vec ! [ SplitState :: MarkedForDeletion ] )
258- . with_update_timestamp_lte ( deletion_cutoff)
259- . with_limit ( DELETE_PARQUET_SPLITS_BATCH_SIZE ) ;
252+ . with_update_timestamp_lte ( deletion_cutoff) ;
260253
261- let is_sketch = is_sketches_index ( & index_uid. index_id ) ;
254+ let kind = parquet_split_kind_for_index ( index_uid) ;
262255
263256 loop {
264257 let sleep_duration = if let Some ( max_rate) = get_maximum_split_deletion_rate_per_sec ( ) {
265- Duration :: from_secs ( DELETE_PARQUET_SPLITS_BATCH_SIZE . div_ceil ( max_rate) as u64 )
258+ Duration :: from_secs ( PARQUET_SPLITS_PAGE_SIZE . div_ceil ( max_rate) as u64 )
266259 } else {
267260 Duration :: default ( )
268261 } ;
269262 let sleep_future = tokio:: time:: sleep ( sleep_duration) ;
270263
271- let splits: Vec < ParquetSplitRecord > = if is_sketch {
272- let request = match ListSketchSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query) {
273- Ok ( req) => req,
274- Err ( err) => {
275- error ! ( index_uid=%index_uid, error=?err, "failed to build list sketch splits request" ) ;
276- break ;
277- }
278- } ;
279- match protect_future ( progress_opt, metastore. list_sketch_splits ( request) ) . await {
280- Ok ( resp) => match resp. deserialize_splits ( ) {
281- Ok ( splits) => splits,
282- Err ( err) => {
283- error ! ( index_uid=%index_uid, error=?err, "failed to deserialize sketch splits" ) ;
284- break ;
285- }
286- } ,
287- Err ( err) => {
288- error ! ( index_uid=%index_uid, error=?err, "failed to list sketch splits" ) ;
289- break ;
290- }
291- }
292- } else {
293- let request = match ListMetricsSplitsRequest :: try_from_query ( index_uid. clone ( ) , & query)
294- {
295- Ok ( req) => req,
296- Err ( err) => {
297- error ! ( index_uid=%index_uid, error=?err, "failed to build list metrics splits request" ) ;
298- break ;
299- }
300- } ;
301- match protect_future ( progress_opt, metastore. list_metrics_splits ( request) ) . await {
302- Ok ( resp) => match resp. deserialize_splits ( ) {
303- Ok ( splits) => splits,
304- Err ( err) => {
305- error ! ( index_uid=%index_uid, error=?err, "failed to deserialize metrics splits" ) ;
306- break ;
307- }
308- } ,
309- Err ( err) => {
310- error ! ( index_uid=%index_uid, error=?err, "failed to list metrics splits" ) ;
311- break ;
312- }
264+ let page = match protect_future (
265+ progress_opt,
266+ list_parquet_splits_page ( metastore, kind, & mut query) ,
267+ )
268+ . await
269+ {
270+ Ok ( page) => page,
271+ Err ( err) => {
272+ error ! ( index_uid=%index_uid, error=?err, "failed to list parquet splits" ) ;
273+ break ;
313274 }
314275 } ;
276+ let splits = page. splits ;
315277
316- // We page through the list of splits to delete using a limit and a `search_after` trick.
317- // To detect if this is the last page, we check if the number of splits is less than the
318- // limit.
319- assert ! ( splits. len( ) <= DELETE_PARQUET_SPLITS_BATCH_SIZE ) ;
320- let splits_to_delete_possibly_remaining = splits. len ( ) == DELETE_PARQUET_SPLITS_BATCH_SIZE ;
278+ // The metastore helper advanced the cursor when the page was full.
279+ assert ! ( splits. len( ) <= PARQUET_SPLITS_PAGE_SIZE ) ;
280+ let splits_to_delete_possibly_remaining = page. has_next_page ;
321281
322- // Set split after which to search for the next loop.
323- let Some ( last_split) = splits. last ( ) else {
282+ if splits. is_empty ( ) {
324283 break ;
325- } ;
326- query = query. with_after_split_id ( last_split. metadata . split_id . to_string ( ) ) ;
284+ }
327285
328286 let ( batch_succeeded, batch_failed) = delete_parquet_splits_from_storage_and_metastore (
329287 metastore,
@@ -342,14 +300,22 @@ async fn delete_marked_parquet_splits(
342300 sleep_future. await ;
343301 } else {
344302 // Stop the GC if this was the last batch.
345- // We are guaranteed to make progress due to .with_after_split_id() .
303+ // The paginator advanced the cursor before this batch was processed .
346304 break ;
347305 }
348306 }
349307
350308 Ok ( removal_info)
351309}
352310
311+ fn parquet_split_kind_for_index ( index_uid : & IndexUid ) -> ParquetSplitKind {
312+ if is_sketches_index ( & index_uid. index_id ) {
313+ ParquetSplitKind :: Sketches
314+ } else {
315+ ParquetSplitKind :: Metrics
316+ }
317+ }
318+
353319/// Deletes a single batch of parquet splits from storage and metastore.
354320/// Returns (succeeded, failed).
355321async fn delete_parquet_splits_from_storage_and_metastore (
0 commit comments