@@ -57,9 +57,11 @@ use spacetimedb_table::page_pool::PagePool;
5757use spacetimedb_table:: table:: { RowRef , TableScanIter } ;
5858use spacetimedb_table:: table_index:: IndexKey ;
5959use std:: borrow:: Cow ;
60+ use std:: collections:: VecDeque ;
6061use std:: io;
6162use std:: ops:: RangeBounds ;
6263use std:: sync:: Arc ;
64+ use std:: time:: { Duration , Instant } ;
6365use tokio:: sync:: watch;
6466
6567pub use super :: persistence:: { DiskSizeFn , Durability , Persistence } ;
@@ -1635,6 +1637,11 @@ fn apply_history(
16351637}
16361638
16371639pub type LocalDurability = Arc < durability:: Local < ProductValue > > ;
1640+
1641+ const COMMITLOG_COMPRESSION_IDLE_WINDOW : Duration = Duration :: from_millis ( 500 ) ;
1642+ const COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL : Duration = Duration :: from_millis ( 100 ) ;
1643+ const COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG : usize = 8 ;
1644+
16381645/// Initialize local durability with the default parameters.
16391646///
16401647/// Also returned is a [`DiskSizeFn`] as required by [`RelationalDB::open`].
@@ -1682,61 +1689,281 @@ pub async fn local_history(replica_dir: &ReplicaDir) -> io::Result<impl History<
16821689 asyncify ( move || Commitlog :: open ( commitlog_dir, <_ >:: default ( ) , None ) ) . await
16831690}
16841691
1685- /// Watches snapshot creation events and compresses all commitlog segments older
1686- /// than the snapshot.
1692+ async fn commitlog_segments_to_compress (
1693+ durability : LocalDurability ,
1694+ prev_snapshot_offset : TxOffset ,
1695+ snapshot_offset : TxOffset ,
1696+ ) -> io:: Result < Vec < TxOffset > > {
1697+ // Return segment start offsets in `[prev_snapshot_offset, snapshot_offset)`.
1698+ // If either offset falls inside a segment, round down to the containing
1699+ // segment. The segment containing `snapshot_offset` must stay uncompressed,
1700+ // because it can contain transactions newer than the snapshot.
1701+ asyncify ( move || {
1702+ let segment_offsets = durability. existing_segment_offsets ( ) ?;
1703+ let start_idx = segment_offsets
1704+ . binary_search ( & prev_snapshot_offset)
1705+ // if the snapshot is in the middle of a segment, we want to round down.
1706+ // [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1707+ . unwrap_or_else ( |i| i. saturating_sub ( 1 ) ) ;
1708+ let segment_offsets = & segment_offsets[ start_idx..] ;
1709+ let end_idx = segment_offsets
1710+ . binary_search ( & snapshot_offset)
1711+ . unwrap_or_else ( |i| i. saturating_sub ( 1 ) ) ;
1712+ // in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1713+ // which we don't want to compress, so an exclusive range is correct.
1714+ Ok ( segment_offsets[ ..end_idx] . to_vec ( ) )
1715+ } )
1716+ . await
1717+ }
1718+
1719+ #[ derive( Default ) ]
1720+ struct CommitlogCompressionState {
1721+ // Latest snapshot offset whose older segments have all been processed.
1722+ compressed_snapshot_offset : TxOffset ,
1723+ // Newest snapshot offset represented by `pending_segments`. Once the queue is
1724+ // drained, this is promoted into `compressed_snapshot_offset`.
1725+ pending_snapshot_offset : Option < TxOffset > ,
1726+ // Segment start offsets waiting for compression, processed oldest first.
1727+ pending_segments : VecDeque < TxOffset > ,
1728+ // Time at which write load first appeared idle during the current idle window.
1729+ idle_since : Option < Instant > ,
1730+ }
1731+
1732+ impl CommitlogCompressionState {
1733+ async fn enqueue_snapshot ( & mut self , durability : LocalDurability , snapshot_offset : TxOffset ) -> io:: Result < ( ) > {
1734+ // Coalesce snapshot events while compression is behind. If work is already
1735+ // pending, only enqueue the segment offsets between the previous pending
1736+ // snapshot and the new one.
1737+ let prev_snapshot_offset = self . pending_snapshot_offset . unwrap_or ( self . compressed_snapshot_offset ) ;
1738+ self . pending_segments
1739+ . extend ( commitlog_segments_to_compress ( durability, prev_snapshot_offset, snapshot_offset) . await ?) ;
1740+ self . pending_snapshot_offset = Some ( snapshot_offset) ;
1741+ Ok ( ( ) )
1742+ }
1743+
1744+ fn mark_caught_up ( & mut self ) {
1745+ // Only advance the checkpoint after every segment for the pending snapshot
1746+ // has been attempted successfully.
1747+ if self . pending_segments . is_empty ( )
1748+ && let Some ( snapshot_offset) = self . pending_snapshot_offset . take ( )
1749+ {
1750+ self . compressed_snapshot_offset = snapshot_offset;
1751+ }
1752+ }
1753+
1754+ fn has_pending_segments ( & self ) -> bool {
1755+ !self . pending_segments . is_empty ( )
1756+ }
1757+
1758+ fn pending_segment_count ( & self ) -> usize {
1759+ self . pending_segments . len ( )
1760+ }
1761+
1762+ fn reset_idle ( & mut self ) {
1763+ self . idle_since = None ;
1764+ }
1765+
1766+ fn idle_window_elapsed ( & mut self ) -> bool {
1767+ // The first idle poll starts the timer; later idle polls measure against
1768+ // that same instant until new writes are queued or compression work is done.
1769+ let now = Instant :: now ( ) ;
1770+ self . idle_since . get_or_insert ( now) . elapsed ( ) >= COMMITLOG_COMPRESSION_IDLE_WINDOW
1771+ }
1772+
1773+ async fn compress_next_segment (
1774+ & mut self ,
1775+ durability : LocalDurability ,
1776+ clog_tx : & mut Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
1777+ ) -> bool {
1778+ // Return `false` on compression failure so the caller can back off before
1779+ // retrying the same segment.
1780+ let Some ( segment_offset) = self . pending_segments . front ( ) . copied ( ) else {
1781+ return true ;
1782+ } ;
1783+
1784+ if let Err ( err) = asyncify ( move || durability. compress_segments ( & [ segment_offset] ) ) . await {
1785+ tracing:: warn!( "failed to compress commitlog segment {segment_offset}: {err}" ) ;
1786+ return false ;
1787+ }
1788+
1789+ self . pending_segments . pop_front ( ) ;
1790+ self . mark_caught_up ( ) ;
1791+
1792+ if let Some ( clog_tx) = clog_tx
1793+ && let Err ( err) = clog_tx. try_send ( segment_offset)
1794+ {
1795+ tracing:: warn!( "failed to send offset {segment_offset} after compression: {err}" ) ;
1796+ }
1797+
1798+ true
1799+ }
1800+
1801+ async fn compress_segments_while_idle (
1802+ & mut self ,
1803+ durability : LocalDurability ,
1804+ clog_tx : & mut Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
1805+ ) -> bool {
1806+ while self . has_pending_segments ( ) {
1807+ if durability. queue_depth ( ) != 0 {
1808+ return true ;
1809+ }
1810+ if !self . compress_next_segment ( durability. clone ( ) , clog_tx) . await {
1811+ return false ;
1812+ }
1813+ tokio:: task:: yield_now ( ) . await ;
1814+ }
1815+
1816+ true
1817+ }
1818+ }
1819+
1820+ async fn handle_commitlog_snapshot_event (
1821+ state : & mut CommitlogCompressionState ,
1822+ durability : LocalDurability ,
1823+ snap_tx : & mut Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
1824+ snapshot_offset : TxOffset ,
1825+ ) {
1826+ // Keep the test hooks in the same place as the state transition so callers
1827+ // can't forget to reset the idle window after new work is enqueued.
1828+ if let Some ( snap_tx) = snap_tx
1829+ && let Err ( err) = snap_tx. try_send ( snapshot_offset)
1830+ {
1831+ tracing:: warn!( "failed to send offset {snapshot_offset} after snapshot creation: {err}" ) ;
1832+ }
1833+
1834+ if let Err ( err) = state. enqueue_snapshot ( durability, snapshot_offset) . await {
1835+ tracing:: warn!( "failed to get commitlog segments to compress: {err}" ) ;
1836+ }
1837+ state. reset_idle ( ) ;
1838+ }
1839+
1840+ /// Watches snapshot creation events and compresses commitlog segments older
1841+ /// than the snapshot once write load appears idle.
16871842///
16881843/// Suitable **only** for non-replicated databases.
1844+ ///
1845+ /// Commitlog compression state machine:
1846+ ///
1847+ /// ```text
1848+ /// startup
1849+ /// |
1850+ /// | enqueue segments in [0, latest snapshot)
1851+ /// v
1852+ /// +------------------+ no work / queue drained +------------------+
1853+ /// | pending segments | ----------------------------------> | no pending work |
1854+ /// | in memory | | wait for snapshot|
1855+ /// +--------+---------+ <---------------------------------- +---------+--------+
1856+ /// | snapshot created
1857+ /// | enqueue older segments
1858+ /// v
1859+ /// backlog >= threshold?
1860+ /// / \
1861+ /// yes no -----------------------+
1862+ /// | |
1863+ /// v v
1864+ /// +------------------------+ +------------------------+
1865+ /// | compress one segment | | wait until durability |
1866+ /// | immediately | | queue is empty for |
1867+ /// +------------+-----------+ | IDLE_WINDOW |
1868+ /// | +-----------+------------+
1869+ /// | |
1870+ /// | v
1871+ /// | +------------------------+
1872+ /// | | compress next only |
1873+ /// | | while queue is empty |
1874+ /// | +-----------+------------+
1875+ /// | |
1876+ /// +---------------------------+
1877+ /// |
1878+ /// v
1879+ /// re-check pending segments
1880+ ///
1881+ /// snapshot observed while pending:
1882+ /// enqueue new older segments and reset the idle timer
1883+ /// ```
1884+ ///
1885+ /// Compression is treated as write-load idle maintenance unless the uncompressed
1886+ /// segment backlog grows large enough to force bounded progress under load.
1887+ /// Startup is handled as an initial catch-up pass because pending compression
1888+ /// work is not persisted across restarts; recompressing an already-compressed
1889+ /// segment is a no-op in the commitlog storage layer.
16891890pub async fn snapshot_watching_commitlog_compressor (
16901891 mut snapshot_rx : watch:: Receiver < u64 > ,
16911892 mut clog_tx : Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
16921893 mut snap_tx : Option < tokio:: sync:: mpsc:: Sender < u64 > > ,
16931894 durability : LocalDurability ,
16941895) {
1695- let mut prev_snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
1696- while snapshot_rx. changed ( ) . await . is_ok ( ) {
1697- let snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
1698- let durability = durability. clone ( ) ;
1896+ let initial_snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
1897+ let mut state = CommitlogCompressionState :: default ( ) ;
1898+
1899+ // `snapshot_rx` starts at the latest snapshot already on disk. Treat that as
1900+ // an initial catch-up target rather than a completed compression checkpoint,
1901+ // because pending compression work is not persisted across restarts.
1902+ if initial_snapshot_offset > 0
1903+ && let Err ( err) = state
1904+ . enqueue_snapshot ( durability. clone ( ) , initial_snapshot_offset)
1905+ . await
1906+ {
1907+ tracing:: warn!( "failed to get initial commitlog segments to compress: {err}" ) ;
1908+ }
16991909
1700- if let Some ( snap_tx) = & mut snap_tx
1701- && let Err ( err) = snap_tx. try_send ( snapshot_offset)
1702- {
1703- tracing:: warn!( "failed to send offset {snapshot_offset} after snapshot creation: {err}" ) ;
1704- }
1705-
1706- let res: io:: Result < _ > = asyncify ( move || {
1707- let segment_offsets = durability. existing_segment_offsets ( ) ?;
1708- let start_idx = segment_offsets
1709- . binary_search ( & prev_snapshot_offset)
1710- // if the snapshot is in the middle of a segment, we want to round down.
1711- // [0, 2].binary_search(1) will return Err(1), so we subtract 1.
1712- . unwrap_or_else ( |i| i. saturating_sub ( 1 ) ) ;
1713- let segment_offsets = & segment_offsets[ start_idx..] ;
1714- let end_idx = segment_offsets
1715- . binary_search ( & snapshot_offset)
1716- . unwrap_or_else ( |i| i. saturating_sub ( 1 ) ) ;
1717- // in this case, segment_offsets[end_idx] is the segment that contains the snapshot,
1718- // which we don't want to compress, so an exclusive range is correct.
1719- let segment_offsets = & segment_offsets[ ..end_idx] ;
1720- durability. compress_segments ( segment_offsets) ?;
1721- let n = segment_offsets. len ( ) ;
1722- let last_compressed_segment = if n > 0 { Some ( segment_offsets[ n - 1 ] ) } else { None } ;
1723- Ok ( last_compressed_segment)
1724- } )
1725- . await ;
1910+ loop {
1911+ state. mark_caught_up ( ) ;
17261912
1727- let last_compressed_segment = match res {
1728- Ok ( opt_offset) => opt_offset,
1729- Err ( err) => {
1730- tracing:: warn!( "failed to compress segments: {err}" ) ;
1731- continue ;
1913+ if !state. has_pending_segments ( ) {
1914+ // With no backlog, block until a new snapshot creates more work.
1915+ if snapshot_rx. changed ( ) . await . is_err ( ) {
1916+ break ;
17321917 }
1733- } ;
1734- prev_snapshot_offset = snapshot_offset;
1918+ let snapshot_offset = * snapshot_rx. borrow_and_update ( ) ;
1919+ handle_commitlog_snapshot_event ( & mut state, durability. clone ( ) , & mut snap_tx, snapshot_offset) . await ;
1920+ continue ;
1921+ }
17351922
1736- if let Some ( ( clog_tx, last_compressed_segment) ) = clog_tx. as_mut ( ) . zip ( last_compressed_segment)
1737- && let Err ( err) = clog_tx. try_send ( last_compressed_segment)
1738- {
1739- tracing:: warn!( "failed to send offset {last_compressed_segment} after compression: {err}" ) ;
1923+ if state. pending_segment_count ( ) >= COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG {
1924+ // Under sustained write load we still need bounded progress so old
1925+ // uncompressed segments do not accumulate forever.
1926+ tracing:: debug!(
1927+ pending_segments = state. pending_segment_count( ) ,
1928+ "forcing commitlog compression; segment backlog exceeded threshold"
1929+ ) ;
1930+ if !state. compress_next_segment ( durability. clone ( ) , & mut clog_tx) . await {
1931+ tokio:: time:: sleep ( COMMITLOG_COMPRESSION_IDLE_WINDOW ) . await ;
1932+ }
1933+ state. reset_idle ( ) ;
1934+ tokio:: task:: yield_now ( ) . await ;
1935+ continue ;
1936+ }
1937+
1938+ tokio:: select! {
1939+ res = snapshot_rx. changed( ) => {
1940+ // New snapshots extend the pending range and restart the idle window.
1941+ if res. is_err( ) {
1942+ break ;
1943+ }
1944+ let snapshot_offset = * snapshot_rx. borrow_and_update( ) ;
1945+ handle_commitlog_snapshot_event(
1946+ & mut state,
1947+ durability. clone( ) ,
1948+ & mut snap_tx,
1949+ snapshot_offset,
1950+ )
1951+ . await ;
1952+ }
1953+ _ = tokio:: time:: sleep( COMMITLOG_COMPRESSION_IDLE_POLL_INTERVAL ) => {
1954+ if durability. queue_depth( ) == 0 {
1955+ // Once no writes have been queued for long enough, drain as
1956+ // many segments as possible, but stop if write load resumes.
1957+ if state. idle_window_elapsed( ) {
1958+ if !state. compress_segments_while_idle( durability. clone( ) , & mut clog_tx) . await {
1959+ tokio:: time:: sleep( COMMITLOG_COMPRESSION_IDLE_WINDOW ) . await ;
1960+ }
1961+ state. reset_idle( ) ;
1962+ }
1963+ } else {
1964+ state. reset_idle( ) ;
1965+ }
1966+ }
17401967 }
17411968 }
17421969}
0 commit comments