@@ -7,7 +7,7 @@ use std::{
77 AtomicI64 , AtomicU64 ,
88 Ordering :: { Acquire , Relaxed , Release } ,
99 } ,
10- Arc ,
10+ Arc , Weak ,
1111 } ,
1212 time:: Duration ,
1313} ;
@@ -116,7 +116,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
116116 ) ;
117117 rt. spawn (
118118 FlushAndSyncTask {
119- clog : clog . clone ( ) ,
119+ clog : Arc :: downgrade ( & clog ) ,
120120 period : opts. sync_interval ,
121121 offset : offset. clone ( ) ,
122122 abort : persister_task. abort_handle ( ) ,
@@ -254,7 +254,7 @@ fn flush_error(e: io::Error) {
254254}
255255
256256struct FlushAndSyncTask < T > {
257- clog : Arc < Commitlog < Txdata < T > > > ,
257+ clog : Weak < Commitlog < Txdata < T > > > ,
258258 period : Duration ,
259259 offset : Arc < AtomicI64 > ,
260260 /// Handle to abort the [`PersisterTask`] if fsync panics.
@@ -272,15 +272,17 @@ impl<T: Send + Sync + 'static> FlushAndSyncTask<T> {
272272 loop {
273273 interval. tick ( ) . await ;
274274
275+ let Some ( clog) = self . clog . upgrade ( ) else {
276+ break ;
277+ } ;
275278 // Skip if nothing changed.
276- if let Some ( committed) = self . clog . max_committed_offset ( ) {
279+ if let Some ( committed) = clog. max_committed_offset ( ) {
277280 let durable = self . offset . load ( Acquire ) ;
278281 if durable. is_positive ( ) && committed == durable as _ {
279282 continue ;
280283 }
281284 }
282285
283- let clog = self . clog . clone ( ) ;
284286 let task = spawn_blocking ( move || clog. flush_and_sync ( ) ) . await ;
285287 match task {
286288 Err ( e) => {
0 commit comments