Skip to content

Commit 4c8ce71

Browse files
authored
Add some tests of procedure concurrency (#4955)
# Description of Changes Add several new tests of concurrency behavior re: procedures. The new tests are in the SDK tests, 'cause I thought the easiest way to observe this behavior was by connecting a client to a database and calling some functions in it. This is yet another mild misuse of the SDK test suite, as the behavior in question is not in the SDK, it's in the host. The tests also have a new module/client pair added, as we don't (yet?) expose `procedure_sleep_until` to any module languages other than Rust, so we can't implement the same test in any other languages. ### `procedure_reducer_interleaving` Verifies that a procedure and a reducer can run concurrently, with the procedure cooperatively yielding using `ctx.sleep_until`. Uses two separate connections due to #4954 . ### `procedure_reducer_same_client_interleaved` Same as previous, but with only a single connection. Now that #4954 is closed, this has the same semantics as previous. ### `procedure_concurrent_with_scheduled_reducer` Verifies that a non-scheduled procedure can schedule a reducer and then sleep, and the reducer will execute before the procedure wakes back up. ### `scheduled_procedure_scheduled_reducer_not_interleaved` Schedules a procedure and a reducer, which you might expect to execute concurrently, but don't in a way similar to `procedure_reducer_same_client_not_interleaved`. This is the behavior that kicked off this whole thing. The scheduler subsystem behaves like a single client, in the sense that it waits for a single function to terminate before scheduling the next function. # API and ABI breaking changes N/a # Expected complexity level and risk 1 - tests # Testing - [x] Ran the tests! I didn't bother intentionally breaking the host to verify that the tests would fail. It feels pretty apparent to me based on just the test code that we won't see false negatives.
1 parent 29b21cc commit 4c8ce71

25 files changed

Lines changed: 2669 additions & 1 deletion

Cargo.lock

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ members = [
4848
"modules/sdk-test",
4949
"modules/sdk-test-connect-disconnect",
5050
"modules/sdk-test-procedure",
51+
"modules/sdk-test-procedure-concurrency",
5152
"modules/sdk-test-view",
5253
"modules/sdk-test-case-conversion",
5354
"modules/sdk-test-view-pk",
@@ -57,6 +58,7 @@ members = [
5758
"sdks/rust/tests/test-counter",
5859
"sdks/rust/tests/connect_disconnect_client",
5960
"sdks/rust/tests/procedure-client",
61+
"sdks/rust/tests/procedure-concurrency-client",
6062
"sdks/rust/tests/view-client",
6163
"sdks/rust/tests/case-conversion-client",
6264
"sdks/rust/tests/view-pk-client",
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[package]
2+
name = "sdk-test-procedure-concurrency-module"
3+
version = "0.1.0"
4+
edition.workspace = true
5+
license-file = "LICENSE"
6+
7+
[lib]
8+
crate-type = ["cdylib"]
9+
10+
[dependencies]
11+
log.workspace = true
12+
anyhow.workspace = true
13+
paste.workspace = true
14+
15+
[dependencies.spacetimedb]
16+
workspace = true
17+
features = ["unstable"]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# `sdk-test-procedure-concurrency` *Rust* test
2+
3+
This module isolates procedure concurrency behavior that currently only has
4+
Rust module coverage.
5+
6+
It is separate from [`sdk-test-procedure`](../sdk-test-procedure) so the shared
7+
procedure test suite can continue targeting other module languages without also
8+
requiring `ctx.sleep_until` support.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use spacetimedb::{
2+
procedure, reducer, table, DbContext, ProcedureContext, ReducerContext, ScheduleAt, Table, TxContext,
3+
};
4+
use std::time::Duration;
5+
6+
#[table(public, accessor = procedure_concurrency_row)]
7+
struct ProcedureConcurrencyRow {
8+
#[auto_inc]
9+
insertion_order: u32,
10+
insertion_context: String,
11+
}
12+
13+
fn insert_procedure_concurrency_row(ctx: &TxContext, insertion_context: &str) {
14+
ctx.db.procedure_concurrency_row().insert(ProcedureConcurrencyRow {
15+
insertion_order: 0,
16+
insertion_context: insertion_context.into(),
17+
});
18+
}
19+
20+
#[reducer]
21+
fn insert_reducer_row(ctx: &ReducerContext) {
22+
ctx.db().procedure_concurrency_row().insert(ProcedureConcurrencyRow {
23+
insertion_order: 0,
24+
insertion_context: "reducer".into(),
25+
});
26+
}
27+
28+
#[derive(Copy, Clone, Debug)]
29+
struct PollOptions {
30+
timeout: Duration,
31+
poll_interval: Duration,
32+
}
33+
34+
impl Default for PollOptions {
35+
fn default() -> Self {
36+
Self {
37+
timeout: Duration::from_secs(10),
38+
poll_interval: Duration::from_millis(100),
39+
}
40+
}
41+
}
42+
43+
fn poll_until_tx_true(ctx: &mut ProcedureContext, pred: impl Fn(&TxContext) -> bool, options: PollOptions) {
44+
let deadline = ctx.timestamp + options.timeout;
45+
log::info!("poll_until_tx_true: will give up at {deadline}");
46+
while ctx.timestamp < deadline {
47+
let try_again = ctx.timestamp + options.poll_interval;
48+
log::info!("poll_until_tx_true: sleeping until {try_again}");
49+
ctx.sleep_until(try_again);
50+
if ctx.with_tx(&pred) {
51+
log::info!("poll_until_tx_true: succeeded, returning now");
52+
return;
53+
}
54+
log::info!("poll_until_tx_true: false");
55+
}
56+
panic!("poll_until_tx_true: exceeded timeout {:?}", options.timeout)
57+
}
58+
59+
#[procedure]
60+
fn procedure_sleep_between_inserts(ctx: &mut ProcedureContext) {
61+
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_before"));
62+
poll_until_tx_true(
63+
ctx,
64+
|tx| {
65+
tx.db
66+
.procedure_concurrency_row()
67+
.iter()
68+
.any(|row| row.insertion_context != "procedure_before")
69+
},
70+
Default::default(),
71+
);
72+
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after"));
73+
}
74+
75+
#[table(accessor = scheduled_reducer_row, scheduled(insert_scheduled_reducer))]
76+
struct ScheduledReducerRow {
77+
#[primary_key]
78+
#[auto_inc]
79+
scheduled_id: u64,
80+
scheduled_at: ScheduleAt,
81+
}
82+
83+
#[reducer]
84+
fn insert_scheduled_reducer(ctx: &ReducerContext, _schedule: ScheduledReducerRow) {
85+
ctx.db().procedure_concurrency_row().insert(ProcedureConcurrencyRow {
86+
insertion_order: 0,
87+
insertion_context: "scheduled_reducer".into(),
88+
});
89+
}
90+
91+
#[procedure]
92+
fn procedure_schedule_reducer_between_inserts(ctx: &mut ProcedureContext) {
93+
ctx.with_tx(|ctx| {
94+
insert_procedure_concurrency_row(ctx, "procedure_before");
95+
ctx.db.scheduled_reducer_row().insert(ScheduledReducerRow {
96+
scheduled_id: 0,
97+
scheduled_at: ctx.timestamp.into(),
98+
});
99+
});
100+
poll_until_tx_true(
101+
ctx,
102+
|tx| {
103+
tx.db
104+
.procedure_concurrency_row()
105+
.iter()
106+
.any(|row| row.insertion_context != "procedure_before")
107+
},
108+
Default::default(),
109+
);
110+
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "procedure_after"));
111+
}
112+
113+
#[table(accessor = scheduled_procedure_row, scheduled(scheduled_procedure_sleep_between_inserts))]
114+
struct ScheduledProcedureRow {
115+
#[primary_key]
116+
#[auto_inc]
117+
scheduled_id: u64,
118+
scheduled_at: ScheduleAt,
119+
}
120+
121+
#[procedure]
122+
fn scheduled_procedure_sleep_between_inserts(ctx: &mut ProcedureContext, _schedule: ScheduledProcedureRow) {
123+
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "scheduled_procedure_before"));
124+
// Unfortunately, we can't poll and wake on event here,
125+
// as (until https://github.com/clockworklabs/SpacetimeDB/pull/5224 is fixed)
126+
// the scheduled reducer actually won't run until after this procedure fully completes.
127+
ctx.sleep_until(ctx.timestamp + Duration::from_secs(10));
128+
ctx.with_tx(|ctx| insert_procedure_concurrency_row(ctx, "scheduled_procedure_after"));
129+
}
130+
131+
#[reducer]
132+
fn schedule_procedure_then_reducer(ctx: &ReducerContext) {
133+
ctx.db().scheduled_procedure_row().insert(ScheduledProcedureRow {
134+
scheduled_id: 0,
135+
scheduled_at: ctx.timestamp.into(),
136+
});
137+
ctx.db().scheduled_reducer_row().insert(ScheduledReducerRow {
138+
scheduled_id: 0,
139+
scheduled_at: (ctx.timestamp + Duration::from_secs(2)).into(),
140+
});
141+
}

