Skip to content

Commit 7f5de6d

Browse files
authored
refactor list splits pagination, env var to skip initial seeding (#6385)
* refacotr pagination to one place, env var to skip initial fetch of splits * linter * fixes
1 parent 9c5a2f2 commit 7f5de6d

14 files changed

Lines changed: 336 additions & 223 deletions

File tree

quickwit/quickwit-datafusion/src/sources/metrics/metastore_provider.rs

Lines changed: 10 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,15 @@ use std::ops::Bound;
1818

1919
use async_trait::async_trait;
2020
use datafusion::error::Result as DFResult;
21-
use quickwit_metastore::{
22-
ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt,
23-
};
21+
use quickwit_metastore::{ListParquetSplitsQuery, list_parquet_splits_paginated};
2422
use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata};
25-
use quickwit_proto::metastore::{
26-
ListMetricsSplitsRequest, ListSketchSplitsRequest, MetastoreService, MetastoreServiceClient,
27-
};
23+
use quickwit_proto::metastore::MetastoreServiceClient;
2824
use quickwit_proto::types::IndexUid;
2925
use tracing::{debug, instrument};
3026

3127
use super::predicate::MetricsSplitQuery;
3228
use super::table_provider::MetricsSplitProvider;
3329

34-
/// Per-page split count for metastore split pagination.
35-
///
36-
/// The list split RPCs are unary, so we need to page client side.
37-
/// TODO: Use streaming RPCs for listing Parquet splits.
38-
const SPLIT_PAGE_SIZE: usize = 200;
39-
4030
/// `MetricsSplitProvider` backed by the Quickwit metastore RPC.
4131
#[derive(Debug, Clone)]
4232
pub struct MetastoreSplitProvider {
@@ -73,74 +63,16 @@ impl MetricsSplitProvider for MetastoreSplitProvider {
7363
)
7464
)]
7565
async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult<Vec<ParquetSplitMetadata>> {
76-
let mut metastore_query = to_metastore_query(&self.index_uid, query);
77-
metastore_query.limit = Some(SPLIT_PAGE_SIZE);
78-
79-
let mut splits: Vec<ParquetSplitMetadata> = Vec::new();
80-
let mut num_pages: usize = 0;
81-
82-
loop {
83-
let records = match self.split_kind {
84-
ParquetSplitKind::Metrics => {
85-
let request = ListMetricsSplitsRequest::try_from_query(
86-
self.index_uid.clone(),
87-
&metastore_query,
88-
)
89-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
90-
91-
self.metastore
92-
.clone()
93-
.list_metrics_splits(request)
94-
.await
95-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
96-
.deserialize_splits()
97-
.map_err(|err| {
98-
datafusion::error::DataFusionError::External(Box::new(err))
99-
})?
100-
}
101-
ParquetSplitKind::Sketches => {
102-
let request = ListSketchSplitsRequest::try_from_query(
103-
self.index_uid.clone(),
104-
&metastore_query,
105-
)
106-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
107-
108-
self.metastore
109-
.clone()
110-
.list_sketch_splits(request)
111-
.await
112-
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
113-
.deserialize_splits()
114-
.map_err(|err| {
115-
datafusion::error::DataFusionError::External(Box::new(err))
116-
})?
117-
}
118-
};
119-
120-
num_pages += 1;
121-
let page_len = records.len();
122-
123-
// The metastore guarantees only Published splits are returned because
124-
// `to_metastore_query` sets `split_states = vec![Published]`. No
125-
// client-side re-filter is needed here.
126-
splits.extend(records.into_iter().map(|record| record.metadata));
127-
128-
// A short page (fewer rows than we asked for) means we've drained
129-
// the result set. The Postgres backend orders by `split_id ASC`
130-
// and applies `split_id > $after_split_id` for the cursor, so the
131-
// last metadata's split_id is the correct next cursor.
132-
if page_len < SPLIT_PAGE_SIZE {
133-
break;
134-
}
135-
let Some(last) = splits.last() else { break };
136-
metastore_query.after_split_id = Some(last.split_id.as_str().to_string());
137-
}
66+
let metastore_query = to_metastore_query(&self.index_uid, query);
67+
let records =
68+
list_parquet_splits_paginated(self.metastore.clone(), self.split_kind, metastore_query)
69+
.await
70+
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
71+
let splits: Vec<ParquetSplitMetadata> =
72+
records.into_iter().map(|record| record.metadata).collect();
13873

13974
tracing::Span::current().record("num_splits", splits.len());
140-
debug!(
141-
num_splits = splits.len(),
142-
num_pages, "metastore returned splits"
143-
);
75+
debug!(num_splits = splits.len(), "metastore returned splits");
14476

14577
Ok(splits)
14678
}

