Skip to content

Commit c3ef76c

Browse files
andygroveclaude
andauthored
perf: Initialize tokio runtime worker threads from spark.executor.cores (apache#3555)
* perf: Initialize tokio runtime worker threads from spark.executor.cores The tokio runtime was previously a process-wide lazy static initialized on first access with the default thread count (all CPU cores). This wastes resources when spark.executor.cores is less than the total CPU count. This change initializes the runtime dynamically using the resolved value of spark.executor.cores as the default worker thread count. The priority chain is: 1. COMET_WORKER_THREADS env var (explicit override, unchanged) 2. spark.executor.cores (resolved by numDriverOrExecutorCores) 3. Tokio default (all CPU cores, only if get_runtime() called before createPlan) The Scala side resolves spark.executor.cores via numDriverOrExecutorCores() which handles local, local[N], local[*], and cluster modes, then injects the resolved value into the config map sent to native code. On the Rust side, Lazy<Runtime> is replaced with OnceLock<Runtime> so the runtime can be initialized with the executor cores value from createPlan(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fmt --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f697d27 commit c3ef76c

3 files changed

Lines changed: 39 additions & 7 deletions

File tree

native/core/src/execution/jni_api.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,31 @@ use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_t
8787
use crate::execution::memory_pools::logging_pool::LoggingMemoryPool;
8888
use crate::execution::spark_config::{
8989
SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED,
90-
COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
90+
COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED, SPARK_EXECUTOR_CORES,
9191
};
9292
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
9393
use datafusion_comet_proto::spark_operator::operator::OpStruct;
9494
use log::info;
95-
use once_cell::sync::Lazy;
95+
use std::sync::OnceLock;
9696
#[cfg(feature = "jemalloc")]
9797
use tikv_jemalloc_ctl::{epoch, stats};
9898

99-
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
99+
static TOKIO_RUNTIME: OnceLock<Runtime> = OnceLock::new();
100+
101+
fn parse_usize_env_var(name: &str) -> Option<usize> {
102+
std::env::var_os(name).and_then(|n| n.to_str().and_then(|s| s.parse::<usize>().ok()))
103+
}
104+
105+
fn build_runtime(default_worker_threads: Option<usize>) -> Runtime {
100106
let mut builder = tokio::runtime::Builder::new_multi_thread();
101107
if let Some(n) = parse_usize_env_var("COMET_WORKER_THREADS") {
108+
info!("Comet tokio runtime: using COMET_WORKER_THREADS={n}");
109+
builder.worker_threads(n);
110+
} else if let Some(n) = default_worker_threads {
111+
info!("Comet tokio runtime: using spark.executor.cores={n} worker threads");
102112
builder.worker_threads(n);
113+
} else {
114+
info!("Comet tokio runtime: using default thread count");
103115
}
104116
if let Some(n) = parse_usize_env_var("COMET_MAX_BLOCKING_THREADS") {
105117
builder.max_blocking_threads(n);
@@ -108,15 +120,17 @@ static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
108120
.enable_all()
109121
.build()
110122
.expect("Failed to create Tokio runtime")
111-
});
123+
}
112124

113-
fn parse_usize_env_var(name: &str) -> Option<usize> {
114-
std::env::var_os(name).and_then(|n| n.to_str().and_then(|s| s.parse::<usize>().ok()))
125+
/// Initialize the global Tokio runtime with the given default worker thread count.
126+
/// If the runtime is already initialized, this is a no-op.
127+
pub fn init_runtime(default_worker_threads: usize) {
128+
TOKIO_RUNTIME.get_or_init(|| build_runtime(Some(default_worker_threads)));
115129
}
116130

117131
/// Function to get a handle to the global Tokio runtime
118132
pub fn get_runtime() -> &'static Runtime {
119-
&TOKIO_RUNTIME
133+
TOKIO_RUNTIME.get_or_init(|| build_runtime(None))
120134
}
121135

122136
/// Comet native execution context. Kept alive across JNI calls.
@@ -192,6 +206,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
192206
let spark_configs = serde::deserialize_config(bytes.as_slice())?;
193207
let spark_config: HashMap<String, String> = spark_configs.entries.into_iter().collect();
194208

209+
// Initialize the tokio runtime with spark.executor.cores as the default
210+
// worker thread count, falling back to 1 if not set.
211+
let executor_cores = spark_config.get_usize(SPARK_EXECUTOR_CORES, 1);
212+
init_runtime(executor_cores);
213+
195214
// Access Comet configs
196215
let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
197216
let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED);

native/core/src/execution/spark_config.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
2222
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
2323
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
2424
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";
25+
pub(crate) const SPARK_EXECUTOR_CORES: &str = "spark.executor.cores";
2526

2627
pub(crate) trait SparkConfig {
2728
fn get_bool(&self, name: &str) -> bool;
2829
fn get_u64(&self, name: &str, default_value: u64) -> u64;
30+
fn get_usize(&self, name: &str, default_value: usize) -> usize;
2931
}
3032

3133
impl SparkConfig for HashMap<String, String> {
@@ -40,4 +42,10 @@ impl SparkConfig for HashMap<String, String> {
4042
.and_then(|str_val| str_val.parse::<u64>().ok())
4143
.unwrap_or(default_value)
4244
}
45+
46+
fn get_usize(&self, name: &str, default_value: usize) -> usize {
47+
self.get(name)
48+
.and_then(|str_val| str_val.parse::<usize>().ok())
49+
.unwrap_or(default_value)
50+
}
4351
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,11 @@ object CometExecIterator extends Logging {
278278
builder.putEntries(k, v)
279279
}
280280
}
281+
// Inject the resolved executor cores so the native side can use it
282+
// for tokio runtime thread count
283+
val executorCores = numDriverOrExecutorCores(SparkEnv.get.conf)
284+
builder.putEntries("spark.executor.cores", executorCores.toString)
285+
281286
builder.build().toByteArray
282287
}
283288

0 commit comments

Comments
 (0)