Skip to content

Commit 8dfeca3

Browse files
Merge branch 'apache:main' into main
2 parents ea92e4b + 1009e98 commit 8dfeca3

30 files changed

Lines changed: 2864 additions & 702 deletions

File tree

.github/workflows/pr_build_linux.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ jobs:
122122
org.apache.comet.exec.CometAsyncShuffleSuite
123123
org.apache.comet.exec.DisableAQECometShuffleSuite
124124
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
125+
org.apache.spark.shuffle.sort.SpillSorterSuite
125126
- name: "parquet"
126127
value: |
127128
org.apache.comet.parquet.CometParquetWriterSuite
@@ -160,6 +161,7 @@ jobs:
160161
value: |
161162
org.apache.comet.CometExpressionSuite
162163
org.apache.comet.CometExpressionCoverageSuite
164+
org.apache.comet.CometHashExpressionSuite
163165
org.apache.comet.CometTemporalExpressionSuite
164166
org.apache.comet.CometArrayExpressionSuite
165167
org.apache.comet.CometCastSuite

.github/workflows/pr_build_macos.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ jobs:
8585
org.apache.comet.exec.CometAsyncShuffleSuite
8686
org.apache.comet.exec.DisableAQECometShuffleSuite
8787
org.apache.comet.exec.DisableAQECometAsyncShuffleSuite
88+
org.apache.spark.shuffle.sort.SpillSorterSuite
8889
- name: "parquet"
8990
value: |
9091
org.apache.comet.parquet.CometParquetWriterSuite
@@ -123,6 +124,7 @@ jobs:
123124
value: |
124125
org.apache.comet.CometExpressionSuite
125126
org.apache.comet.CometExpressionCoverageSuite
127+
org.apache.comet.CometHashExpressionSuite
126128
org.apache.comet.CometTemporalExpressionSuite
127129
org.apache.comet.CometArrayExpressionSuite
128130
org.apache.comet.CometCastSuite

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ under the License.
2121

2222
[![Apache licensed][license-badge]][license-url]
2323
[![Discord chat][discord-badge]][discord-url]
24+
[![Pending PRs][pending-pr-badge]][pending-pr-url]
2425

2526
[license-badge]: https://img.shields.io/badge/license-Apache%20v2-blue.svg
2627
[license-url]: https://github.com/apache/datafusion-comet/blob/main/LICENSE.txt
2728
[discord-badge]: https://img.shields.io/discord/885562378132000778.svg?logo=discord&style=flat-square
2829
[discord-url]: https://discord.gg/3EAr4ZX6JK
30+
[pending-pr-badge]: https://img.shields.io/github/issues-search/apache/datafusion-comet?query=is%3Apr+is%3Aopen+draft%3Afalse+review%3Arequired+status%3Asuccess&label=Pending%20PRs&logo=github
31+
[pending-pr-url]: https://github.com/apache/datafusion-comet/pulls?q=is%3Apr+is%3Aopen+draft%3Afalse+review%3Arequired+status%3Asuccess+sort%3Aupdated-desc
2932

