Skip to content

Commit 75750af

Browse files
andygroveclaude
andcommitted
feat: Native batch passthrough for native_iceberg_compat V1 scans
Eliminate the JVM data round trip for data columns in native_iceberg_compat scans. Data columns are read directly from the native BatchContext via zero-copy Arc::clone, while only partition columns cross the JVM boundary via Arrow FFI. Previously, data made a wasteful round trip: Rust ParquetSource → per-column JNI export to JVM → JVM wraps as CometVector → JVM exports ALL cols back to Rust via Arrow FFI → Rust ScanExec deep-copies every column Now in passthrough mode: Rust ParquetSource → batch stays in native BatchContext → Rust ScanExec reads data cols directly (zero-copy) → Only partition cols imported from JVM FFI (small, constant) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 02e8ca4 commit 75750af

8 files changed

Lines changed: 224 additions & 10 deletions

File tree

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ public URI pathUri() throws URISyntaxException {
154154
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
155155
private NativeUtil nativeUtil = new NativeUtil();
156156

157+
/**
158+
* Thread-local holding the native BatchContext handle of the current reader. Set during
159+
* nextBatch() in passthrough mode so that CometBatchIterator.advancePassthrough() can retrieve
160+
* it.
161+
*/
162+
public static final ThreadLocal<Long> CURRENT_READER_HANDLE = ThreadLocal.withInitial(() -> 0L);
163+
157164
protected Configuration conf;
158165
protected int capacity;
159166
protected boolean isCaseSensitive;
@@ -888,6 +895,10 @@ private boolean containsPath(Type parquetType, String[] path, int depth) {
888895
return false;
889896
}
890897

898+
public long getHandle() {
899+
return this.handle;
900+
}
901+
891902
public void setSparkSchema(StructType schema) {
892903
this.sparkSchema = schema;
893904
}
@@ -956,6 +967,11 @@ public boolean nextBatch() throws IOException {
956967

957968
if (batchSize == 0) return false;
958969

970+
// Set the thread-local handle so CometBatchIterator.advancePassthrough() can retrieve it.
971+
// This is always set after a successful loadNextBatch() regardless of whether passthrough
972+
// mode will be used — the Rust ScanExec decides whether to use it.
973+
CURRENT_READER_HANDLE.set(this.handle);
974+
959975
long totalDecodeTime = 0, totalLoadTime = 0;
960976
for (int i = 0; i < columnReaders.length; i++) {
961977
AbstractColumnReader reader = columnReaders[i];

native/core/src/execution/operators/scan.rs

Lines changed: 117 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
// under the License.
1717

1818
use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode};
19+
use crate::parquet::get_batch_context;
1920
use crate::{
2021
errors::CometError,
2122
execution::{
@@ -79,6 +80,11 @@ pub struct ScanExec {
7980
baseline_metrics: BaselineMetrics,
8081
/// Whether native code can assume ownership of batches that it receives
8182
arrow_ffi_safe: bool,
83+
/// When true, data columns are read directly from the native reader's
84+
/// BatchContext instead of through JVM FFI (zero-copy).
85+
native_batch_passthrough: bool,
86+
/// Number of data columns from native reader. Remaining are partition columns.
87+
num_data_columns: usize,
8288
}
8389

8490
impl ScanExec {
@@ -88,6 +94,8 @@ impl ScanExec {
8894
input_source_description: &str,
8995
data_types: Vec<DataType>,
9096
arrow_ffi_safe: bool,
97+
native_batch_passthrough: bool,
98+
num_data_columns: usize,
9199
) -> Result<Self, CometError> {
92100
let metrics_set = ExecutionPlanMetricsSet::default();
93101
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
@@ -115,6 +123,8 @@ impl ScanExec {
115123
baseline_metrics,
116124
schema,
117125
arrow_ffi_safe,
126+
native_batch_passthrough,
127+
num_data_columns,
118128
})
119129
}
120130

@@ -143,12 +153,21 @@ impl ScanExec {
143153

144154
let mut current_batch = self.batch.try_lock().unwrap();
145155
if current_batch.is_none() {
146-
let next_batch = ScanExec::get_next(
147-
self.exec_context_id,
148-
self.input_source.as_ref().unwrap().as_obj(),
149-
self.data_types.len(),
150-
self.arrow_ffi_safe,
151-
)?;
156+
let next_batch = if self.native_batch_passthrough {
157+
ScanExec::get_next_passthrough(
158+
self.exec_context_id,
159+
self.input_source.as_ref().unwrap().as_obj(),
160+
self.num_data_columns,
161+
self.data_types.len(),
162+
)?
163+
} else {
164+
ScanExec::get_next(
165+
self.exec_context_id,
166+
self.input_source.as_ref().unwrap().as_obj(),
167+
self.data_types.len(),
168+
self.arrow_ffi_safe,
169+
)?
170+
};
152171
*current_batch = Some(next_batch);
153172
}
154173

@@ -259,6 +278,98 @@ impl ScanExec {
259278
Ok(InputBatch::new(inputs, Some(actual_num_rows)))
260279
}
261280

281+
/// Passthrough mode: data columns are read directly from native BatchContext
282+
/// (zero-copy Arc::clone). Only partition columns are imported from JVM via FFI.
283+
fn get_next_passthrough(
284+
exec_context_id: i64,
285+
iter: &JObject,
286+
num_data_cols: usize,
287+
num_total_cols: usize,
288+
) -> Result<InputBatch, CometError> {
289+
if exec_context_id == TEST_EXEC_CONTEXT_ID {
290+
return Ok(InputBatch::EOF);
291+
}
292+
293+
if iter.is_null() {
294+
return Err(CometError::from(ExecutionError::GeneralError(format!(
295+
"Null batch iterator object. Plan id: {exec_context_id}"
296+
))));
297+
}
298+
299+
let mut env = JVMClasses::get_env()?;
300+
301+
// 1. Advance reader; get native batch handle (data stays in Rust)
302+
let handle: i64 = unsafe {
303+
jni_call!(&mut env,
304+
comet_batch_iterator(iter).advance_passthrough() -> i64)?
305+
};
306+
if handle == 0 {
307+
return Ok(InputBatch::EOF);
308+
}
309+
310+
// 2. Get data columns from native BatchContext (zero-copy)
311+
let context = get_batch_context(handle)?;
312+
let batch = context.current_batch.as_ref().ok_or_else(|| {
313+
CometError::from(ExecutionError::GeneralError(
314+
"No current batch in BatchContext".to_string(),
315+
))
316+
})?;
317+
318+
let num_rows = batch.num_rows();
319+
let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_total_cols);
320+
321+
for i in 0..num_data_cols {
322+
// Zero-copy: just increment the Arc reference count
323+
inputs.push(Arc::clone(batch.column(i)));
324+
}
325+
326+
// 3. Import partition columns from JVM FFI (if any)
327+
let num_partition_cols = num_total_cols - num_data_cols;
328+
if num_partition_cols > 0 {
329+
let mut array_addrs = Vec::with_capacity(num_partition_cols);
330+
let mut schema_addrs = Vec::with_capacity(num_partition_cols);
331+
332+
for _ in 0..num_partition_cols {
333+
let arrow_array = Rc::new(FFI_ArrowArray::empty());
334+
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
335+
array_addrs.push(Rc::into_raw(arrow_array) as i64);
336+
schema_addrs.push(Rc::into_raw(arrow_schema) as i64);
337+
}
338+
339+
let long_array_addrs = env.new_long_array(num_partition_cols as jsize)?;
340+
let long_schema_addrs = env.new_long_array(num_partition_cols as jsize)?;
341+
env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?;
342+
env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?;
343+
344+
let array_obj = JObject::from(long_array_addrs);
345+
let schema_obj = JObject::from(long_schema_addrs);
346+
let num_data_cols_jint = num_data_cols as i32;
347+
348+
let _part_rows: i32 = unsafe {
349+
jni_call!(&mut env,
350+
comet_batch_iterator(iter).next_partition_columns_only(
351+
JValueGen::Object(array_obj.as_ref()),
352+
JValueGen::Object(schema_obj.as_ref()),
353+
JValueGen::Int(num_data_cols_jint)
354+
) -> i32)?
355+
};
356+
357+
for i in 0..num_partition_cols {
358+
let array_data = ArrayData::from_spark((array_addrs[i], schema_addrs[i]))?;
359+
let array = make_array(array_data);
360+
// Partition columns come from JVM mutable buffers, must copy
361+
inputs.push(copy_array(&array));
362+
363+
unsafe {
364+
Rc::from_raw(array_addrs[i] as *const FFI_ArrowArray);
365+
Rc::from_raw(schema_addrs[i] as *const FFI_ArrowSchema);
366+
}
367+
}
368+
}
369+
370+
Ok(InputBatch::new(inputs, Some(num_rows)))
371+
}
372+
262373
/// Allocates Arrow FFI structures and calls JNI to get the next batch data.
263374
/// Returns the number of rows and the allocated array/schema addresses.
264375
fn allocate_and_fetch_batch(

native/core/src/execution/planner.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,6 +1124,8 @@ impl PhysicalPlanner {
11241124
&scan.source,
11251125
data_types,
11261126
scan.arrow_ffi_safe,
1127+
scan.native_batch_passthrough,
1128+
scan.num_data_columns as usize,
11271129
)?;
11281130

11291131
Ok((
@@ -3473,6 +3475,8 @@ mod tests {
34733475
}],
34743476
source: "".to_string(),
34753477
arrow_ffi_safe: false,
3478+
native_batch_passthrough: false,
3479+
num_data_columns: 0,
34763480
})),
34773481
};
34783482

@@ -3547,6 +3551,8 @@ mod tests {
35473551
}],
35483552
source: "".to_string(),
35493553
arrow_ffi_safe: false,
3554+
native_batch_passthrough: false,
3555+
num_data_columns: 0,
35503556
})),
35513557
};
35523558

