From a539a6273f3cc00890dd1ee0fac95bcfec3a7c34 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Tue, 3 Feb 2026 16:56:42 +0200 Subject: [PATCH 1/2] chore: Move spark unsafe classes into spark_unsafe --- native/core/benches/row_columnar.rs | 2 +- native/core/src/execution/jni_api.rs | 2 +- native/core/src/execution/shuffle/mod.rs | 4 +--- .../shuffle/{ => spark_unsafe}/list.rs | 2 +- .../shuffle/{ => spark_unsafe}/map.rs | 2 +- .../src/execution/shuffle/spark_unsafe/mod.rs | 20 +++++++++++++++++++ .../shuffle/{ => spark_unsafe}/row.rs | 10 ++++++---- 7 files changed, 31 insertions(+), 11 deletions(-) rename native/core/src/execution/shuffle/{ => spark_unsafe}/list.rs (99%) rename native/core/src/execution/shuffle/{ => spark_unsafe}/map.rs (98%) create mode 100644 native/core/src/execution/shuffle/spark_unsafe/mod.rs rename native/core/src/execution/shuffle/{ => spark_unsafe}/row.rs (99%) diff --git a/native/core/benches/row_columnar.rs b/native/core/benches/row_columnar.rs index a62574111b..a040b25eb0 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/core/benches/row_columnar.rs @@ -16,7 +16,7 @@ // under the License. use arrow::datatypes::DataType as ArrowDataType; -use comet::execution::shuffle::row::{ +use comet::execution::shuffle::spark_unsafe::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; use comet::execution::shuffle::CompressionCodec; diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 2022aef75e..b9776afd3f 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -22,7 +22,7 @@ use crate::{ errors::{try_unwrap_or_throw, CometError, CometResult}, execution::{ metrics::utils::update_comet_metric, planner::PhysicalPlanner, serde::to_arrow_datatype, - shuffle::row::process_sorted_row_partition, sort::RdxSort, + shuffle::spark_unsafe::row::process_sorted_row_partition, sort::RdxSort, }, jvm_bridge::{jni_new_global_ref, JVMClasses}, }; diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index e2798df63e..98953883fc 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -17,9 +17,7 @@ pub(crate) mod codec; mod comet_partitioning; -mod list; -mod map; -pub mod row; +pub mod spark_unsafe; mod shuffle_writer; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; diff --git a/native/core/src/execution/shuffle/list.rs b/native/core/src/execution/shuffle/spark_unsafe/list.rs similarity index 99% rename from native/core/src/execution/shuffle/list.rs rename to native/core/src/execution/shuffle/spark_unsafe/list.rs index c31244b87d..d8e39e8b09 100644 --- a/native/core/src/execution/shuffle/list.rs +++ b/native/core/src/execution/shuffle/spark_unsafe/list.rs @@ -17,7 +17,7 @@ use crate::{ errors::CometError, - execution::shuffle::{ + execution::shuffle::spark_unsafe::{ map::append_map_elements, row::{append_field, downcast_builder_ref, SparkUnsafeObject, SparkUnsafeRow}, }, diff --git a/native/core/src/execution/shuffle/map.rs b/native/core/src/execution/shuffle/spark_unsafe/map.rs similarity index 98% rename from native/core/src/execution/shuffle/map.rs rename to native/core/src/execution/shuffle/spark_unsafe/map.rs index 48b0b9a00a..de2b96146b 100644 --- a/native/core/src/execution/shuffle/map.rs +++ b/native/core/src/execution/shuffle/spark_unsafe/map.rs @@ -17,7 +17,7 @@ use crate::{ errors::CometError, - execution::shuffle::list::{append_to_builder, SparkUnsafeArray}, + execution::shuffle::spark_unsafe::list::{append_to_builder, SparkUnsafeArray}, }; use arrow::array::builder::{ArrayBuilder, MapBuilder, MapFieldNames}; use arrow::datatypes::{DataType, FieldRef}; diff --git a/native/core/src/execution/shuffle/spark_unsafe/mod.rs b/native/core/src/execution/shuffle/spark_unsafe/mod.rs new file mode 100644 index 0000000000..b052df29b3 --- /dev/null +++ b/native/core/src/execution/shuffle/spark_unsafe/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod list; +mod map; +pub mod row; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/spark_unsafe/row.rs similarity index 99% rename from native/core/src/execution/shuffle/row.rs rename to native/core/src/execution/shuffle/spark_unsafe/row.rs index 821607ddb9..1c121c506b 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/spark_unsafe/row.rs @@ -22,8 +22,10 @@ use crate::{ execution::{ shuffle::{ codec::{Checksum, ShuffleBlockWriter}, - list::{append_list_element, SparkUnsafeArray}, - map::{append_map_elements, get_map_key_value_fields, SparkUnsafeMap}, + spark_unsafe::{ + list::{append_list_element, SparkUnsafeArray}, + map::{append_map_elements, get_map_key_value_fields, SparkUnsafeMap}, + }, }, utils::bytes_to_i128, }, @@ -293,7 +295,7 @@ pub(crate) use downcast_builder_ref; /// `struct_builder.append` is called before/after calling this function to append the null buffer /// of the struct array. #[allow(clippy::redundant_closure_call)] -pub(crate) fn append_field( +pub(super) fn append_field( dt: &DataType, struct_builder: &mut StructBuilder, row: &SparkUnsafeRow, @@ -441,7 +443,7 @@ pub(crate) fn append_field( /// Appends column of top rows to the given array builder. #[allow(clippy::redundant_closure_call, clippy::too_many_arguments)] -pub(crate) fn append_columns( +fn append_columns( row_addresses_ptr: *mut jlong, row_sizes_ptr: *mut jint, row_start: usize, From dafe646814c316b1976d53326ab40c505db50cff Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Tue, 3 Feb 2026 18:32:25 +0200 Subject: [PATCH 2/2] fmt --- native/core/src/execution/shuffle/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index 98953883fc..2e9a08c435 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -17,8 +17,8 @@ pub(crate) mod codec; mod comet_partitioning; -pub mod spark_unsafe; mod shuffle_writer; +pub mod spark_unsafe; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; pub use comet_partitioning::CometPartitioning;