Skip to content

Commit d887555

Browse files
Merge branch 'apache:main' into main
2 parents c68c342 + 619f368 commit d887555

28 files changed

Lines changed: 1615 additions & 949 deletions

File tree

.github/workflows/benchmark-tpcds.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
rust-version: ${{env.RUST_VERSION}}
5656
jdk-version: 11
5757
- name: Cache Maven dependencies
58-
uses: actions/cache@v4
58+
uses: actions/cache@v5
5959
with:
6060
path: |
6161
~/.m2/repository
@@ -67,7 +67,7 @@ jobs:
6767
run: make release
6868
- name: Cache TPC-DS generated data
6969
id: cache-tpcds-sf-1
70-
uses: actions/cache@v4
70+
uses: actions/cache@v5
7171
with:
7272
path: ./tpcds-sf-1
7373
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml') }}
@@ -107,7 +107,7 @@ jobs:
107107
rust-version: ${{env.RUST_VERSION}}
108108
jdk-version: 11
109109
- name: Cache Maven dependencies
110-
uses: actions/cache@v4
110+
uses: actions/cache@v5
111111
with:
112112
path: |
113113
~/.m2/repository
@@ -117,7 +117,7 @@ jobs:
117117
${{ runner.os }}-java-maven-
118118
- name: Restore TPC-DS generated data
119119
id: cache-tpcds-sf-1
120-
uses: actions/cache/restore@v4
120+
uses: actions/cache/restore@v5
121121
with:
122122
path: ./tpcds-sf-1
123123
key: tpcds-${{ hashFiles('.github/workflows/benchmark.yml') }}

.github/workflows/benchmark-tpch.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
rust-version: ${{env.RUST_VERSION}}
5656
jdk-version: 11
5757
- name: Cache Maven dependencies
58-
uses: actions/cache@v4
58+
uses: actions/cache@v5
5959
with:
6060
path: |
6161
~/.m2/repository
@@ -65,7 +65,7 @@ jobs:
6565
${{ runner.os }}-java-maven-
6666
- name: Cache TPC-H generated data
6767
id: cache-tpch-sf-1
68-
uses: actions/cache@v4
68+
uses: actions/cache@v5
6969
with:
7070
path: ./tpch
7171
key: tpch-${{ hashFiles('.github/workflows/benchmark-tpch.yml') }}
@@ -91,7 +91,7 @@ jobs:
9191
rust-version: ${{env.RUST_VERSION}}
9292
jdk-version: 11
9393
- name: Cache Maven dependencies
94-
uses: actions/cache@v4
94+
uses: actions/cache@v5
9595
with:
9696
path: |
9797
~/.m2/repository
@@ -101,7 +101,7 @@ jobs:
101101
${{ runner.os }}-java-maven-
102102
- name: Restore TPC-H generated data
103103
id: cache-tpch-sf-1
104-
uses: actions/cache/restore@v4
104+
uses: actions/cache/restore@v5
105105
with:
106106
path: ./tpch
107107
key: tpch-${{ hashFiles('.github/workflows/benchmark-tpch.yml') }}