@@ -3754,6 +3760,8 @@ mod tests {
37543760
fields: vec![create_proto_datatype()],
37553761
source: "".to_string(),
37563762
arrow_ffi_safe: false,
3763+
native_batch_passthrough: false,
3764+
num_data_columns: 0,
37573765
})),
37583766
}
37593767
}
@@ -3797,6 +3805,8 @@ mod tests {
37973805
],
37983806
source: "".to_string(),
37993807
arrow_ffi_safe: false,
3808+
native_batch_passthrough: false,
3809+
num_data_columns: 0,
38003810
})),
38013811
};
38023812

@@ -3913,6 +3923,8 @@ mod tests {
39133923
],
39143924
source: "".to_string(),
39153925
arrow_ffi_safe: false,
3926+
native_batch_passthrough: false,
3927+
num_data_columns: 0,
39163928
})),
39173929
};
39183930

native/core/src/jvm_bridge/batch_iterator.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ pub struct CometBatchIterator<'a> {
3535
pub method_has_selection_vectors_ret: ReturnType,
3636
pub method_export_selection_indices: JMethodID,
3737
pub method_export_selection_indices_ret: ReturnType,
38+
pub method_advance_passthrough: JMethodID,
39+
pub method_advance_passthrough_ret: ReturnType,
40+
pub method_next_partition_columns_only: JMethodID,
41+
pub method_next_partition_columns_only_ret: ReturnType,
3842
}
3943

