Skip to content

Commit 8b0d141

Browse files
authored
Merge branch 'main' into refactor/extract-jvm-bridge-crate
2 parents 373e7db + d9ed85f commit 8b0d141

23 files changed

Lines changed: 185 additions & 837 deletions

File tree

.github/workflows/miri.yml

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,14 @@ on:
2828
- "native/core/benches/**"
2929
- "native/spark-expr/benches/**"
3030
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
31-
# Disabled until Miri compatibility is restored
32-
# https://github.com/apache/datafusion-comet/issues/3499
33-
# pull_request:
34-
# paths-ignore:
35-
# - "doc/**"
36-
# - "docs/**"
37-
# - "**.md"
38-
# - "native/core/benches/**"
39-
# - "native/spark-expr/benches/**"
40-
# - "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
31+
pull_request:
32+
paths-ignore:
33+
- "doc/**"
34+
- "docs/**"
35+
- "**.md"
36+
- "native/core/benches/**"
37+
- "native/spark-expr/benches/**"
38+
- "spark/src/test/scala/org/apache/spark/sql/benchmark/**"
4139
# manual trigger
4240
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
4341
workflow_dispatch:

.github/workflows/pr_rat_check.yml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
name: RAT License Check
19+
20+
concurrency:
21+
group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }}
22+
cancel-in-progress: true
23+
24+
permissions:
25+
contents: read
26+
27+
# No paths-ignore: this workflow must run for ALL changes including docs
28+
on:
29+
push:
30+
branches:
31+
- main
32+
pull_request:
33+
workflow_dispatch:
34+
35+
jobs:
36+
rat-check:
37+
name: RAT License Check
38+
runs-on: ubuntu-slim
39+
steps:
40+
- uses: actions/checkout@v4
41+
- name: Set up Java
42+
uses: actions/setup-java@v4
43+
with:
44+
distribution: temurin
45+
java-version: 11
46+
- name: Run RAT check
47+
run: ./mvnw -B -N apache-rat:check

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,6 @@ object CometConf extends ShimCometConf {
114114
.booleanConf
115115
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
116116

117-
// Deprecated: native_comet uses mutable buffers incompatible with Arrow FFI best practices
118-
// and does not support complex types. Use native_iceberg_compat or auto instead.
119-
// This will be removed in a future release.
120-
// See: https://github.com/apache/datafusion-comet/issues/2186
121-
@deprecated("Use SCAN_AUTO instead. native_comet will be removed in a future release.", "0.9.0")
122-
val SCAN_NATIVE_COMET = "native_comet"
123117
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
124118
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
125119
val SCAN_AUTO = "auto"

common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ object Utils extends CometTypeShim {
271271
throw new SparkException(
272272
s"Comet execution only takes Arrow Arrays, but got ${c.getClass}. " +
273273
"This typically happens when a Comet scan falls back to Spark due to unsupported " +
274-
"data types (e.g., complex types like structs, arrays, or maps with native_comet). " +
274+
"data types (e.g., complex types like structs, arrays, or maps). " +
275275
"To resolve this, you can: " +
276276
"(1) enable spark.comet.scan.allowIncompatible=true to use a compatible native " +
277277
"scan variant, or " +

native/core/src/execution/shuffle/spark_unsafe/row.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,15 @@ impl SparkUnsafeObject for SparkUnsafeRow {
255255
self.row_addr
256256
}
257257

258-
fn get_element_offset(&self, index: usize, _: usize) -> *const u8 {
259-
(self.row_addr + self.row_bitset_width + (index * 8) as i64) as *const u8
258+
fn get_element_offset(&self, index: usize, element_size: usize) -> *const u8 {
259+
let offset = self.row_bitset_width + (index * 8) as i64;
260+
debug_assert!(
261+
self.row_size >= 0 && offset + element_size as i64 <= self.row_size as i64,
262+
"get_element_offset: access at offset {offset} with size {element_size} \
263+
exceeds row_size {} for index {index}",
264+
self.row_size
265+
);
266+
(self.row_addr + offset) as *const u8
260267
}
261268
}
262269

@@ -1659,7 +1666,10 @@ mod test {
16591666
let fields = Fields::from(vec![Field::new("st", data_type.clone(), true)]);
16601667
let mut struct_builder = StructBuilder::from_fields(fields, 1);
16611668
let mut row = SparkUnsafeRow::new_with_num_fields(1);
1662-
let data = [0; 8];
1669+
// 8 bytes null bitset + 8 bytes field value = 16 bytes
1670+
// Set bit 0 in the null bitset to mark field 0 as null
1671+
let mut data = [0u8; 16];
1672+
data[0] = 1;
16631673
row.point_to_slice(&data);
16641674
append_field(&data_type, &mut struct_builder, &row, 0).expect("append field");
16651675
struct_builder.append_null();

native/spark-expr/src/predicate_funcs/rlike.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use arrow::array::types::Int32Type;
2121
use arrow::array::{Array, BooleanArray, DictionaryArray, RecordBatch, StringArray};
2222
use arrow::compute::take;
2323
use arrow::datatypes::{DataType, Schema};
24-
use datafusion::common::{internal_err, Result};
24+
use datafusion::common::{internal_err, Result, ScalarValue};
2525
use datafusion::physical_expr::PhysicalExpr;
2626
use datafusion::physical_plan::ColumnarValue;
2727
use regex::Regex;
@@ -140,8 +140,24 @@ impl PhysicalExpr for RLike {
140140
let array = self.is_match(inputs);
141141
Ok(ColumnarValue::Array(Arc::new(array)))
142142
}
143-
ColumnarValue::Scalar(_) => {
144-
internal_err!("non scalar regexp patterns are not supported")
143+
ColumnarValue::Scalar(scalar) => {
144+
if scalar.is_null() {
145+
return Ok(ColumnarValue::Scalar(ScalarValue::Boolean(None)));
146+
}
147+
148+
let is_match = match scalar {
149+
ScalarValue::Utf8(Some(s))
150+
| ScalarValue::LargeUtf8(Some(s))
151+
| ScalarValue::Utf8View(Some(s)) => self.pattern.is_match(&s),
152+
_ => {
153+
return internal_err!(
154+
"RLike requires string type for input, got {:?}",
155+
scalar.data_type()
156+
);
157+
}
158+
};
159+
160+
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(is_match))))
145161
}
146162
}
147163
}
@@ -165,3 +181,53 @@ impl PhysicalExpr for RLike {
165181
Display::fmt(self, f)
166182
}
167183
}
184+
185+
#[cfg(test)]
186+
mod tests {
187+
use super::*;
188+
use datafusion::physical_expr::expressions::Literal;
189+
190+
#[test]
191+
fn test_rlike_scalar_string_variants() {
192+
let pattern = "R[a-z]+";
193+
let scalars = [
194+
ScalarValue::Utf8(Some("Rose".to_string())),
195+
ScalarValue::LargeUtf8(Some("Rose".to_string())),
196+
ScalarValue::Utf8View(Some("Rose".to_string())),
197+
];
198+
199+
for scalar in scalars {
200+
let expr = RLike::try_new(Arc::new(Literal::new(scalar.clone())), pattern).unwrap();
201+
let result = expr
202+
.evaluate(&RecordBatch::new_empty(Arc::new(Schema::empty())))
203+
.unwrap();
204+
let ColumnarValue::Scalar(result) = result else {
205+
panic!("expected scalar result");
206+
};
207+
assert_eq!(result, ScalarValue::Boolean(Some(true)));
208+
}
209+
210+
// Null input should produce a null boolean result
211+
let expr =
212+
RLike::try_new(Arc::new(Literal::new(ScalarValue::Utf8(None))), pattern).unwrap();
213+
let result = expr
214+
.evaluate(&RecordBatch::new_empty(Arc::new(Schema::empty())))
215+
.unwrap();
216+
let ColumnarValue::Scalar(result) = result else {
217+
panic!("expected scalar result");
218+
};
219+
assert_eq!(result, ScalarValue::Boolean(None));
220+
}
221+
222+
#[test]
223+
fn test_rlike_scalar_non_string_error() {
224+
let expr = RLike::try_new(
225+
Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))),
226+
"R[a-z]+",
227+
)
228+
.unwrap();
229+
230+
let result = expr.evaluate(&RecordBatch::new_empty(Arc::new(Schema::empty())));
231+
assert!(result.is_err());
232+
}
233+
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ case class CometScanRule(session: SparkSession)
307307
if (s.isCometEnabled && schemaSupported) {
308308
// When reading from Iceberg, we automatically enable type promotion
309309
SQLConf.get.setConfString(COMET_SCHEMA_EVOLUTION_ENABLED.key, "true")
310+
// When reading from Iceberg, we automatically disable native columnar to row
311+
SQLConf.get.setConfString(COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.key, "false")
310312
CometBatchScanExec(
311313
scanExec.clone().asInstanceOf[BatchScanExec],
312314
runtimeFilters = scanExec.runtimeFilters)

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import org.apache.comet.parquet.CometParquetFileFormat
5151
*
5252
* This is a hybrid scan where the native plan will contain a `ScanExec` that reads batches of
5353
* data from the JVM via JNI. The ultimate source of data may be a JVM implementation such as
54-
* Spark readers, or could be the `native_comet` or `native_iceberg_compat` native scans.
54+
* Spark readers, or could be the `native_iceberg_compat` native scan.
5555
*
5656
* Note that scanImpl can only be `native_datafusion` after CometScanRule runs and before
5757
* CometExecRule runs. It will never be set to `native_datafusion` at execution time

spark/src/test/resources/sql-tests/expressions/string/rlike_enabled.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,5 @@ query
3535
SELECT s RLIKE '' FROM test_rlike_enabled
3636

3737
-- literal arguments
38-
query ignore(https://github.com/apache/datafusion-comet/issues/3343)
38+
query
3939
SELECT 'hello' RLIKE '^[a-z]+$', '12345' RLIKE '^[a-z]+$', '' RLIKE '', NULL RLIKE 'a'

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -812,13 +812,7 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
812812

813813
// https://github.com/apache/datafusion-comet/issues/2612
814814
test("array_reverse - fallback for binary array") {
815-
val fallbackReason =
816-
if (CometConf.COMET_NATIVE_SCAN_IMPL.key == CometConf.SCAN_NATIVE_COMET || sys.env
817-
.getOrElse("COMET_PARQUET_SCAN_IMPL", "") == CometConf.SCAN_NATIVE_COMET) {
818-
"Unsupported schema"
819-
} else {
820-
CometArrayReverse.unsupportedReason
821-
}
815+
val fallbackReason = CometArrayReverse.unsupportedReason
822816
withTable("t1") {
823817
sql("""create table t1 using parquet as
824818
select cast(null as array<binary>) c1, cast(array() as array<binary>) c2

0 commit comments

Comments
 (0)