Skip to content

Commit 9da74d9

Browse files
committed
Run http_request on a tokio task
1 parent 9785d54 commit 9da74d9

2 files changed

Lines changed: 13 additions & 23 deletions

File tree

crates/core/src/host/instance_env.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::subscription::module_subscription_manager::{from_tx_offset, Transacti
1010
use crate::util::prometheus_handle::IntGaugeExt;
1111
use chrono::{DateTime, Utc};
1212
use core::mem;
13+
use futures::TryFutureExt;
1314
use parking_lot::{Mutex, MutexGuard};
1415
use smallvec::SmallVec;
1516
use spacetimedb_client_api_messages::energy::EnergyQuanta;
@@ -865,7 +866,8 @@ impl InstanceEnv {
865866
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
866867
let execute_fut = reqwest::Client::new().execute(reqwest);
867868

868-
let response_fut = async {
869+
// Run the future that does IO work on a tokio worker thread, where it's more efficent.
870+
let response_fut = tokio::spawn(async {
869871
// `reqwest::Error` may contain sensitive info, namely the full URL with query params.
870872
// We'll strip those with `strip_query_params_from_eqwest_error`
871873
// after `await`ing `response_fut` below.
@@ -880,7 +882,8 @@ impl InstanceEnv {
880882
let body = http_body_util::BodyExt::collect(body).await?.to_bytes();
881883

882884
Ok((response, body))
883-
};
885+
})
886+
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()));
884887

885888
let database_identity = *self.database_identity();
886889

crates/core/src/host/v8/mod.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,10 @@ impl JsInstance {
421421
}
422422

423423
pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn {
424-
// Get a handle to the current tokio runtime, and pass it to the worker
425-
// so that it can execute futures.
426-
let rt = tokio::runtime::Handle::current();
427424
*self
428425
.send_recv(
429426
JsWorkerReply::into_call_procedure,
430-
JsWorkerRequest::CallProcedure { params, rt },
427+
JsWorkerRequest::CallProcedure { params },
431428
)
432429
.await
433430
}
@@ -442,12 +439,9 @@ impl JsInstance {
442439
&mut self,
443440
params: ScheduledFunctionParams,
444441
) -> CallScheduledFunctionResult {
445-
// Get a handle to the current tokio runtime, and pass it to the worker
446-
// so that it can execute futures.
447-
let rt = tokio::runtime::Handle::current();
448442
self.send_recv(
449443
JsWorkerReply::into_call_scheduled_function,
450-
JsWorkerRequest::CallScheduledFunction(params, rt),
444+
JsWorkerRequest::CallScheduledFunction(params),
451445
)
452446
.await
453447
}
@@ -489,10 +483,7 @@ enum JsWorkerRequest {
489483
/// See [`JsInstance::call_view`].
490484
CallView { cmd: ViewCommand },
491485
/// See [`JsInstance::call_procedure`].
492-
CallProcedure {
493-
params: CallProcedureParams,
494-
rt: tokio::runtime::Handle,
495-
},
486+
CallProcedure { params: CallProcedureParams },
496487
/// See [`JsInstance::clear_all_clients`].
497488
ClearAllClients,
498489
/// See [`JsInstance::call_identity_connected`].
@@ -504,7 +495,7 @@ enum JsWorkerRequest {
504495
/// See [`JsInstance::init_database`].
505496
InitDatabase(Program),
506497
/// See [`JsInstance::call_scheduled_function`].
507-
CallScheduledFunction(ScheduledFunctionParams, tokio::runtime::Handle),
498+
CallScheduledFunction(ScheduledFunctionParams),
508499
}
509500

510501
/// Performs some of the startup work of [`spawn_instance_worker`].
@@ -576,6 +567,8 @@ async fn spawn_instance_worker(
576567
let _guard = load_balance_guard;
577568
core_pinner.pin_now();
578569

570+
let _entered = rt.enter();
571+
579572
// Create the isolate and scope.
580573
let mut isolate = new_isolate();
581574
scope_with_context!(let scope, &mut isolate, Context::new(scope, Default::default()));
@@ -659,11 +652,7 @@ async fn spawn_instance_worker(
659652
let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst);
660653
reply("call_view", JsWorkerReply::CallView(res.into()), trapped);
661654
}
662-
JsWorkerRequest::CallProcedure { params, rt } => {
663-
// The callee passed us a handle to their tokio runtime - enter its
664-
// context so that we can execute futures.
665-
let _guard = rt.enter();
666-
655+
JsWorkerRequest::CallProcedure { params } => {
667656
let (res, trapped) = instance_common
668657
.call_procedure(params, &mut inst)
669658
.now_or_never()
@@ -707,9 +696,7 @@ async fn spawn_instance_worker(
707696
init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
708697
reply("init_database", InitDatabase(Box::new(res)), trapped);
709698
}
710-
JsWorkerRequest::CallScheduledFunction(params, rt) => {
711-
let _guard = rt.enter();
712-
699+
JsWorkerRequest::CallScheduledFunction(params) => {
713700
let (res, trapped) = instance_common
714701
.call_scheduled_function(params, &mut inst)
715702
.now_or_never()

0 commit comments

Comments
 (0)