Skip to content

Commit 3508fe9

Browse files
committed
Run http_request on a tokio task
1 parent 6e565f7 commit 3508fe9

2 files changed

Lines changed: 13 additions & 23 deletions

File tree

crates/core/src/host/instance_env.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::subscription::module_subscription_manager::{from_tx_offset, Transacti
1010
use crate::util::prometheus_handle::IntGaugeExt;
1111
use chrono::{DateTime, Utc};
1212
use core::mem;
13+
use futures::TryFutureExt;
1314
use parking_lot::{Mutex, MutexGuard};
1415
use smallvec::SmallVec;
1516
use spacetimedb_client_api_messages::energy::EnergyQuanta;
@@ -865,7 +866,8 @@ impl InstanceEnv {
865866
// TODO(perf): Stash a long-lived `Client` in the env somewhere, rather than building a new one for each call.
866867
let execute_fut = reqwest::Client::new().execute(reqwest);
867868

868-
let response_fut = async {
869+
// Run the future that does IO work on a tokio worker thread, where it's more efficent.
870+
let response_fut = tokio::spawn(async {
869871
// `reqwest::Error` may contain sensitive info, namely the full URL with query params.
870872
// We'll strip those with `strip_query_params_from_eqwest_error`
871873
// after `await`ing `response_fut` below.
@@ -880,7 +882,8 @@ impl InstanceEnv {
880882
let body = http_body_util::BodyExt::collect(body).await?.to_bytes();
881883

882884
Ok((response, body))
883-
};
885+
})
886+
.unwrap_or_else(|e| std::panic::resume_unwind(e.into_panic()));
884887

885888
let database_identity = *self.database_identity();
886889

crates/core/src/host/v8/mod.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -415,13 +415,10 @@ impl JsInstance {
415415
}
416416

417417
pub async fn call_procedure(&mut self, params: CallProcedureParams) -> CallProcedureReturn {
418-
// Get a handle to the current tokio runtime, and pass it to the worker
419-
// so that it can execute futures.
420-
let rt = tokio::runtime::Handle::current();
421418
*self
422419
.send_recv(
423420
JsWorkerReply::into_call_procedure,
424-
JsWorkerRequest::CallProcedure { params, rt },
421+
JsWorkerRequest::CallProcedure { params },
425422
)
426423
.await
427424
}
@@ -436,12 +433,9 @@ impl JsInstance {
436433
&mut self,
437434
params: ScheduledFunctionParams,
438435
) -> CallScheduledFunctionResult {
439-
// Get a handle to the current tokio runtime, and pass it to the worker
440-
// so that it can execute futures.
441-
let rt = tokio::runtime::Handle::current();
442436
self.send_recv(
443437
JsWorkerReply::into_call_scheduled_function,
444-
JsWorkerRequest::CallScheduledFunction(params, rt),
438+
JsWorkerRequest::CallScheduledFunction(params),
445439
)
446440
.await
447441
}
@@ -481,10 +475,7 @@ enum JsWorkerRequest {
481475
/// See [`JsInstance::call_view`].
482476
CallView { cmd: ViewCommand },
483477
/// See [`JsInstance::call_procedure`].
484-
CallProcedure {
485-
params: CallProcedureParams,
486-
rt: tokio::runtime::Handle,
487-
},
478+
CallProcedure { params: CallProcedureParams },
488479
/// See [`JsInstance::clear_all_clients`].
489480
ClearAllClients,
490481
/// See [`JsInstance::call_identity_connected`].
@@ -496,7 +487,7 @@ enum JsWorkerRequest {
496487
/// See [`JsInstance::init_database`].
497488
InitDatabase(Program),
498489
/// See [`JsInstance::call_scheduled_function`].
499-
CallScheduledFunction(ScheduledFunctionParams, tokio::runtime::Handle),
490+
CallScheduledFunction(ScheduledFunctionParams),
500491
}
501492

502493
/// Performs some of the startup work of [`spawn_instance_worker`].
@@ -568,6 +559,8 @@ async fn spawn_instance_worker(
568559
let _guard = load_balance_guard;
569560
core_pinner.pin_now();
570561

562+
let _entered = rt.enter();
563+
571564
// Create the isolate and scope.
572565
let mut isolate = new_isolate();
573566
scope_with_context!(let scope, &mut isolate, Context::new(scope, Default::default()));
@@ -651,11 +644,7 @@ async fn spawn_instance_worker(
651644
let (res, trapped) = instance_common.handle_cmd(cmd, &mut inst);
652645
reply("call_view", JsWorkerReply::CallView(res.into()), trapped);
653646
}
654-
JsWorkerRequest::CallProcedure { params, rt } => {
655-
// The callee passed us a handle to their tokio runtime - enter its
656-
// context so that we can execute futures.
657-
let _guard = rt.enter();
658-
647+
JsWorkerRequest::CallProcedure { params } => {
659648
let (res, trapped) = instance_common
660649
.call_procedure(params, &mut inst)
661650
.now_or_never()
@@ -699,9 +688,7 @@ async fn spawn_instance_worker(
699688
init_database(replica_ctx, &module_common.info().module_def, program, call_reducer);
700689
reply("init_database", InitDatabase(res), trapped);
701690
}
702-
JsWorkerRequest::CallScheduledFunction(params, rt) => {
703-
let _guard = rt.enter();
704-
691+
JsWorkerRequest::CallScheduledFunction(params) => {
705692
let (res, trapped) = instance_common
706693
.call_scheduled_function(params, &mut inst)
707694
.now_or_never()

0 commit comments

Comments
 (0)