Skip to content

Commit 0be66e3

Browse files
Shubham8287bfops
andauthored
Deterministic runtime crate (#5016)
# Description of Changes. Introduces deterministic runtime crate. Integrate it with RelationalDB. I think best steps to review: - Read the [README](https://github.com/clockworklabs/SpacetimeDB/blob/shub/sim/crates/runtime/README.md) of runtime crate. - Look at the integration with existing crates - `durability`, `core`, `snapshot`, etc. - Read runtime crate's code. Draft branch to Test code - #5019 # API and ABI breaking changes NA # Expected complexity level and risk Does not intend to change any production functionality, but it's big code. # Testing - new crate contains unit and integration tests. - Existing tests should work for production. --------- Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: Zeke Foppa <196249+bfops@users.noreply.github.com>
1 parent 10ebb2b commit 0be66e3

27 files changed

Lines changed: 2836 additions & 87 deletions

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ members = [
2525
"crates/physical-plan",
2626
"crates/primitives",
2727
"crates/query",
28+
"crates/runtime",
2829
"crates/sats",
2930
"crates/schema",
3031
"crates/smoketests",
@@ -149,6 +150,7 @@ spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.4.1" }
149150
spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.4.1" }
150151
spacetimedb-subscription = { path = "crates/subscription", version = "=2.4.1" }
151152
spacetimedb-query-builder = { path = "crates/query-builder", version = "=2.4.1" }
153+
spacetimedb-runtime = { path = "crates/runtime", version = "=2.4.1" }
152154

153155
# Prevent `ahash` from pulling in `getrandom` by disabling default features.
154156
# Modules use `getrandom02` and we need to prevent an incompatible version

crates/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ spacetimedb-primitives.workspace = true
2828
spacetimedb-paths.workspace = true
2929
spacetimedb-physical-plan.workspace = true
3030
spacetimedb-query.workspace = true
31+
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
3132
spacetimedb-sats = { workspace = true, features = ["serde"] }
3233
spacetimedb-schema.workspace = true
3334
spacetimedb-table.workspace = true

crates/core/src/db/durability.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
99
use spacetimedb_durability::Transaction;
1010
use spacetimedb_lib::Identity;
1111
use spacetimedb_sats::ProductValue;
12-
use tokio::{runtime, time::timeout};
1312

1413
use crate::db::persistence::Durability;
14+
use spacetimedb_runtime::Handle;
1515

1616
pub(super) fn request_durability(
1717
durability: &Durability,
@@ -32,11 +32,11 @@ pub(super) fn request_durability(
3232
}));
3333
}
3434

