Skip to content

Commit 894fad0

Browse files
committed
tweaks
1 parent c912960 commit 894fad0

2 files changed

Lines changed: 28 additions & 2 deletions

File tree

quickwit/quickwit-indexing/src/actors/indexing_service.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -789,8 +789,7 @@ impl IndexingService {
789789
plan_request: ApplyIndexingPlanRequest,
790790
ctx: &ActorContext<Self>,
791791
) -> Result<(), IndexingError> {
792-
let tasks = &plan_request.indexing_tasks;
793-
let pipeline_diff = self.compute_pipeline_diff(tasks);
792+
let pipeline_diff = self.compute_pipeline_diff(&plan_request.indexing_tasks);
794793

795794
if !pipeline_diff.pipelines_to_shutdown.is_empty() {
796795
self.shutdown_pipelines(&pipeline_diff.pipelines_to_shutdown)

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,6 +1436,11 @@ impl MetastoreService for PostgresqlMetastore {
14361436
.bind(&request.publish_token)
14371437
.fetch_all(&self.connection_pool)
14381438
.await?;
1439+
1440+
if pg_shards.len() != request.shard_ids.len() {
1441+
warn_on_unacquired_shards(&request, &pg_shards);
1442+
}
1443+
14391444
let acquired_shards = pg_shards
14401445
.into_iter()
14411446
.map(|pg_shard| pg_shard.into())
@@ -2953,6 +2958,28 @@ impl PostgresqlMetastore {
29532958
}
29542959
}
29552960

2961+
/// Best-effort diagnostics for the acquire error path: logs the shards from `request` that were not
2962+
/// acquired — those absent from `acquired_pg_shards` because a more recent publish token owns them,
2963+
/// or because they no longer exist. Does not touch the database.
2964+
fn warn_on_unacquired_shards(request: &AcquireShardsRequest, acquired_pg_shards: &[PgShard]) {
2965+
let not_acquired_shard_ids: Vec<&ShardId> = request
2966+
.shard_ids
2967+
.iter()
2968+
.filter(|shard_id| {
2969+
!acquired_pg_shards
2970+
.iter()
2971+
.any(|pg_shard| &pg_shard.shard_id == *shard_id)
2972+
})
2973+
.collect();
2974+
warn!(
2975+
index_uid=%request.index_uid(),
2976+
source_id=%request.source_id,
2977+
shard_ids=?not_acquired_shard_ids,
2978+
publish_token=%request.publish_token,
2979+
"could not acquire shards: held by a more recent publish token, or no longer present"
2980+
);
2981+
}
2982+
29562983
async fn open_or_fetch_shard<'e>(
29572984
executor: impl Executor<'e, Database = Postgres> + Clone,
29582985
subrequest: &OpenShardSubrequest,

0 commit comments

Comments
 (0)