Skip to content

Commit b334edb

Browse files
Revert "Add send channel to commitlog compressor to facilitate archival"
This reverts commit 40e798b.
1 parent 40e798b commit b334edb

2 files changed

Lines changed: 6 additions & 29 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,22 +1625,13 @@ pub async fn local_durability(commitlog_dir: CommitLogDir) -> io::Result<(LocalD
16251625
/// [DurabilityProvider]: crate::host::host_controller::DurabilityProvider
16261626
pub async fn snapshot_watching_commitlog_compressor(
16271627
mut snapshot_rx: watch::Receiver<u64>,
1628-
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1629-
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
16301628
durability: LocalDurability,
16311629
) {
16321630
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
16331631
while snapshot_rx.changed().await.is_ok() {
16341632
let snapshot_offset = *snapshot_rx.borrow_and_update();
16351633
let durability = durability.clone();
1636-
1637-
if let Some(snap_tx) = &mut snap_tx {
1638-
if let Err(err) = snap_tx.try_send(snapshot_offset) {
1639-
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1640-
}
1641-
}
1642-
1643-
let res: io::Result<_> = asyncify(move || {
1634+
let res = asyncify(move || {
16441635
let segment_offsets = durability.existing_segment_offsets()?;
16451636
let start_idx = segment_offsets
16461637
.binary_search(&prev_snapshot_offset)
@@ -1654,27 +1645,15 @@ pub async fn snapshot_watching_commitlog_compressor(
16541645
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
16551646
// which we don't want to compress, so an exclusive range is correct.
16561647
let segment_offsets = &segment_offsets[..end_idx];
1657-
durability.compress_segments(segment_offsets)?;
1658-
let n = segment_offsets.len();
1659-
let last_compressed_segment = segment_offsets[n - 1];
1660-
Ok(last_compressed_segment)
1648+
durability.compress_segments(segment_offsets)
16611649
})
16621650
.await;
16631651

1664-
let last_compressed_segment = match res {
1665-
Ok(offset) => offset,
1666-
Err(err) => {
1667-
tracing::warn!("failed to compress segments: {err}");
1668-
continue;
1669-
}
1670-
};
1671-
prev_snapshot_offset = snapshot_offset;
1672-
1673-
if let Some(clog_tx) = &mut clog_tx {
1674-
if let Err(err) = clog_tx.try_send(last_compressed_segment) {
1675-
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
1676-
}
1652+
if let Err(e) = res {
1653+
tracing::warn!("failed to compress segments: {e}");
1654+
continue;
16771655
}
1656+
prev_snapshot_offset = snapshot_offset;
16781657
}
16791658
}
16801659

crates/standalone/src/lib.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,6 @@ impl DurabilityProvider for StandaloneDurabilityProvider {
127127
|snapshot_rx| {
128128
tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
129129
snapshot_rx,
130-
None,
131-
None,
132130
durability,
133131
));
134132
}

0 commit comments

Comments
 (0)