Skip to content

Commit 6694211

Browse files
code polish for #3938 (#3952)
# Description of Changes Exntention of #3938 to polish some of the code, which was not done in original PR due to hurry. - Adds some missing commentary. - code cleaning mostly limited to changed functions signature. # API and ABI breaking changes NA # Expected complexity level and risk 0 # Testing Exisiting tests should be enough. --------- Signed-off-by: Shubham Mishra <shivam828787@gmail.com> Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
1 parent 8fb0bcf commit 6694211

4 files changed

Lines changed: 116 additions & 89 deletions

File tree

crates/core/src/host/module_host.rs

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -649,28 +649,24 @@ impl CallReducerParams {
649649

650650
pub enum ViewCommand {
651651
AddSingleSubscription {
652-
info: Arc<ModuleInfo>,
653652
sender: Arc<ClientConnectionSender>,
654653
auth: AuthCtx,
655654
request: SubscribeSingle,
656655
timer: Instant,
657656
},
658657
AddMultiSubscription {
659-
info: Arc<ModuleInfo>,
660658
sender: Arc<ClientConnectionSender>,
661659
auth: AuthCtx,
662660
request: SubscribeMulti,
663661
timer: Instant,
664662
},
665663
AddLegacySubscription {
666-
info: Arc<ModuleInfo>,
667664
sender: Arc<ClientConnectionSender>,
668665
auth: AuthCtx,
669666
subscribe: Subscribe,
670667
timer: Instant,
671668
},
672669
Sql {
673-
info: Arc<ModuleInfo>,
674670
db: Arc<RelationalDB>,
675671
sql_text: String,
676672
auth: AuthCtx,
@@ -1562,14 +1558,12 @@ impl ModuleHost {
15621558

15631559
pub async fn call_view_add_single_subscription(
15641560
&self,
1565-
info: Arc<ModuleInfo>,
15661561
sender: Arc<ClientConnectionSender>,
15671562
auth: AuthCtx,
15681563
request: SubscribeSingle,
15691564
timer: Instant,
15701565
) -> Result<Option<ExecutionMetrics>, DBError> {
15711566
let cmd = ViewCommand::AddSingleSubscription {
1572-
info,
15731567
sender,
15741568
auth,
15751569
request,
@@ -1597,14 +1591,12 @@ impl ModuleHost {
15971591

15981592
pub async fn call_view_add_multi_subscription(
15991593
&self,
1600-
info: Arc<ModuleInfo>,
16011594
sender: Arc<ClientConnectionSender>,
16021595
auth: AuthCtx,
16031596
request: SubscribeMulti,
16041597
timer: Instant,
16051598
) -> Result<Option<ExecutionMetrics>, DBError> {
16061599
let cmd = ViewCommand::AddMultiSubscription {
1607-
info,
16081600
sender,
16091601
auth,
16101602
request,
@@ -1632,14 +1624,12 @@ impl ModuleHost {
16321624

16331625
pub async fn call_view_add_legacy_subscription(
16341626
&self,
1635-
info: Arc<ModuleInfo>,
16361627
sender: Arc<ClientConnectionSender>,
16371628
auth: AuthCtx,
16381629
subscribe: spacetimedb_client_api_messages::websocket::Subscribe,
16391630
timer: Instant,
16401631
) -> Result<Option<ExecutionMetrics>, DBError> {
16411632
let cmd = ViewCommand::AddLegacySubscription {
1642-
info,
16431633
sender,
16441634
auth,
16451635
subscribe,
@@ -1667,15 +1657,13 @@ impl ModuleHost {
16671657

16681658
pub async fn call_view_sql(
16691659
&self,
1670-
info: Arc<ModuleInfo>,
16711660
db: Arc<RelationalDB>,
16721661
sql_text: String,
16731662
auth: AuthCtx,
16741663
subs: Option<ModuleSubscriptions>,
16751664
head: &mut Vec<(Box<str>, AlgebraicType)>,
16761665
) -> Result<SqlResult, DBError> {
16771666
let cmd = ViewCommand::Sql {
1678-
info,
16791667
db,
16801668
sql_text,
16811669
auth,
@@ -1838,7 +1826,6 @@ impl ModuleHost {
18381826
pub fn materialize_views<I: WasmInstance>(
18391827
mut tx: MutTxId,
18401828
instance: &mut RefInstance<'_, I>,
1841-
module_def: &ModuleDef,
18421829
view_collector: &impl CollectViews,
18431830
caller: Identity,
18441831
workload: Workload,
@@ -1854,9 +1841,8 @@ impl ModuleHost {
18541841
let is_anonymous = st_view_row.is_anonymous;
18551842
let sender = if is_anonymous { None } else { Some(caller) };
18561843
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? {
1857-
let (res, trapped) = Self::call_view(
1858-
instance, module_def, tx, &view_name, view_id, table_id, Nullary, caller, sender,
1859-
)?;
1844+
let (res, trapped) =
1845+
Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?;
18601846
tx = res.tx;
18611847
if trapped {
18621848
return Ok((tx, true));
@@ -1877,10 +1863,10 @@ impl ModuleHost {
18771863
pub fn call_views_with_tx<I: WasmInstance>(
18781864
tx: MutTxId,
18791865
instance: &mut RefInstance<'_, I>,
1880-
module_def: &ModuleDef,
18811866
caller: Identity,
18821867
) -> Result<(ViewCallResult, bool), ViewCallError> {
18831868
let mut out = ViewCallResult::default(tx);
1869+
let module_def = &instance.common.info().module_def;
18841870
let mut trapped = false;
18851871
use FunctionArgs::Nullary;
18861872
for ViewCallInfo {
@@ -1896,7 +1882,6 @@ impl ModuleHost {
18961882

18971883
let (result, trap) = Self::call_view(
18981884
instance,
1899-
module_def,
19001885
out.tx,
19011886
&view_def.name,
19021887
view_id,
@@ -1924,7 +1909,6 @@ impl ModuleHost {
19241909

19251910
fn call_view<I: WasmInstance>(
19261911
instance: &mut RefInstance<'_, I>,
1927-
module_def: &ModuleDef,
19281912
tx: MutTxId,
19291913
view_name: &str,
19301914
view_id: ViewId,
@@ -1933,6 +1917,7 @@ impl ModuleHost {
19331917
caller: Identity,
19341918
sender: Option<Identity>,
19351919
) -> Result<(ViewCallResult, bool), ViewCallError> {
1920+
let module_def = &instance.common.info().module_def;
19361921
let view_def = module_def.view(view_name).ok_or(ViewCallError::NoSuchView)?;
19371922
let fn_ptr = view_def.fn_ptr;
19381923
let row_type = view_def.product_type_ref;

crates/core/src/host/wasm_common/module_host_actor.rs

Lines changed: 16 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::messages::control_db::HostType;
2222
use crate::module_host_context::ModuleCreationContextLimited;
2323
use crate::replica_context::ReplicaContext;
2424
use crate::sql::ast::SchemaViewer;
25-
use crate::sql::execute::run_from_module;
25+
use crate::sql::execute::run_with_instance;
2626
use crate::subscription::module_subscription_actor::commit_and_broadcast_event;
2727
use crate::subscription::module_subscription_manager::TransactionOffset;
2828
use crate::util::prometheus_handle::{HistogramExt, TimerGuard};
@@ -507,6 +507,10 @@ impl InstanceCommon {
507507
}
508508
}
509509

510+
pub(crate) fn info(&self) -> Arc<ModuleInfo> {
511+
self.info.clone()
512+
}
513+
510514
#[tracing::instrument(level = "trace", skip_all)]
511515
pub(crate) fn update_database<I: WasmInstance>(
512516
&mut self,
@@ -977,47 +981,36 @@ impl InstanceCommon {
977981
}
978982

979983
pub(crate) fn handle_cmd<I: WasmInstance>(&mut self, cmds: ViewCommand, inst: &mut I) -> (ViewCommandResult, bool) {
984+
let info = self.info.clone();
980985
let mut inst = RefInstance {
981986
instance: inst,
982987
common: self,
983988
};
984989
match cmds {
985990
ViewCommand::AddSingleSubscription {
986-
info,
987991
sender,
988992
auth,
989993
request,
990994
timer,
991995
} => {
992-
let res = info.subscriptions.add_single_subscription_from_module(
993-
Some((&mut inst, &info.module_def)),
994-
sender,
995-
auth,
996-
request,
997-
timer,
998-
None,
999-
);
996+
let res = info
997+
.subscriptions
998+
.add_single_subscription_with_instance(&mut inst, sender, auth, request, timer, None);
1000999

10011000
match res {
10021001
Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped),
10031002
Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false),
10041003
}
10051004
}
10061005
ViewCommand::AddLegacySubscription {
1007-
info,
10081006
sender,
10091007
auth,
10101008
subscribe,
10111009
timer,
10121010
} => {
1013-
let res = info.subscriptions.add_legacy_subscriber_from_module(
1014-
Some((&mut inst, &info.module_def)),
1015-
sender,
1016-
auth,
1017-
subscribe,
1018-
timer,
1019-
None,
1020-
);
1011+
let res = info
1012+
.subscriptions
1013+
.add_legacy_subscriber_with_instance(&mut inst, sender, auth, subscribe, timer, None);
10211014

10221015
match res {
10231016
Ok((metrics, trapped)) => (
@@ -1030,20 +1023,14 @@ impl InstanceCommon {
10301023
}
10311024
}
10321025
ViewCommand::AddMultiSubscription {
1033-
info,
10341026
sender,
10351027
auth,
10361028
request,
10371029
timer,
10381030
} => {
1039-
let res = info.subscriptions.add_multi_subscription_from_module(
1040-
Some((&mut inst, &info.module_def)),
1041-
sender,
1042-
auth,
1043-
request,
1044-
timer,
1045-
None,
1046-
);
1031+
let res = info
1032+
.subscriptions
1033+
.add_multi_subscription_with_instance(&mut inst, sender, auth, request, timer, None);
10471034

10481035
match res {
10491036
Ok((metrics, trapped)) => (ViewCommandResult::Subscription { result: Ok(metrics) }, trapped),
@@ -1052,14 +1039,13 @@ impl InstanceCommon {
10521039
}
10531040

10541041
ViewCommand::Sql {
1055-
info,
10561042
db,
10571043
sql_text,
10581044
auth,
10591045
subs,
10601046
} => {
10611047
let mut head = vec![];
1062-
let res = run_from_module(Some((&mut inst, &info.module_def)), db, sql_text, auth, subs, &mut head);
1048+
let res = run_with_instance(&mut inst, db, sql_text, auth, subs, &mut head);
10631049

10641050
match res {
10651051
Ok((result, trapped)) => (

crates/core/src/sql/execute.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics;
2626
use spacetimedb_lib::Timestamp;
2727
use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue};
2828
use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt};
29-
use spacetimedb_schema::def::ModuleDef;
3029
use spacetimedb_schema::relation::FieldName;
3130
use spacetimedb_vm::eval::run_ast;
3231
use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr};
@@ -189,6 +188,10 @@ pub struct SqlResult {
189188
}
190189

191190
/// Run the `SQL` string using the `auth` credentials
191+
///
192+
/// If a `ModuleHost` is provided, the SQL query is executed via the module host,
193+
/// meaning the module’s core is used to run the statement.
194+
/// If no module host is provided, the SQL query is executed on the current thread.
192195
pub async fn run(
193196
db: Arc<RelationalDB>,
194197
sql_text: String,
@@ -198,17 +201,27 @@ pub async fn run(
198201
head: &mut Vec<(Box<str>, AlgebraicType)>,
199202
) -> Result<SqlResult, DBError> {
200203
match module {
201-
Some(module) => {
202-
let info = module.info.clone();
203-
module.call_view_sql(info, db, sql_text, auth, subs, head).await
204-
}
205-
None => run_from_module::<crate::host::wasmtime::WasmtimeInstance>(None, db, sql_text, auth, subs, head)
206-
.map(|x| x.0),
204+
Some(module) => module.call_view_sql(db, sql_text, auth, subs, head).await,
205+
None => run_inner::<crate::host::wasmtime::WasmtimeInstance>(None, db, sql_text, auth, subs, head).map(|x| x.0),
207206
}
208207
}
209208

210-
pub fn run_from_module<I: WasmInstance>(
211-
instance: Option<(&mut RefInstance<I>, &ModuleDef)>,
209+
/// Run the `SQL` string using the provided `WasmInstance` and `ModuleDef`
210+
///
211+
/// The query will always be executed on the module's thread.
212+
pub(crate) fn run_with_instance<I: WasmInstance>(
213+
instance: &mut RefInstance<I>,
214+
db: Arc<RelationalDB>,
215+
sql_text: String,
216+
auth: AuthCtx,
217+
subs: Option<ModuleSubscriptions>,
218+
head: &mut Vec<(Box<str>, AlgebraicType)>,
219+
) -> Result<(SqlResult, bool), DBError> {
220+
run_inner::<I>(Some(instance), db, sql_text, auth, subs, head)
221+
}
222+
223+
fn run_inner<I: WasmInstance>(
224+
instance: Option<&mut RefInstance<I>>,
212225
db: Arc<RelationalDB>,
213226
sql_text: String,
214227
auth: AuthCtx,
@@ -227,9 +240,7 @@ pub fn run_from_module<I: WasmInstance>(
227240
Statement::Select(stmt) => {
228241
// Materialize views and downgrade to a read-only transaction
229242
let (tx, trapped) = match instance {
230-
Some(instance) => {
231-
ModuleHost::materialize_views(tx, instance.0, instance.1, &stmt, auth.caller(), Workload::Sql)?
232-
}
243+
Some(instance) => ModuleHost::materialize_views(tx, instance, &stmt, auth.caller(), Workload::Sql)?,
233244
None => (tx, false),
234245
};
235246

@@ -287,7 +298,7 @@ pub fn run_from_module<I: WasmInstance>(
287298

288299
// Update views
289300
let (result, trapped) = match instance {
290-
Some(instance) => ModuleHost::call_views_with_tx(tx, instance.0, instance.1, auth.caller())?,
301+
Some(instance) => ModuleHost::call_views_with_tx(tx, instance, auth.caller())?,
291302
None => (ViewCallResult::default(tx), false),
292303
};
293304

0 commit comments

Comments
 (0)