Skip to content

Commit 7c75c01

Browse files
committed
Merge remote-tracking branch 'apache/main' into scatter-kernel-pr
2 parents 7bae4d1 + 8bab4a5 commit 7c75c01

19 files changed

Lines changed: 1039 additions & 59 deletions

File tree

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,17 @@ object CometConf extends ShimCometConf {
343343
.booleanConf
344344
.createWithDefault(true)
345345

346+
val COMET_SHUFFLE_DIRECT_READ_ENABLED: ConfigEntry[Boolean] =
347+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.directRead.enabled")
348+
.category(CATEGORY_SHUFFLE)
349+
.doc(
350+
"When enabled, native operators that consume shuffle output will read " +
351+
"compressed shuffle blocks directly in native code, bypassing Arrow FFI. " +
352+
"Applies to both native shuffle and JVM columnar shuffle. " +
353+
"Requires spark.comet.exec.shuffle.enabled to be true.")
354+
.booleanConf
355+
.createWithDefault(true)
356+
346357
val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
347358
.category(CATEGORY_SHUFFLE)
348359
.doc(

native/core/src/execution/jni_api.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ use tokio::sync::mpsc;
8282
use crate::execution::memory_pools::{
8383
create_memory_pool, handle_task_shared_pool_release, parse_memory_pool_config, MemoryPoolConfig,
8484
};
85-
use crate::execution::operators::ScanExec;
85+
use crate::execution::operators::{ScanExec, ShuffleScanExec};
8686
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
8787
use crate::execution::spark_plan::SparkPlan;
8888

@@ -151,6 +151,8 @@ struct ExecutionContext {
151151
pub root_op: Option<Arc<SparkPlan>>,
152152
/// The input sources for the DataFusion plan
153153
pub scans: Vec<ScanExec>,
154+
/// The shuffle scan input sources for the DataFusion plan
155+
pub shuffle_scans: Vec<ShuffleScanExec>,
154156
/// The global reference of input sources for the DataFusion plan
155157
pub input_sources: Vec<Arc<GlobalRef>>,
156158
/// The record batch stream to pull results from
@@ -311,6 +313,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
311313
partition_count: partition_count as usize,
312314
root_op: None,
313315
scans: vec![],
316+
shuffle_scans: vec![],
314317
input_sources,
315318
stream: None,
316319
batch_receiver: None,
@@ -491,6 +494,10 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
491494
exec_context.scans.iter_mut().try_for_each(|scan| {
492495
scan.get_next_batch()?;
493496
Ok::<(), CometError>(())
497+
})?;
498+
exec_context.shuffle_scans.iter_mut().try_for_each(|scan| {
499+
scan.get_next_batch()?;
500+
Ok::<(), CometError>(())
494501
})
495502
}
496503

@@ -539,7 +546,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
539546
let planner =
540547
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
541548
.with_exec_id(exec_context_id);
542-
let (scans, root_op) = planner.create_plan(
549+
let (scans, shuffle_scans, root_op) = planner.create_plan(
543550
&exec_context.spark_plan,
544551
&mut exec_context.input_sources.clone(),
545552
exec_context.partition_count,
@@ -548,6 +555,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
548555

549556
exec_context.plan_creation_time += physical_plan_time;
550557
exec_context.scans = scans;
558+
exec_context.shuffle_scans = shuffle_scans;
551559

552560
if exec_context.explain_native {
553561
let formatted_plan_str =
@@ -560,7 +568,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
560568
// so we should always execute partition 0.
561569
let stream = root_op.native_plan.execute(0, task_ctx)?;
562570

563-
if exec_context.scans.is_empty() {
571+
if exec_context.scans.is_empty() && exec_context.shuffle_scans.is_empty() {
564572
// No JVM data sources — spawn onto tokio so the executor
565573
// thread parks in blocking_recv instead of busy-polling.
566574
//

native/core/src/execution/operators/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ pub use parquet_writer::ParquetWriterExec;
3232
mod csv_scan;
3333
pub mod projection;
3434
mod scan;
35+
mod shuffle_scan;
3536
pub use csv_scan::init_csv_datasource_exec;
37+
pub use shuffle_scan::ShuffleScanExec;

native/core/src/execution/operators/projection.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ use jni::objects::GlobalRef;
2525

2626
use crate::{
2727
execution::{
28-
operators::{ExecutionError, ScanExec},
29-
planner::{operator_registry::OperatorBuilder, PhysicalPlanner},
28+
planner::{operator_registry::OperatorBuilder, PhysicalPlanner, PlanCreationResult},
3029
spark_plan::SparkPlan,
3130
},
3231
extract_op,
@@ -42,12 +41,13 @@ impl OperatorBuilder for ProjectionBuilder {
4241
inputs: &mut Vec<Arc<GlobalRef>>,
4342
partition_count: usize,
4443
planner: &PhysicalPlanner,
45-
) -> Result<(Vec<ScanExec>, Arc<SparkPlan>), ExecutionError> {
44+
) -> PlanCreationResult {
4645
let project = extract_op!(spark_plan, Projection);
4746
let children = &spark_plan.children;
4847

4948
assert_eq!(children.len(), 1);
50-
let (scans, child) = planner.create_plan(&children[0], inputs, partition_count)?;
49+
let (scans, shuffle_scans, child) =
50+
planner.create_plan(&children[0], inputs, partition_count)?;
5151

5252
// Create projection expressions
5353
let exprs: Result<Vec<_>, _> = project
@@ -68,6 +68,7 @@ impl OperatorBuilder for ProjectionBuilder {
6868

6969
Ok((
7070
scans,
71+
shuffle_scans,
7172
Arc::new(SparkPlan::new(spark_plan.plan_id, projection, vec![child])),
7273
))
7374
}

0 commit comments

Comments
 (0)