11use std:: {
22 io,
3+ num:: NonZeroUsize ,
34 path:: PathBuf ,
45 sync:: {
56 atomic:: { AtomicU64 , Ordering :: Relaxed } ,
67 Arc ,
78 } ,
8- time:: Duration ,
99} ;
1010
1111use futures:: { FutureExt as _, TryFutureExt as _} ;
@@ -19,7 +19,6 @@ use thiserror::Error;
1919use tokio:: {
2020 sync:: { futures:: OwnedNotified , mpsc, oneshot, watch, Notify } ,
2121 task:: { spawn_blocking, AbortHandle } ,
22- time:: { interval, MissedTickBehavior } ,
2322} ;
2423use tracing:: { instrument, Span } ;
2524
@@ -30,23 +29,29 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
3029/// [`Local`] configuration.
3130#[ derive( Clone , Copy , Debug ) ]
3231pub struct Options {
33- /// Periodically flush and sync the log this often .
32+ /// The number of elements to reserve for batching transactions .
3433 ///
35- /// Default: 50ms
36- pub sync_interval : Duration ,
37- /// If `true`, flush (but not sync) each transaction.
34+ /// This puts an upper bound on the buffer capacity, while not preventing
35+ /// reallocations when the number of queued transactions exceeds it.
3836 ///
39- /// Default: false
40- pub flush_each_tx : bool ,
37+ /// In other words, the durability actor will attempt to receive all
38+ /// transactions that are currently in the queue, but shrink the buffer to
39+ /// `batch_capacity` if it had to make additional space during a burst.
40+ ///
41+ /// Default: 4096
42+ pub batch_capacity : NonZeroUsize ,
4143 /// [`Commitlog`] configuration.
4244 pub commitlog : spacetimedb_commitlog:: Options ,
4345}
4446
47+ impl Options {
48+ pub const DEFAULT_BATCH_CAPACITY : NonZeroUsize = NonZeroUsize :: new ( 4096 ) . unwrap ( ) ;
49+ }
50+
4551impl Default for Options {
4652 fn default ( ) -> Self {
4753 Self {
48- sync_interval : Duration :: from_millis ( 50 ) ,
49- flush_each_tx : false ,
54+ batch_capacity : Self :: DEFAULT_BATCH_CAPACITY ,
5055 commitlog : Default :: default ( ) ,
5156 }
5257 }
@@ -134,8 +139,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
134139 durable_offset : durable_tx,
135140 queue_depth : queue_depth. clone ( ) ,
136141
137- sync_interval : opts. sync_interval ,
138- flush_each_tx : opts. flush_each_tx ,
142+ batch_capacity : opts. batch_capacity ,
139143
140144 lock,
141145 }
@@ -193,8 +197,7 @@ struct Actor<T> {
193197 durable_offset : watch:: Sender < Option < TxOffset > > ,
194198 queue_depth : Arc < AtomicU64 > ,
195199
196- sync_interval : Duration ,
197- flush_each_tx : bool ,
200+ batch_capacity : NonZeroUsize ,
198201
199202 #[ allow( unused) ]
200203 lock : Lock ,
@@ -209,8 +212,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
209212 ) {
210213 info ! ( "starting durability actor" ) ;
211214
212- let mut sync_interval = interval ( self . sync_interval ) ;
213- sync_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
215+ let mut tx_buf = Vec :: with_capacity ( self . batch_capacity . get ( ) ) ;
214216 // `flush_and_sync` when the loop exits without panicking,
215217 // or `flush_and_sync` inside the loop failed.
216218 let mut sync_on_exit = true ;
@@ -220,42 +222,38 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
220222 // Biased towards the shutdown channel,
221223 // so that we stop accepting new data promptly after
222224 // `Durability::close` was called.
223- //
224- // Note that periodic `flush_and_sync` needs to be polled before
225- // the txdata channel, so that we don't delay `fsync(2)` under
226- // high transaction throughput.
227225 biased;
228226
229227 Some ( reply) = shutdown_rx. recv( ) => {
230228 transactions_rx. close( ) ;
231229 let _ = reply. send( self . lock. notified( ) ) ;
232230 } ,
233231
234- _ = sync_interval. tick( ) => {
235- if self . flush_and_sync( ) . await . is_err( ) {
236- sync_on_exit = false ;
232+ // Pop as many elements from the channel as possible,
233+ // potentially requiring the `tx_buf` to allocate additional
234+ // capacity.
235+ // We'll reclaim capacity in excess of `self.batch_size` below.
236+ n = transactions_rx. recv_many( & mut tx_buf, usize :: MAX ) => {
237+ if n == 0 {
237238 break ;
238239 }
239- } ,
240-
241- tx = transactions_rx. recv( ) => {
242- let Some ( tx) = tx else {
243- break ;
244- } ;
245- self . queue_depth. fetch_sub( 1 , Relaxed ) ;
240+ self . queue_depth. fetch_sub( n as u64 , Relaxed ) ;
246241 let clog = self . clog. clone( ) ;
247- let flush = self . flush_each_tx;
248- spawn_blocking( move || -> io:: Result <( ) > {
249- clog. commit( [ tx] ) ?;
250- if flush {
251- clog. flush( ) ?;
242+ tx_buf = spawn_blocking( move || -> io:: Result <Vec <Transaction <Txdata <T >>>> {
243+ for tx in tx_buf. drain( ..) {
244+ clog. commit( [ tx] ) ?;
252245 }
253-
254- Ok ( ( ) )
246+ Ok ( tx_buf)
255247 } )
256248 . await
257249 . expect( "commitlog write panicked" )
258250 . expect( "commitlog write failed" ) ;
251+ if self . flush_and_sync( ) . await . is_err( ) {
252+ sync_on_exit = false ;
253+ break ;
254+ }
255+ // Reclaim burst capacity.
256+ tx_buf. shrink_to( self . batch_capacity. get( ) ) ;
259257 } ,
260258 }
261259 }
0 commit comments