-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathdurability.rs
More file actions
89 lines (79 loc) · 2.72 KB
/
durability.rs
File metadata and controls
89 lines (79 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::{sync::Arc, time::Duration};
use log::{error, info};
use spacetimedb_commitlog::payload::{
txdata::{Mutations, Ops},
Txdata,
};
use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
use spacetimedb_durability::Transaction;
use spacetimedb_lib::Identity;
use spacetimedb_sats::ProductValue;
use crate::db::persistence::Durability;
use spacetimedb_runtime::Handle;
pub(super) fn request_durability(
durability: &Durability,
reducer_context: Option<ReducerContext>,
tx_data: &Arc<TxData>,
) {
let Some(tx_offset) = tx_data.tx_offset() else {
let name = reducer_context.as_ref().map(|rcx| &rcx.name);
debug_assert!(
!tx_data.has_rows_or_connect_disconnect(name),
"tx_data has no rows but has connect/disconnect: `{name:?}`"
);
return;
};
let tx_data = tx_data.clone();
durability.append_tx(Box::new(move || {
prepare_tx_data_for_durability(tx_offset, reducer_context, &tx_data)
}));
}
pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &Handle, database_identity: Identity) {
let label = format!("[{database_identity}]");
let runtime = runtime.clone();
runtime.clone().spawn(async move {
match runtime.timeout(Duration::from_secs(10), durability.close()).await {
Err(_elapsed) => {
error!("{label} timeout waiting for durability shutdown");
}
Ok(offset) => {
info!("{label} durability shut down at tx offset: {offset:?}");
}
}
});
}
fn prepare_tx_data_for_durability(
tx_offset: u64,
reducer_context: Option<ReducerContext>,
tx_data: &TxData,
) -> Transaction<Txdata<ProductValue>> {
let mut inserts: Box<_> = tx_data
.persistent_inserts()
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
.collect();
inserts.sort_unstable_by_key(|ops| ops.table_id);
let mut deletes: Box<_> = tx_data
.persistent_deletes()
.map(|(table_id, rowdata)| Ops { table_id, rowdata })
.collect();
deletes.sort_unstable_by_key(|ops| ops.table_id);
let mut truncates: Box<[_]> = tx_data.persistent_truncates().collect();
truncates.sort_unstable_by_key(|table_id| *table_id);
let inputs = reducer_context.map(|rcx| rcx.into());
debug_assert!(
!(inserts.is_empty() && truncates.is_empty() && deletes.is_empty() && inputs.is_none()),
"empty transaction"
);
Transaction {
offset: tx_offset,
txdata: Txdata {
inputs,
outputs: None,
mutations: Some(Mutations {
inserts,
deletes,
truncates,
}),
},
}
}