Skip to content

Commit 5e8a2e9

Browse files
committed
Reapply "Procedures: fix scheduling (#3704)" (#3774)
This reverts commit d26f3a1.
1 parent 141048c commit 5e8a2e9

213 files changed

Lines changed: 1447 additions & 316 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

crates/codegen/src/rust.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ impl {func_name} for super::RemoteReducers {{
423423
{callback_id}(self.imp.on_reducer(
424424
{reducer_name:?},
425425
Box::new(move |ctx: &super::ReducerEventContext| {{
426+
#[allow(irrefutable_let_patterns)]
426427
let super::ReducerEventContext {{
427428
event: __sdk::ReducerEvent {{
428429
reducer: super::Reducer::{enum_variant_name} {{

crates/codegen/tests/snapshots/codegen__codegen_rust.snap

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ impl add_player for super::RemoteReducers {
7272
AddPlayerCallbackId(self.imp.on_reducer(
7373
"add_player",
7474
Box::new(move |ctx: &super::ReducerEventContext| {
75+
#[allow(irrefutable_let_patterns)]
7576
let super::ReducerEventContext {
7677
event: __sdk::ReducerEvent {
7778
reducer: super::Reducer::AddPlayer {
@@ -181,6 +182,7 @@ impl add_private for super::RemoteReducers {
181182
AddPrivateCallbackId(self.imp.on_reducer(
182183
"add_private",
183184
Box::new(move |ctx: &super::ReducerEventContext| {
185+
#[allow(irrefutable_let_patterns)]
184186
let super::ReducerEventContext {
185187
event: __sdk::ReducerEvent {
186188
reducer: super::Reducer::AddPrivate {
@@ -294,6 +296,7 @@ age: u8,
294296
AddCallbackId(self.imp.on_reducer(
295297
"add",
296298
Box::new(move |ctx: &super::ReducerEventContext| {
299+
#[allow(irrefutable_let_patterns)]
297300
let super::ReducerEventContext {
298301
event: __sdk::ReducerEvent {
299302
reducer: super::Reducer::Add {
@@ -398,6 +401,7 @@ impl assert_caller_identity_is_module_identity for super::RemoteReducers {
398401
AssertCallerIdentityIsModuleIdentityCallbackId(self.imp.on_reducer(
399402
"assert_caller_identity_is_module_identity",
400403
Box::new(move |ctx: &super::ReducerEventContext| {
404+
#[allow(irrefutable_let_patterns)]
401405
let super::ReducerEventContext {
402406
event: __sdk::ReducerEvent {
403407
reducer: super::Reducer::AssertCallerIdentityIsModuleIdentity {
@@ -527,6 +531,7 @@ impl client_connected for super::RemoteReducers {
527531
ClientConnectedCallbackId(self.imp.on_reducer(
528532
"client_connected",
529533
Box::new(move |ctx: &super::ReducerEventContext| {
534+
#[allow(irrefutable_let_patterns)]
530535
let super::ReducerEventContext {
531536
event: __sdk::ReducerEvent {
532537
reducer: super::Reducer::ClientConnected {
@@ -636,6 +641,7 @@ impl delete_player for super::RemoteReducers {
636641
DeletePlayerCallbackId(self.imp.on_reducer(
637642
"delete_player",
638643
Box::new(move |ctx: &super::ReducerEventContext| {
644+
#[allow(irrefutable_let_patterns)]
639645
let super::ReducerEventContext {
640646
event: __sdk::ReducerEvent {
641647
reducer: super::Reducer::DeletePlayer {
@@ -745,6 +751,7 @@ impl delete_players_by_name for super::RemoteReducers {
745751
DeletePlayersByNameCallbackId(self.imp.on_reducer(
746752
"delete_players_by_name",
747753
Box::new(move |ctx: &super::ReducerEventContext| {
754+
#[allow(irrefutable_let_patterns)]
748755
let super::ReducerEventContext {
749756
event: __sdk::ReducerEvent {
750757
reducer: super::Reducer::DeletePlayersByName {
@@ -1066,6 +1073,7 @@ impl list_over_age for super::RemoteReducers {
10661073
ListOverAgeCallbackId(self.imp.on_reducer(
10671074
"list_over_age",
10681075
Box::new(move |ctx: &super::ReducerEventContext| {
1076+
#[allow(irrefutable_let_patterns)]
10691077
let super::ReducerEventContext {
10701078
event: __sdk::ReducerEvent {
10711079
reducer: super::Reducer::ListOverAge {
@@ -1170,6 +1178,7 @@ impl log_module_identity for super::RemoteReducers {
11701178
LogModuleIdentityCallbackId(self.imp.on_reducer(
11711179
"log_module_identity",
11721180
Box::new(move |ctx: &super::ReducerEventContext| {
1181+
#[allow(irrefutable_let_patterns)]
11731182
let super::ReducerEventContext {
11741183
event: __sdk::ReducerEvent {
11751184
reducer: super::Reducer::LogModuleIdentity {
@@ -3567,6 +3576,7 @@ impl query_private for super::RemoteReducers {
35673576
QueryPrivateCallbackId(self.imp.on_reducer(
35683577
"query_private",
35693578
Box::new(move |ctx: &super::ReducerEventContext| {
3579+
#[allow(irrefutable_let_patterns)]
35703580
let super::ReducerEventContext {
35713581
event: __sdk::ReducerEvent {
35723582
reducer: super::Reducer::QueryPrivate {
@@ -3852,6 +3862,7 @@ impl repeating_test for super::RemoteReducers {
38523862
RepeatingTestCallbackId(self.imp.on_reducer(
38533863
"repeating_test",
38543864
Box::new(move |ctx: &super::ReducerEventContext| {
3865+
#[allow(irrefutable_let_patterns)]
38553866
let super::ReducerEventContext {
38563867
event: __sdk::ReducerEvent {
38573868
reducer: super::Reducer::RepeatingTest {
@@ -4015,6 +4026,7 @@ impl say_hello for super::RemoteReducers {
40154026
SayHelloCallbackId(self.imp.on_reducer(
40164027
"say_hello",
40174028
Box::new(move |ctx: &super::ReducerEventContext| {
4029+
#[allow(irrefutable_let_patterns)]
40184030
let super::ReducerEventContext {
40194031
event: __sdk::ReducerEvent {
40204032
reducer: super::Reducer::SayHello {
@@ -4325,6 +4337,7 @@ impl test_btree_index_args for super::RemoteReducers {
43254337
TestBtreeIndexArgsCallbackId(self.imp.on_reducer(
43264338
"test_btree_index_args",
43274339
Box::new(move |ctx: &super::ReducerEventContext| {
4340+
#[allow(irrefutable_let_patterns)]
43284341
let super::ReducerEventContext {
43294342
event: __sdk::ReducerEvent {
43304343
reducer: super::Reducer::TestBtreeIndexArgs {
@@ -4878,6 +4891,7 @@ arg_4: NamespaceTestF,
48784891
TestCallbackId(self.imp.on_reducer(
48794892
"test",
48804893
Box::new(move |ctx: &super::ReducerEventContext| {
4894+
#[allow(irrefutable_let_patterns)]
48814895
let super::ReducerEventContext {
48824896
event: __sdk::ReducerEvent {
48834897
reducer: super::Reducer::Test {

crates/core/src/host/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use spacetimedb_lib::bsatn;
88
use spacetimedb_lib::de::{serde::SeedWrapper, DeserializeSeed};
99
use spacetimedb_lib::ProductValue;
1010
use spacetimedb_schema::def::deserialize::{ArgsSeed, FunctionDef};
11+
use spacetimedb_schema::def::ModuleDef;
1112

1213
mod disk_storage;
1314
mod host_controller;
@@ -41,6 +42,14 @@ pub enum FunctionArgs {
4142
}
4243

4344
impl FunctionArgs {
45+
fn into_tuple_for_def<Def: FunctionDef>(
46+
self,
47+
module: &ModuleDef,
48+
def: &Def,
49+
) -> Result<ArgsTuple, InvalidFunctionArguments> {
50+
self.into_tuple(module.arg_seed_for(def))
51+
}
52+
4453
fn into_tuple<Def: FunctionDef>(self, seed: ArgsSeed<'_, Def>) -> Result<ArgsTuple, InvalidFunctionArguments> {
4554
self._into_tuple(seed).map_err(|err| InvalidFunctionArguments {
4655
err,

crates/core/src/host/module_host.rs

Lines changed: 65 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::error::DBError;
1111
use crate::estimation::estimate_rows_scanned;
1212
use crate::hash::Hash;
1313
use crate::host::host_controller::CallProcedureReturn;
14-
use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem};
1514
use crate::host::v8::JsInstance;
1615
use crate::host::wasmtime::ModuleInstance;
1716
use crate::host::{InvalidFunctionArguments, InvalidViewArguments};
@@ -41,7 +40,7 @@ use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOf
4140
use spacetimedb_data_structures::error_stream::ErrorStream;
4241
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
4342
use spacetimedb_datastore::error::DatastoreError;
44-
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
43+
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
4544
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo};
4645
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4746
use spacetimedb_durability::DurableOffset;
@@ -56,7 +55,6 @@ use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId};
5655
use spacetimedb_query::compile_subscription;
5756
use spacetimedb_sats::{AlgebraicTypeRef, ProductValue};
5857
use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy};
59-
use spacetimedb_schema::def::deserialize::ArgsSeed;
6058
use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef};
6159
use spacetimedb_schema::schema::{Schema, TableSchema};
6260
use spacetimedb_vm::relation::RelValue;
@@ -608,59 +606,6 @@ pub fn call_identity_connected(
608606
}
609607
}
610608

611-
// Only for logging purposes.
612-
const SCHEDULED_REDUCER: &str = "scheduled_reducer";
613-
614-
pub(crate) fn call_scheduled_reducer(
615-
module: &ModuleInfo,
616-
queue_item: QueueItem,
617-
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
618-
) -> (Result<(ReducerCallResult, Timestamp), ReducerCallError>, bool) {
619-
extract_trapped(call_scheduled_reducer_inner(module, queue_item, call_reducer))
620-
}
621-
622-
fn call_scheduled_reducer_inner(
623-
module: &ModuleInfo,
624-
item: QueueItem,
625-
call_reducer: impl FnOnce(Option<MutTxId>, CallReducerParams) -> (ReducerCallResult, bool),
626-
) -> Result<((ReducerCallResult, Timestamp), bool), ReducerCallError> {
627-
let db = &module.relational_db();
628-
let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
629-
630-
match handle_queued_call_reducer_params(&tx, module, db, item) {
631-
Ok(Some(params)) => {
632-
// Is necessary to patch the context with the actual calling reducer
633-
let reducer_def = module
634-
.module_def
635-
.get_reducer_by_id(params.reducer_id)
636-
.ok_or(ReducerCallError::ScheduleReducerNotFound)?;
637-
let reducer = &*reducer_def.name;
638-
639-
tx.ctx = ExecutionContext::with_workload(
640-
tx.ctx.database_identity(),
641-
Workload::Reducer(ReducerContext {
642-
name: reducer.into(),
643-
caller_identity: params.caller_identity,
644-
caller_connection_id: params.caller_connection_id,
645-
timestamp: Timestamp::now(),
646-
arg_bsatn: params.args.get_bsatn().clone(),
647-
}),
648-
);
649-
650-
let timestamp = params.timestamp;
651-
let (res, trapped) = call_reducer(Some(tx), params);
652-
Ok(((res, timestamp), trapped))
653-
}
654-
Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound),
655-
Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments(
656-
InvalidFunctionArguments {
657-
err,
658-
function_name: SCHEDULED_REDUCER.into(),
659-
},
660-
))),
661-
}
662-
}
663-
664609
pub struct CallReducerParams {
665610
pub timestamp: Timestamp,
666611
pub caller_identity: Identity,
@@ -671,9 +616,10 @@ pub struct CallReducerParams {
671616
pub reducer_id: ReducerId,
672617
pub args: ArgsTuple,
673618
}
619+
674620
impl CallReducerParams {
675-
/// Returns a set of parameters for a call that came from within
676-
/// and without a client/caller/request_id.
621+
/// Returns a set of parameters for an internal call
622+
/// without a client/caller/request_id.
677623
pub fn from_system(
678624
timestamp: Timestamp,
679625
caller_identity: Identity,
@@ -718,6 +664,26 @@ pub struct CallProcedureParams {
718664
pub args: ArgsTuple,
719665
}
720666

667+
impl CallProcedureParams {
668+
/// Returns a set of parameters for an internal call
669+
/// without a client/caller/request_id.
670+
pub fn from_system(
671+
timestamp: Timestamp,
672+
caller_identity: Identity,
673+
procedure_id: ProcedureId,
674+
args: ArgsTuple,
675+
) -> Self {
676+
Self {
677+
timestamp,
678+
caller_identity,
679+
caller_connection_id: ConnectionId::ZERO,
680+
timer: None,
681+
procedure_id,
682+
args,
683+
}
684+
}
685+
}
686+
721687
/// Holds a [`Module`] and a set of [`Instance`]s from it,
722688
/// and allocates the [`Instance`]s to be used for function calls.
723689
///
@@ -1442,8 +1408,9 @@ impl ModuleHost {
14421408
reducer_def: &ReducerDef,
14431409
args: FunctionArgs,
14441410
) -> Result<CallReducerParams, InvalidReducerArguments> {
1445-
let reducer_seed = ArgsSeed(module.module_def.typespace().with_type(reducer_def));
1446-
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
1411+
let args = args
1412+
.into_tuple_for_def(&module.module_def, reducer_def)
1413+
.map_err(InvalidReducerArguments)?;
14471414
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
14481415
Ok(CallReducerParams {
14491416
timestamp: Timestamp::now(),
@@ -1457,6 +1424,21 @@ impl ModuleHost {
14571424
})
14581425
}
14591426

1427+
pub async fn call_reducer_with_params(
1428+
&self,
1429+
reducer_name: &str,
1430+
tx: Option<MutTxId>,
1431+
params: CallReducerParams,
1432+
) -> Result<ReducerCallResult, NoSuchModule> {
1433+
self.call(
1434+
reducer_name,
1435+
(tx, params),
1436+
|(tx, p), inst| inst.call_reducer(tx, p),
1437+
|(tx, p), inst| inst.call_reducer(tx, p),
1438+
)
1439+
.await
1440+
}
1441+
14601442
async fn call_reducer_inner(
14611443
&self,
14621444
caller_identity: Identity,
@@ -1468,8 +1450,9 @@ impl ModuleHost {
14681450
reducer_def: &ReducerDef,
14691451
args: FunctionArgs,
14701452
) -> Result<ReducerCallResult, ReducerCallError> {
1471-
let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def));
1472-
let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?;
1453+
let args = args
1454+
.into_tuple_for_def(&self.info.module_def, reducer_def)
1455+
.map_err(InvalidReducerArguments)?;
14731456
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
14741457
let call_reducer_params = CallReducerParams {
14751458
timestamp: Timestamp::now(),
@@ -1483,12 +1466,7 @@ impl ModuleHost {
14831466
};
14841467

14851468
Ok(self
1486-
.call(
1487-
&reducer_def.name,
1488-
call_reducer_params,
1489-
|p, inst| inst.call_reducer(None, p),
1490-
|p, inst| inst.call_reducer(None, p),
1491-
)
1469+
.call_reducer_with_params(&reducer_def.name, None, call_reducer_params)
14921470
.await?)
14931471
}
14941472

@@ -1593,9 +1571,10 @@ impl ModuleHost {
15931571
procedure_def: &ProcedureDef,
15941572
args: FunctionArgs,
15951573
) -> Result<CallProcedureReturn, ProcedureCallError> {
1596-
let procedure_seed = ArgsSeed(self.info.module_def.typespace().with_type(procedure_def));
1574+
let args = args
1575+
.into_tuple_for_def(&self.info.module_def, procedure_def)
1576+
.map_err(InvalidProcedureArguments)?;
15971577
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);
1598-
let args = args.into_tuple(procedure_seed).map_err(InvalidProcedureArguments)?;
15991578

16001579
let params = CallProcedureParams {
16011580
timestamp: Timestamp::now(),
@@ -1605,31 +1584,26 @@ impl ModuleHost {
16051584
procedure_id,
16061585
args,
16071586
};
1608-
self.call_async_with_instance(&procedure_def.name, async move |inst| match inst {
1587+
self.call_procedure_with_params(&procedure_def.name, params)
1588+
.await
1589+
.map_err(Into::into)
1590+
}
1591+
1592+
// This is not reused in `call_procedure_inner`
1593+
// due to concerns re. `Timestamp::now`.
1594+
pub async fn call_procedure_with_params(
1595+
&self,
1596+
name: &str,
1597+
params: CallProcedureParams,
1598+
) -> Result<CallProcedureReturn, NoSuchModule> {
1599+
self.call_async_with_instance(name, async move |inst| match inst {
16091600
Instance::Wasm(mut inst) => (inst.call_procedure(params).await, Instance::Wasm(inst)),
16101601
Instance::Js(inst) => {
16111602
let (r, s) = inst.call_procedure(params).await;
16121603
(r, Instance::Js(s))
16131604
}
16141605
})
16151606
.await
1616-
.map_err(Into::into)
1617-
}
1618-
1619-
// Scheduled reducers require a different function here to call their reducer
1620-
// because their reducer arguments are stored in the database and need to be fetched
1621-
// within the same transaction as the reducer call.
1622-
pub(crate) async fn call_scheduled_reducer(
1623-
&self,
1624-
item: QueueItem,
1625-
) -> Result<(ReducerCallResult, Timestamp), ReducerCallError> {
1626-
self.call(
1627-
SCHEDULED_REDUCER,
1628-
item,
1629-
|item, inst| inst.call_scheduled_reducer(item),
1630-
|item, inst| inst.call_scheduled_reducer(item),
1631-
)
1632-
.await?
16331607
}
16341608

16351609
/// Materializes the views return by the `view_collector`, if not already materialized,
@@ -1722,9 +1696,9 @@ impl ModuleHost {
17221696
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
17231697
let fn_ptr = view_def.fn_ptr;
17241698
let row_type = view_def.product_type_ref;
1725-
let typespace = module_def.typespace().with_type(view_def);
1726-
let view_seed = ArgsSeed(typespace);
1727-
let args = args.into_tuple(view_seed).map_err(InvalidViewArguments)?;
1699+
let args = args
1700+
.into_tuple_for_def(module_def, view_def)
1701+
.map_err(InvalidViewArguments)?;
17281702

17291703
match self
17301704
.call_view_inner(tx, view_name, view_id, table_id, fn_ptr, caller, sender, args, row_type)

0 commit comments

Comments
 (0)