.github/workflows/spark_sql_test.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ jobs:
8989
LC_ALL: "C.UTF-8"
9090
- name: Upload fallback log
9191
if: ${{ github.event.inputs.collect-fallback-logs == 'true' }}
92-
uses: actions/upload-artifact@v5
92+
uses: actions/upload-artifact@v6
9393
with:
9494
name: fallback-log-spark-sql-${{ matrix.module.name }}-${{ matrix.os }}-spark-${{ matrix.spark-version.full }}-java-${{ matrix.spark-version.java }}
9595
path: "**/fallback.log"
@@ -138,7 +138,7 @@ jobs:
138138
LC_ALL: "C.UTF-8"
139139
- name: Upload fallback log
140140
if: ${{ github.event.inputs.collect-fallback-logs == 'true' }}
141-
uses: actions/upload-artifact@v5
141+
uses: actions/upload-artifact@v6
142142
with:
143143
name: fallback-log-spark-sql-native-comet-${{ matrix.module.name }}-${{ matrix.os }}-spark-${{ matrix.spark-version.full }}-java-${{ matrix.java-version }}
144144
path: "**/fallback.log"
@@ -187,7 +187,7 @@ jobs:
187187
LC_ALL: "C.UTF-8"
188188
- name: Upload fallback log
189189
if: ${{ github.event.inputs.collect-fallback-logs == 'true' }}
190-
uses: actions/upload-artifact@v5
190+
uses: actions/upload-artifact@v6
191191
with:
192192
name: fallback-log-spark-sql-iceberg-compat-${{ matrix.module.name }}-${{ matrix.os }}-spark-${{ matrix.spark-version.full }}-java-${{ matrix.java-version }}
193193
path: "**/fallback.log"
@@ -199,14 +199,14 @@ jobs:
199199
runs-on: ubuntu-24.04
200200
steps:
201201
- name: Download fallback log artifacts
202-
uses: actions/download-artifact@v6
202+
uses: actions/download-artifact@v7
203203
with:
204204
path: fallback-logs/
205205
- name: Merge fallback logs
206206
run: |
207207
find ./fallback-logs/ -type f -name "fallback.log" -print0 | xargs -0 cat | sort -u > all_fallback.log
208208
- name: Upload merged fallback log
209-
uses: actions/upload-artifact@v5
209+
uses: actions/upload-artifact@v6
210210
with:
211211
name: all-fallback-log
212212
path: all_fallback.log

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,17 @@ object CometConf extends ShimCometConf {
441441
.intConf
442442
.createWithDefault(8192)
443443

444+
val COMET_SHUFFLE_WRITE_BUFFER_SIZE: ConfigEntry[Long] =
445+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.writeBufferSize")
446+
.category(CATEGORY_SHUFFLE)
447+
.doc("Size of the write buffer in bytes used by the native shuffle writer when writing " +
448+
"shuffle data to disk. Larger values may improve write performance by reducing " +
449+
"the number of system calls, but will use more memory. " +
450+
"The default is 1MB which provides a good balance between performance and memory usage.")
451+
.bytesConf(ByteUnit.MiB)
452+
.checkValue(v => v > 0, "Write buffer size must be positive")
453+
.createWithDefault(1)
454+
444455
val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf(
445456
"spark.comet.shuffle.preferDictionary.ratio")
446457
.category(CATEGORY_SHUFFLE)

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ These settings can be used to determine which parts of the plan are accelerated
107107
| `spark.comet.exec.shuffle.compression.codec` | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
108108
| `spark.comet.exec.shuffle.compression.zstd.level` | The compression level to use when compressing shuffle files with zstd. | 1 |
109109
| `spark.comet.exec.shuffle.enabled` | Whether to enable Comet native shuffle. Note that this requires setting `spark.shuffle.manager` to `org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager`. `spark.shuffle.manager` must be set before starting the Spark application and cannot be changed during the application. | true |
110+
| `spark.comet.exec.shuffle.writeBufferSize` | Size of the write buffer in bytes used by the native shuffle writer when writing shuffle data to disk. Larger values may improve write performance by reducing the number of system calls, but will use more memory. The default is 1MB which provides a good balance between performance and memory usage. | 1048576b |
110111
| `spark.comet.native.shuffle.partitioning.hash.enabled` | Whether to enable hash partitioning for Comet native shuffle. | true |
111112
| `spark.comet.native.shuffle.partitioning.range.enabled` | Whether to enable range partitioning for Comet native shuffle. | true |
112113
| `spark.comet.shuffle.preferDictionary.ratio` | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |

native/core/benches/shuffle_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ fn create_shuffle_writer_exec(
152152
"/tmp/data.out".to_string(),
153153
"/tmp/index.out".to_string(),
154154
false,
155+
1024 * 1024,
155156
)
156157
.unwrap()
157158
}

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use datafusion_spark::function::datetime::date_sub::SparkDateSub;
4747
use datafusion_spark::function::hash::sha1::SparkSha1;
4848
use datafusion_spark::function::hash::sha2::SparkSha2;
4949
use datafusion_spark::function::math::expm1::SparkExpm1;
50+
use datafusion_spark::function::math::hex::SparkHex;
5051
use datafusion_spark::function::string::char::CharFunc;
5152
use datafusion_spark::function::string::concat::SparkConcat;
5253
use futures::poll;
@@ -337,6 +338,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
337338
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default()));
338339
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default()));
339340
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitwiseNot::default()));
341+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkHex::default()));
340342
}
341343

342344
/// Prepares arrow arrays for output.

native/core/src/execution/planner.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,13 +1244,15 @@ impl PhysicalPlanner {
12441244
))),
12451245
}?;
12461246

