Skip to content

Commit a659e39

Browse files
committed
remove tokio as dependency
1 parent 4f36a1c commit a659e39

14 files changed

Lines changed: 127 additions & 94 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ spacetimedb-primitives.workspace = true
2929
spacetimedb-paths.workspace = true
3030
spacetimedb-physical-plan.workspace = true
3131
spacetimedb-query.workspace = true
32-
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
32+
spacetimedb-runtime.workspace = true
3333
spacetimedb-sats = { workspace = true, features = ["serde"] }
3434
spacetimedb-schema.workspace = true
3535
spacetimedb-table.workspace = true

crates/core/src/host/host_controller.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use spacetimedb_datastore::traits::Program;
3838
use spacetimedb_durability::{self as durability};
3939
use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp};
4040
use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir};
41+
use spacetimedb_runtime::AbortHandle;
4142
use spacetimedb_sats::hash::Hash;
4243
use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle};
4344
use spacetimedb_schema::def::{ModuleDef, RawModuleDefVersion};
@@ -47,7 +48,6 @@ use std::ops::Deref;
4748
use std::sync::Arc;
4849
use std::time::Duration;
4950
use tokio::sync::{watch, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock as AsyncRwLock};
50-
use tokio::task::AbortHandle;
5151
use tokio::time::error::Elapsed;
5252
use tokio::time::{interval_at, timeout, Instant};
5353

@@ -889,7 +889,8 @@ impl Host {
889889
..
890890
} = host_controller;
891891
let replica_dir = data_dir.replica(replica_id);
892-
let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder();
892+
let runtime = spacetimedb_runtime::Handle::tokio_current();
893+
let (tx_metrics_queue, tx_metrics_recorder_task) = spawn_tx_metrics_recorder(&runtime);
893894

894895
let (db, connected_clients) = match config.storage {
895896
db::Storage::Memory => RelationalDB::open(
@@ -902,7 +903,6 @@ impl Host {
902903
)?,
903904
db::Storage::Disk => {
904905
// Replay from the local state.
905-
let runtime = spacetimedb_runtime::Handle::tokio_current();
906906
let history = relational_db::local_history(&replica_dir, &runtime).await?;
907907
let persistence = persistence.persistence(database.database_identity, replica_id).await?;
908908
// Loading a database from persistent storage involves heavy
@@ -1085,8 +1085,9 @@ impl Host {
10851085
module_host.clear_all_clients().await?;
10861086

10871087
scheduler_starter.start(&module_host)?;
1088-
let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle();
1089-
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone());
1088+
let disk_metrics_recorder_task: spacetimedb_runtime::AbortHandle =
1089+
tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle().into();
1090+
let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db().clone(), &runtime);
10901091

10911092
let module = watch::Sender::new(module_host);
10921093

crates/datastore/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ rust-version.workspace = true
1010
spacetimedb-data-structures.workspace = true
1111
spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] }
1212
spacetimedb-commitlog.workspace = true
13-
spacetimedb-durability.workspace = true
13+
spacetimedb-durability = { path = "../durability", default-features = false }
1414
spacetimedb-metrics.workspace = true
1515
spacetimedb-primitives.workspace = true
1616
spacetimedb-paths.workspace = true
1717
spacetimedb-sats = { workspace = true, features = ["serde"] }
1818
spacetimedb-schema.workspace = true
1919
spacetimedb-table.workspace = true
20-
spacetimedb-snapshot.workspace = true
20+
spacetimedb-snapshot = { path = "../snapshot", default-features = false }
2121
spacetimedb-execution.workspace = true
2222

2323
anyhow = { workspace = true, features = ["backtrace"] }
@@ -39,7 +39,9 @@ thin-vec.workspace = true
3939
[features]
4040
# Print a warning when doing an unindexed `iter_by_col_range` on a large table.
4141
unindexed_iter_by_col_range_warn = []
42-
default = ["unindexed_iter_by_col_range_warn"]
42+
default = ["unindexed_iter_by_col_range_warn", "tokio"]
43+
tokio = ["spacetimedb-durability/tokio", "spacetimedb-snapshot/tokio"]
44+
simulation = ["spacetimedb-durability/simulation", "spacetimedb-snapshot/simulation"]
4345
# Enable test helpers and utils
4446
test = ["spacetimedb-commitlog/test", "spacetimedb-schema/test"]
4547

crates/durability/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ license-file = "LICENSE"
88
description = "Traits and single-node implementation of durability for SpacetimeDB."
99

1010
[features]
11+
default = ["tokio"]
12+
tokio = ["spacetimedb-runtime/tokio"]
13+
simulation = ["spacetimedb-runtime/simulation"]
1114
test = []
1215
fallocate = ["spacetimedb-commitlog/fallocate"]
1316

