Skip to content

Commit 03721b4

Browse files
kimjoshua-spacetime
authored andcommitted
durability: Fix task leak (#2875)
1 parent 84a265f commit 03721b4

1 file changed

Lines changed: 7 additions & 5 deletions

File tree

crates/durability/src/imp/local.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
AtomicI64, AtomicU64,
88
Ordering::{Acquire, Relaxed, Release},
99
},
10-
Arc,
10+
Arc, Weak,
1111
},
1212
time::Duration,
1313
};
@@ -116,7 +116,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
116116
);
117117
rt.spawn(
118118
FlushAndSyncTask {
119-
clog: clog.clone(),
119+
clog: Arc::downgrade(&clog),
120120
period: opts.sync_interval,
121121
offset: offset.clone(),
122122
abort: persister_task.abort_handle(),
@@ -254,7 +254,7 @@ fn flush_error(e: io::Error) {
254254
}
255255

256256
struct FlushAndSyncTask<T> {
257-
clog: Arc<Commitlog<Txdata<T>>>,
257+
clog: Weak<Commitlog<Txdata<T>>>,
258258
period: Duration,
259259
offset: Arc<AtomicI64>,
260260
/// Handle to abort the [`PersisterTask`] if fsync panics.
@@ -272,15 +272,17 @@ impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
272272
loop {
273273
interval.tick().await;
274274

275+
let Some(clog) = self.clog.upgrade() else {
276+
break;
277+
};
275278
// Skip if nothing changed.
276-
if let Some(committed) = self.clog.max_committed_offset() {
279+
if let Some(committed) = clog.max_committed_offset() {
277280
let durable = self.offset.load(Acquire);
278281
if durable.is_positive() && committed == durable as _ {
279282
continue;
280283
}
281284
}
282285

283-
let clog = self.clog.clone();
284286
let task = spawn_blocking(move || clog.flush_and_sync()).await;
285287
match task {
286288
Err(e) => {

0 commit comments

Comments
 (0)