@@ -1625,13 +1625,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16251625/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16261626pub async fn snapshot_watching_commitlog_compressor (
16271627 mut snapshot_rx : watch:: Receiver < u64 > ,
1628+ mut clog_tx : Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
1629+ mut snap_tx : Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
16281630 durability : LocalDurability ,
16291631) {
16301632 let mut prev_snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
16311633 while snapshot_rx. changed ( ) . await . is_ok ( ) {
16321634 let snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
16331635 let durability = durability. clone ( ) ;
1634- let res = asyncify ( move || {
1636+
1637+ if let Some ( snap_tx) = & mut snap_tx {
1638+ if let Err ( err) = snap_tx. try_send ( snapshot_offset) {
1639+ tracing:: warn!( "failed to send offset {snapshot_offset} after snapshot creation: {err}" ) ;
1640+ }
1641+ }
1642+
1643+ let res: io:: Result < _ > = asyncify ( move || {
16351644 let segment_offsets = durability. existing_segment_offsets ( ) ?;
16361645 let start_idx = segment_offsets
16371646 . binary_search ( & prev_snapshot_offset)
@@ -1645,15 +1654,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16451654 // in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16461655 // which we don't want to compress, so an exclusive range is correct.
16471656 let segment_offsets = & segment_offsets[ ..end_idx] ;
1648- durability. compress_segments ( segment_offsets)
1657+ durability. compress_segments ( segment_offsets) ?;
1658+ let n = segment_offsets. len ( ) ;
1659+ let last_compressed_segment = segment_offsets[ n - 1 ] ;
1660+ Ok ( last_compressed_segment)
16491661 } )
16501662 . await ;
16511663
1652- if let Err ( e) = res {
1653- tracing:: warn!( "failed to compress segments: {e}" ) ;
1654- continue ;
1655- }
1664+ let last_compressed_segment = match res {
1665+ Ok ( offset) => offset,
1666+ Err ( err) => {
1667+ tracing:: warn!( "failed to compress segments: {err}" ) ;
1668+ continue ;
1669+ }
1670+ } ;
16561671 prev_snapshot_offset = snapshot_offset;
1672+
1673+ if let Some ( clog_tx) = & mut clog_tx {
1674+ if let Err ( err) = clog_tx. try_send ( last_compressed_segment) {
1675+ tracing:: warn!( "failed to send offset {last_compressed_segment} after compression: {err}" ) ;
1676+ }
1677+ }
16571678 }
16581679}
16591680
0 commit comments