Skip to content

Commit 0b1e511

Browse files
authored
[VL] Reduce Velox hash shuffle partition buffer memory by evicting large partitions after split (#12089)
1 parent faad82c commit 0b1e511

11 files changed

Lines changed: 67 additions & 7 deletions

File tree

backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
150150
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
151151
nativeBufferSize,
152152
GlutenConfig.get.columnarShuffleReallocThreshold,
153+
GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
153154
partitionWriterHandle
154155
)
155156
case SortShuffleWriterType =>

backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected void writeImpl(Iterator<Product2<K, V>> records) {
185185
columnarDep.nativePartitioning(), partitionId),
186186
nativeBufferSize,
187187
reallocThreshold,
188+
GlutenConfig.get().columnarShufflePartitionBufferEvictThreshold(),
188189
partitionWriterHandle);
189190
}
190191

backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V](
192192
taskContext.partitionId),
193193
nativeBufferSize,
194194
reallocThreshold,
195+
GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold,
195196
partitionWriterHandle
196197
)
197198
}

cpp/core/jni/JniWrapper.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
990990
jint startPartitionId,
991991
jint splitBufferSize,
992992
jdouble splitBufferReallocThreshold,
993+
jint partitionBufferEvictThreshold,
993994
jlong partitionWriterHandle) {
994995
JNI_METHOD_START
995996
const auto ctx = getRuntime(env, wrapper);
@@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
10041005
toPartitioning(jStringToCString(env, partitioningNameJstr)),
10051006
startPartitionId,
10061007
splitBufferSize,
1007-
splitBufferReallocThreshold);
1008+
splitBufferReallocThreshold,
1009+
partitionBufferEvictThreshold);
10081010

10091011
return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
10101012
JNI_METHOD_END(kInvalidObjectHandle)

cpp/core/shuffle/Options.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
namespace gluten {
2828

2929
static constexpr int16_t kDefaultBatchSize = 4096;
30+
static constexpr int32_t kDefaultPartitionBufferEvictThreshold = -1;
3031
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
3132
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
3233
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
@@ -85,17 +86,20 @@ struct ShuffleWriterOptions {
8586
struct HashShuffleWriterOptions : ShuffleWriterOptions {
8687
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
8788
double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
89+
int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold;
8890

8991
HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {}
9092

9193
HashShuffleWriterOptions(
9294
Partitioning partitioning,
9395
int32_t startPartitionId,
9496
int32_t partitionBufferSize,
95-
double partitionBufferReallocThreshold)
97+
double partitionBufferReallocThreshold,
98+
int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold)
9699
: ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId),
97100
splitBufferSize(partitionBufferSize),
98-
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
101+
splitBufferReallocThreshold(partitionBufferReallocThreshold),
102+
partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
99103

100104
protected:
101105
HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {}
@@ -105,10 +109,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
105109
Partitioning partitioning,
106110
int32_t startPartitionId,
107111
int32_t partitionBufferSize,
108-
double partitionBufferReallocThreshold)
112+
double partitionBufferReallocThreshold,
113+
int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold)
109114
: ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId),
110115
splitBufferSize(partitionBufferSize),
111-
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
116+
splitBufferReallocThreshold(partitionBufferReallocThreshold),
117+
partitionBufferEvictThreshold(partitionBufferEvictThreshold) {}
112118
};
113119

114120
struct SortShuffleWriterOptions : ShuffleWriterOptions {

cpp/core/shuffle/Payload.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ arrow::Result<uint8_t> readPayloadType(arrow::io::InputStream* is) {
6060
}
6161

6262
arrow::Result<int64_t> compressBuffer(
63-
const std::shared_ptr<arrow::Buffer>& buffer,
63+
const std::shared_ptr<arrow::Buffer> buffer,
6464
uint8_t* output,
6565
int64_t outputLength,
6666
arrow::util::Codec* codec) {

cpp/velox/shuffle/VeloxHashShuffleWriter.cc

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector&
441441
printPartitionBuffer();
442442

443443
setSplitState(SplitState::kInit);
444+
if (partitionBufferEvictThreshold_ > 0) {
445+
// After split, evict large partition buffers to free up memory for the next input RowVector.
446+
const auto partitionBytes = estimatePartitionBufferBytes();
447+
for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) {
448+
if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= partitionBufferEvictThreshold_) {
449+
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
450+
}
451+
}
452+
}
444453
return arrow::Status::OK();
445454
}
446455

456+
std::vector<int64_t> VeloxHashShuffleWriter::estimatePartitionBufferBytes() const {
457+
std::vector<int64_t> partitionBytes(numPartitions_, 0);
458+
459+
for (const auto& columnBuffers : partitionBuffers_) {
460+
for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) {
461+
for (const auto& buffer : columnBuffers[pid]) {
462+
if (buffer) {
463+
partitionBytes[pid] += buffer->capacity();
464+
}
465+
}
466+
}
467+
}
468+
469+
for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) {
470+
const auto& buffer = complexTypeFlushBuffer_[pid];
471+
if (buffer) {
472+
partitionBytes[pid] += buffer->capacity();
473+
}
474+
}
475+
476+
return partitionBytes;
477+
}
478+
447479
arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) {
448480
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);
449481

cpp/velox/shuffle/VeloxHashShuffleWriter.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
278278
MemoryManager* memoryManager)
279279
: VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager),
280280
splitBufferSize_(options->splitBufferSize),
281-
splitBufferReallocThreshold_(options->splitBufferReallocThreshold) {
281+
splitBufferReallocThreshold_(options->splitBufferReallocThreshold),
282+
partitionBufferEvictThreshold_(options->partitionBufferEvictThreshold) {
282283
arenas_.resize(numPartitions);
283284
}
284285

@@ -287,6 +288,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
287288

288289
arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);
289290

291+
std::vector<int64_t> estimatePartitionBufferBytes() const;
292+
290293
arrow::Status splitRowVector(const facebook::velox::RowVector& rv);
291294

292295
arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
@@ -396,6 +399,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
396399
protected:
397400
int32_t splitBufferSize_;
398401
double splitBufferReallocThreshold_;
402+
int32_t partitionBufferEvictThreshold_;
399403

400404
std::shared_ptr<arrow::Schema> schema_;
401405

docs/Configuration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ nav_order: 15
9595
| spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. |
9696
| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. |
9797
| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 |
98+
| spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold | -1 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. Use non-positive value to disable this feature. |
9899
| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. |
99100
| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 |
100101
| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. |

gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public native long createHashShuffleWriter(
4343
int startPartitionId,
4444
int splitBufferSize,
4545
double splitBufferReallocThreshold,
46+
int partitionBufferEvictThreshold,
4647
long partitionWriterHandle);
4748

4849
public native long createSortShuffleWriter(

0 commit comments

Comments
 (0)