Skip to content

Commit 392ef4e

Browse files
committed
Merge branch 'master' into jsdt/store-client-creds
2 parents c80ac15 + 2f9554c commit 392ef4e

2 files changed

Lines changed: 29 additions & 6 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1643,13 +1643,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16431643
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16441644
pub async fn snapshot_watching_commitlog_compressor(
16451645
mut snapshot_rx: watch::Receiver<u64>,
1646+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1647+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16461648
durability: LocalDurability,
16471649
) {
16481650
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16491651
while snapshot_rx.changed().await.is_ok() {
16501652
let snapshot_offset = *snapshot_rx.borrow_and_update();
16511653
let durability = durability.clone();
1652-
let res = asyncify(move || {
1654+
1655+
if let Some(snap_tx) = &mut snap_tx {
1656+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1657+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1658+
}
1659+
}
1660+
1661+
let res: io::Result<_> = asyncify(move || {
16531662
let segment_offsets = durability.existing_segment_offsets()?;
16541663
let start_idx = segment_offsets
16551664
.binary_search(&prev_snapshot_offset)
@@ -1663,15 +1672,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16631672
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16641673
// which we don't want to compress, so an exclusive range is correct.
16651674
let segment_offsets = &segment_offsets[..end_idx];
1666-
durability.compress_segments(segment_offsets)
1675+
durability.compress_segments(segment_offsets)?;
1676+
let n = segment_offsets.len();
1677+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1678+
Ok(last_compressed_segment)
16671679
})
16681680
.await;
16691681

1670-
if let Err(e) = res {
1671-
tracing::warn!("failed to compress segments: {e}");
1672-
continue;
1673-
}
1682+
let last_compressed_segment = match res {
1683+
Ok(opt_offset) => opt_offset,
1684+
Err(err) => {
1685+
tracing::warn!("failed to compress segments: {err}");
1686+
continue;
1687+
}
1688+
};
16741689
prev_snapshot_offset = snapshot_offset;
1690+
1691+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) {
1692+
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1693+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1694+
}
1695+
}
16751696
}
16761697
}
16771698

crates/standalone/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
127127
|snapshot_rx| {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130+
None,
131+
None,
130132
durability,
131133
));
132134
}

0 commit comments

Comments
 (0)