Skip to content

Commit dd1b66b

Browse files
Do not defer commitlog compression (#4987)
# Description of Changes This reverts commit ca1b45f since after #4981 we no longer hold the lock while compressing. # API and ABI breaking changes None # Expected complexity level and risk 1 # Testing N/A
1 parent 2173982 commit dd1b66b

1 file changed

Lines changed: 43 additions & 270 deletions

File tree

crates/core/src/db/relational_db.rs

Lines changed: 43 additions & 270 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,9 @@ use spacetimedb_table::page_pool::PagePool;
5757
use spacetimedb_table::table::{RowRef, TableScanIter};
5858
use spacetimedb_table::table_index::IndexKey;
5959
use std::borrow::Cow;
60-
use std::collections::VecDeque;
6160
use std::io;
6261
use std::ops::RangeBounds;
6362
use std::sync::Arc;
64-
use std::time::{Duration, Instant};
6563
use tokio::sync::watch;
6664

6765
pub use super::persistence::{DiskSizeFn, Durability, Persistence};
@@ -1637,11 +1635,6 @@ fn apply_history(
16371635
}
16381636

16391637
pub type LocalDurability = Arc<durability::Local<ProductValue>>;
1640-
1641-
const COMMITLOG_COMPRESSION_IDLE_WINDOW: Duration = Duration::from_millis(500);
1642-
const COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL: Duration = Duration::from_millis(100);
1643-
const COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG: usize = 8;
1644-
16451638
/// Initialize local durability with the default parameters.
16461639
///
16471640
/// Also returned is a [`DiskSizeFn`] as required by [`RelationalDB::open`].
@@ -1689,281 +1682,61 @@ pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result<impl History<
16891682
asyncify(move || Commitlog::open(commitlog_dir, <_>::default(), None)).await
16901683
}
16911684

