Skip to content

Commit fef0576

Browse files
committed
Revert "feat: Native batch passthrough for native_iceberg_compat V1 scans"
This reverts commit 75750af.
1 parent 75750af commit fef0576

8 files changed

Lines changed: 10 additions & 224 deletions

File tree

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,6 @@ public URI pathUri() throws URISyntaxException {
154154
protected static final BufferAllocator ALLOCATOR = new RootAllocator();
155155
private NativeUtil nativeUtil = new NativeUtil();
156156

157-
/**
158-
* Thread-local holding the native BatchContext handle of the current reader. Set during
159-
* nextBatch() in passthrough mode so that CometBatchIterator.advancePassthrough() can retrieve
160-
* it.
161-
*/
162-
public static final ThreadLocal<Long> CURRENT_READER_HANDLE = ThreadLocal.withInitial(() -> 0L);
163-
164157
protected Configuration conf;
165158
protected int capacity;
166159
protected boolean isCaseSensitive;
@@ -895,10 +888,6 @@ private boolean containsPath(Type parquetType, String[] path, int depth) {
895888
return false;
896889
}
897890

898-
public long getHandle() {
899-
return this.handle;
900-
}
901-
902891
public void setSparkSchema(StructType schema) {
903892
this.sparkSchema = schema;
904893
}
@@ -967,11 +956,6 @@ public boolean nextBatch() throws IOException {
967956

968957
if (batchSize == 0) return false;
969958

970-
// Set the thread-local handle so CometBatchIterator.advancePassthrough() can retrieve it.
971-
// This is always set after a successful loadNextBatch() regardless of whether passthrough
972-
// mode will be used — the Rust ScanExec decides whether to use it.
973-
CURRENT_READER_HANDLE.set(this.handle);
974-
975959
long totalDecodeTime = 0, totalLoadTime = 0;
976960
for (int i = 0; i < columnReaders.length; i++) {
977961
AbstractColumnReader reader = columnReaders[i];

native/core/src/execution/operators/scan.rs

Lines changed: 6 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
// under the License.
1717

1818
use crate::execution::operators::{copy_array, copy_or_unpack_array, CopyMode};
19-
use crate::parquet::get_batch_context;
2019
use crate::{
2120
errors::CometError,
2221
execution::{
@@ -80,11 +79,6 @@ pub struct ScanExec {
8079
baseline_metrics: BaselineMetrics,
8180
/// Whether native code can assume ownership of batches that it receives
8281
arrow_ffi_safe: bool,
83-
/// When true, data columns are read directly from the native reader's
84-
/// BatchContext instead of through JVM FFI (zero-copy).
85-
native_batch_passthrough: bool,
86-
/// Number of data columns from native reader. Remaining are partition columns.
87-
num_data_columns: usize,
8882
}
8983

9084
impl ScanExec {
@@ -94,8 +88,6 @@ impl ScanExec {
9488
input_source_description: &str,
9589
data_types: Vec<DataType>,
9690
arrow_ffi_safe: bool,
97-
native_batch_passthrough: bool,
98-
num_data_columns: usize,
9991
) -> Result<Self, CometError> {
10092
let metrics_set = ExecutionPlanMetricsSet::default();
10193
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
@@ -123,8 +115,6 @@ impl ScanExec {
123115
baseline_metrics,
124116
schema,
125117
arrow_ffi_safe,
126-
native_batch_passthrough,
127-
num_data_columns,
128118
})
129119
}
130120

@@ -153,21 +143,12 @@ impl ScanExec {
153143

154144
let mut current_batch = self.batch.try_lock().unwrap();
155145
if current_batch.is_none() {
156-
let next_batch = if self.native_batch_passthrough {
157-
ScanExec::get_next_passthrough(
158-
self.exec_context_id,
159-
self.input_source.as_ref().unwrap().as_obj(),
160-
self.num_data_columns,
161-
self.data_types.len(),
162-
)?
163-
} else {
164-
ScanExec::get_next(
165-
self.exec_context_id,
166-
self.input_source.as_ref().unwrap().as_obj(),
167-
self.data_types.len(),
168-
self.arrow_ffi_safe,
169-
)?
170-
};
146+
let next_batch = ScanExec::get_next(
147+
self.exec_context_id,
148+
self.input_source.as_ref().unwrap().as_obj(),
149+
self.data_types.len(),
150+
self.arrow_ffi_safe,
151+
)?;
171152
*current_batch = Some(next_batch);
172153
}
173154

@@ -278,98 +259,6 @@ impl ScanExec {
278259
Ok(InputBatch::new(inputs, Some(actual_num_rows)))
279260
}
280261

281-
/// Passthrough mode: data columns are read directly from native BatchContext
282-
/// (zero-copy Arc::clone). Only partition columns are imported from JVM via FFI.
283-
fn get_next_passthrough(
284-
exec_context_id: i64,
285-
iter: &JObject,
286-
num_data_cols: usize,
287-
num_total_cols: usize,
288-
) -> Result<InputBatch, CometError> {
289-
if exec_context_id == TEST_EXEC_CONTEXT_ID {
290-
return Ok(InputBatch::EOF);
291-
}
292-
293-
if iter.is_null() {
294-
return Err(CometError::from(ExecutionError::GeneralError(format!(
295-
"Null batch iterator object. Plan id: {exec_context_id}"
296-
))));
297-
}
298-
299-
let mut env = JVMClasses::get_env()?;
300-
301-
// 1. Advance reader; get native batch handle (data stays in Rust)
302-
let handle: i64 = unsafe {
303-
jni_call!(&mut env,
304-
comet_batch_iterator(iter).advance_passthrough() -> i64)?
305-
};
306-
if handle == 0 {
307-
return Ok(InputBatch::EOF);
308-
}
309-
310-
// 2. Get data columns from native BatchContext (zero-copy)
311-
let context = get_batch_context(handle)?;
312-
let batch = context.current_batch.as_ref().ok_or_else(|| {
313-
CometError::from(ExecutionError::GeneralError(
314-
"No current batch in BatchContext".to_string(),
315-
))
316-
})?;
317-
318-
let num_rows = batch.num_rows();
319-
let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_total_cols);
320-
321-
for i in 0..num_data_cols {
322-
// Zero-copy: just increment the Arc reference count
323-
inputs.push(Arc::clone(batch.column(i)));
324-
}
325-
326-
// 3. Import partition columns from JVM FFI (if any)
327-
let num_partition_cols = num_total_cols - num_data_cols;
328-
if num_partition_cols > 0 {
329-
let mut array_addrs = Vec::with_capacity(num_partition_cols);
330-
let mut schema_addrs = Vec::with_capacity(num_partition_cols);
331-
332-
for _ in 0..num_partition_cols {
333-
let arrow_array = Rc::new(FFI_ArrowArray::empty());
334-
let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
335-
array_addrs.push(Rc::into_raw(arrow_array) as i64);
336-
schema_addrs.push(Rc::into_raw(arrow_schema) as i64);
337-
}
338-
339-
let long_array_addrs = env.new_long_array(num_partition_cols as jsize)?;
340-
let long_schema_addrs = env.new_long_array(num_partition_cols as jsize)?;
341-
env.set_long_array_region(&long_array_addrs, 0, &array_addrs)?;
342-
env.set_long_array_region(&long_schema_addrs, 0, &schema_addrs)?;
343-
344-
let array_obj = JObject::from(long_array_addrs);
345-
let schema_obj = JObject::from(long_schema_addrs);
346-
let num_data_cols_jint = num_data_cols as i32;
347-
348-
let _part_rows: i32 = unsafe {
349-
jni_call!(&mut env,
350-
comet_batch_iterator(iter).next_partition_columns_only(
351-
JValueGen::Object(array_obj.as_ref()),
352-
JValueGen::Object(schema_obj.as_ref()),
353-
JValueGen::Int(num_data_cols_jint)
354-
) -> i32)?
355-
};
356-
357-
for i in 0..num_partition_cols {
358-
let array_data = ArrayData::from_spark((array_addrs[i], schema_addrs[i]))?;
359-
let array = make_array(array_data);
360-
// Partition columns come from JVM mutable buffers, must copy
361-
inputs.push(copy_array(&array));
362-
363-
unsafe {
364-
Rc::from_raw(array_addrs[i] as *const FFI_ArrowArray);
365-
Rc::from_raw(schema_addrs[i] as *const FFI_ArrowSchema);
366-
}
367-
}
368-
}
369-
370-
Ok(InputBatch::new(inputs, Some(num_rows)))
371-
}
372-
373262
/// Allocates Arrow FFI structures and calls JNI to get the next batch data.
374263
/// Returns the number of rows and the allocated array/schema addresses.
375264
fn allocate_and_fetch_batch(

native/core/src/execution/planner.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,8 +1124,6 @@ impl PhysicalPlanner {
11241124
&scan.source,
11251125
data_types,
11261126
scan.arrow_ffi_safe,
1127-
scan.native_batch_passthrough,
1128-
scan.num_data_columns as usize,
11291127
)?;
11301128

11311129
Ok((
@@ -3475,8 +3473,6 @@ mod tests {
34753473
}],
34763474
source: "".to_string(),
34773475
arrow_ffi_safe: false,
3478-
native_batch_passthrough: false,
3479-
num_data_columns: 0,
34803476
})),
34813477
};
34823478

@@ -3551,8 +3547,6 @@ mod tests {
35513547
}],
35523548
source: "".to_string(),
35533549
arrow_ffi_safe: false,
3554-
native_batch_passthrough: false,
3555-
num_data_columns: 0,
35563550
})),
35573551
};
35583552

