diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 9767bafa18..ed38f20f8c 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -1107,19 +1107,33 @@ impl Database for DatabaseKv { // Pull all available wake conditions from all registered wf names let (mut active_worker_ids, wake_keys) = tokio::try_join!( // Check - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(active_worker_subspace_start, active_worker_subspace_end).into() - }, - // This is Snapshot to reduce contention and exact timestamps are not important - Snapshot, - ) - .map(|res| { - let key = tx.unpack::(res?.key())?; - Ok(key.worker_id) - }) - .try_collect::>(), + async { + let entries = tx + .get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(active_worker_subspace_start, active_worker_subspace_end) + .into() + }, + // This is Snapshot to reduce contention and exact timestamps are not important + Snapshot, + ) + .try_collect::>() + .await?; + let mut worker_ids = Vec::with_capacity(entries.len()); + for entry in entries { + match tx.unpack::(entry.key()) { + Ok(key) => worker_ids.push(key.worker_id), + Err(err) => { + tracing::warn!( + ?err, + "skipping corrupt active worker index key" + ); + } + } + } + Ok::, anyhow::Error>(worker_ids) + }, async { let start = Instant::now(); let mut buffer = Vec::new();