Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
62d85d8
Pin db threads to cores
coolreader18 May 28, 2025
3391f0d
Move the module instance inside the database core.
jsdt May 28, 2025
42fe57b
Fix errors
coolreader18 May 28, 2025
48f1a3d
Allow 50 messages at a time on the queue.
jsdt May 28, 2025
31f1fe8
More robust job system
coolreader18 May 28, 2025
ece5074
Merge remote-tracking branch 'origin/jsdt/remove-pool' into noa/pin-t…
coolreader18 May 28, 2025
f46b4e9
Try pinning rayon
coolreader18 May 28, 2025
ea9901a
fixup! More robust job system
coolreader18 May 28, 2025
31e7cc6
fix lints
joshua-spacetime May 28, 2025
a46d9f0
Merge branch 'master' into noa/pin-threads
joshua-spacetime May 28, 2025
511a6a1
Bring back queue length stats.
jsdt May 28, 2025
72e4e56
Use sched_setaffinity on tokio blocking threads
coolreader18 Jun 2, 2025
d3a0323
Remove unnecessary utility function
coolreader18 Jun 3, 2025
54c9fb8
Remove outdated comment
coolreader18 Jun 3, 2025
655b48f
Clarify and and add docs for core::util::jobs
coolreader18 Jun 3, 2025
dcf94a9
Use a slightly better data structure for JobCores
coolreader18 Jun 3, 2025
909ee0b
More docs
coolreader18 Jun 3, 2025
759a165
Allow closing a JobThread
coolreader18 Jun 4, 2025
72623f8
Tiny fix
coolreader18 Jun 4, 2025
69b77dd
Tiny fix 2
coolreader18 Jun 4, 2025
3f872b5
Don't finish the queue before closing
coolreader18 Jun 4, 2025
cc2b42b
Bump JOB_CHANNEL_LENGTH to 64
coolreader18 Jun 4, 2025
7e7e462
Update crates/core/src/startup.rs
coolreader18 Jun 6, 2025
c4e7166
Address review
coolreader18 Jun 6, 2025
75cf78b
Remove lending_pool.rs
coolreader18 Jun 6, 2025
ba56187
More comments
coolreader18 Jun 6, 2025
08d30b2
Add TODO
coolreader18 Jun 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ log = "0.4.17"
memchr = "2"
mimalloc = "0.1.39"
nohash-hasher = "0.2"
nix = "0.30"
once_cell = "1.16"
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
parse-size = "1.1.0"
Expand Down
5 changes: 5 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ bytes.workspace = true
bytestring.workspace = true
chrono.workspace = true
crossbeam-channel.workspace = true
crossbeam-queue.workspace = true
derive_more.workspace = true
dirs.workspace = true
enum-as-inner.workspace = true
Expand Down Expand Up @@ -113,11 +114,15 @@ wasmtime.workspace = true
jwks.workspace = true
async_cache = "0.3.1"
faststr = "0.2.23"
core_affinity = "0.8"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = {workspace = true}
tikv-jemalloc-ctl = {workspace = true}

[target.'cfg(target_os = "linux")'.dependencies]
nix = { workspace = true, features = ["sched"] }

[features]
# Print a warning when doing an unindexed `iter_by_col_range` on a large table.
unindexed_iter_by_col_range_warn = []
Expand Down
52 changes: 45 additions & 7 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::SubscriptionManager;
use crate::util::{asyncify, spawn_rayon};
use crate::util::asyncify;
use crate::util::jobs::{JobCore, JobCores};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, ensure, Context};
use async_trait::async_trait;
Expand Down Expand Up @@ -95,6 +96,8 @@ pub struct HostController {
pub page_pool: PagePool,
/// The runtimes for running our modules.
runtimes: Arc<HostRuntimes>,
/// The CPU cores that are reserved for ModuleHost operations to run on.
db_cores: JobCores,
}

