1+ #[ cfg( all( feature = "tokio" , feature = "simulation" ) ) ]
2+ compile_error ! (
3+ "spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`, not both"
4+ ) ;
5+
6+ #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
7+ compile_error ! ( "spacetimedb-runtime requires exactly one runtime backend: enable either `tokio` or `simulation`" ) ;
8+
19#[ cfg( feature = "simulation" ) ]
210extern crate alloc;
311
@@ -15,8 +23,39 @@ pub mod sim;
1523#[ cfg( feature = "simulation" ) ]
1624pub mod sim_std;
1725
18- #[ cfg( feature = "tokio" ) ]
1926pub type TokioHandle = tokio:: runtime:: Handle ;
27+ pub type TokioRuntime = tokio:: runtime:: Runtime ;
28+ pub type TokioRuntimeBuilder = tokio:: runtime:: Builder ;
29+
30+ // We intentionally expose a small subset of `tokio::sync` under the simulation
31+ // backend. Tokio's async synchronization primitives are runtime-agnostic: they
32+ // can be polled by this executor instead of a Tokio runtime.
33+ //
34+ // Runtime-agnostic does not translate to deterministic by itself. For
35+ // deterministic simulation, `Waker`s must be invoked by a task running on the
36+ // deterministic executor. For the exports below, that means sends, receives,
37+ // closes, drops of senders/receivers, and watch updates must be driven by
38+ // simulated tasks.
39+ //
40+ // Anything outside the simulated runtime that invokes a stored `Waker`
41+ // bypasses the deterministic executor. This includes Tokio timers,
42+ // OS/kernel readiness routed through another runtime, and blocking threads.
43+ //
44+ // Tokio documents `*_timeout` methods as non-runtime-agnostic because they
45+ // require Tokio's timer; in this subset, that includes
46+ // `mpsc::Sender::send_timeout`.
47+ //
48+ // Also avoid blocking methods. The blocking methods currently reachable from
49+ // this subset are `mpsc::Sender::blocking_send`,
50+ // `mpsc::Receiver::blocking_recv`, `mpsc::Receiver::blocking_recv_many`,
51+ // `mpsc::UnboundedReceiver::blocking_recv`, and
52+ // `mpsc::UnboundedReceiver::blocking_recv_many`. These block or park the
53+ // calling OS thread, which is outside the simulation runtime.
54+ pub mod sync {
55+ // TODO: Remove unbounded channels as resources should be bounded.
56+ pub use tokio:: sync:: mpsc;
57+ pub use tokio:: sync:: watch;
58+ }
2059
2160#[ derive( Clone ) ]
2261pub enum Handle {
@@ -74,15 +113,22 @@ enum JoinErrorInner {
74113 Simulation ( sim:: JoinError ) ,
75114}
76115
116+ #[ cfg( feature = "tokio" ) ]
117+ impl From < tokio:: task:: AbortHandle > for AbortHandle {
118+ fn from ( handle : tokio:: task:: AbortHandle ) -> Self {
119+ Self {
120+ inner : AbortHandleInner :: Tokio ( handle) ,
121+ }
122+ }
123+ }
124+
77125impl AbortHandle {
78126 pub fn abort ( & self ) {
79127 match & self . inner {
80128 #[ cfg( feature = "tokio" ) ]
81129 AbortHandleInner :: Tokio ( handle) => handle. abort ( ) ,
82130 #[ cfg( feature = "simulation" ) ]
83131 AbortHandleInner :: Simulation ( handle) => handle. abort ( ) ,
84- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
85- _ => unreachable ! ( "runtime abort handle has no enabled backend" ) ,
86132 }
87133 }
88134}
@@ -100,16 +146,10 @@ impl JoinErrorInner {
100146
101147impl fmt:: Display for JoinError {
102148 fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
103- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
104- let _ = f;
105- #[ cfg( any( feature = "tokio" , feature = "simulation" ) ) ]
106- return self . inner . fmt ( f) ;
107- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
108- unreachable ! ( "runtime join error has no enabled backend" )
149+ self . inner . fmt ( f)
109150 }
110151}
111152
112- #[ cfg( any( feature = "tokio" , feature = "simulation" ) ) ]
113153impl std:: error:: Error for JoinError { }
114154
115155impl < T > JoinHandleInner < T > {
@@ -160,8 +200,6 @@ impl<T> Future for JoinHandle<T> {
160200 type Output = Result < T , JoinError > ;
161201
162202 fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
163- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
164- let _ = cx;
165203 match self . inner . poll_result ( cx) {
166204 Poll :: Ready ( Ok ( output) ) => {
167205 self . inner = JoinHandleInner :: Detached ( PhantomData ) ;
@@ -197,17 +235,30 @@ impl fmt::Display for RuntimeTimeout {
197235 }
198236}
199237
200- #[ cfg( any( feature = "tokio" , feature = "simulation" ) ) ]
201238impl std:: error:: Error for RuntimeTimeout { }
202239
203- #[ cfg( feature = "tokio" ) ]
204240impl Handle {
205241 pub fn tokio ( handle : TokioHandle ) -> Self {
206- Self :: Tokio ( handle)
242+ #[ cfg( feature = "tokio" ) ]
243+ {
244+ Self :: Tokio ( handle)
245+ }
246+ #[ cfg( not( feature = "tokio" ) ) ]
247+ {
248+ let _ = handle;
249+ panic ! ( "spacetimedb-runtime tokio handle requested without the `tokio` backend enabled" )
250+ }
207251 }
208252
209253 pub fn tokio_current ( ) -> Self {
210- Self :: tokio ( TokioHandle :: current ( ) )
254+ #[ cfg( feature = "tokio" ) ]
255+ {
256+ Self :: tokio ( TokioHandle :: current ( ) )
257+ }
258+ #[ cfg( not( feature = "tokio" ) ) ]
259+ {
260+ panic ! ( "spacetimedb-runtime current tokio handle requested without the `tokio` backend enabled" )
261+ }
211262 }
212263}
213264
@@ -220,8 +271,6 @@ impl Handle {
220271
221272impl Handle {
222273 pub fn spawn < T : Send + ' static > ( & self , future : impl Future < Output = T > + Send + ' static ) -> JoinHandle < T > {
223- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
224- let _ = future;
225274 match self {
226275 #[ cfg( feature = "tokio" ) ]
227276 Self :: Tokio ( handle) => JoinHandle {
@@ -231,8 +280,6 @@ impl Handle {
231280 Self :: Simulation ( handle) => JoinHandle {
232281 inner : JoinHandleInner :: Simulation ( handle. spawn_on ( sim:: NodeId :: MAIN , future) ) ,
233282 } ,
234- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
235- _ => unreachable ! ( "runtime dispatch has no enabled backend" ) ,
236283 }
237284 }
238285
@@ -241,8 +288,6 @@ impl Handle {
241288 F : FnOnce ( ) -> R + Send + ' static ,
242289 R : Send + ' static ,
243290 {
244- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
245- let _ = & f;
246291 match self {
247292 #[ cfg( feature = "tokio" ) ]
248293 Self :: Tokio ( _) => tokio:: task:: spawn_blocking ( f)
@@ -261,8 +306,6 @@ impl Handle {
261306 . spawn_on ( sim:: NodeId :: MAIN , async move { f ( ) } )
262307 . await
263308 . expect ( "simulation spawn_blocking task should not be cancelled" ) ,
264- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
265- _ => unreachable ! ( "runtime dispatch has no enabled backend" ) ,
266309 }
267310 }
268311
@@ -271,17 +314,22 @@ impl Handle {
271314 timeout_after : Duration ,
272315 future : impl Future < Output = T > ,
273316 ) -> Result < T , RuntimeTimeout > {
274- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
275- let _ = ( timeout_after, future) ;
276317 match self {
277318 #[ cfg( feature = "tokio" ) ]
278319 Self :: Tokio ( _) => tokio:: time:: timeout ( timeout_after, future)
279320 . await
280321 . map_err ( |_| RuntimeTimeout ) ,
281322 #[ cfg( feature = "simulation" ) ]
282323 Self :: Simulation ( handle) => handle. timeout ( timeout_after, future) . await . map_err ( |_| RuntimeTimeout ) ,
283- #[ cfg( not( any( feature = "tokio" , feature = "simulation" ) ) ) ]
284- _ => unreachable ! ( "runtime dispatch has no enabled backend" ) ,
324+ }
325+ }
326+
327+ pub async fn sleep ( & self , duration : Duration ) {
328+ match self {
329+ #[ cfg( feature = "tokio" ) ]
330+ Self :: Tokio ( _) => tokio:: time:: sleep ( duration) . await ,
331+ #[ cfg( feature = "simulation" ) ]
332+ Self :: Simulation ( handle) => handle. sleep ( duration) . await ,
285333 }
286334 }
287335}
0 commit comments