Skip to content

Commit 55f51d0

Browse files
Add channel params to commitlog compressor (#3254)
# Description of Changes <!-- Please describe your change, mention any related tickets, and so on here. --> In order to facilitate commitlog and snapshot archival, this patch adds channel params to `snapshot_watching_commitlog_compressor` # API and ABI breaking changes <!-- If this is an API or ABI breaking change, please apply the corresponding GitHub label. --> None # Expected complexity level and risk <!-- How complicated do you think these changes are? Grade on a scale from 1 to 5, where 1 is a trivial change, and 5 is a deep-reaching and complex change. This complexity rating applies not only to the complexity apparent in the diff, but also to its interactions with existing and future code. If you answered more than a 2, explain what is complex about the PR, and what other components it interacts with in potentially concerning ways. --> 1 # Testing <!-- Describe any testing you've done, and any testing you'd like your reviewers to do, so that you're confident that all the changes work as expected! -->
1 parent 71f3a87 commit 55f51d0

2 files changed

Lines changed: 29 additions & 6 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,13 +1613,22 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16131613
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16141614
pub async fn snapshot_watching_commitlog_compressor(
16151615
mut snapshot_rx: watch::Receiver<u64>,
1616+
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1617+
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16161618
durability: LocalDurability,
16171619
) {
16181620
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16191621
while snapshot_rx.changed().await.is_ok() {
16201622
let snapshot_offset = *snapshot_rx.borrow_and_update();
16211623
let durability = durability.clone();
1622-
let res = asyncify(move || {
1624+
1625+
if let Some(snap_tx) = &mut snap_tx {
1626+
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1627+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1628+
}
1629+
}
1630+
1631+
let res: io::Result<_> = asyncify(move || {
16231632
let segment_offsets = durability.existing_segment_offsets()?;
16241633
let start_idx = segment_offsets
16251634
.binary_search(&prev_snapshot_offset)
@@ -1633,15 +1642,27 @@ pub async fn snapshot_watching_commitlog_compressor(
16331642
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16341643
// which we don't want to compress, so an exclusive range is correct.
16351644
let segment_offsets = &segment_offsets[..end_idx];
1636-
durability.compress_segments(segment_offsets)
1645+
durability.compress_segments(segment_offsets)?;
1646+
let n = segment_offsets.len();
1647+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1648+
Ok(last_compressed_segment)
16371649
})
16381650
.await;
16391651

1640-
if let Err(e) = res {
1641-
tracing::warn!("failed to compress segments: {e}");
1642-
continue;
1643-
}
1652+
let last_compressed_segment = match res {
1653+
Ok(opt_offset) => opt_offset,
1654+
Err(err) => {
1655+
tracing::warn!("failed to compress segments: {err}");
1656+
continue;
1657+
}
1658+
};
16441659
prev_snapshot_offset = snapshot_offset;
1660+
1661+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment) {
1662+
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1663+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1664+
}
1665+
}
16451666
}
16461667
}
16471668

crates/standalone/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
125125
|snapshot_rx| {
126126
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
127127
snapshot_rx,
128+
None,
129+
None,
128130
durability,
129131
));
130132
}

0 commit comments

Comments
 (0)