@@ -3760,8 +3754,6 @@ mod tests {
37603754
fields: vec![create_proto_datatype()],
37613755
source: "".to_string(),
37623756
arrow_ffi_safe: false,
3763-
native_batch_passthrough: false,
3764-
num_data_columns: 0,
37653757
})),
37663758
}
37673759
}
@@ -3805,8 +3797,6 @@ mod tests {
38053797
],
38063798
source: "".to_string(),
38073799
arrow_ffi_safe: false,
3808-
native_batch_passthrough: false,
3809-
num_data_columns: 0,
38103800
})),
38113801
};
38123802

@@ -3923,8 +3913,6 @@ mod tests {
39233913
],
39243914
source: "".to_string(),
39253915
arrow_ffi_safe: false,
3926-
native_batch_passthrough: false,
3927-
num_data_columns: 0,
39283916
})),
39293917
};
39303918

native/core/src/jvm_bridge/batch_iterator.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,6 @@ pub struct CometBatchIterator<'a> {
3535
pub method_has_selection_vectors_ret: ReturnType,
3636
pub method_export_selection_indices: JMethodID,
3737
pub method_export_selection_indices_ret: ReturnType,
38-
pub method_advance_passthrough: JMethodID,
39-
pub method_advance_passthrough_ret: ReturnType,
40-
pub method_next_partition_columns_only: JMethodID,
41-
pub method_next_partition_columns_only_ret: ReturnType,
4238
}
4339

