Skip to content

Commit 98b4484

Browse files
authored
feat: Enable native c2r by default, add debug asserts (#3649)
1 parent 1772097 commit 98b4484

753 files changed

Lines changed: 6172 additions & 6144 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,9 @@ object CometConf extends ShimCometConf {
312312
"Whether to enable native columnar to row conversion. When enabled, Comet will use " +
313313
"native Rust code to convert Arrow columnar data to Spark UnsafeRow format instead " +
314314
"of the JVM implementation. This can improve performance for queries that need to " +
315-
"convert between columnar and row formats. This is an experimental feature.")
315+
"convert between columnar and row formats.")
316316
.booleanConf
317-
.createWithDefault(false)
317+
.createWithDefault(true)
318318

319319
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
320320
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")

native/core/src/execution/columnar_to_row.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,16 @@ impl<'a> TypedElements<'a> {
624624
let values_slice = $arr.values();
625625
let byte_len = num_elements * $elem_size;
626626
let src_start = start_idx * $elem_size;
627+
debug_assert!(
628+
src_start + byte_len <= values_slice.len() * $elem_size,
629+
"bulk_copy_range: source slice out of bounds: src_start={}, byte_len={}, values_len={}, elem_size={}",
630+
src_start, byte_len, values_slice.len() * $elem_size, $elem_size
631+
);
632+
debug_assert!(
633+
elements_start + byte_len <= buffer.len(),
634+
"bulk_copy_range: destination slice out of bounds: elements_start={}, byte_len={}, buffer_len={}",
635+
elements_start, byte_len, buffer.len()
636+
);
627637
let src_bytes = unsafe {
628638
std::slice::from_raw_parts(
629639
(values_slice.as_ptr() as *const u8).add(src_start),

native/core/src/execution/jni_api.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
972972
) -> jni::sys::jobject {
973973
try_unwrap_or_throw(&e, |mut env| {
974974
// Get the context
975+
debug_assert!(c2r_handle != 0, "columnarToRowConvert: c2r_handle is null");
975976
let ctx = (c2r_handle as *mut ColumnarToRowContext)
976977
.as_mut()
977978
.ok_or_else(|| CometError::Internal("Null columnar to row context".to_string()))?;
@@ -989,6 +990,17 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
989990
let array_ptr = array_addrs_elements[i] as *mut FFI_ArrowArray;
990991
let schema_ptr = schema_addrs_elements[i] as *mut FFI_ArrowSchema;
991992

993+
debug_assert!(
994+
!array_ptr.is_null(),
995+
"columnarToRowConvert: null array pointer at index {}",
996+
i
997+
);
998+
debug_assert!(
999+
!schema_ptr.is_null(),
1000+
"columnarToRowConvert: null schema pointer at index {}",
1001+
i
1002+
);
1003+
9921004
// Take ownership of the FFI structures
9931005
let ffi_array = std::ptr::read(array_ptr);
9941006
let ffi_schema = std::ptr::read(schema_ptr);
@@ -1001,6 +1013,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowConvert(
10011013
}
10021014

10031015
// Convert columnar to row
1016+
debug_assert!(
1017+
num_rows >= 0,
1018+
"columnarToRowConvert: num_rows is negative: {}",
1019+
num_rows
1020+
);
10041021
let (buffer_ptr, offsets, lengths) = ctx.convert(&arrays, num_rows as usize)?;
10051022

10061023
// Create Java int arrays for offsets and lengths
@@ -1037,6 +1054,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_columnarToRowClose(
10371054
c2r_handle: jlong,
10381055
) {
10391056
try_unwrap_or_throw(&e, |_env| {
1057+
debug_assert!(c2r_handle != 0, "columnarToRowClose: c2r_handle is null");
10401058
if c2r_handle != 0 {
10411059
let _ctx: Box<ColumnarToRowContext> =
10421060
Box::from_raw(c2r_handle as *mut ColumnarToRowContext);

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ import org.apache.comet.{CometConf, NativeColumnarToRowConverter}
4545
* Native implementation of ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow
4646
* format using Rust.
4747
*
48-
* This is an experimental feature that can be enabled by setting
49-
* `spark.comet.columnarToRow.native.enabled=true`.
48+
* This feature is enabled by default and can be disabled by setting
49+
* `spark.comet.exec.columnarToRow.native.enabled=false`.
5050
*
5151
* Benefits over the JVM implementation:
5252
* - Zero-copy for variable-length types (strings, binary)

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q1.native_datafusion/extended.txt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ TakeOrderedAndProject
77
: : +- BroadcastHashJoin
88
: : :- Filter
99
: : : +- HashAggregate
10-
: : : +- CometColumnarToRow
10+
: : : +- CometNativeColumnarToRow
1111
: : : +- CometColumnarExchange
1212
: : : +- HashAggregate
1313
: : : +- Project
@@ -17,23 +17,23 @@ TakeOrderedAndProject
1717
: : : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
1818
: : : : +- SubqueryBroadcast
1919
: : : : +- BroadcastExchange
20-
: : : : +- CometColumnarToRow
20+
: : : : +- CometNativeColumnarToRow
2121
: : : : +- CometProject
2222
: : : : +- CometFilter
2323
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
2424
: : : +- BroadcastExchange
25-
: : : +- CometColumnarToRow
25+
: : : +- CometNativeColumnarToRow
2626
: : : +- CometProject
2727
: : : +- CometFilter
2828
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
2929
: : +- BroadcastExchange
3030
: : +- Filter
3131
: : +- HashAggregate
32-
: : +- CometColumnarToRow
32+
: : +- CometNativeColumnarToRow
3333
: : +- CometColumnarExchange
3434
: : +- HashAggregate
3535
: : +- HashAggregate
36-
: : +- CometColumnarToRow
36+
: : +- CometNativeColumnarToRow
3737
: : +- CometColumnarExchange
3838
: : +- HashAggregate
3939
: : +- Project
@@ -43,17 +43,17 @@ TakeOrderedAndProject
4343
: : : +- Scan parquet spark_catalog.default.store_returns [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
4444
: : : +- ReusedSubquery
4545
: : +- BroadcastExchange
46-
: : +- CometColumnarToRow
46+
: : +- CometNativeColumnarToRow
4747
: : +- CometProject
4848
: : +- CometFilter
4949
: : +- CometNativeScan parquet spark_catalog.default.date_dim
5050
: +- BroadcastExchange
51-
: +- CometColumnarToRow
51+
: +- CometNativeColumnarToRow
5252
: +- CometProject
5353
: +- CometFilter
5454
: +- CometNativeScan parquet spark_catalog.default.store
5555
+- BroadcastExchange
56-
+- CometColumnarToRow
56+
+- CometNativeColumnarToRow
5757
+- CometProject
5858
+- CometFilter
5959
+- CometNativeScan parquet spark_catalog.default.customer

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q1.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ CometColumnarToRow
1616
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_returns
1717
: : : : +- SubqueryBroadcast
1818
: : : : +- BroadcastExchange
19-
: : : : +- CometColumnarToRow
19+
: : : : +- CometNativeColumnarToRow
2020
: : : : +- CometProject
2121
: : : : +- CometFilter
2222
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_datafusion/extended.txt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
TakeOrderedAndProject
22
+- HashAggregate
3-
+- CometColumnarToRow
3+
+- CometNativeColumnarToRow
44
+- CometColumnarExchange
55
+- HashAggregate
66
+- Project
@@ -12,7 +12,7 @@ TakeOrderedAndProject
1212
: : +- BroadcastHashJoin
1313
: : :- BroadcastHashJoin
1414
: : : :- BroadcastHashJoin
15-
: : : : :- CometColumnarToRow
15+
: : : : :- CometNativeColumnarToRow
1616
: : : : : +- CometFilter
1717
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
1818
: : : : +- BroadcastExchange
@@ -22,12 +22,12 @@ TakeOrderedAndProject
2222
: : : : : +- Scan parquet spark_catalog.default.store_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
2323
: : : : : +- SubqueryBroadcast
2424
: : : : : +- BroadcastExchange
25-
: : : : : +- CometColumnarToRow
25+
: : : : : +- CometNativeColumnarToRow
2626
: : : : : +- CometProject
2727
: : : : : +- CometFilter
2828
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
2929
: : : : +- BroadcastExchange
30-
: : : : +- CometColumnarToRow
30+
: : : : +- CometNativeColumnarToRow
3131
: : : : +- CometProject
3232
: : : : +- CometFilter
3333
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
@@ -38,7 +38,7 @@ TakeOrderedAndProject
3838
: : : : +- Scan parquet spark_catalog.default.web_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
3939
: : : : +- ReusedSubquery
4040
: : : +- BroadcastExchange
41-
: : : +- CometColumnarToRow
41+
: : : +- CometNativeColumnarToRow
4242
: : : +- CometProject
4343
: : : +- CometFilter
4444
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
@@ -49,17 +49,17 @@ TakeOrderedAndProject
4949
: : : +- Scan parquet spark_catalog.default.catalog_sales [COMET: Native DataFusion scan does not support subqueries/dynamic pruning]
5050
: : : +- ReusedSubquery
5151
: : +- BroadcastExchange
52-
: : +- CometColumnarToRow
52+
: : +- CometNativeColumnarToRow
5353
: : +- CometProject
5454
: : +- CometFilter
5555
: : +- CometNativeScan parquet spark_catalog.default.date_dim
5656
: +- BroadcastExchange
57-
: +- CometColumnarToRow
57+
: +- CometNativeColumnarToRow
5858
: +- CometProject
5959
: +- CometFilter
6060
: +- CometNativeScan parquet spark_catalog.default.customer_address
6161
+- BroadcastExchange
62-
+- CometColumnarToRow
62+
+- CometNativeColumnarToRow
6363
+- CometProject
6464
+- CometFilter
6565
+- CometNativeScan parquet spark_catalog.default.customer_demographics

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ TakeOrderedAndProject
2121
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales
2222
: : : : : +- SubqueryBroadcast
2323
: : : : : +- BroadcastExchange
24-
: : : : : +- CometColumnarToRow
24+
: : : : : +- CometNativeColumnarToRow
2525
: : : : : +- CometProject
2626
: : : : : +- CometFilter
2727
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
@@ -50,12 +50,12 @@ TakeOrderedAndProject
5050
: : +- CometFilter
5151
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
5252
: +- BroadcastExchange
53-
: +- CometColumnarToRow
53+
: +- CometNativeColumnarToRow
5454
: +- CometProject
5555
: +- CometFilter
5656
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
5757
+- BroadcastExchange
58-
+- CometColumnarToRow
58+
+- CometNativeColumnarToRow
5959
+- CometProject
6060
+- CometFilter
6161
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics

0 commit comments

Comments
 (0)