4044
impl<'a> CometBatchIterator<'a> {
@@ -61,6 +65,18 @@ impl<'a> CometBatchIterator<'a> {
6165
"([J[J)I",
6266
)?,
6367
method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int),
68+
method_advance_passthrough: env.get_method_id(
69+
Self::JVM_CLASS,
70+
"advancePassthrough",
71+
"()J",
72+
)?,
73+
method_advance_passthrough_ret: ReturnType::Primitive(Primitive::Long),
74+
method_next_partition_columns_only: env.get_method_id(
75+
Self::JVM_CLASS,
76+
"nextPartitionColumnsOnly",
77+
"([J[JI)I",
78+
)?,
79+
method_next_partition_columns_only_ret: ReturnType::Primitive(Primitive::Int),
6480
})
6581
}
6682
}

native/core/src/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,16 +601,16 @@ enum ParquetReaderState {
601601
Complete,
602602
}
603603
/// Parquet read context maintained across multiple JNI calls.
604-
struct BatchContext {
604+
pub struct BatchContext {
605605
native_plan: Arc<SparkPlan>,
606606
metrics_node: Arc<GlobalRef>,
607607
batch_stream: Option<SendableRecordBatchStream>,
608-
current_batch: Option<RecordBatch>,
608+
pub current_batch: Option<RecordBatch>,
609609
reader_state: ParquetReaderState,
610610
}
611611

612612
#[inline]
613-
fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometError> {
613+
pub fn get_batch_context<'a>(handle: i64) -> Result<&'a mut BatchContext, CometError> {
614614
unsafe {
615615
(handle as *mut BatchContext)
616616
.as_mut()

native/proto/src/proto/operator.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ message Scan {
8383
string source = 2;
8484
// Whether native code can assume ownership of batches that it receives
8585
bool arrow_ffi_safe = 3;
86+
// When true, data columns are read directly from the native reader's
87+
// BatchContext instead of through JVM FFI. Only partition columns
88+
// cross the JVM boundary.
89+
bool native_batch_passthrough = 4;
90+
// Number of data columns (from native reader). Remaining columns are partition cols.
91+
int32 num_data_columns = 5;
8692
}
8793

8894
message NativeScan {

spark/src/main/java/org/apache/comet/CometBatchIterator.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323

2424
import org.apache.spark.sql.vectorized.ColumnarBatch;
2525

26+
import org.apache.comet.parquet.NativeBatchReader;
2627
import org.apache.comet.vector.CometSelectionVector;
28+
import org.apache.comet.vector.CometVector;
2729
import org.apache.comet.vector.NativeUtil;
2830

2931
/**
@@ -111,6 +113,46 @@ public boolean hasSelectionVectors() {
111113
return true;
112114
}
113115

116+
/**
117+
* Advance to next batch in passthrough mode. Data columns stay in native BatchContext; only
118+
* partition columns are exported via FFI.
119+
*
120+
* @return native reader handle, or 0 for EOF
121+
*/
122+
public long advancePassthrough() {
123+
previousBatch = null;
124+
125+
if (currentBatch == null) {
126+
if (input.hasNext()) {
127+
currentBatch = input.next();
128+
}
129+
}
130+
if (currentBatch == null) {
131+
return 0; // EOF
132+
}
133+
long handle = NativeBatchReader.CURRENT_READER_HANDLE.get();
134+
previousBatch = currentBatch;
135+
currentBatch = null;
136+
return handle;
137+
}
138+
139+
/**
140+
* Export only partition columns (columns at indices >= numDataCols).
141+
*
142+
* @param arrayAddrs The addresses of the ArrowArray structures for partition columns
143+
* @param schemaAddrs The addresses of the ArrowSchema structures for partition columns
144+
* @param numDataCols Number of data columns to skip
145+
* @return the number of rows, or -1 if no batch
146+
*/
147+
public int nextPartitionColumnsOnly(long[] arrayAddrs, long[] schemaAddrs, int numDataCols) {
148+
if (previousBatch == null) return -1;
149+
for (int i = numDataCols; i < previousBatch.numCols(); i++) {
150+
CometVector vec = (CometVector) previousBatch.column(i);
151+
nativeUtil.exportSingleVector(vec, arrayAddrs[i - numDataCols], schemaAddrs[i - numDataCols]);
152+
}
153+
return previousBatch.numRows();
154+
}
155+
114156
/**
115157
* Export selection indices for all columns when they are selection vectors.
116158
*

0 commit comments

Comments
 (0)