quickwit/quickwit-index-management/src/parquet_garbage_collection.rs

Lines changed: 65 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ use std::time::Duration;
2020
use anyhow::Context;
2121
use quickwit_common::{Progress, is_sketches_index};
2222
use quickwit_metastore::{
23-
ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt,
24-
ParquetSplitRecord, SplitState,
23+
ListParquetSplitsQuery, PARQUET_SPLITS_PAGE_SIZE, ParquetSplitRecord, SplitState,
24+
list_parquet_splits_page, list_parquet_splits_paginated,
2525
};
26+
use quickwit_parquet_engine::split::ParquetSplitKind;
2627
use quickwit_proto::metastore::{
27-
DeleteMetricsSplitsRequest, DeleteSketchSplitsRequest, ListMetricsSplitsRequest,
28-
ListSketchSplitsRequest, MarkMetricsSplitsForDeletionRequest,
28+
DeleteMetricsSplitsRequest, DeleteSketchSplitsRequest, MarkMetricsSplitsForDeletionRequest,
2929
MarkSketchSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient,
3030
};
3131
use quickwit_proto::types::IndexUid;
@@ -69,9 +69,6 @@ impl ParquetSplitRemovalInfo {
6969
}
7070
}
7171

72-
/// Maximum number of parquet splits to process per paginated query.
73-
const DELETE_PARQUET_SPLITS_BATCH_SIZE: usize = 10_000;
74-
7572
/// Runs garbage collection for parquet splits.
7673
#[instrument(skip_all, fields(num_indexes=%indexes.len()))]
7774
pub async fn run_parquet_garbage_collect(
@@ -177,22 +174,13 @@ async fn list_parquet_splits(
177174
let query = ListParquetSplitsQuery::for_index(index_uid.clone())
178175
.with_split_states(states)
179176
.with_update_timestamp_lte(cutoff);
180-
181-
if is_sketches_index(&index_uid.index_id) {
182-
let request = ListSketchSplitsRequest::try_from_query(index_uid.clone(), &query)
183-
.context("failed to build list sketch splits request")?;
184-
protect_future(progress_opt, metastore.list_sketch_splits(request))
185-
.await?
186-
.deserialize_splits()
187-
.context("failed to deserialize sketch splits")
188-
} else {
189-
let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
190-
.context("failed to build list metrics splits request")?;
191-
protect_future(progress_opt, metastore.list_metrics_splits(request))
192-
.await?
193-
.deserialize_splits()
194-
.context("failed to deserialize metrics splits")
195-
}
177+
let kind = parquet_split_kind_for_index(index_uid);
178+
protect_future(
179+
progress_opt,
180+
list_parquet_splits_paginated(metastore.clone(), kind, query),
181+
)
182+
.await
183+
.context("failed to list parquet splits")
196184
}
197185

198186
/// Marks the given splits for deletion in the metastore, grouped by index.
@@ -216,26 +204,32 @@ async fn mark_splits_for_deletion(
216204

217205
for (index_uid_str, split_ids) in splits_by_index {
218206
let index_uid: IndexUid = index_uid_str.parse()?;
219-
info!(index_uid=%index_uid, count=%split_ids.len(), "marking stale staged parquet splits for deletion");
220-
221-
if is_sketches_index(&index_uid.index_id) {
222-
protect_future(
223-
progress_opt,
224-
metastore.mark_sketch_splits_for_deletion(MarkSketchSplitsForDeletionRequest {
225-
index_uid: Some(index_uid),
226-
split_ids,
227-
}),
228-
)
229-
.await?;
230-
} else {
231-
protect_future(
232-
progress_opt,
233-
metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest {
234-
index_uid: Some(index_uid),
235-
split_ids,
236-
}),
237-
)
238-
.await?;
207+
let is_sketch = is_sketches_index(&index_uid.index_id);
208+
for split_ids_chunk in split_ids.chunks(PARQUET_SPLITS_PAGE_SIZE) {
209+
let split_ids = split_ids_chunk.to_vec();
210+
info!(index_uid=%index_uid, count=%split_ids.len(), "marking stale staged parquet splits for deletion");
211+
212+
if is_sketch {
213+
protect_future(
214+
progress_opt,
215+
metastore.mark_sketch_splits_for_deletion(MarkSketchSplitsForDeletionRequest {
216+
index_uid: Some(index_uid.clone()),
217+
split_ids,
218+
}),
219+
)
220+
.await?;
221+
} else {
222+
protect_future(
223+
progress_opt,
224+
metastore.mark_metrics_splits_for_deletion(
225+
MarkMetricsSplitsForDeletionRequest {
226+
index_uid: Some(index_uid.clone()),
227+
split_ids,
228+
},
229+
),
230+
)
231+
.await?;
232+
}
239233
}
240234
}
241235

@@ -255,75 +249,39 @@ async fn delete_marked_parquet_splits(
255249

256250
let mut query = ListParquetSplitsQuery::for_index(index_uid.clone())
257251
.with_split_states(vec![SplitState::MarkedForDeletion])
258-
.with_update_timestamp_lte(deletion_cutoff)
259-
.with_limit(DELETE_PARQUET_SPLITS_BATCH_SIZE);
252+
.with_update_timestamp_lte(deletion_cutoff);
260253

261-
let is_sketch = is_sketches_index(&index_uid.index_id);
254+
let kind = parquet_split_kind_for_index(index_uid);
262255

263256
loop {
264257
let sleep_duration = if let Some(max_rate) = get_maximum_split_deletion_rate_per_sec() {
265-
Duration::from_secs(DELETE_PARQUET_SPLITS_BATCH_SIZE.div_ceil(max_rate) as u64)
258+
Duration::from_secs(PARQUET_SPLITS_PAGE_SIZE.div_ceil(max_rate) as u64)
266259
} else {
267260
Duration::default()
268261
};
269262
let sleep_future = tokio::time::sleep(sleep_duration);
270263

271-
let splits: Vec<ParquetSplitRecord> = if is_sketch {
272-
let request = match ListSketchSplitsRequest::try_from_query(index_uid.clone(), &query) {
273-
Ok(req) => req,
274-
Err(err) => {
275-
error!(index_uid=%index_uid, error=?err, "failed to build list sketch splits request");
276-
break;
277-
}
278-
};
279-
match protect_future(progress_opt, metastore.list_sketch_splits(request)).await {
280-
Ok(resp) => match resp.deserialize_splits() {
281-
Ok(splits) => splits,
282-
Err(err) => {
283-
error!(index_uid=%index_uid, error=?err, "failed to deserialize sketch splits");
284-
break;
285-
}
286-
},
287-
Err(err) => {
288-
error!(index_uid=%index_uid, error=?err, "failed to list sketch splits");
289-
break;
290-
}
291-
}
292-
} else {
293-
let request = match ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
294-
{
295-
Ok(req) => req,
296-
Err(err) => {
297-
error!(index_uid=%index_uid, error=?err, "failed to build list metrics splits request");
298-
break;
299-
}
300-
};
301-
match protect_future(progress_opt, metastore.list_metrics_splits(request)).await {
302-
Ok(resp) => match resp.deserialize_splits() {
303-
Ok(splits) => splits,
304-
Err(err) => {
305-
error!(index_uid=%index_uid, error=?err, "failed to deserialize metrics splits");
306-
break;
307-
}
308-
},
309-
Err(err) => {
310-
error!(index_uid=%index_uid, error=?err, "failed to list metrics splits");
311-
break;
312-
}
264+
let page = match protect_future(
265+
progress_opt,
266+
list_parquet_splits_page(metastore, kind, &mut query),
267+
)
268+
.await
269+
{
270+
Ok(page) => page,
271+
Err(err) => {
272+
error!(index_uid=%index_uid, error=?err, "failed to list parquet splits");
273+
break;
313274
}
314275
};
276+
let splits = page.splits;
315277

316-
// We page through the list of splits to delete using a limit and a `search_after` trick.
317-
// To detect if this is the last page, we check if the number of splits is less than the
318-
// limit.
319-
assert!(splits.len() <= DELETE_PARQUET_SPLITS_BATCH_SIZE);
320-
let splits_to_delete_possibly_remaining = splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE;
278+
// The metastore helper advanced the cursor when the page was full.
279+
assert!(splits.len() <= PARQUET_SPLITS_PAGE_SIZE);
280+
let splits_to_delete_possibly_remaining = page.has_next_page;
321281

322-
// Set split after which to search for the next loop.
323-
let Some(last_split) = splits.last() else {
282+
if splits.is_empty() {
324283
break;
325-
};
326-
query = query.with_after_split_id(last_split.metadata.split_id.to_string());
284+
}
327285

328286
let (batch_succeeded, batch_failed) = delete_parquet_splits_from_storage_and_metastore(
329287
metastore,
@@ -342,14 +300,22 @@ async fn delete_marked_parquet_splits(
342300
sleep_future.await;
343301
} else {
344302
// Stop the GC if this was the last batch.
345-
// We are guaranteed to make progress due to .with_after_split_id().
303+
// The paginator advanced the cursor before this batch was processed.
346304
break;
347305
}
348306
}
349307

350308
Ok(removal_info)
351309
}
352310

311+
fn parquet_split_kind_for_index(index_uid: &IndexUid) -> ParquetSplitKind {
312+
if is_sketches_index(&index_uid.index_id) {
313+
ParquetSplitKind::Sketches
314+
} else {
315+
ParquetSplitKind::Metrics
316+
}
317+
}
318+
353319
/// Deletes a single batch of parquet splits from storage and metastore.
354320
/// Returns (succeeded, failed).
355321
async fn delete_parquet_splits_from_storage_and_metastore(

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,10 @@ impl IndexingService {
730730
merge_scheduler_service: self.merge_scheduler_service.clone(),
731731
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
732732
event_broker: self.event_broker.clone(),
733+
skip_initial_seed: quickwit_common::get_bool_from_env(
734+
super::parquet_pipeline::PARQUET_MERGE_SKIP_INITIAL_SEED_ENV_KEY,
735+
false,
736+
),
733737
writer_config,
734738
use_streaming_engine: self.parquet_merge_use_streaming_engine,
735739
target_split_size_bytes: cfg.target_split_size_bytes,

quickwit/quickwit-indexing/src/actors/parquet_pipeline/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ pub use parquet_doc_processor::{
7171
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
7272
pub use parquet_merge_executor::ParquetMergeExecutor;
7373
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
74-
pub use parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams};
74+
pub use parquet_merge_pipeline::{
75+
PARQUET_MERGE_SKIP_INITIAL_SEED_ENV_KEY, ParquetMergePipeline, ParquetMergePipelineParams,
76+
};
7577
pub use parquet_merge_planner::ParquetMergePlanner;
7678
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;
7779
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};

quickwit/quickwit-indexing/src/actors/parquet_pipeline/parquet_indexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ impl Actor for ParquetIndexer {
584584
}
585585

586586
fn queue_capacity(&self) -> QueueCapacity {
587-
QueueCapacity::Bounded(10)
587+
QueueCapacity::Bounded(5)
588588
}
589589

590590
fn name(&self) -> String {

0 commit comments

Comments
 (0)