struct HostRuntimes {
Expand Down Expand Up @@ -169,6 +172,7 @@ impl HostController {
program_storage: ProgramStorage,
energy_monitor: Arc<impl EnergyMonitor>,
durability: Arc<dyn DurabilityProvider>,
db_cores: JobCores,
) -> Self {
Self {
hosts: <_>::default(),
Expand All @@ -179,6 +183,7 @@ impl HostController {
runtimes: HostRuntimes::new(Some(&data_dir)),
data_dir,
page_pool: PagePool::new(default_config.page_pool_max_size),
db_cores,
}
}

Expand Down Expand Up @@ -267,7 +272,19 @@ impl HostController {
/// This is not necessary during hotswap publishes,
/// as the automigration planner and executor accomplish the same validity checks.
pub async fn check_module_validity(&self, database: Database, program: Program) -> anyhow::Result<Arc<ModuleInfo>> {
Host::try_init_in_memory_to_check(&self.runtimes, self.page_pool.clone(), database, program).await
Host::try_init_in_memory_to_check(
&self.runtimes,
self.page_pool.clone(),
database,
program,
// This takes a db core to check validity, and we will later take
// another core to actually run the module. Due to the round-robin
// algorithm that JobCores uses, that will likely just be the same
// core - there's not a concern that we'll only end up using 1/2
// of the actual cores.
self.db_cores.take(),
)
.await
}

/// Run a computation on the [`RelationalDB`] of a [`ModuleHost`] managed by
Expand Down Expand Up @@ -338,6 +355,7 @@ impl HostController {
program,
self.energy_monitor.clone(),
self.unregister_fn(replica_id),
self.db_cores.take(),
)
.await?;

Expand Down Expand Up @@ -415,6 +433,7 @@ impl HostController {
program,
self.energy_monitor.clone(),
self.unregister_fn(replica_id),
self.db_cores.take(),
)
.await?;
match update_result {
Expand Down Expand Up @@ -556,6 +575,7 @@ async fn make_replica_ctx(

/// Initialize a module host for the given program.
/// The passed replica_ctx may not be configured for this version of the program's database schema yet.
#[allow(clippy::too_many_arguments)]
async fn make_module_host(
runtimes: Arc<HostRuntimes>,
host_type: HostType,
Expand All @@ -564,8 +584,14 @@ async fn make_module_host(
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
unregister: impl Fn() + Send + Sync + 'static,
core: JobCore,
) -> anyhow::Result<(Program, ModuleHost)> {
spawn_rayon(move || {
// `make_actor` is blocking, as it needs to compile the wasm to native code,
// which may be computationally expensive - sometimes up to 1s for a large module.
// TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
// threads, but those aren't for computation. Also, wasmtime uses rayon
// to run compilation in parallel, so it'll need to run stuff in rayon anyway.
asyncify(move || {
Comment thread
coolreader18 marked this conversation as resolved.
let module_host = match host_type {
HostType::Wasm => {
let mcc = ModuleCreationContext {
Expand All @@ -577,7 +603,7 @@ async fn make_module_host(
let start = Instant::now();
let actor = runtimes.wasmtime.make_actor(mcc)?;
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, unregister)
ModuleHost::new(actor, unregister, core)
}
};
Ok((program, module_host))
Expand Down Expand Up @@ -610,6 +636,7 @@ async fn launch_module(
energy_monitor: Arc<dyn EnergyMonitor>,
replica_dir: ReplicaDir,
runtimes: Arc<HostRuntimes>,
core: JobCore,
) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = database.database_identity;
let host_type = database.host_type;
Expand All @@ -626,6 +653,7 @@ async fn launch_module(
program,
energy_monitor.clone(),
on_panic,
core,
)
.await?;

Expand Down Expand Up @@ -776,6 +804,7 @@ impl Host {
energy_monitor.clone(),
replica_dir,
runtimes.clone(),
host_controller.db_cores.take(),
)
.await?;

Expand Down Expand Up @@ -834,6 +863,7 @@ impl Host {
page_pool: PagePool,
database: Database,
program: Program,
core: JobCore,
) -> anyhow::Result<Arc<ModuleInfo>> {
// Even in-memory databases acquire a lockfile.
// Grab a tempdir to put that lockfile in.
Expand Down Expand Up @@ -865,6 +895,7 @@ impl Host {
Arc::new(NullEnergyMonitor),
phony_replica_dir,
runtimes.clone(),
core,
)
.await?;

Expand Down Expand Up @@ -895,6 +926,7 @@ impl Host {
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
on_panic: impl Fn() + Send + Sync + 'static,
core: JobCore,
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = &self.replica_ctx;
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
Expand All @@ -907,6 +939,7 @@ impl Host {
program,
energy_monitor,
on_panic,
core,
)
.await?;

Expand Down Expand Up @@ -981,10 +1014,15 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an

let runtimes = HostRuntimes::new(None);
let page_pool = PagePool::new(None);
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program).await?;
let module_info = Arc::into_inner(module_info).unwrap();
let core = JobCore::default();
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
// this should always succeed, but sometimes it doesn't
let module_def = match Arc::try_unwrap(module_info) {
Ok(info) => info.module_def,
Err(info) => info.module_def.clone(),
};

Ok(module_info.module_def)
Ok(module_def)
}

// Remove all gauges associated with a database.
Expand Down
Loading
Loading