Skip to content

Commit 1a47f5e

Browse files
committed
fix(gasoline): skip corrupt active worker indexes
1 parent db4224a commit 1a47f5e

1 file changed

Lines changed: 27 additions & 13 deletions

File tree

  • engine/packages/gasoline/src/db/kv

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,19 +1107,33 @@ impl Database for DatabaseKv {
11071107
// Pull all available wake conditions from all registered wf names
11081108
let (mut active_worker_ids, wake_keys) = tokio::try_join!(
11091109
// Check
1110-
tx.get_ranges_keyvalues(
1111-
universaldb::RangeOption {
1112-
mode: StreamingMode::WantAll,
1113-
..(active_worker_subspace_start, active_worker_subspace_end).into()
1114-
},
1115-
// This is Snapshot to reduce contention and exact timestamps are not important
1116-
Snapshot,
1117-
)
1118-
.map(|res| {
1119-
let key = tx.unpack::<keys::worker::ActiveWorkerIdxKey>(res?.key())?;
1120-
Ok(key.worker_id)
1121-
})
1122-
.try_collect::<Vec<_>>(),
1110+
async {
1111+
let entries = tx
1112+
.get_ranges_keyvalues(
1113+
universaldb::RangeOption {
1114+
mode: StreamingMode::WantAll,
1115+
..(active_worker_subspace_start, active_worker_subspace_end)
1116+
.into()
1117+
},
1118+
// This is Snapshot to reduce contention and exact timestamps are not important
1119+
Snapshot,
1120+
)
1121+
.try_collect::<Vec<_>>()
1122+
.await?;
1123+
let mut worker_ids = Vec::with_capacity(entries.len());
1124+
for entry in entries {
1125+
match tx.unpack::<keys::worker::ActiveWorkerIdxKey>(entry.key()) {
1126+
Ok(key) => worker_ids.push(key.worker_id),
1127+
Err(err) => {
1128+
tracing::warn!(
1129+
?err,
1130+
"skipping corrupt active worker index key"
1131+
);
1132+
}
1133+
}
1134+
}
1135+
Ok::<Vec<_>, anyhow::Error>(worker_ids)
1136+
},
11231137
async {
11241138
let start = Instant::now();
11251139
let mut buffer = Vec::new();

0 commit comments

Comments
 (0)