|
| 1 | +<!-- |
| 2 | +Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +or more contributor license agreements. See the NOTICE file |
| 4 | +distributed with this work for additional information |
| 5 | +regarding copyright ownership. The ASF licenses this file |
| 6 | +to you under the Apache License, Version 2.0 (the |
| 7 | +"License"); you may not use this file except in compliance |
| 8 | +with the License. You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | +Unless required by applicable law or agreed to in writing, |
| 13 | +software distributed under the License is distributed on an |
| 14 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | +KIND, either express or implied. See the License for the |
| 16 | +specific language governing permissions and limitations |
| 17 | +under the License. |
| 18 | +--> |
| 19 | + |
| 20 | +# JVM Shuffle |
| 21 | + |
| 22 | +This document describes Comet's JVM-based columnar shuffle implementation (`CometColumnarShuffle`), which |
| 23 | +writes shuffle data in Arrow IPC format using JVM code with native encoding. For the fully native |
| 24 | +alternative, see [Native Shuffle](native_shuffle.md). |
| 25 | + |
| 26 | +## Overview |
| 27 | + |
| 28 | +Comet provides two shuffle implementations: |
| 29 | + |
| 30 | +- **CometNativeShuffle** (`CometExchange`): Fully native shuffle using Rust. Takes columnar input directly |
| 31 | + from Comet native operators and performs partitioning in native code. |
| 32 | +- **CometColumnarShuffle** (`CometColumnarExchange`): JVM-based shuffle that operates on rows internally, |
| 33 | + buffers `UnsafeRow`s in memory pages, and uses native code (via JNI) to encode them to Arrow IPC format. |
| 34 | + Uses Spark's partitioner for partition assignment. Can accept either row-based or columnar input |
| 35 | + (columnar input is converted to rows via `ColumnarToRowExec`). |
| 36 | + |
| 37 | +The JVM shuffle is selected via `CometShuffleDependency.shuffleType`. |
| 38 | + |
| 39 | +## When JVM Shuffle is Used |
| 40 | + |
| 41 | +JVM shuffle (`CometColumnarExchange`) is used instead of native shuffle (`CometExchange`) in the following cases: |
| 42 | + |
| 43 | +1. **Shuffle mode is explicitly set to "jvm"**: When `spark.comet.exec.shuffle.mode` is set to `jvm`. |
| 44 | + |
| 45 | +2. **Child plan is not a Comet native operator**: When the child plan is a Spark row-based operator |
| 46 | + (not a `CometPlan`), JVM shuffle is the only option since native shuffle requires columnar input |
| 47 | + from Comet operators. |
| 48 | + |
| 49 | +3. **Unsupported partitioning type**: Native shuffle only supports `HashPartitioning`, `RangePartitioning`, |
| 50 | + and `SinglePartition`. JVM shuffle additionally supports `RoundRobinPartitioning`. |
| 51 | + |
| 52 | +4. **Unsupported partition key types**: For `HashPartitioning` and `RangePartitioning`, native shuffle |
| 53 | + only supports primitive types as partition keys. Complex types (struct, array, map) cannot be used |
| 54 | + as partition keys in native shuffle, though they are fully supported as data columns in both implementations. |
| 55 | + |
| 56 | +## Input Handling |
| 57 | + |
| 58 | +### Spark Row-Based Input |
| 59 | + |
| 60 | +When the child plan is a Spark row-based operator, `CometColumnarExchange` calls `child.execute()` which |
| 61 | +returns an `RDD[InternalRow]`. The rows flow directly to the JVM shuffle writers. |
| 62 | + |
| 63 | +### Comet Columnar Input |
| 64 | + |
| 65 | +When the child plan is a Comet native operator (e.g., `CometHashAggregate`) but JVM shuffle is selected |
| 66 | +(due to shuffle mode setting or unsupported partitioning), `CometColumnarExchange` still calls |
| 67 | +`child.execute()`. Comet operators implement `doExecute()` by wrapping themselves with `ColumnarToRowExec`: |
| 68 | + |
| 69 | +```scala |
| 70 | +// In CometExec base class |
| 71 | +override def doExecute(): RDD[InternalRow] = |
| 72 | + ColumnarToRowExec(this).doExecute() |
| 73 | +``` |
| 74 | + |
| 75 | +This means the data path becomes: |
| 76 | + |
| 77 | +``` |
| 78 | +Comet Native (columnar) → ColumnarToRowExec → rows → JVM Shuffle → Arrow IPC → columnar |
| 79 | +``` |
| 80 | + |
| 81 | +This is less efficient than native shuffle which avoids the columnar-to-row conversion: |
| 82 | + |
| 83 | +``` |
| 84 | +Comet Native (columnar) → Native Shuffle → Arrow IPC → columnar |
| 85 | +``` |
| 86 | + |
| 87 | +### Why Use Spark's Partitioner? |
| 88 | + |
| 89 | +JVM shuffle uses row-based input so it can leverage Spark's existing partitioner infrastructure |
| 90 | +(`partitioner.getPartition(key)`). This allows Comet to support all of Spark's partitioning schemes |
| 91 | +without reimplementing them in Rust. Native shuffle, by contrast, serializes the partitioning scheme |
| 92 | +to protobuf and implements the partitioning logic in native code. |
| 93 | + |
| 94 | +## Architecture |
| 95 | + |
| 96 | +``` |
| 97 | +┌─────────────────────────────────────────────────────────────────────────┐ |
| 98 | +│ CometShuffleManager │ |
| 99 | +│ - Extends Spark's ShuffleManager │ |
| 100 | +│ - Routes to appropriate writer/reader based on ShuffleHandle type │ |
| 101 | +└─────────────────────────────────────────────────────────────────────────┘ |
| 102 | + │ |
| 103 | + ┌────────────────────────┼────────────────────────┐ |
| 104 | + ▼ ▼ ▼ |
| 105 | +┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ |
| 106 | +│ CometBypassMerge- │ │ CometUnsafe- │ │ CometNative- │ |
| 107 | +│ SortShuffleWriter │ │ ShuffleWriter │ │ ShuffleWriter │ |
| 108 | +│ (hash-based) │ │ (sort-based) │ │ (fully native) │ |
| 109 | +└─────────────────────┘ └─────────────────────┘ └─────────────────────┘ |
| 110 | + │ │ |
| 111 | + ▼ ▼ |
| 112 | +┌─────────────────────┐ ┌─────────────────────┐ |
| 113 | +│ CometDiskBlock- │ │ CometShuffleExternal│ |
| 114 | +│ Writer │ │ Sorter │ |
| 115 | +└─────────────────────┘ └─────────────────────┘ |
| 116 | + │ │ |
| 117 | + └────────────┬───────────┘ |
| 118 | + ▼ |
| 119 | + ┌─────────────────────┐ |
| 120 | + │ SpillWriter │ |
| 121 | + │ (native encoding │ |
| 122 | + │ via JNI) │ |
| 123 | + └─────────────────────┘ |
| 124 | +``` |
| 125 | + |
| 126 | +## Key Classes |
| 127 | + |
| 128 | +### Shuffle Manager |
| 129 | + |
| 130 | +| Class | Location | Description | |
| 131 | +| ------------------------ | ------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------ | |
| 132 | +| `CometShuffleManager` | `.../shuffle/CometShuffleManager.scala` | Entry point. Extends Spark's `ShuffleManager`. Selects writer/reader based on handle type. Delegates non-Comet shuffles to `SortShuffleManager`. | |
| 133 | +| `CometShuffleDependency` | `.../shuffle/CometShuffleDependency.scala` | Extends `ShuffleDependency`. Contains `shuffleType` (`CometColumnarShuffle` or `CometNativeShuffle`) and schema info. | |
| 134 | + |
| 135 | +### Shuffle Handles |
| 136 | + |
| 137 | +| Handle | Writer Strategy | |
| 138 | +| ----------------------------------- | --------------------------------------------------------- | |
| 139 | +| `CometBypassMergeSortShuffleHandle` | Hash-based: one file per partition, merged at end | |
| 140 | +| `CometSerializedShuffleHandle` | Sort-based: records sorted by partition ID, single output | |
| 141 | +| `CometNativeShuffleHandle` | Fully native shuffle | |
| 142 | + |
| 143 | +Selection logic in `CometShuffleManager.shouldBypassMergeSort()`: |
| 144 | + |
| 145 | +- Uses bypass if partitions < threshold AND partitions × cores ≤ max threads |
| 146 | +- Otherwise uses sort-based to avoid OOM from many concurrent writers |
| 147 | + |
| 148 | +### Writers |
| 149 | + |
| 150 | +| Class | Location | Description | |
| 151 | +| ----------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- | |
| 152 | +| `CometBypassMergeSortShuffleWriter` | `.../shuffle/CometBypassMergeSortShuffleWriter.java` | Hash-based writer. Creates one `CometDiskBlockWriter` per partition. Supports async writes. | |
| 153 | +| `CometUnsafeShuffleWriter` | `.../shuffle/CometUnsafeShuffleWriter.java` | Sort-based writer. Uses `CometShuffleExternalSorter` to buffer and sort records, then merges spill files. | |
| 154 | +| `CometDiskBlockWriter` | `.../shuffle/CometDiskBlockWriter.java` | Buffers rows in memory pages for a single partition. Spills to disk via native encoding. Used by bypass writer. | |
| 155 | +| `CometShuffleExternalSorter` | `.../shuffle/sort/CometShuffleExternalSorter.java` | Buffers records across all partitions, sorts by partition ID, spills sorted data. Used by unsafe writer. | |
| 156 | +| `SpillWriter` | `.../shuffle/SpillWriter.java` | Base class for spill logic. Manages memory pages and calls `Native.writeSortedFileNative()` for Arrow IPC encoding. | |
| 157 | + |
| 158 | +### Reader |
| 159 | + |
| 160 | +| Class | Location | Description | |
| 161 | +| ------------------------------ | ------------------------------------------------ | -------------------------------------------------------------------------------------------------- | |
| 162 | +| `CometBlockStoreShuffleReader` | `.../shuffle/CometBlockStoreShuffleReader.scala` | Fetches shuffle blocks via `ShuffleBlockFetcherIterator`. Decodes Arrow IPC to `ColumnarBatch`. | |
| 163 | +| `NativeBatchDecoderIterator` | `.../shuffle/NativeBatchDecoderIterator.scala` | Reads compressed Arrow IPC batches from input stream. Calls `Native.decodeShuffleBlock()` via JNI. | |
| 164 | + |
| 165 | +## Data Flow |
| 166 | + |
| 167 | +### Write Path |
| 168 | + |
| 169 | +1. `ShuffleWriteProcessor` calls `CometShuffleManager.getWriter()` |
| 170 | +2. Writer receives `Iterator[Product2[K, V]]` where V is `UnsafeRow` |
| 171 | +3. Rows are serialized and buffered in off-heap memory pages |
| 172 | +4. When memory threshold or batch size is reached, `SpillWriter.doSpilling()` is called |
| 173 | +5. Native code (`Native.writeSortedFileNative()`) converts rows to Arrow arrays and writes IPC format |
| 174 | +6. For bypass writer: partition files are concatenated into final output |
| 175 | +7. For sort writer: spill files are merged |
| 176 | + |
| 177 | +### Read Path |
| 178 | + |
| 179 | +1. `CometBlockStoreShuffleReader.read()` creates `ShuffleBlockFetcherIterator` |
| 180 | +2. For each block, `NativeBatchDecoderIterator` reads the IPC stream |
| 181 | +3. Native code (`Native.decodeShuffleBlock()`) decompresses and decodes to Arrow arrays |
| 182 | +4. Arrow FFI imports arrays as `ColumnarBatch` |
| 183 | + |
| 184 | +## Memory Management |
| 185 | + |
| 186 | +- `CometShuffleMemoryAllocator`: Custom allocator for off-heap memory pages |
| 187 | +- Memory is allocated in pages; when allocation fails, writers spill to disk |
| 188 | +- `CometDiskBlockWriter` coordinates spilling across all partition writers (largest first) |
| 189 | +- Async spilling is supported via `ShuffleThreadPool` |
| 190 | + |
| 191 | +## Configuration |
| 192 | + |
| 193 | +| Config | Description | |
| 194 | +| ----------------------------------------------- | ----------------------------------- | |
| 195 | +| `spark.comet.columnar.shuffle.async.enabled` | Enable async spill writes | |
| 196 | +| `spark.comet.columnar.shuffle.async.thread.num` | Threads per writer for async | |
| 197 | +| `spark.comet.columnar.shuffle.batch.size` | Rows per Arrow batch | |
| 198 | +| `spark.comet.columnar.shuffle.spill.threshold` | Row count threshold for spill | |
| 199 | +| `spark.comet.exec.shuffle.compression.codec` | Compression codec (zstd, lz4, etc.) | |
0 commit comments