Skip to content

Commit fe7d98b

Browse files
committed
move work of do_durabiliy into a DurabilityWorker
1 parent 907d67e commit fe7d98b

7 files changed

Lines changed: 177 additions & 93 deletions

File tree

crates/core/src/db/durability.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use crate::db::persistence::Durability;
2+
use futures::{channel::mpsc, StreamExt};
3+
use spacetimedb_commitlog::payload::{
4+
txdata::{Mutations, Ops},
5+
Txdata,
6+
};
7+
use spacetimedb_data_structures::map::IntSet;
8+
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
9+
use spacetimedb_durability::DurableOffset;
10+
use spacetimedb_primitives::TableId;
11+
use std::sync::Arc;
12+
13+
/// A request to persist a transaction.
14+
pub struct DurabilityRequest {
15+
reducer_context: Option<ReducerContext>,
16+
tx_data: Arc<TxData>,
17+
}
18+
19+
/// Represents a handle to a background task that persists transactions
20+
/// according to the [`Durability`] policy provided.
21+
///
22+
/// This exists to avoid doing some preparatory work
23+
/// before sending over to the `Durability` layer.
24+
#[derive(Clone)]
25+
pub struct DurabilityWorker {
26+
request_tx: mpsc::UnboundedSender<DurabilityRequest>,
27+
durability: Arc<Durability>,
28+
}
29+
30+
impl DurabilityWorker {
31+
/// Create a new [`DurabilityWorker`] using the given `durability` policy.
32+
pub fn new(durability: Arc<Durability>) -> Self {
33+
let (request_tx, request_rx) = mpsc::unbounded();
34+
35+
let actor = DurabilityWorkerActor {
36+
request_rx,
37+
durability: durability.clone(),
38+
};
39+
tokio::spawn(actor.run());
40+
41+
Self { request_tx, durability }
42+
}
43+
44+
/// Request that a transaction be made be made durable.
45+
/// That is, if `(tx_data, ctx)` should be appended to the commitlog, do so.
46+
///
47+
/// Note that by this stage,
48+
/// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`]
49+
/// has already decided based on the reducer and operations whether the transaction should be appended;
50+
/// this method is responsible only for reading its decision out of the `tx_data`
51+
/// and calling `durability.append_tx`.
52+
///
53+
/// This method does not block,
54+
/// and sends the work to an actor that collects data and calls `durability.append_tx`.
55+
///
56+
/// Panics if the durability worker has closed the receive end of its queue(s),
57+
/// which is likely due to it having panicked.
58+
pub fn request_durability(&self, reducer_context: Option<ReducerContext>, tx_data: &Arc<TxData>) {
59+
self.request_tx
60+
.unbounded_send(DurabilityRequest {
61+
reducer_context,
62+
tx_data: tx_data.clone(),
63+
})
64+
.expect("durability worker panicked");
65+
}
66+
67+
/// Get the [`DurableOffset`] of this database.
68+
pub fn durable_tx_offset(&self) -> DurableOffset {
69+
self.durability.durable_tx_offset()
70+
}
71+
}
72+
73+
pub struct DurabilityWorkerActor {
74+
request_rx: mpsc::UnboundedReceiver<DurabilityRequest>,
75+
durability: Arc<Durability>,
76+
}
77+
78+
impl DurabilityWorkerActor {
79+
/// Processes requests to do durability.
80+
async fn run(mut self) {
81+
while let Some(DurabilityRequest {
82+
reducer_context,
83+
tx_data,
84+
}) = self.request_rx.next().await
85+
{
86+
Self::do_durability(&*self.durability, reducer_context, &tx_data);
87+
}
88+
}
89+
90+
pub fn do_durability(durability: &Durability, reducer_context: Option<ReducerContext>, tx_data: &TxData) {
91+
if tx_data.tx_offset().is_none() {
92+
let name = reducer_context.as_ref().map(|rcx| &*rcx.name);
93+
debug_assert!(
94+
!tx_data.has_rows_or_connect_disconnect(name),
95+
"tx_data has no rows but has connect/disconnect: `{name:?}`"
96+
);
97+
return;
98+
}
99+
100+
let is_not_ephemeral_table = |table_id: &TableId| -> bool {
101+
tx_data
102+
.ephemeral_tables()
103+
.map(|etables| !etables.contains(table_id))
104+
.unwrap_or(true)
105+
};
106+
107+
let inserts: Box<_> = tx_data
108+
.inserts()
109+
// Skip ephemeral tables
110+
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
111+
.map(|(table_id, rowdata)| Ops {
112+
table_id: *table_id,
113+
rowdata: rowdata.clone(),
114+
})
115+
.collect();
116+
117+
let truncates: IntSet<TableId> = tx_data.truncates().collect();
118+
119+
let deletes: Box<_> = tx_data
120+
.deletes()
121+
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
122+
.map(|(table_id, rowdata)| Ops {
123+
table_id: *table_id,
124+
rowdata: rowdata.clone(),
125+
})
126+
// filter out deletes for tables that are truncated in the same transaction.
127+
.filter(|ops| !truncates.contains(&ops.table_id))
128+
.collect();
129+
130+
let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
131+
132+
let inputs = reducer_context.map(|rcx| rcx.into());
133+
134+
let txdata = Txdata {
135+
inputs,
136+
outputs: None,
137+
mutations: Some(Mutations {
138+
inserts,
139+
deletes,
140+
truncates,
141+
}),
142+
};
143+
144+
// TODO: Should measure queuing time + actual write
145+
// This does not block, as per trait docs.
146+
durability.append_tx(txdata);
147+
}
148+
}

