Skip to content

Commit d5d7c55

Browse files
Add send channel to commitlog compressor to facilitate archival
1 parent 321e430 commit d5d7c55

2 files changed

Lines changed: 29 additions & 6 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,13 +1625,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16251625
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16261626
pub async fn snapshot_watching_commitlog_compressor(
16271627
mut snapshot_rx: watch::Receiver<u64>,
1628+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1629+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16281630
durability: LocalDurability,
16291631
) {
16301632
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16311633
while snapshot_rx.changed().await.is_ok() {
16321634
let snapshot_offset = *snapshot_rx.borrow_and_update();
16331635
let durability = durability.clone();
1634-
let res = asyncify(move || {
1636+
1637+
if let Some(snap_tx) = &mut snap_tx {
1638+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1639+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1640+
}
1641+
}
1642+
1643+
let res: io::Result<_> = asyncify(move || {
16351644
let segment_offsets = durability.existing_segment_offsets()?;
16361645
let start_idx = segment_offsets
16371646
.binary_search(&prev_snapshot_offset)
@@ -1645,15 +1654,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16451654
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16461655
// which we don't want to compress, so an exclusive range is correct.
16471656
let segment_offsets = &segment_offsets[..end_idx];
1648-
durability.compress_segments(segment_offsets)
1657+
durability.compress_segments(segment_offsets)?;
1658+
let n = segment_offsets.len();
1659+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1660+
Ok(last_compressed_segment)
16491661
})
16501662
.await;
16511663

1652-
if let Err(e) = res {
1653-
tracing::warn!("failed to compress segments: {e}");
1654-
continue;
1655-
}
1664+
let last_compressed_segment = match res {
1665+
Ok(opt_offset) => opt_offset,
1666+
Err(err) => {
1667+
tracing::warn!("failed to compress segments: {err}");
1668+
continue;
1669+
}
1670+
};
16561671
prev_snapshot_offset = snapshot_offset;
1672+
1673+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) {
1674+
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1675+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1676+
}
1677+
}
16571678
}
16581679
}
16591680

crates/standalone/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
127127
|snapshot_rx| {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130+
None,
131+
None,
130132
durability,
131133
));
132134
}

0 commit comments

Comments
 (0)