sdks/rust/tests/procedure-client/src/module_bindings/mod.rs

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
[package]
2+
name = "procedure-concurrency-client"
3+
version.workspace = true
4+
edition.workspace = true
5+
license-file = "LICENSE"
6+
7+
[lib]
8+
crate-type = ["cdylib", "rlib"]
9+
10+
[features]
11+
default = ["native"]
12+
13+
native = [
14+
"dep:env_logger",
15+
"dep:tokio",
16+
]
17+
18+
browser = [
19+
"spacetimedb-sdk/browser",
20+
"dep:wasm-bindgen",
21+
"dep:wasm-bindgen-futures",
22+
"dep:console_error_panic_hook",
23+
"dep:futures",
24+
]
25+
26+
[[bin]]
27+
name = "procedure-concurrency-client"
28+
path = "src/main.rs"
29+
required-features = ["native"]
30+
31+
[dependencies]
32+
spacetimedb-sdk = { path = "../.." }
33+
test-counter = { path = "../test-counter" }
34+
anyhow.workspace = true
35+
env_logger = { workspace = true, optional = true }
36+
tokio = { workspace = true, optional = true }
37+
futures = { workspace = true, optional = true }
38+
39+
wasm-bindgen = { version = "0.2.100", optional = true }
40+
wasm-bindgen-futures = { version = "0.4.45", optional = true }
41+
console_error_panic_hook = { version = "0.1.7", optional = true }
42+
43+
[lints]
44+
workspace = true
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
This test client is used with the module:
2+
3+
- [`sdk-test-procedure-concurrency`](/modules/sdk-test-procedure-concurrency)
4+
5+
The goal of the test is to exercise the current reducer/procedure concurrency
6+
behavior of the Rust module ABI and the Rust SDK.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#![allow(clippy::disallowed_macros)]
2+
3+
mod module_bindings;
4+
pub mod test_handlers;
5+
6+
#[cfg(all(target_arch = "wasm32", feature = "browser"))]
7+
use wasm_bindgen::prelude::wasm_bindgen;
8+
9+
#[cfg(all(target_arch = "wasm32", feature = "browser"))]
10+
#[wasm_bindgen]
11+
pub async fn run(test_name: String, db_name: String) {
12+
console_error_panic_hook::set_once();
13+
test_handlers::dispatch(&test_name, &db_name).await;
14+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use procedure_concurrency_client::test_handlers;
2+
3+
fn exit_on_panic() {
4+
let default_hook = std::panic::take_hook();
5+
std::panic::set_hook(Box::new(move |panic_info| {
6+
default_hook(panic_info);
7+
std::process::exit(1);
8+
}));
9+
}
10+
11+
fn main() {
12+
env_logger::init();
13+
exit_on_panic();
14+
15+
let test = std::env::args()
16+
.nth(1)
17+
.expect("Pass a test name as a command-line argument to the test client");
18+
let db_name = std::env::var("SPACETIME_SDK_TEST_DB_NAME").expect("Failed to read db name from env");
19+
20+
tokio::runtime::Runtime::new()
21+
.unwrap()
22+
.block_on(test_handlers::dispatch(&test, &db_name));
23+
}

0 commit comments

Comments
 (0)