crates/core/src/db/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::subscription::ExecutionCounters;
77
use spacetimedb_datastore::execution_context::WorkloadType;
88
use spacetimedb_datastore::{locking_tx_datastore::datastore::TxMetrics, traits::TxData};
99

10+
mod durability;
1011
pub mod persistence;
1112
pub mod relational_db;
1213
pub mod snapshot;

crates/core/src/db/relational_db.rs

Lines changed: 14 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::db::durability::DurabilityWorker;
12
use crate::db::MetricsRecorderQueue;
23
use crate::error::{DBError, DatabaseError, RestoreSnapshotError};
34
use crate::messages::control_db::HostType;
@@ -10,10 +11,9 @@ use fs2::FileExt;
1011
use log::info;
1112
use spacetimedb_commitlog::repo::OnNewSegmentFn;
1213
use spacetimedb_commitlog::{self as commitlog, SizeOnDisk};
13-
use spacetimedb_data_structures::map::IntSet;
1414
use spacetimedb_datastore::db_metrics::DB_METRICS;
1515
use spacetimedb_datastore::error::{DatastoreError, TableError, ViewError};
16-
use spacetimedb_datastore::execution_context::{ReducerContext, Workload, WorkloadType};
16+
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
1717
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
1818
use spacetimedb_datastore::locking_tx_datastore::state_view::{
1919
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView,
@@ -103,7 +103,7 @@ pub struct RelationalDB {
103103
owner_identity: Identity,
104104

105105
inner: Locking,
106-
durability: Option<Arc<Durability>>,
106+
durability: Option<DurabilityWorker>,
107107
snapshot_worker: Option<SnapshotWorker>,
108108

109109
row_count_fn: RowCountFn,
@@ -154,6 +154,7 @@ impl RelationalDB {
154154
Arc::new(EnumMap::from_fn(|ty| ExecutionCounters::new(&ty, &database_identity)));
155155

156156
let (durability, disk_size_fn, snapshot_worker) = Persistence::unzip(persistence);
157+
let durability = durability.map(DurabilityWorker::new);
157158

158159
Self {
159160
inner,
@@ -766,19 +767,21 @@ impl RelationalDB {
766767
}
767768

768769
#[tracing::instrument(level = "trace", skip_all)]
769-
pub fn commit_tx(&self, tx: MutTx) -> Result<Option<(TxOffset, TxData, TxMetrics, String)>, DBError> {
770+
#[allow(clippy::type_complexity)]
771+
pub fn commit_tx(&self, tx: MutTx) -> Result<Option<(TxOffset, Arc<TxData>, TxMetrics, String)>, DBError> {
770772
log::trace!("COMMIT MUT TX");
771773

772-
// TODO: Never returns `None` -- should it?
773774
let reducer_context = tx.ctx.reducer_context().cloned();
775+
// TODO: Never returns `None` -- should it?
774776
let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else {
775777
return Ok(None);
776778
};
777779

778780
self.maybe_do_snapshot(&tx_data);
779781

782+
let tx_data = Arc::new(tx_data);
780783
if let Some(durability) = &self.durability {
781-
Self::do_durability(&**durability, reducer_context.as_ref(), &tx_data)
784+
durability.request_durability(reducer_context, &tx_data);
782785
}
783786

784787
Ok(Some((tx_offset, tx_data, tx_metrics, reducer)))
@@ -789,7 +792,7 @@ impl RelationalDB {
789792
&self,
790793
tx: MutTx,
791794
workload: Workload,
792-
) -> Result<Option<(TxData, TxMetrics, Tx)>, DBError> {
795+
) -> Result<Option<(Arc<TxData>, TxMetrics, Tx)>, DBError> {
793796
log::trace!("COMMIT MUT TX");
794797

795798
let Some((tx_data, tx_metrics, tx)) = self.inner.commit_mut_tx_downgrade(tx, workload)? else {
@@ -798,82 +801,14 @@ impl RelationalDB {
798801

799802
self.maybe_do_snapshot(&tx_data);
800803

804+
let tx_data = Arc::new(tx_data);
801805
if let Some(durability) = &self.durability {
802-
Self::do_durability(&**durability, tx.ctx.reducer_context(), &tx_data)
806+
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
803807
}
804808

805809
Ok(Some((tx_data, tx_metrics, tx)))
806810
}
807811

808-
/// If `(tx_data, ctx)` should be appended to the commitlog, do so.
809-
///
810-
/// Note that by this stage,
811-
/// [`spacetimedb_datastore::locking_tx_datastore::committed_state::tx_consumes_offset`]
812-
/// has already decided based on the reducer and operations whether the transaction should be appended;
813-
/// this method is responsible only for reading its decision out of the `tx_data`
814-
/// and calling `durability.append_tx`.
815-
fn do_durability(durability: &Durability, reducer_context: Option<&ReducerContext>, tx_data: &TxData) {
816-
use commitlog::payload::{
817-
txdata::{Mutations, Ops},
818-
Txdata,
819-
};
820-
821-
let is_not_ephemeral_table = |table_id: &TableId| -> bool {
822-
tx_data
823-
.ephemeral_tables()
824-
.map(|etables| !etables.contains(table_id))
825-
.unwrap_or(true)
826-
};
827-
828-
if tx_data.tx_offset().is_some() {
829-
let inserts: Box<_> = tx_data
830-
.inserts()
831-
// Skip ephemeral tables
832-
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
833-
.map(|(table_id, rowdata)| Ops {
834-
table_id: *table_id,
835-
rowdata: rowdata.clone(),
836-
})
837-
.collect();
838-
839-
let truncates: IntSet<TableId> = tx_data.truncates().collect();
840-
841-
let deletes: Box<_> = tx_data
842-
.deletes()
843-
.filter(|(table_id, _)| is_not_ephemeral_table(table_id))
844-
.map(|(table_id, rowdata)| Ops {
845-
table_id: *table_id,
846-
rowdata: rowdata.clone(),
847-
})
848-
// filter out deletes for tables that are truncated in the same transaction.
849-
.filter(|ops| !truncates.contains(&ops.table_id))
850-
.collect();
851-
852-
let truncates = truncates.into_iter().filter(is_not_ephemeral_table).collect();
853-
854-
let inputs = reducer_context.map(|rcx| rcx.into());
855-
856-
let txdata = Txdata {
857-
inputs,
858-
outputs: None,
859-
mutations: Some(Mutations {
860-
inserts,
861-
deletes,
862-
truncates,
863-
}),
864-
};
865-
866-
// TODO: Should measure queuing time + actual write
867-
durability.append_tx(txdata);
868-
} else {
869-
debug_assert!(
870-
!tx_data.has_rows_or_connect_disconnect(reducer_context),
871-
"tx_data has no rows but has connect/disconnect: `{:?}`",
872-
reducer_context.map(|rcx| &rcx.name),
873-
);
874-
}
875-
}
876-
877812
/// Get the [`DurableOffset`] of this database, or `None` if this is an
878813
/// in-memory instance.
879814
pub fn durable_tx_offset(&self) -> Option<DurableOffset> {
@@ -1511,8 +1446,8 @@ impl RelationalDB {
15111446
}
15121447

15131448
/// Reports the metrics for `reducer`, using counters provided by `db`.
1514-
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<TxData>) {
1515-
self.report_tx_metrics(reducer, tx_data.map(Arc::new), Some(metrics), None);
1449+
pub fn report_mut_tx_metrics(&self, reducer: String, metrics: TxMetrics, tx_data: Option<Arc<TxData>>) {
1450+
self.report_tx_metrics(reducer, tx_data, Some(metrics), None);
15161451
}
15171452

15181453
/// Reports subscription metrics for `reducer`, using counters provided by `db`.

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ impl ModuleSubscriptions {
10311031
return Ok(Err(WriteConflict));
10321032
};
10331033
*db_update = DatabaseUpdate::from_writes(&tx_data);
1034-
(read_tx, Arc::new(tx_data), tx_metrics)
1034+
(read_tx, tx_data, tx_metrics)
10351035
}
10361036
EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
10371037
// If the transaction failed, we need to rollback the mutable tx.
@@ -1198,7 +1198,7 @@ impl ModuleSubscriptions {
11981198
let _ = extra.send(tx_offset);
11991199
}
12001200
self.relational_db
1201-
.report_tx_metrics(reducer, Some(Arc::new(tx_data)), Some(tx_metrics_mut), None);
1201+
.report_tx_metrics(reducer, Some(tx_data), Some(tx_metrics_mut), None);
12021202
}
12031203
});
12041204
(guard, offset_rx)

crates/datastore/src/execution_context.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,17 @@ pub struct ReducerContext {
3939
pub arg_bsatn: Bytes,
4040
}
4141

42-
impl From<&ReducerContext> for txdata::Inputs {
42+
impl From<ReducerContext> for txdata::Inputs {
4343
fn from(
4444
ReducerContext {
4545
name,
4646
caller_identity,
4747
caller_connection_id,
4848
timestamp,
4949
arg_bsatn,
50-
}: &ReducerContext,
50+
}: ReducerContext,
5151
) -> Self {
52-
let reducer_name = Arc::new(Varchar::from_str_truncate(name));
52+
let reducer_name = Arc::new(Varchar::from_string_truncate(name));
5353
let cap = arg_bsatn.len()
5454
/* caller_identity */
5555
+ 32
@@ -58,10 +58,10 @@ impl From<&ReducerContext> for txdata::Inputs {
5858
/* timestamp */
5959
+ 8;
6060
let mut buf = Vec::with_capacity(cap);
61-
bsatn::to_writer(&mut buf, caller_identity).unwrap();
62-
bsatn::to_writer(&mut buf, caller_connection_id).unwrap();
63-
bsatn::to_writer(&mut buf, timestamp).unwrap();
64-
buf.extend_from_slice(arg_bsatn);
61+
bsatn::to_writer(&mut buf, &caller_identity).unwrap();
62+
bsatn::to_writer(&mut buf, &caller_connection_id).unwrap();
63+
bsatn::to_writer(&mut buf, &timestamp).unwrap();
64+
buf.extend_from_slice(&arg_bsatn);
6565

6666
txdata::Inputs {
6767
reducer_name,

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ impl CommittedState {
675675
// Note that this may change in the future: some analytics and/or
676676
// timetravel queries may benefit from seeing all inputs, even if
677677
// the database state did not change.
678-
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context())
678+
tx_data.has_rows_or_connect_disconnect(ctx.reducer_context().map(|rcx| &*rcx.name))
679679
}
680680

681681
pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option<Identity>) {

0 commit comments

Comments
 (0)