Skip to content

Commit f9837d4

Browse files
another channel
1 parent 7aa1159 commit f9837d4

2 files changed

Lines changed: 11 additions & 2 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,13 +1625,21 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16251625
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16261626
pub async fn snapshot_watching_commitlog_compressor(
16271627
mut snapshot_rx: watch::Receiver<u64>,
1628-
mut snapshot_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1628+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1629+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16291630
durability: LocalDurability,
16301631
) {
16311632
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16321633
while snapshot_rx.changed().await.is_ok() {
16331634
let snapshot_offset = *snapshot_rx.borrow_and_update();
16341635
let durability = durability.clone();
1636+
1637+
if let Some(snap_tx) = &mut snap_tx {
1638+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1639+
tracing::warn!("failed to send offset {snapshot_offset} after creation event: {err}");
1640+
}
1641+
}
1642+
16351643
let res: io::Result<_> = asyncify(move || {
16361644
let segment_offsets = durability.existing_segment_offsets()?;
16371645
let start_idx = segment_offsets
@@ -1662,7 +1670,7 @@ pub async fn snapshot_watching_commitlog_compressor(
16621670
};
16631671
prev_snapshot_offset = snapshot_offset;
16641672

1665-
if let Some(snapshot_tx) = &mut snapshot_tx {
1673+
if let Some(snapshot_tx) = &mut clog_tx {
16661674
if let Err(err) = snapshot_tx.try_send(last_compressed_segment) {
16671675
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
16681676
}

crates/standalone/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130130
None,
131+
None,
131132
durability,
132133
));
133134
}

0 commit comments

Comments
 (0)