35-
pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &runtime::Handle, database_identity: Identity) {
36-
let rt = runtime.clone();
37-
rt.spawn(async move {
38-
let label = format!("[{database_identity}]");
39-
match timeout(Duration::from_secs(10), durability.close()).await {
35+
pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &Handle, database_identity: Identity) {
36+
let label = format!("[{database_identity}]");
37+
let runtime = runtime.clone();
38+
runtime.clone().spawn(async move {
39+
match runtime.timeout(Duration::from_secs(10), durability.close()).await {
4040
Err(_elapsed) => {
4141
error!("{label} timeout waiting for durability shutdown");
4242
}

crates/core/src/db/persistence.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use spacetimedb_paths::server::ServerDataDir;
1111
use spacetimedb_snapshot::DynSnapshotRepo;
1212

1313
use crate::{messages::control_db::Database, util::asyncify};
14+
use spacetimedb_runtime::Handle;
1415

1516
use super::{
1617
relational_db::{self, Txdata},
@@ -98,8 +99,8 @@ pub struct Persistence {
9899
/// persistent (as opposed to in-memory) databases. This is enforced by
99100
/// this type.
100101
pub snapshots: Option<SnapshotWorker>,
101-
/// The tokio runtime onto which durability-related tasks shall be spawned.
102-
pub runtime: tokio::runtime::Handle,
102+
/// Runtime onto which durability-related tasks shall be spawned.
103+
pub runtime: Handle,
103104
}
104105

105106
impl Persistence {
@@ -109,6 +110,15 @@ impl Persistence {
109110
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
110111
snapshots: Option<SnapshotWorker>,
111112
runtime: tokio::runtime::Handle,
113+
) -> Self {
114+
Self::new_with_runtime(durability, disk_size, snapshots, Handle::tokio(runtime))
115+
}
116+
117+
pub fn new_with_runtime(
118+
durability: impl spacetimedb_durability::Durability<TxData = Txdata> + 'static,
119+
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
120+
snapshots: Option<SnapshotWorker>,
121+
runtime: Handle,
112122
) -> Self {
113123
Self {
114124
durability: Arc::new(durability),
@@ -148,7 +158,7 @@ impl Persistence {
148158
Option<Arc<Durability>>,
149159
Option<DiskSizeFn>,
150160
Option<SnapshotWorker>,
151-
Option<tokio::runtime::Handle>,
161+
Option<Handle>,
152162
) {
153163
this.map(
154164
|Self {
@@ -207,14 +217,16 @@ impl PersistenceProvider for LocalPersistenceProvider {
207217
async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence> {
208218
let replica_dir = self.data_dir.replica(replica_id);
209219
let snapshot_dir = replica_dir.snapshots();
220+
let runtime = Handle::tokio_current();
210221

211222
let database_identity = database.database_identity;
212223
let snapshot_worker =
213224
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
214225
.await
215-
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
226+
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?;
216227
let (durability, disk_size) = relational_db::local_durability_with_options(
217228
replica_dir,
229+
runtime.clone(),
218230
Some(&snapshot_worker),
219231
self.durability.into_options(),
220232
)
@@ -231,7 +243,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
231243
durability,
232244
disk_size,
233245
snapshots: Some(snapshot_worker),
234-
runtime: tokio::runtime::Handle::current(),
246+
runtime,
235247
})
236248
}
237249
}

crates/core/src/db/relational_db.rs

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use spacetimedb_lib::ConnectionId;
4343
use spacetimedb_lib::Identity;
4444
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
4545
use spacetimedb_primitives::*;
46+
use spacetimedb_runtime::Handle;
4647
use spacetimedb_sats::memory_usage::MemoryUsage;
4748
use spacetimedb_sats::raw_identifier::RawIdentifier;
4849
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
@@ -100,7 +101,7 @@ pub struct RelationalDB {
100101

101102
inner: Locking,
102103
durability: Option<Arc<Durability>>,
103-
durability_runtime: Option<tokio::runtime::Handle>,
104+
durability_runtime: Option<Handle>,
104105
snapshot_worker: Option<SnapshotWorker>,
105106

106107
row_count_fn: RowCountFn,
@@ -1688,9 +1689,10 @@ pub type LocalDurability = Arc<durability::Local<ProductValue>>;
16881689
/// of the commitlog.
16891690
pub async fn local_durability(
16901691
replica_dir: ReplicaDir,
1692+
runtime: Handle,
16911693
snapshot_worker: Option<&SnapshotWorker>,
16921694
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
1693-
local_durability_with_options(replica_dir, snapshot_worker, <_>::default()).await
1695+
local_durability_with_options(replica_dir, runtime, snapshot_worker, <_>::default()).await
16941696
}
16951697

16961698
/// Initialize local durability with explicit parameters.
@@ -1701,10 +1703,10 @@ pub async fn local_durability(
17011703
/// of the commitlog.
17021704
pub async fn local_durability_with_options(
17031705
replica_dir: ReplicaDir,
1706+
runtime: Handle,
17041707
snapshot_worker: Option<&SnapshotWorker>,
17051708
opts: durability::local::Options,
17061709
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
1707-
let rt = tokio::runtime::Handle::current();
17081710
let on_new_segment = snapshot_worker.map(|snapshot_worker| {
17091711
let snapshot_worker = snapshot_worker.clone();
17101712
Arc::new(move || {
@@ -1716,7 +1718,7 @@ pub async fn local_durability_with_options(
17161718
let local = asyncify(move || {
17171719
durability::Local::open(
17181720
replica_dir.clone(),
1719-
rt,
1721+
runtime,
17201722
opts,
17211723
// Give the durability a handle to request a new snapshot run,
17221724
// which it will send down whenever we rotate commitlog segments.
@@ -1982,19 +1984,22 @@ pub mod tests_utils {
19821984
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
19831985
let snapshots = want_snapshot_repo
19841986
.then(|| {
1985-
open_snapshot_repo(root.snapshots(), db_identity, replica_id)
1986-
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
1987+
open_snapshot_repo(root.snapshots(), db_identity, replica_id).map(|repo| {
1988+
SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone()))
1989+
})
19871990
})
19881991
.transpose()?;
19891992

1990-
let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
1993+
let runtime = Handle::tokio(rt.clone());
1994+
let (local, disk_size_fn) =
1995+
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
19911996
let history = local.as_history();
19921997

19931998
let persistence = Persistence {
19941999
durability: local.clone(),
19952000
disk_size: disk_size_fn,
19962001
snapshots,
1997-
runtime: rt,
2002+
runtime,
19982003
};
19992004

20002005
let (db, _) = RelationalDB::open(
@@ -2107,17 +2112,20 @@ pub mod tests_utils {
21072112
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
21082113
let snapshots = want_snapshot_repo
21092114
.then(|| {
2110-
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)
2111-
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
2115+
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0).map(|repo| {
2116+
SnapshotWorker::new(repo, snapshot::Compression::Disabled, Handle::tokio(rt.clone()))
2117+
})
21122118
})
21132119
.transpose()?;
2114-
let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
2120+
let runtime = Handle::tokio(rt.clone());
2121+
let (local, disk_size_fn) =
2122+
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
21152123
let history = local.as_history();
21162124
let persistence = Persistence {
21172125
durability: local.clone(),
21182126
disk_size: disk_size_fn,
21192127
snapshots,
2120-
runtime: rt,
2128+
runtime,
21212129
};
21222130
let db = Self::open_db(history, Some(persistence), None, 0)?;
21232131

0 commit comments

Comments
 (0)