1- use crate :: db:: durability:: DurabilityWorker ;
1+ use crate :: db:: durability:: { request_durability , spawn_close as spawn_durability_close } ;
22use crate :: db:: MetricsRecorderQueue ;
33use crate :: error:: { DBError , RestoreSnapshotError } ;
44use crate :: subscription:: ExecutionCounters ;
@@ -12,7 +12,7 @@ use spacetimedb_commitlog::{self as commitlog, Commitlog, SizeOnDisk};
1212use spacetimedb_data_structures:: map:: HashSet ;
1313use spacetimedb_datastore:: db_metrics:: DB_METRICS ;
1414use spacetimedb_datastore:: error:: { DatastoreError , TableError , ViewError } ;
15- use spacetimedb_datastore:: execution_context:: { Workload , WorkloadType } ;
15+ use spacetimedb_datastore:: execution_context:: { ReducerContext , Workload , WorkloadType } ;
1616use spacetimedb_datastore:: locking_tx_datastore:: datastore:: TxMetrics ;
1717use spacetimedb_datastore:: locking_tx_datastore:: state_view:: {
1818 IterByColEqMutTx , IterByColRangeMutTx , IterMutTx , StateView ,
@@ -98,7 +98,8 @@ pub struct RelationalDB {
9898 owner_identity : Identity ,
9999
100100 inner : Locking ,
101- durability : Option < DurabilityWorker > ,
101+ durability : Option < Arc < Durability > > ,
102+ durability_runtime : Option < tokio:: runtime:: Handle > ,
102103 snapshot_worker : Option < SnapshotWorker > ,
103104
104105 row_count_fn : RowCountFn ,
@@ -133,8 +134,8 @@ impl std::fmt::Debug for RelationalDB {
133134impl Drop for RelationalDB {
134135 fn drop ( & mut self ) {
135136 // Attempt to flush the outstanding transactions.
136- if let Some ( worker ) = self . durability . take ( ) {
137- worker . spawn_close ( self . database_identity ) ;
137+ if let ( Some ( durability ) , Some ( runtime ) ) = ( self . durability . take ( ) , self . durability_runtime . take ( ) ) {
138+ spawn_durability_close ( durability , & runtime , self . database_identity ) ;
138139 }
139140 }
140141}
@@ -150,18 +151,12 @@ impl RelationalDB {
150151 let workload_type_to_exec_counters =
151152 Arc :: new ( EnumMap :: from_fn ( |ty| ExecutionCounters :: new ( & ty, & database_identity) ) ) ;
152153
153- let ( durability, local_durability, disk_size_fn, snapshot_worker, rt) = Persistence :: unzip ( persistence) ;
154- let durability = match ( local_durability, durability, rt) {
155- ( Some ( local_durability) , _, Some ( rt) ) => {
156- Some ( DurabilityWorker :: new_local ( database_identity, local_durability, rt) )
157- }
158- ( None , Some ( durability) , Some ( rt) ) => Some ( DurabilityWorker :: new ( database_identity, durability, rt) ) ,
159- _ => None ,
160- } ;
154+ let ( durability, disk_size_fn, snapshot_worker, durability_runtime) = Persistence :: unzip ( persistence) ;
161155
162156 Self {
163157 inner,
164158 durability,
159+ durability_runtime,
165160 snapshot_worker,
166161
167162 database_identity,
@@ -811,9 +806,7 @@ impl RelationalDB {
811806 let reducer_context = tx. ctx . reducer_context ( ) . cloned ( ) ;
812807 // TODO: Never returns `None` -- should it?
813808 let Some ( ( tx_offset, tx_data, tx_metrics, reducer) ) = self . inner . commit_mut_tx_and_then ( tx, |tx_data| {
814- if let Some ( durability) = & self . durability {
815- durability. request_durability ( reducer_context, tx_data) ;
816- }
809+ self . request_durability ( reducer_context, tx_data) ;
817810 } ) ?
818811 else {
819812 return Ok ( None ) ;
@@ -830,9 +823,7 @@ impl RelationalDB {
830823
831824 let reducer_context = tx. ctx . reducer_context ( ) . cloned ( ) ;
832825 let ( tx_data, tx_metrics, tx) = self . inner . commit_mut_tx_downgrade_and_then ( tx, workload, |tx_data| {
833- if let Some ( durability) = & self . durability {
834- durability. request_durability ( reducer_context, tx_data) ;
835- }
826+ self . request_durability ( reducer_context, tx_data) ;
836827 } ) ;
837828
838829 self . maybe_do_snapshot ( & tx_data) ;
@@ -848,6 +839,12 @@ impl RelationalDB {
848839 . map ( |durability| durability. durable_tx_offset ( ) )
849840 }
850841
842+ fn request_durability ( & self , reducer_context : Option < ReducerContext > , tx_data : & Arc < TxData > ) {
843+ if let Some ( durability) = & self . durability {
844+ request_durability ( durability. as_ref ( ) , reducer_context, tx_data) ;
845+ }
846+ }
847+
851848 /// Decide based on the `committed_state.next_tx_offset`
852849 /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database.
853850 ///
@@ -1970,7 +1967,6 @@ pub mod tests_utils {
19701967
19711968 let persistence = Persistence {
19721969 durability : local. clone ( ) ,
1973- local_durability : Some ( local. clone ( ) ) ,
19741970 disk_size : disk_size_fn,
19751971 snapshots,
19761972 runtime : rt,
@@ -2092,7 +2088,6 @@ pub mod tests_utils {
20922088 let history = local. as_history ( ) ;
20932089 let persistence = Persistence {
20942090 durability : local. clone ( ) ,
2095- local_durability : Some ( local. clone ( ) ) ,
20962091 disk_size : disk_size_fn,
20972092 snapshots,
20982093 runtime : rt,
0 commit comments