Skip to content

Commit 69b2260

Browse files
authored
perf: set DataFusion session context’s target_partitions to match Spark's spark.task.cpus. (#3062)
1 parent c7aad67 commit 69b2260

3 files changed

Lines changed: 10 additions & 0 deletions

File tree

native/core/src/execution/jni_api.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
174174
memory_limit: jlong,
175175
memory_limit_per_task: jlong,
176176
task_attempt_id: jlong,
177+
task_cpus: jlong,
177178
key_unwrapper_obj: JObject,
178179
) -> jlong {
179180
try_unwrap_or_throw(&e, |mut env| {
@@ -241,6 +242,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
241242
memory_pool,
242243
local_dirs_vec,
243244
max_temp_directory_size,
245+
task_cpus as usize,
244246
)?;
245247

246248
let plan_creation_time = start.elapsed();
@@ -294,6 +296,7 @@ fn prepare_datafusion_session_context(
294296
memory_pool: Arc<dyn MemoryPool>,
295297
local_dirs: Vec<String>,
296298
max_temp_directory_size: u64,
299+
task_cpus: usize,
297300
) -> CometResult<SessionContext> {
298301
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
299302
let disk_manager = DiskManagerBuilder::default()
@@ -306,6 +309,10 @@ fn prepare_datafusion_session_context(
306309
// can be configured in Comet Spark JVM using Spark --conf parameters
307310
// e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true
308311
let session_config = SessionConfig::new()
312+
.with_target_partitions(task_cpus)
313+
// This DataFusion context is within the scope of an executing Spark Task. We want to set
314+
// its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be
315+
// modified by changing spark.task.cpus in the Spark config.
309316
.with_batch_size(batch_size)
310317
// DataFusion partial aggregates can emit duplicate rows so we disable the
311318
// skip partial aggregation feature because this is not compatible with Spark's

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class CometExecIterator(
7878
private val nativeLib = new Native()
7979
private val nativeUtil = new NativeUtil()
8080
private val taskAttemptId = TaskContext.get().taskAttemptId
81+
private val taskCPUs = TaskContext.get().cpus()
8182
private val cometTaskMemoryManager = new CometTaskMemoryManager(id, taskAttemptId)
8283
private val cometBatchIterators = inputs.map { iterator =>
8384
new CometBatchIterator(iterator, nativeUtil)
@@ -121,6 +122,7 @@ class CometExecIterator(
121122
memoryConfig.memoryLimit,
122123
memoryConfig.memoryLimitPerTask,
123124
taskAttemptId,
125+
taskCPUs,
124126
keyUnwrapper)
125127
}
126128

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class Native extends NativeBase {
6868
memoryLimit: Long,
6969
memoryLimitPerTask: Long,
7070
taskAttemptId: Long,
71+
taskCPUs: Long,
7172
keyUnwrapper: CometFileKeyUnwrapper): Long
7273
// scalastyle:on
7374

0 commit comments

Comments
 (0)