Skip to content

Commit 3f872b5

Browse files
committed
Don't finish the queue before closing
1 parent 69b77dd commit 3f872b5

1 file changed

Lines changed: 11 additions & 15 deletions

File tree

crates/core/src/util/jobs.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
use std::pin::pin;
21
use std::sync::{Arc, Mutex, Weak};
32

43
use core_affinity::CoreId;
5-
use futures::FutureExt;
64
use indexmap::IndexMap;
75
use smallvec::SmallVec;
86
use spacetimedb_data_structures::map::HashMap;
@@ -197,22 +195,20 @@ impl JobCore {
197195
};
198196

199197
let job_loop = async {
200-
let mut closed = pin!(closed.notified().fuse());
201-
loop {
202-
tokio::select! {
203-
job = rx.recv() => {
204-
let Some(job) = job else { break };
205-
// blocking in place means that other futures on the same task
206-
// won't get polled - in this case, that's just the repin loop,
207-
// which is fine because it can just run before the next job.
208-
tokio::task::block_in_place(|| job(data))
209-
}
210-
() = &mut closed => rx.close(),
211-
}
198+
while let Some(job) = rx.recv().await {
199+
// blocking in place means that other futures on the same task
200+
// won't get polled - in this case, that's just the repin loop,
201+
// which is fine because it can just run before the next job.
202+
tokio::task::block_in_place(|| job(data))
212203
}
213204
};
214205

215-
super::also_poll(job_loop, repin_loop).await
206+
tokio::select! {
207+
() = super::also_poll(job_loop, repin_loop) => {}
208+
// when we receive a close notification, we immediately drop all
209+
// remaining jobs in the queue.
210+
() = closed.notified() => {}
211+
}
216212
}
217213
}
218214

0 commit comments

Comments
 (0)