Skip to content

Commit 524eaa1

Browse files
authored
feat: Native columnar to row conversion (Phase 1) (#3221)
1 parent ec5df97 commit 524eaa1

16 files changed

Lines changed: 4209 additions & 4 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ jobs:
202202
value: |
203203
org.apache.comet.exec.CometShuffleSuite
204204
org.apache.comet.exec.CometShuffle4_0Suite
205+
org.apache.comet.exec.CometNativeColumnarToRowSuite
205206
org.apache.comet.exec.CometNativeShuffleSuite
206207
org.apache.comet.exec.CometShuffleEncryptionSuite
207208
org.apache.comet.exec.CometShuffleManagerSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ jobs:
145145
value: |
146146
org.apache.comet.exec.CometShuffleSuite
147147
org.apache.comet.exec.CometShuffle4_0Suite
148+
org.apache.comet.exec.CometNativeColumnarToRowSuite
148149
org.apache.comet.exec.CometNativeShuffleSuite
149150
org.apache.comet.exec.CometShuffleEncryptionSuite
150151
org.apache.comet.exec.CometShuffleManagerSuite

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
CLAUDE.md
12
target
23
.idea
34
*.iml

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ object CometConf extends ShimCometConf {
296296
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
297297
createExecEnabledConfig("localTableScan", defaultValue = false)
298298

299+
val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
300+
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
301+
.category(CATEGORY_EXEC)
302+
.doc(
303+
"Whether to enable native columnar to row conversion. When enabled, Comet will use " +
304+
"native Rust code to convert Arrow columnar data to Spark UnsafeRow format instead " +
305+
"of the JVM implementation. This can improve performance for queries that need to " +
306+
"convert between columnar and row formats. This is an experimental feature.")
307+
.booleanConf
308+
.createWithDefault(false)
309+
299310
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
300311
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
301312
.category(CATEGORY_ENABLE_EXEC)

common/src/main/scala/org/apache/comet/vector/NativeUtil.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,26 @@ class NativeUtil {
7878
(arrays, schemas)
7979
}
8080

81+
/**
82+
* Exports a ColumnarBatch to Arrow FFI and returns the memory addresses.
83+
*
84+
* This is a convenience method that allocates Arrow structs, exports the batch, and returns
85+
* just the memory addresses (without exposing the Arrow types).
86+
*
87+
* @param batch
88+
* the columnar batch to export
89+
* @return
90+
* a tuple of (array addresses, schema addresses, number of rows)
91+
*/
92+
def exportBatchToAddresses(batch: ColumnarBatch): (Array[Long], Array[Long], Int) = {
93+
val numCols = batch.numCols()
94+
val (arrays, schemas) = allocateArrowStructs(numCols)
95+
val arrayAddrs = arrays.map(_.memoryAddress())
96+
val schemaAddrs = schemas.map(_.memoryAddress())
97+
val numRows = exportBatch(arrayAddrs, schemaAddrs, batch)
98+
(arrayAddrs, schemaAddrs, numRows)
99+
}
100+
81101
/**
82102
* Exports a Comet `ColumnarBatch` into a list of memory addresses that can be consumed by the
83103
* native execution.

0 commit comments

Comments
 (0)