@@ -21,7 +24,7 @@ scopeguard.workspace = true
2124
spacetimedb-commitlog.workspace = true
2225
spacetimedb-fs-utils.workspace = true
2326
spacetimedb-paths.workspace = true
24-
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
27+
spacetimedb-runtime = { path = "../runtime", default-features = false }
2528
spacetimedb-sats.workspace = true
2629
thiserror.workspace = true
2730
tokio.workspace = true

crates/durability/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
#[cfg(all(feature = "tokio", feature = "simulation"))]
2+
compile_error!("spacetimedb-durability requires exactly one runtime backend: enable either `tokio` or `simulation`, not both");
3+
4+
#[cfg(not(any(feature = "tokio", feature = "simulation")))]
5+
compile_error!("spacetimedb-durability requires exactly one runtime backend: enable either `tokio` or `simulation`");
6+
17
use std::{iter, marker::PhantomData, sync::Arc};
28

39
use futures::future::BoxFuture;

crates/engine/Cargo.toml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ rust-version.workspace = true
1010
workspace = true
1111

1212
[features]
13+
default = ["tokio"]
14+
tokio = ["spacetimedb-runtime/tokio", "spacetimedb-datastore/tokio", "spacetimedb-durability/tokio", "spacetimedb-snapshot/tokio"]
15+
simulation = ["spacetimedb-runtime/simulation", "spacetimedb-datastore/simulation", "spacetimedb-durability/simulation", "spacetimedb-snapshot/simulation"]
1316
test = ["spacetimedb-commitlog/test", "spacetimedb-datastore/test"]
1417

1518
[dependencies]
@@ -27,28 +30,27 @@ serde.workspace = true
2730
sled.workspace = true
2831
spacetimedb-commitlog.workspace = true
2932
spacetimedb-data-structures.workspace = true
30-
spacetimedb-datastore.workspace = true
31-
spacetimedb-durability.workspace = true
33+
spacetimedb-datastore = { path = "../datastore", default-features = false }
34+
spacetimedb-durability = { path = "../durability", default-features = false }
3235
spacetimedb-expr.workspace = true
3336
spacetimedb-fs-utils.workspace = true
3437
spacetimedb-lib = { workspace = true, features = ["serde", "metrics_impls"] }
3538
spacetimedb-metrics.workspace = true
3639
spacetimedb-paths.workspace = true
3740
spacetimedb-primitives.workspace = true
38-
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
41+
spacetimedb-runtime = { path = "../runtime", default-features = false }
3942
spacetimedb-sats = { workspace = true, features = ["serde"] }
4043
spacetimedb-schema.workspace = true
41-
spacetimedb-snapshot.workspace = true
44+
spacetimedb-snapshot = { path = "../snapshot", default-features = false }
4245
spacetimedb-table.workspace = true
4346
sqlparser.workspace = true
4447
tempfile.workspace = true
4548
thiserror.workspace = true
46-
tokio.workspace = true
47-
tokio-stream = { workspace = true, features = ["sync"] }
4849
tracing.workspace = true
4950

5051
[dev-dependencies]
5152
bytes.workspace = true
5253
env_logger.workspace = true
5354
fs_extra.workspace = true
5455
pretty_assertions.workspace = true
56+
tokio.workspace = true

crates/engine/src/db/mod.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ use std::sync::Arc;
22

33
use enum_map::EnumMap;
44
use spacetimedb_schema::reducer_name::ReducerName;
5-
use tokio::sync::mpsc;
6-
use tokio::time::MissedTickBehavior;
75