4440
impl<'a> CometBatchIterator<'a> {
@@ -65,18 +61,6 @@ impl<'a> CometBatchIterator<'a> {
6561
"([J[J)I",
6662
)?,
6763
method_export_selection_indices_ret: ReturnType::Primitive(Primitive::Int),
68-
method_advance_passthrough: env.get_method_id(
69-
Self::JVM_CLASS,
70-
"advancePassthrough",
71-
"()J",
72-
)?,
73-
method_advance_passthrough_ret: ReturnType::Primitive(Primitive::Long),
74-
method_next_partition_columns_only: env.get_method_id(
75-
Self::JVM_CLASS,
76-
"nextPartitionColumnsOnly",
77-
"([J[JI)I",
78-
)?,
79-
method_next_partition_columns_only_ret: ReturnType::Primitive(Primitive::Int),
8064
})
8165
}
8266
}

native/core/src/parquet/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,16 +601,16 @@ enum ParquetReaderState {
601601
Complete,
602602
}
603603
/// Parquet read context maintained across multiple JNI calls.
604-
pub struct BatchContext {
604+
struct BatchContext {
605605
native_plan: Arc<SparkPlan>,
606606
metrics_node: Arc<GlobalRef>,
607607
batch_stream: Option<SendableRecordBatchStream>,
608-
pub current_batch: Option<RecordBatch>,
608+
current_batch: Option<RecordBatch>,
609609
reader_state: ParquetReaderState,
610610
}
611611

612612
#[inline]
613-
pub fn get_batch_context<'a>(handle: i64) -> Result<&'a mut BatchContext, CometError> {
613+
fn get_batch_context<'a>(handle: jlong) -> Result<&'a mut BatchContext, CometError> {
614614
unsafe {
615615
(handle as *mut BatchContext)
616616
.as_mut()

native/proto/src/proto/operator.proto

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,6 @@ message Scan {
8383
string source = 2;
8484
// Whether native code can assume ownership of batches that it receives
8585
bool arrow_ffi_safe = 3;
86-
// When true, data columns are read directly from the native reader's
87-
// BatchContext instead of through JVM FFI. Only partition columns
88-
// cross the JVM boundary.
89-
bool native_batch_passthrough = 4;
90-
// Number of data columns (from native reader). Remaining columns are partition cols.
91-
int32 num_data_columns = 5;
9286
}
9387

9488
message NativeScan {

spark/src/main/java/org/apache/comet/CometBatchIterator.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323

2424
import org.apache.spark.sql.vectorized.ColumnarBatch;
2525

26-
import org.apache.comet.parquet.NativeBatchReader;
2726
import org.apache.comet.vector.CometSelectionVector;
28-
import org.apache.comet.vector.CometVector;
2927
import org.apache.comet.vector.NativeUtil;
3028

3129
/**
@@ -113,46 +111,6 @@ public boolean hasSelectionVectors() {
113111
return true;
114112
}
115113

116-
/**
117-
* Advance to next batch in passthrough mode. Data columns stay in native BatchContext; only
118-
* partition columns are exported via FFI.
119-
*
120-
* @return native reader handle, or 0 for EOF
121-
*/
122-
public long advancePassthrough() {
123-
previousBatch = null;
124-
125-
if (currentBatch == null) {
126-
if (input.hasNext()) {
127-
currentBatch = input.next();
128-
}
129-
}
130-
if (currentBatch == null) {
131-
return 0; // EOF
132-
}
133-
long handle = NativeBatchReader.CURRENT_READER_HANDLE.get();
134-
previousBatch = currentBatch;
135-
currentBatch = null;
136-
return handle;
137-
}
138-
139-
/**
140-
* Export only partition columns (columns at indices >= numDataCols).
141-
*
142-
* @param arrayAddrs The addresses of the ArrowArray structures for partition columns
143-
* @param schemaAddrs The addresses of the ArrowSchema structures for partition columns
144-
* @param numDataCols Number of data columns to skip
145-
* @return the number of rows, or -1 if no batch
146-
*/
147-
public int nextPartitionColumnsOnly(long[] arrayAddrs, long[] schemaAddrs, int numDataCols) {
148-
if (previousBatch == null) return -1;
149-
for (int i = numDataCols; i < previousBatch.numCols(); i++) {
150-
CometVector vec = (CometVector) previousBatch.column(i);
151-
nativeUtil.exportSingleVector(vec, arrayAddrs[i - numDataCols], schemaAddrs[i - numDataCols]);
152-
}
153-
return previousBatch.numRows();
154-
}
155-
156114
/**
157115
* Export selection indices for all columns when they are selection vectors.
158116
*

0 commit comments

Comments
 (0)