Skip to content

Commit 54b7d5a

Browse files
committed
Thread jobs.rs changes through all of core crate.
1 parent f26e20f commit 54b7d5a

9 files changed

Lines changed: 365 additions & 225 deletions

File tree

crates/core/src/host/host_controller.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::replica_context::ReplicaContext;
1414
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
1515
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
1616
use crate::util::asyncify;
17-
use crate::util::jobs::{JobCore, JobCores};
17+
use crate::util::jobs::{JobCores, SingleCoreExecutor};
1818
use crate::worker_metrics::WORKER_METRICS;
1919
use anyhow::{anyhow, ensure, Context};
2020
use async_trait::async_trait;
@@ -698,7 +698,7 @@ async fn make_module_host(
698698
program: Program,
699699
energy_monitor: Arc<dyn EnergyMonitor>,
700700
unregister: impl Fn() + Send + Sync + 'static,
701-
core: JobCore,
701+
executor: SingleCoreExecutor,
702702
) -> anyhow::Result<(Program, ModuleHost)> {
703703
// `make_actor` is blocking, as it needs to compile the wasm to native code,
704704
// which may be computationally expensive - sometimes up to 1s for a large module.
@@ -718,12 +718,12 @@ async fn make_module_host(
718718
HostType::Wasm => {
719719
let actor = runtimes.wasmtime.make_actor(mcc)?;
720720
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
721-
ModuleHost::new(actor, unregister, core)
721+
ModuleHost::new(actor, unregister, executor)
722722
}
723723
HostType::Js => {
724724
let actor = runtimes.v8.make_actor(mcc)?;
725725
trace!("v8::make_actor blocked for {:?}", start.elapsed());
726-
ModuleHost::new(actor, unregister, core)
726+
ModuleHost::new(actor, unregister, executor)
727727
}
728728
};
729729
Ok((program, module_host))
@@ -756,7 +756,7 @@ async fn launch_module(
756756
energy_monitor: Arc<dyn EnergyMonitor>,
757757
replica_dir: ReplicaDir,
758758
runtimes: Arc<HostRuntimes>,
759-
core: JobCore,
759+
executor: SingleCoreExecutor,
760760
) -> anyhow::Result<(Program, LaunchedModule)> {
761761
let db_identity = database.database_identity;
762762
let host_type = database.host_type;
@@ -773,7 +773,7 @@ async fn launch_module(
773773
program,
774774
energy_monitor.clone(),
775775
on_panic,
776-
core,
776+
executor,
777777
)
778778
.await?;
779779

@@ -991,7 +991,7 @@ impl Host {
991991
page_pool: PagePool,
992992
database: Database,
993993
program: Program,
994-
core: JobCore,
994+
executor: SingleCoreExecutor,
995995
) -> anyhow::Result<Arc<ModuleInfo>> {
996996
// Even in-memory databases acquire a lockfile.
997997
// Grab a tempdir to put that lockfile in.
@@ -1024,7 +1024,7 @@ impl Host {
10241024
Arc::new(NullEnergyMonitor),
10251025
phony_replica_dir,
10261026
runtimes.clone(),
1027-
core,
1027+
executor,
10281028
)
10291029
.await?;
10301030

@@ -1057,7 +1057,7 @@ impl Host {
10571057
policy: MigrationPolicy,
10581058
energy_monitor: Arc<dyn EnergyMonitor>,
10591059
on_panic: impl Fn() + Send + Sync + 'static,
1060-
core: JobCore,
1060+
executor: SingleCoreExecutor,
10611061
) -> anyhow::Result<UpdateDatabaseResult> {
10621062
let replica_ctx = &self.replica_ctx;
10631063
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
@@ -1070,7 +1070,7 @@ impl Host {
10701070
program,
10711071
energy_monitor,
10721072
on_panic,
1073-
core,
1073+
executor,
10741074
)
10751075
.await?;
10761076

@@ -1187,7 +1187,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an
11871187

11881188
let runtimes = HostRuntimes::new(None);
11891189
let page_pool = PagePool::new(None);
1190-
let core = JobCore::default();
1190+
let core = SingleCoreExecutor::in_current_tokio_runtime();
11911191
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
11921192
// this should always succeed, but sometimes it doesn't
11931193
let module_def = match Arc::try_unwrap(module_info) {

crates/core/src/host/module_common.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
44
use crate::{
55
energy::EnergyMonitor,
6-
host::{
7-
module_host::{DynModule, ModuleInfo},
8-
Scheduler,
9-
},
6+
host::{module_host::ModuleInfo, Scheduler},
107
module_host_context::ModuleCreationContext,
118
replica_context::ReplicaContext,
129
};
@@ -79,12 +76,12 @@ impl ModuleCommon {
7976
}
8077
}
8178

82-
impl DynModule for ModuleCommon {
83-
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
79+
impl ModuleCommon {
80+
pub fn replica_ctx(&self) -> &Arc<ReplicaContext> {
8481
&self.replica_context
8582
}
8683

87-
fn scheduler(&self) -> &Scheduler {
84+
pub fn scheduler(&self) -> &Scheduler {
8885
&self.scheduler
8986
}
9087
}

0 commit comments

Comments
 (0)