86
use crate::metrics::ExecutionCounters;
97
use spacetimedb_datastore::execution_context::WorkloadType;
@@ -56,7 +54,7 @@ pub struct MetricsMessage {
5654
/// The handle used to send work to the tx metrics recorder.
5755
#[derive(Clone)]
5856
pub struct MetricsRecorderQueue {
59-
tx: mpsc::UnboundedSender<MetricsMessage>,
57+
tx: spacetimedb_runtime::sync::mpsc::UnboundedSender<MetricsMessage>,
6058
}
6159

6260
impl MetricsRecorderQueue {
@@ -127,19 +125,20 @@ const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::
127125

128126
/// Spawns a task for recording transaction metrics.
129127
/// Returns the handle for pushing metrics to the recorder.
130-
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
131-
let (tx, mut rx) = mpsc::unbounded_channel();
132-
let abort_handle = tokio::spawn(async move {
133-
let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL);
134-
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
135-
136-
loop {
137-
interval.tick().await;
138-
while let Ok(metrics) = rx.try_recv() {
139-
record_metrics(metrics);
128+
pub fn spawn_tx_metrics_recorder(
129+
handle: &spacetimedb_runtime::Handle,
130+
) -> (MetricsRecorderQueue, spacetimedb_runtime::AbortHandle) {
131+
let handle_clone = handle.clone();
132+
let (tx, mut rx) = spacetimedb_runtime::sync::mpsc::unbounded_channel();
133+
let abort_handle = handle
134+
.spawn(async move {
135+
loop {
136+
handle_clone.sleep(TX_METRICS_RECORDING_INTERVAL).await;
137+
while let Ok(metrics) = rx.try_recv() {
138+
record_metrics(metrics);
139+
}
140140
}
141-
}
142-
})
143-
.abort_handle();
141+
})
142+
.abort_handle();
144143
(MetricsRecorderQueue { tx }, abort_handle)
145144
}

crates/engine/src/db/relational_db.rs

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use spacetimedb_lib::ConnectionId;
4343
use spacetimedb_lib::Identity;
4444
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
4545
use spacetimedb_primitives::*;
46+
use spacetimedb_runtime::sync::watch;
4647
use spacetimedb_runtime::Handle;
4748
use spacetimedb_sats::memory_usage::MemoryUsage;
4849
use spacetimedb_sats::raw_identifier::RawIdentifier;
@@ -62,7 +63,6 @@ use std::borrow::Cow;
6263
use std::io;
6364
use std::ops::RangeBounds;
6465
use std::sync::Arc;
65-
use tokio::sync::watch;
6666

6767
pub use super::persistence::{DiskSizeFn, Durability, Persistence};
6868
pub use super::snapshot::SnapshotWorker;
@@ -1079,7 +1079,10 @@ const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10
10791079
const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10);
10801080

10811081
/// Spawn a background task that periodically cleans up expired views
1082-
pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandle {
1082+
pub fn spawn_view_cleanup_loop(
1083+
db: Arc<RelationalDB>,
1084+
handle: &spacetimedb_runtime::Handle,
1085+
) -> spacetimedb_runtime::AbortHandle {
10831086
fn run_view_cleanup(db: &RelationalDB) {
10841087
match db.with_auto_commit(Workload::Internal, |tx| {
10851088
tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET)
@@ -1106,23 +1109,19 @@ pub fn spawn_view_cleanup_loop(db: Arc<RelationalDB>) -> tokio::task::AbortHandl
11061109
}
11071110
}
11081111

1109-
tokio::spawn(async move {
1110-
loop {
1111-
// Offload actual cleanup to blocking thread pool, as `VIEW_CLEANUP_BUDGET` is defined
1112-
// in milliseconds, which may be too long for async tasks.
1113-
let db = db.clone();
1114-
let db_identity = db.database_identity();
1115-
tokio::task::spawn_blocking(move || run_view_cleanup(&db))
1116-
.await
1117-
.inspect_err(|e| {
1118-
log::error!("[{}] DATABASE: failed to run view cleanup task: {}", db_identity, e);
1119-
})
1120-
.ok();
1112+
let handle_clone = handle.clone();
1113+
handle
1114+
.spawn(async move {
1115+
loop {
1116+
// Offload actual cleanup to blocking thread pool, as `VIEW_CLEANUP_BUDGET` is defined
1117+
// in milliseconds, which may be too long for async tasks.
1118+
let db = db.clone();
1119+
handle_clone.spawn_blocking(move || run_view_cleanup(&db)).await;
11211120

1122-
tokio::time::sleep(VIEWS_EXPIRATION).await;
1123-
}
1124-
})
1125-
.abort_handle()
1121+
handle_clone.sleep(VIEWS_EXPIRATION).await;
1122+
}
1123+
})
1124+
.abort_handle()
11261125
}
11271126
impl RelationalDB {
11281127
pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result<TableId, DBError> {
@@ -1753,8 +1752,8 @@ pub async fn local_history(
17531752
/// Suitable **only** for non-replicated databases.
17541753
pub async fn snapshot_watching_commitlog_compressor(
17551754
mut snapshot_rx: watch::Receiver<u64>,
1756-
mut clog_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1757-
mut snap_tx: Option<tokio::sync::mpsc::Sender<u64>>,
1755+
mut clog_tx: Option<spacetimedb_runtime::sync::mpsc::Sender<u64>>,
1756+
mut snap_tx: Option<spacetimedb_runtime::sync::mpsc::Sender<u64>>,
17581757
durability: LocalDurability,
17591758
runtime: Handle,
17601759
) {

crates/engine/src/db/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use prometheus::{Histogram, IntGauge};
1414
use spacetimedb_datastore::locking_tx_datastore::{committed_state::CommittedState, datastore::Locking};
1515
use spacetimedb_durability::TxOffset;
1616
use spacetimedb_lib::Identity;
17+
use spacetimedb_runtime::sync::watch;
1718
use spacetimedb_snapshot::{CompressionStats, DynSnapshotRepo};
18-
use tokio::sync::watch;
1919

2020
use crate::metrics::ENGINE_METRICS;
2121
use spacetimedb_runtime::Handle;

0 commit comments

Comments
 (0)