1247+
let write_buffer_size = writer.write_buffer_size as usize;
12471248
let shuffle_writer = Arc::new(ShuffleWriterExec::try_new(
12481249
Arc::clone(&child.native_plan),
12491250
partitioning,
12501251
codec,
12511252
writer.output_data_file.clone(),
12521253
writer.output_index_file.clone(),
12531254
writer.tracing_enabled,
1255+
write_buffer_size,
12541256
)?);
12551257

12561258
Ok((

native/core/src/execution/shuffle/shuffle_writer.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,21 @@ pub struct ShuffleWriterExec {
7878
/// The compression codec to use when compressing shuffle blocks
7979
codec: CompressionCodec,
8080
tracing_enabled: bool,
81+
/// Size of the write buffer in bytes
82+
write_buffer_size: usize,
8183
}
8284

8385
impl ShuffleWriterExec {
8486
/// Create a new ShuffleWriterExec
87+
#[allow(clippy::too_many_arguments)]
8588
pub fn try_new(
8689
input: Arc<dyn ExecutionPlan>,
8790
partitioning: CometPartitioning,
8891
codec: CompressionCodec,
8992
output_data_file: String,
9093
output_index_file: String,
9194
tracing_enabled: bool,
95+
write_buffer_size: usize,
9296
) -> Result<Self> {
9397
let cache = PlanProperties::new(
9498
EquivalenceProperties::new(Arc::clone(&input.schema())),
@@ -106,6 +110,7 @@ impl ShuffleWriterExec {
106110
cache,
107111
codec,
108112
tracing_enabled,
113+
write_buffer_size,
109114
})
110115
}
111116
}
@@ -169,6 +174,7 @@ impl ExecutionPlan for ShuffleWriterExec {
169174
self.output_data_file.clone(),
170175
self.output_index_file.clone(),
171176
self.tracing_enabled,
177+
self.write_buffer_size,
172178
)?)),
173179
_ => panic!("ShuffleWriterExec wrong number of children"),
174180
}
@@ -195,6 +201,7 @@ impl ExecutionPlan for ShuffleWriterExec {
195201
context,
196202
self.codec.clone(),
197203
self.tracing_enabled,
204+
self.write_buffer_size,
198205
)
199206
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
200207
)
@@ -214,6 +221,7 @@ async fn external_shuffle(
214221
context: Arc<TaskContext>,
215222
codec: CompressionCodec,
216223
tracing_enabled: bool,
224+
write_buffer_size: usize,
217225
) -> Result<SendableRecordBatchStream> {
218226
with_trace_async("external_shuffle", tracing_enabled, || async {
219227
let schema = input.schema();
@@ -227,6 +235,7 @@ async fn external_shuffle(
227235
metrics,
228236
context.session_config().batch_size(),
229237
codec,
238+
write_buffer_size,
230239
)?)
231240
}
232241
_ => Box::new(MultiPartitionShuffleRepartitioner::try_new(
@@ -240,6 +249,7 @@ async fn external_shuffle(
240249
context.session_config().batch_size(),
241250
codec,
242251
tracing_enabled,
252+
write_buffer_size,
243253
)?),
244254
};
245255

@@ -331,6 +341,8 @@ struct MultiPartitionShuffleRepartitioner {
331341
/// Reservation for repartitioning
332342
reservation: MemoryReservation,
333343
tracing_enabled: bool,
344+
/// Size of the write buffer in bytes
345+
write_buffer_size: usize,
334346
}
335347

336348
#[derive(Default)]
@@ -362,6 +374,7 @@ impl MultiPartitionShuffleRepartitioner {
362374
batch_size: usize,
363375
codec: CompressionCodec,
364376
tracing_enabled: bool,
377+
write_buffer_size: usize,
365378
) -> Result<Self> {
366379
let num_output_partitions = partitioning.partition_count();
367380
assert_ne!(
@@ -407,6 +420,7 @@ impl MultiPartitionShuffleRepartitioner {
407420
batch_size,
408421
reservation,
409422
tracing_enabled,
423+
write_buffer_size,
410424
})
411425
}
412426

@@ -654,8 +668,10 @@ impl MultiPartitionShuffleRepartitioner {
654668
output_data: &mut BufWriter<File>,
655669
encode_time: &Time,
656670
write_time: &Time,
671+
write_buffer_size: usize,
657672
) -> Result<()> {
658-
let mut buf_batch_writer = BufBatchWriter::new(shuffle_block_writer, output_data);
673+
let mut buf_batch_writer =
674+
BufBatchWriter::new(shuffle_block_writer, output_data, write_buffer_size);
659675
for batch in partition_iter {
660676
let batch = batch?;
661677
buf_batch_writer.write(&batch, encode_time, write_time)?;
@@ -714,7 +730,12 @@ impl MultiPartitionShuffleRepartitioner {
714730
for partition_id in 0..num_output_partitions {
715731
let partition_writer = &mut self.partition_writers[partition_id];
716732
let mut iter = partitioned_batches.produce(partition_id);
717-
spilled_bytes += partition_writer.spill(&mut iter, &self.runtime, &self.metrics)?;
733+
spilled_bytes += partition_writer.spill(
734+
&mut iter,
735+
&self.runtime,
736+
&self.metrics,
737+
self.write_buffer_size,
738+
)?;
718739
}
719740

720741
let mut timer = self.metrics.mempool_time.timer();
@@ -795,6 +816,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
795816
&mut output_data,
796817
&self.metrics.encode_time,
797818
&self.metrics.write_time,
819+
self.write_buffer_size,
798820
)?;
799821
}
800822

@@ -862,6 +884,7 @@ impl SinglePartitionShufflePartitioner {
862884
metrics: ShuffleRepartitionerMetrics,
863885
batch_size: usize,
864886
codec: CompressionCodec,
887+
write_buffer_size: usize,
865888
) -> Result<Self> {
866889
let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?;
867890

@@ -872,7 +895,8 @@ impl SinglePartitionShufflePartitioner {
872895
.open(output_data_path)
873896
.map_err(to_df_err)?;
874897

875-
let output_data_writer = BufBatchWriter::new(shuffle_block_writer, output_data_file);
898+
let output_data_writer =
899+
BufBatchWriter::new(shuffle_block_writer, output_data_file, write_buffer_size);
876900

877901
Ok(Self {
878902
output_data_writer,
@@ -1131,6 +1155,7 @@ impl PartitionWriter {
11311155
iter: &mut PartitionedBatchIterator,
11321156
runtime: &RuntimeEnv,
11331157
metrics: &ShuffleRepartitionerMetrics,
1158+
write_buffer_size: usize,
11341159
) -> Result<usize> {
11351160
if let Some(batch) = iter.next() {
11361161
self.ensure_spill_file_created(runtime)?;
@@ -1139,6 +1164,7 @@ impl PartitionWriter {
11391164
let mut buf_batch_writer = BufBatchWriter::new(
11401165
&mut self.shuffle_block_writer,
11411166
&mut self.spill_file.as_mut().unwrap().file,
1167+
write_buffer_size,
11421168
);
11431169
let mut bytes_written =
11441170
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
@@ -1194,10 +1220,7 @@ struct BufBatchWriter<S: Borrow<ShuffleBlockWriter>, W: Write> {
11941220
}
11951221

11961222
impl<S: Borrow<ShuffleBlockWriter>, W: Write> BufBatchWriter<S, W> {
1197-
fn new(shuffle_block_writer: S, writer: W) -> Self {
1198-
// 1MB should be good enough to avoid frequent system calls,
1199-
// and also won't cause too much memory usage
1200-
let buffer_max_size = 1024 * 1024;
1223+
fn new(shuffle_block_writer: S, writer: W, buffer_max_size: usize) -> Self {
12011224
Self {
12021225
shuffle_block_writer,
12031226
writer,
@@ -1343,6 +1366,7 @@ mod test {
13431366
1024,
13441367
CompressionCodec::Lz4Frame,
13451368
false,
1369+
1024 * 1024, // write_buffer_size: 1MB default
13461370
)
13471371
.unwrap();
13481372

@@ -1439,6 +1463,7 @@ mod test {
14391463
"/tmp/data.out".to_string(),
14401464
"/tmp/index.out".to_string(),
14411465
false,
1466+
1024 * 1024, // write_buffer_size: 1MB default
14421467
)
14431468
.unwrap();
14441469

native/proto/src/proto/operator.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ message ShuffleWriter {
236236
CompressionCodec codec = 5;
237237
int32 compression_level = 6;
238238
bool tracing_enabled = 7;
239+
// Size of the write buffer in bytes used when writing shuffle data to disk.
240+
// Larger values may improve write performance but use more memory.
241+
int32 write_buffer_size = 8;
239242
}
240243

241244
message ParquetWriter {

0 commit comments

Comments
 (0)