3033
<img src="docs/source/_static/images/DataFusionComet-Logo-Light.png" width="512" alt="logo"/>
3134

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ object CometConf extends ShimCometConf {
111111
.booleanConf
112112
.createWithDefault(false)
113113

114+
// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
115+
// and does not support complex types. Use native_iceberg_compat or auto instead.
116+
// See: https://github.com/apache/datafusion-comet/issues/2186
117+
@deprecated("Use SCAN_AUTO instead", "0.9.0")
114118
val SCAN_NATIVE_COMET = "native_comet"
115119
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
116120
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
@@ -121,11 +125,14 @@ object CometConf extends ShimCometConf {
121125
.doc(
122126
s"The implementation of Comet Native Scan to use. Available modes are `$SCAN_NATIVE_COMET`," +
123127
s"`$SCAN_NATIVE_DATAFUSION`, and `$SCAN_NATIVE_ICEBERG_COMPAT`. " +
124-
s"`$SCAN_NATIVE_COMET` is for the original Comet native scan which uses a jvm based " +
125-
"parquet file reader and native column decoding. Supports simple types only " +
126-
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on DataFusion" +
127-
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is a native implementation that exposes apis to read " +
128-
s"parquet columns natively. `$SCAN_AUTO` chooses the best scan.")
128+
s"`$SCAN_NATIVE_COMET` (DEPRECATED) is for the original Comet native scan which " +
129+
"uses a jvm based parquet file reader and native column decoding. " +
130+
"Supports simple types only. " +
131+
s"`$SCAN_NATIVE_DATAFUSION` is a fully native implementation of scan based on " +
132+
"DataFusion. " +
133+
s"`$SCAN_NATIVE_ICEBERG_COMPAT` is the recommended native implementation that " +
134+
"exposes apis to read parquet columns natively and supports complex types. " +
135+
s"`$SCAN_AUTO` (default) chooses the best scan.")
129136
.internal()
130137
.stringConf
131138
.transform(_.toLowerCase(Locale.ROOT))

dev/ensure-jars-have-correct-contents.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ allowed_expr+="|^org/apache/spark/shuffle/$"
8686
allowed_expr+="|^org/apache/spark/shuffle/sort/$"
8787
allowed_expr+="|^org/apache/spark/shuffle/sort/CometShuffleExternalSorter.*$"
8888
allowed_expr+="|^org/apache/spark/shuffle/sort/RowPartition.class$"
89+
allowed_expr+="|^org/apache/spark/shuffle/sort/SpillSorter.*$"
8990
allowed_expr+="|^org/apache/spark/shuffle/comet/.*$"
9091
allowed_expr+="|^org/apache/spark/sql/$"
9192
# allow ExplainPlanGenerator trait since it may not be available in older Spark versions

docs/source/contributor-guide/adding_a_new_expression.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,29 @@ test("unhex") {
236236
}
237237
```
238238

239+
#### Testing with Literal Values
240+
241+
When writing tests that use literal values (e.g., `SELECT my_func('literal')`), Spark's constant folding optimizer may evaluate the expression at planning time rather than execution time. This means your Comet implementation might not actually be exercised during the test.
242+
243+
To ensure literal expressions are executed by Comet, disable the constant folding optimizer:
244+
245+
```scala
246+
test("my_func with literals") {
247+
withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
248+
"org.apache.spark.sql.catalyst.optimizer.ConstantFolding") {
249+
checkSparkAnswerAndOperator("SELECT my_func('literal_value')")
250+
}
251+
}
252+
```
253+
254+
This is particularly important for:
255+
256+
- Edge case tests using specific literal values (e.g., null handling, overflow conditions)
257+
- Tests verifying behavior with special input values
258+
- Any test where the expression inputs are entirely literal
259+
260+
When possible, prefer testing with column references from tables (as shown in the `unhex` example above), which naturally avoids the constant folding issue.
261+
239262
### Adding the Expression To the Protobuf Definition
240263

241264
Once you have the expression implemented in Scala, you might need to update the protobuf definition to include the new expression. You may not need to do this if the expression is already covered by the existing protobuf definition (e.g. you're adding a new scalar function that uses the `ScalarFunc` message).

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ These settings can be used to determine which parts of the plan are accelerated
333333
| `spark.comet.expression.TruncTimestamp.enabled` | Enable Comet acceleration for `TruncTimestamp` | true |
334334
| `spark.comet.expression.UnaryMinus.enabled` | Enable Comet acceleration for `UnaryMinus` | true |
335335
| `spark.comet.expression.Unhex.enabled` | Enable Comet acceleration for `Unhex` | true |
336+
| `spark.comet.expression.UnixDate.enabled` | Enable Comet acceleration for `UnixDate` | true |
336337
| `spark.comet.expression.UnscaledValue.enabled` | Enable Comet acceleration for `UnscaledValue` | true |
337338
| `spark.comet.expression.Upper.enabled` | Enable Comet acceleration for `Upper` | true |
338339
| `spark.comet.expression.WeekDay.enabled` | Enable Comet acceleration for `WeekDay` | true |

native/core/src/execution/operators/parquet_writer.rs

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919
2020
use std::{
2121
any::Any,
22+
collections::HashMap,
2223
fmt,
2324
fmt::{Debug, Formatter},
2425
fs::File,
2526
io::Cursor,
2627
sync::Arc,
2728
};
2829

29-
use opendal::{services::Hdfs, Operator};
30-
use url::Url;
30+
use opendal::Operator;
3131

32+
use crate::execution::shuffle::CompressionCodec;
33+
use crate::parquet::parquet_support::{
34+
create_hdfs_operator, is_hdfs_scheme, prepare_object_store_with_configs,
35+
};
3236
use arrow::datatypes::{Schema, SchemaRef};
3337
use arrow::record_batch::RecordBatch;
3438
use async_trait::async_trait;
@@ -50,8 +54,7 @@ use parquet::{
5054
basic::{Compression, ZstdLevel},
5155
file::properties::WriterProperties,
5256
};
53-
54-
use crate::execution::shuffle::CompressionCodec;
57+
use url::Url;
5558

5659
/// Enum representing different types of Arrow writers based on storage backend
5760
enum ParquetWriter {
@@ -200,6 +203,8 @@ pub struct ParquetWriterExec {
200203
partition_id: i32,
201204
/// Column names to use in the output Parquet file
202205
column_names: Vec<String>,
206+
/// Object store configuration options
207+
object_store_options: HashMap<String, String>,
203208
/// Metrics
204209
metrics: ExecutionPlanMetricsSet,
205210
/// Cache for plan properties
@@ -218,6 +223,7 @@ impl ParquetWriterExec {
218223
compression: CompressionCodec,
219224
partition_id: i32,
220225
column_names: Vec<String>,
226+
object_store_options: HashMap<String, String>,
221227
) -> Result<Self> {
222228
// Preserve the input's partitioning so each partition writes its own file
223229
let input_partitioning = input.output_partitioning().clone();
@@ -238,6 +244,7 @@ impl ParquetWriterExec {
238244
compression,
239245
partition_id,
240246
column_names,
247+
object_store_options,
241248
metrics: ExecutionPlanMetricsSet::new(),
242249
cache,
243250
})
@@ -255,10 +262,11 @@ impl ParquetWriterExec {
255262
/// Create an Arrow writer based on the storage scheme
256263
///
257264
/// # Arguments
258-
/// * `storage_scheme` - The storage backend ("hdfs", "s3", or "local")
259265
/// * `output_file_path` - The full path to the output file
260266
/// * `schema` - The Arrow schema for the Parquet file
261267
/// * `props` - Writer properties including compression
268+
/// * `runtime_env` - Runtime environment for object store registration
269+
/// * `object_store_options` - Configuration options for object store
262270
///
263271
/// # Returns
264272
/// * `Ok(ParquetWriter)` - A writer appropriate for the storage scheme
@@ -267,71 +275,61 @@ impl ParquetWriterExec {
267275
output_file_path: &str,
268276
schema: SchemaRef,
269277
props: WriterProperties,
278+
runtime_env: Arc<datafusion::execution::runtime_env::RuntimeEnv>,
279+
object_store_options: &HashMap<String, String>,
270280
) -> Result<ParquetWriter> {
271-
// Determine storage scheme from output_file_path
272-
let storage_scheme = if output_file_path.starts_with("hdfs://") {
273-
"hdfs"
274-
} else if output_file_path.starts_with("s3://") || output_file_path.starts_with("s3a://") {
275-
"s3"
276-
} else {
277-
"local"
278-
};
281+
// Parse URL and match on storage scheme directly
282+
let url = Url::parse(output_file_path).map_err(|e| {
283+
DataFusionError::Execution(format!("Failed to parse URL '{}': {}", output_file_path, e))
284+
})?;
279285

280-
match storage_scheme {
281-
"hdfs" => {
282-
// Parse the output_file_path to extract namenode and path
283-
// Expected format: hdfs://namenode:port/path/to/file
284-
let url = Url::parse(output_file_path).map_err(|e| {
286+
if is_hdfs_scheme(&url, object_store_options) {
287+
// HDFS storage
288+
{
289+
// Use prepare_object_store_with_configs to create and register the object store
290+
let (_object_store_url, object_store_path) = prepare_object_store_with_configs(
291+
runtime_env,
292+
output_file_path.to_string(),
293+
object_store_options,
294+
)
295+
.map_err(|e| {
285296
DataFusionError::Execution(format!(
286-
"Failed to parse HDFS URL '{}': {}",
297+
"Failed to prepare object store for '{}': {}",
287298
output_file_path, e
288299
))
289300
})?;
290301

291-
// Extract namenode (scheme + host + port)
292-
let namenode = format!(
293-
"{}://{}{}",
294-
url.scheme(),
295-
url.host_str().unwrap_or("localhost"),
296-
url.port()
297-
.map(|p| format!(":{}", p))
298-
.unwrap_or_else(|| ":9000".to_string())
299-
);
300-
301-
// Extract the path (without the scheme and host)
302-
let hdfs_path = url.path().to_string();
303-
304302
// For remote storage (HDFS, S3), write to an in-memory buffer
305303
let buffer = Vec::new();
306304
let cursor = Cursor::new(buffer);
307305
let arrow_parquet_buffer_writer = ArrowWriter::try_new(cursor, schema, Some(props))
308306
.map_err(|e| {
309-
DataFusionError::Execution(format!(
310-
"Failed to create {} writer: {}",
311-
storage_scheme, e
312-
))
307+
DataFusionError::Execution(format!("Failed to create HDFS writer: {}", e))
313308
})?;
314309

315-
let builder = Hdfs::default().name_node(&namenode);
316-
let op = Operator::new(builder)
317-
.map_err(|e| {
318-
DataFusionError::Execution(format!(
319-
"Failed to create HDFS operator for '{}' (namenode: {}): {}",
320-
output_file_path, namenode, e
321-
))
322-
})?
323-
.finish();
310+
// Create HDFS operator with configuration options using the helper function
311+
let op = create_hdfs_operator(&url).map_err(|e| {
312+
DataFusionError::Execution(format!(
313+
"Failed to create HDFS operator for '{}': {}",
314+
output_file_path, e
315+
))
316+
})?;
324317

325318
// HDFS writer will be created lazily on first write
326-
// Use only the path part for the HDFS writer
319+
// Use the path from prepare_object_store_with_configs
327320
Ok(ParquetWriter::Remote(
328321
arrow_parquet_buffer_writer,
329322
None,
330323
op,
331-
hdfs_path,
324+
object_store_path.to_string(),
332325
))
333326
}
334-
"local" => {
327+
} else if output_file_path.starts_with("file://")
328+
|| output_file_path.starts_with("file:")
329+
|| !output_file_path.contains("://")
330+
{
331+
// Local file system
332+
{
335333
// For a local file system, write directly to file
336334
// Strip file:// or file: prefix if present
337335
let local_path = output_file_path
@@ -368,10 +366,12 @@ impl ParquetWriterExec {
368366
})?;
369367
Ok(ParquetWriter::LocalFile(writer))
370368
}
371-
_ => Err(DataFusionError::Execution(format!(
372-
"Unsupported storage scheme: {}",
373-
storage_scheme
374-
))),
369+
} else {
370+
// Unsupported storage scheme
371+
Err(DataFusionError::Execution(format!(
372+
"Unsupported storage scheme in path: {}",
373+
output_file_path
374+
)))
375375
}
376376
}
377377
}
@@ -435,6 +435,7 @@ impl ExecutionPlan for ParquetWriterExec {
435435
self.compression.clone(),
436436
self.partition_id,
437437
self.column_names.clone(),
438+
self.object_store_options.clone(),
438439
)?)),
439440
_ => Err(DataFusionError::Internal(
440441
"ParquetWriterExec requires exactly one child".to_string(),
@@ -454,6 +455,7 @@ impl ExecutionPlan for ParquetWriterExec {
454455
let bytes_written = MetricBuilder::new(&self.metrics).counter("bytes_written", partition);
455456
let rows_written = MetricBuilder::new(&self.metrics).counter("rows_written", partition);
456457

458+
let runtime_env = context.runtime_env();
457459
let input = self.input.execute(partition, context)?;
458460
let input_schema = self.input.schema();
459461
let work_dir = self.work_dir.clone();
@@ -488,7 +490,14 @@ impl ExecutionPlan for ParquetWriterExec {
488490
.set_compression(compression)
489491
.build();
490492

491-
let mut writer = Self::create_arrow_writer(&part_file, Arc::clone(&output_schema), props)?;
493+
let object_store_options = self.object_store_options.clone();
494+
let mut writer = Self::create_arrow_writer(
495+
&part_file,
496+
Arc::clone(&output_schema),
497+
props,
498+
runtime_env,
499+
&object_store_options,
500+
)?;
492501

493502
// Clone schema for use in async closure
494503
let schema_for_write = Arc::clone(&output_schema);
@@ -732,10 +741,14 @@ mod tests {
732741
// Create ParquetWriter using the create_arrow_writer method
733742
// Use full HDFS URL format
734743
let full_output_path = format!("hdfs://namenode:9000{}", output_path);
744+
let session_ctx = datafusion::prelude::SessionContext::new();
745+
let runtime_env = session_ctx.runtime_env();
735746
let mut writer = ParquetWriterExec::create_arrow_writer(
736747
&full_output_path,
737748
create_test_record_batch(1)?.schema(),
738749
props,
750+
runtime_env,
751+
&HashMap::new(),
739752
)?;
740753

741754
// Write 5 batches in a loop
@@ -802,6 +815,7 @@ mod tests {
802815
CompressionCodec::None,
803816
0, // partition_id
804817
column_names,
818+
HashMap::new(), // object_store_options
805819
)?;
806820

807821
// Create a session context and execute the plan

native/core/src/execution/planner.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,12 @@ impl PhysicalPlanner {
12481248
))),
12491249
}?;
12501250

1251+
let object_store_options: HashMap<String, String> = writer
1252+
.object_store_options
1253+
.iter()
1254+
.map(|(k, v)| (k.clone(), v.clone()))
1255+
.collect();
1256+
12511257
let parquet_writer = Arc::new(ParquetWriterExec::try_new(
12521258
Arc::clone(&child.native_plan),
12531259
writer.output_path.clone(),
@@ -1261,6 +1267,7 @@ impl PhysicalPlanner {
12611267
codec,
12621268
self.partition,
12631269
writer.column_names.clone(),
1270+
object_store_options,
12641271
)?);
12651272

12661273
Ok((

0 commit comments

Comments
 (0)