Skip to content

Commit 2f9554c

Browse files
Add channel params to commitlog compressor (#3254)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> In order to facilitate commitlog and snapshot archival, this patch adds channel params to `snapshot_watching_commitlog_compressor` # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> 1 # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! -->
1 parent 321e430 commit 2f9554c

2 files changed

Lines changed: 29 additions & 6 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,13 +1625,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16251625
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16261626
pub async fn snapshot_watching_commitlog_compressor(
16271627
mut snapshot_rx: watch::Receiver<u64>,
1628+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1629+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16281630
durability: LocalDurability,
16291631
) {
16301632
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16311633
while snapshot_rx.changed().await.is_ok() {
16321634
let snapshot_offset = *snapshot_rx.borrow_and_update();
16331635
let durability = durability.clone();
1634-
let res = asyncify(move || {
1636+
1637+
if let Some(snap_tx) = &mut snap_tx {
1638+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1639+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1640+
}
1641+
}
1642+
1643+
let res: io::Result<_> = asyncify(move || {
16351644
let segment_offsets = durability.existing_segment_offsets()?;
16361645
let start_idx = segment_offsets
16371646
.binary_search(&prev_snapshot_offset)
@@ -1645,15 +1654,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16451654
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16461655
// which we don't want to compress, so an exclusive range is correct.
16471656
let segment_offsets = &segment_offsets[..end_idx];
1648-
durability.compress_segments(segment_offsets)
1657+
durability.compress_segments(segment_offsets)?;
1658+
let n = segment_offsets.len();
1659+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1660+
Ok(last_compressed_segment)
16491661
})
16501662
.await;
16511663

1652-
if let Err(e) = res {
1653-
tracing::warn!("failed to compress segments: {e}");
1654-
continue;
1655-
}
1664+
let last_compressed_segment = match res {
1665+
Ok(opt_offset) => opt_offset,
1666+
Err(err) => {
1667+
tracing::warn!("failed to compress segments: {err}");
1668+
continue;
1669+
}
1670+
};
16561671
prev_snapshot_offset = snapshot_offset;
1672+
1673+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) {
1674+
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1675+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1676+
}
1677+
}
16571678
}
16581679
}
16591680

crates/standalone/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
127127
|snapshot_rx| {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130+
None,
131+
None,
130132
durability,
131133
));
132134
}

0 commit comments

Comments
 (0)