11use std:: {
22 io,
3+ num:: NonZeroUsize ,
34 path:: PathBuf ,
45 sync:: {
56 atomic:: { AtomicU64 , Ordering :: Relaxed } ,
67 Arc ,
78 } ,
8- time:: Duration ,
99} ;
1010
1111use futures:: { FutureExt as _, TryFutureExt as _} ;
@@ -19,7 +19,6 @@ use thiserror::Error;
1919use tokio:: {
2020 sync:: { futures:: OwnedNotified , mpsc, oneshot, watch, Notify } ,
2121 task:: { spawn_blocking, AbortHandle } ,
22- time:: { interval, MissedTickBehavior } ,
2322} ;
2423use tracing:: { instrument, Span } ;
2524
@@ -30,23 +29,20 @@ pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk};
3029/// [`Local`] configuration.
3130#[ derive( Clone , Copy , Debug ) ]
3231pub struct Options {
33- /// Periodically flush and sync the log this often .
32+ /// Pull up to this many transactions from the queue .
3433 ///
35- /// Default: 50ms
36- pub sync_interval : Duration ,
37- /// If `true`, flush (but not sync) each transaction.
34+ /// The commitlog is flushed and synced after each batch.
3835 ///
39- /// Default: false
40- pub flush_each_tx : bool ,
36+ /// Defaults: 32
37+ pub batch_size : NonZeroUsize ,
4138 /// [`Commitlog`] configuration.
4239 pub commitlog : spacetimedb_commitlog:: Options ,
4340}
4441
4542impl Default for Options {
4643 fn default ( ) -> Self {
4744 Self {
48- sync_interval : Duration :: from_millis ( 50 ) ,
49- flush_each_tx : false ,
45+ batch_size : NonZeroUsize :: new ( 32 ) . unwrap ( ) ,
5046 commitlog : Default :: default ( ) ,
5147 }
5248 }
@@ -134,8 +130,7 @@ impl<T: Encode + Send + Sync + 'static> Local<T> {
134130 durable_offset : durable_tx,
135131 queue_depth : queue_depth. clone ( ) ,
136132
137- sync_interval : opts. sync_interval ,
138- flush_each_tx : opts. flush_each_tx ,
133+ batch_size : opts. batch_size ,
139134
140135 lock,
141136 }
@@ -193,8 +188,7 @@ struct Actor<T> {
193188 durable_offset : watch:: Sender < Option < TxOffset > > ,
194189 queue_depth : Arc < AtomicU64 > ,
195190
196- sync_interval : Duration ,
197- flush_each_tx : bool ,
191+ batch_size : NonZeroUsize ,
198192
199193 #[ allow( unused) ]
200194 lock : Lock ,
@@ -209,8 +203,7 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
209203 ) {
210204 info ! ( "starting durability actor" ) ;
211205
212- let mut sync_interval = interval ( self . sync_interval ) ;
213- sync_interval. set_missed_tick_behavior ( MissedTickBehavior :: Delay ) ;
206+ let mut tx_buf = Vec :: with_capacity ( self . batch_size . get ( ) ) ;
214207 // `flush_and_sync` when the loop exits without panicking,
215208 // or `flush_and_sync` inside the loop failed.
216209 let mut sync_on_exit = true ;
@@ -220,42 +213,32 @@ impl<T: Encode + Send + Sync + 'static> Actor<T> {
220213 // Biased towards the shutdown channel,
221214 // so that we stop accepting new data promptly after
222215 // `Durability::close` was called.
223- //
224- // Note that periodic `flush_and_sync` needs to be polled before
225- // the txdata channel, so that we don't delay `fsync(2)` under
226- // high transaction throughput.
227216 biased;
228217
229218 Some ( reply) = shutdown_rx. recv( ) => {
230219 transactions_rx. close( ) ;
231220 let _ = reply. send( self . lock. notified( ) ) ;
232221 } ,
233222
234- _ = sync_interval. tick( ) => {
235- if self . flush_and_sync( ) . await . is_err( ) {
236- sync_on_exit = false ;
223+ n = transactions_rx. recv_many( & mut tx_buf, self . batch_size. get( ) ) => {
224+ if n == 0 {
237225 break ;
238226 }
239- } ,
240-
241- tx = transactions_rx. recv( ) => {
242- let Some ( tx) = tx else {
243- break ;
244- } ;
245- self . queue_depth. fetch_sub( 1 , Relaxed ) ;
227+ self . queue_depth. fetch_sub( n as u64 , Relaxed ) ;
246228 let clog = self . clog. clone( ) ;
247- let flush = self . flush_each_tx;
248- spawn_blocking( move || -> io:: Result <( ) > {
249- clog. commit( [ tx] ) ?;
250- if flush {
251- clog. flush( ) ?;
229+ tx_buf = spawn_blocking( move || -> io:: Result <Vec <Transaction <Txdata <T >>>> {
230+ for tx in tx_buf. drain( ..) {
231+ clog. commit( [ tx] ) ?;
252232 }
253-
254- Ok ( ( ) )
233+ Ok ( tx_buf)
255234 } )
256235 . await
257236 . expect( "commitlog write panicked" )
258237 . expect( "commitlog write failed" ) ;
238+ if self . flush_and_sync( ) . await . is_err( ) {
239+ sync_on_exit = false ;
240+ break ;
241+ }
259242 } ,
260243 }
261244 }
0 commit comments