1692-
async fn commitlog_segments_to_compress(
1693-
durability: LocalDurability,
1694-
prev_snapshot_offset: TxOffset,
1695-
snapshot_offset: TxOffset,
1696-
) -> io::Result<Vec<TxOffset>> {
1697-
// Return segment start offsets in `[prev_snapshot_offset, snapshot_offset)`.
1698-
// If either offset falls inside a segment, round down to the containing
1699-
// segment. The segment containing `snapshot_offset` must stay uncompressed,
1700-
// because it can contain transactions newer than the snapshot.
1701-
asyncify(move || {
1702-
let segment_offsets = durability.existing_segment_offsets()?;
1703-
let start_idx = segment_offsets
1704-
.binary_search(&prev_snapshot_offset)
1705-
// if the snapshot is in the middle of a segment, we want to round down.
1706-
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1707-
.unwrap_or_else(|i| i.saturating_sub(1));
1708-
let segment_offsets = &segment_offsets[start_idx..];
1709-
let end_idx = segment_offsets
1710-
.binary_search(&snapshot_offset)
1711-
.unwrap_or_else(|i| i.saturating_sub(1));
1712-
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1713-
// which we don't want to compress, so an exclusive range is correct.
1714-
Ok(segment_offsets[..end_idx].to_vec())
1715-
})
1716-
.await
1717-
}
1718-
1719-
#[derive(Default)]
1720-
struct CommitlogCompressionState {
1721-
// Latest snapshot offset whose older segments have all been processed.
1722-
compressed_snapshot_offset: TxOffset,
1723-
// Newest snapshot offset represented by `pending_segments`. Once the queue is
1724-
// drained, this is promoted into `compressed_snapshot_offset`.
1725-
pending_snapshot_offset: Option<TxOffset>,
1726-
// Segment start offsets waiting for compression, processed oldest first.
1727-
pending_segments: VecDeque<TxOffset>,
1728-
// Time at which write load first appeared idle during the current idle window.
1729-
idle_since: Option<Instant>,
1730-
}
1731-
1732-
impl CommitlogCompressionState {
1733-
async fn enqueue_snapshot(&mut self, durability: LocalDurability, snapshot_offset: TxOffset) -> io::Result<()> {
1734-
// Coalesce snapshot events while compression is behind. If work is already
1735-
// pending, only enqueue the segment offsets between the previous pending
1736-
// snapshot and the new one.
1737-
let prev_snapshot_offset = self.pending_snapshot_offset.unwrap_or(self.compressed_snapshot_offset);
1738-
self.pending_segments
1739-
.extend(commitlog_segments_to_compress(durability, prev_snapshot_offset, snapshot_offset).await?);
1740-
self.pending_snapshot_offset = Some(snapshot_offset);
1741-
Ok(())
1742-
}
1743-
1744-
fn mark_caught_up(&mut self) {
1745-
// Only advance the checkpoint after every segment for the pending snapshot
1746-
// has been attempted successfully.
1747-
if self.pending_segments.is_empty()
1748-
&& let Some(snapshot_offset) = self.pending_snapshot_offset.take()
1749-
{
1750-
self.compressed_snapshot_offset = snapshot_offset;
1751-
}
1752-
}
1753-
1754-
fn has_pending_segments(&self) -> bool {
1755-
!self.pending_segments.is_empty()
1756-
}
1757-
1758-
fn pending_segment_count(&self) -> usize {
1759-
self.pending_segments.len()
1760-
}
1761-
1762-
fn reset_idle(&mut self) {
1763-
self.idle_since = None;
1764-
}
1765-
1766-
fn idle_window_elapsed(&mut self) -> bool {
1767-
// The first idle poll starts the timer; later idle polls measure against
1768-
// that same instant until new writes are queued or compression work is done.
1769-
let now = Instant::now();
1770-
self.idle_since.get_or_insert(now).elapsed() >= COMMITLOG_COMPRESSION_IDLE_WINDOW
1771-
}
1772-
1773-
async fn compress_next_segment(
1774-
&mut self,
1775-
durability: LocalDurability,
1776-
clog_tx: &mut Option<tokio::sync::mpsc::Sender<u64>>,
1777-
) -> bool {
1778-
// Return `false` on compression failure so the caller can back off before
1779-
// retrying the same segment.
1780-
let Some(segment_offset) = self.pending_segments.front().copied() else {
1781-
return true;
1782-
};
1783-
1784-
if let Err(err) = asyncify(move || durability.compress_segments(&[segment_offset])).await {
1785-
tracing::warn!("failed to compress commitlog segment {segment_offset}: {err}");
1786-
return false;
1787-
}
1788-
1789-
self.pending_segments.pop_front();
1790-
self.mark_caught_up();
1791-
1792-
if let Some(clog_tx) = clog_tx
1793-
&& let Err(err) = clog_tx.try_send(segment_offset)
1794-
{
1795-
tracing::warn!("failed to send offset {segment_offset} after compression: {err}");
1796-
}
1797-
1798-
true
1799-
}
1800-
1801-
async fn compress_segments_while_idle(
1802-
&mut self,
1803-
durability: LocalDurability,
1804-
clog_tx: &mut Option<tokio::sync::mpsc::Sender<u64>>,
1805-
) -> bool {
1806-
while self.has_pending_segments() {
1807-
if durability.queue_depth() != 0 {
1808-
return true;
1809-
}
1810-
if !self.compress_next_segment(durability.clone(), clog_tx).await {
1811-
return false;
1812-
}
1813-
tokio::task::yield_now().await;
1814-
}
1815-
1816-
true
1817-
}
1818-
}
1819-
1820-
async fn handle_commitlog_snapshot_event(
1821-
state: &mut CommitlogCompressionState,
1822-
durability: LocalDurability,
1823-
snap_tx: &mut Option<tokio::sync::mpsc::Sender<u64>>,
1824-
snapshot_offset: TxOffset,
1825-
) {
1826-
// Keep the test hooks in the same place as the state transition so callers
1827-
// can't forget to reset the idle window after new work is enqueued.
1828-
if let Some(snap_tx) = snap_tx
1829-
&& let Err(err) = snap_tx.try_send(snapshot_offset)
1830-
{
1831-
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1832-
}
1833-
1834-
if let Err(err) = state.enqueue_snapshot(durability, snapshot_offset).await {
1835-
tracing::warn!("failed to get commitlog segments to compress: {err}");
1836-
}
1837-
state.reset_idle();
1838-
}
1839-
1840-
/// Watches snapshot creation events and compresses commitlog segments older
1841-
/// than the snapshot once write load appears idle.
1685+
/// Watches snapshot creation events and compresses all commitlog segments older
1686+
/// than the snapshot.
18421687
///
18431688
/// Suitable **only** for non-replicated databases.
1844-
///
1845-
/// Commitlog compression state machine:
1846-
///
1847-
/// ```text
1848-
/// startup
1849-
/// |
1850-
/// | enqueue segments in [0, latest snapshot)
1851-
/// v
1852-
/// +------------------+ no work / queue drained +------------------+
1853-
/// | pending segments | ----------------------------------> | no pending work |
1854-
/// | in memory | | wait for snapshot|
1855-
/// +--------+---------+ <---------------------------------- +---------+--------+
1856-
/// | snapshot created
1857-
/// | enqueue older segments
1858-
/// v
1859-
/// backlog >= threshold?
1860-
/// / \
1861-
/// yes no -----------------------+
1862-
/// | |
1863-
/// v v
1864-
/// +------------------------+ +------------------------+
1865-
/// | compress one segment | | wait until durability |
1866-
/// | immediately | | queue is empty for |
1867-
/// +------------+-----------+ | IDLE_WINDOW |
1868-
/// | +-----------+------------+
1869-
/// | |
1870-
/// | v
1871-
/// | +------------------------+
1872-
/// | | compress next only |
1873-
/// | | while queue is empty |
1874-
/// | +-----------+------------+
1875-
/// | |
1876-
/// +---------------------------+
1877-
/// |
1878-
/// v
1879-
/// re-check pending segments
1880-
///
1881-
/// snapshot observed while pending:
1882-
/// enqueue new older segments and reset the idle timer
1883-
/// ```
1884-
///
1885-
/// Compression is treated as write-load idle maintenance unless the uncompressed
1886-
/// segment backlog grows large enough to force bounded progress under load.
1887-
/// Startup is handled as an initial catch-up pass because pending compression
1888-
/// work is not persisted across restarts; recompressing an already-compressed
1889-
/// segment is a no-op in the commitlog storage layer.
18901689
pub async fn snapshot_watching_commitlog_compressor(
18911690
mut snapshot_rx: watch::Receiver<u64>,
18921691
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
18931692
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
18941693
durability: LocalDurability,
18951694
) {
1896-
let initial_snapshot_offset = *snapshot_rx.borrow_and_update();
1897-
let mut state = CommitlogCompressionState::default();
1898-
1899-
// `snapshot_rx` starts at the latest snapshot already on disk. Treat that as
1900-
// an initial catch-up target rather than a completed compression checkpoint,
1901-
// because pending compression work is not persisted across restarts.
1902-
if initial_snapshot_offset > 0
1903-
&& let Err(err) = state
1904-
.enqueue_snapshot(durability.clone(), initial_snapshot_offset)
1905-
.await
1906-
{
1907-
tracing::warn!("failed to get initial commitlog segments to compress: {err}");
1908-
}
1695+
let mut prev_snapshot_offset = *snapshot_rx.borrow_and_update();
1696+
while snapshot_rx.changed().await.is_ok() {
1697+
let snapshot_offset = *snapshot_rx.borrow_and_update();
1698+
let durability = durability.clone();
19091699

1910-
loop {
1911-
state.mark_caught_up();
1700+
if let Some(snap_tx) = &mut snap_tx
1701+
&& let Err(err) = snap_tx.try_send(snapshot_offset)
1702+
{
1703+
tracing::warn!("failed to send offset {snapshot_offset} after snapshot creation: {err}");
1704+
}
1705+
1706+
let res: io::Result<_> = asyncify(move || {
1707+
let segment_offsets = durability.existing_segment_offsets()?;
1708+
let start_idx = segment_offsets
1709+
.binary_search(&prev_snapshot_offset)
1710+
// if the snapshot is in the middle of a segment, we want to round down.
1711+
// [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1712+
.unwrap_or_else(|i| i.saturating_sub(1));
1713+
let segment_offsets = &segment_offsets[start_idx..];
1714+
let end_idx = segment_offsets
1715+
.binary_search(&snapshot_offset)
1716+
.unwrap_or_else(|i| i.saturating_sub(1));
1717+
// in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1718+
// which we don't want to compress, so an exclusive range is correct.
1719+
let segment_offsets = &segment_offsets[..end_idx];
1720+
durability.compress_segments(segment_offsets)?;
1721+
let n = segment_offsets.len();
1722+
let last_compressed_segment = if n > 0 { Some(segment_offsets[n - 1]) } else { None };
1723+
Ok(last_compressed_segment)
1724+
})
1725+
.await;
19121726

1913-
if !state.has_pending_segments() {
1914-
// With no backlog, block until a new snapshot creates more work.
1915-
if snapshot_rx.changed().await.is_err() {
1916-
break;
1727+
let last_compressed_segment = match res {
1728+
Ok(opt_offset) => opt_offset,
1729+
Err(err) => {
1730+
tracing::warn!("failed to compress segments: {err}");
1731+
continue;
19171732
}
1918-
let snapshot_offset = *snapshot_rx.borrow_and_update();
1919-
handle_commitlog_snapshot_event(&mut state, durability.clone(), &mut snap_tx, snapshot_offset).await;
1920-
continue;
1921-
}
1922-
1923-
if state.pending_segment_count() >= COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG {
1924-
// Under sustained write load we still need bounded progress so old
1925-
// uncompressed segments do not accumulate forever.
1926-
tracing::debug!(
1927-
pending_segments = state.pending_segment_count(),
1928-
"forcing commitlog compression; segment backlog exceeded threshold"
1929-
);
1930-
if !state.compress_next_segment(durability.clone(), &mut clog_tx).await {
1931-
tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_WINDOW).await;
1932-
}
1933-
state.reset_idle();
1934-
tokio::task::yield_now().await;
1935-
continue;
1936-
}
1733+
};
1734+
prev_snapshot_offset = snapshot_offset;
19371735

1938-
tokio::select! {
1939-
res = snapshot_rx.changed() => {
1940-
// New snapshots extend the pending range and restart the idle window.
1941-
if res.is_err() {
1942-
break;
1943-
}
1944-
let snapshot_offset = *snapshot_rx.borrow_and_update();
1945-
handle_commitlog_snapshot_event(
1946-
&mut state,
1947-
durability.clone(),
1948-
&mut snap_tx,
1949-
snapshot_offset,
1950-
)
1951-
.await;
1952-
}
1953-
_ = tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL) => {
1954-
if durability.queue_depth() == 0 {
1955-
// Once no writes have been queued for long enough, drain as
1956-
// many segments as possible, but stop if write load resumes.
1957-
if state.idle_window_elapsed() {
1958-
if !state.compress_segments_while_idle(durability.clone(), &mut clog_tx).await {
1959-
tokio::time::sleep(COMMITLOG_COMPRESSION_IDLE_WINDOW).await;
1960-
}
1961-
state.reset_idle();
1962-
}
1963-
} else {
1964-
state.reset_idle();
1965-
}
1966-
}
1736+
if let Some((clog_tx, last_compressed_segment)) = clog_tx.as_mut().zip(last_compressed_segment)
1737+
&& let Err(err) = clog_tx.try_send(last_compressed_segment)
1738+
{
1739+
tracing::warn!("failed to send offset {last_compressed_segment} after compression: {err}");
19671740
}
19681741
}
19691742
}

0 commit comments

Comments
 (0)