Skip to content

Commit 6e2a125

Browse files
authored
feat: support Spark luhn_check expression (#3573)
* feat: support Spark luhn_check via StaticInvoke Register datafusion-spark's SparkLuhnCheck UDF and add StaticInvoke handler for ExpressionImplUtils.isLuhnNumber (Spark 3.5+). * fix spotless * address feedback * remove unnecessary version check
1 parent 98b4484 commit 6e2a125

3 files changed

Lines changed: 38 additions & 2 deletions

File tree

native/core/src/execution/jni_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use datafusion_spark::function::math::hex::SparkHex;
5656
use datafusion_spark::function::math::width_bucket::SparkWidthBucket;
5757
use datafusion_spark::function::string::char::CharFunc;
5858
use datafusion_spark::function::string::concat::SparkConcat;
59+
use datafusion_spark::function::string::luhn_check::SparkLuhnCheck;
5960
use datafusion_spark::function::string::space::SparkSpace;
6061
use futures::poll;
6162
use futures::stream::StreamExt;
@@ -403,6 +404,7 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
403404
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkWidthBucket::default()));
404405
session_ctx.register_udf(ScalarUDF::new_from_impl(MapFromEntries::default()));
405406
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCrc32::default()));
407+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkLuhnCheck::default()));
406408
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSpace::default()));
407409
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitCount::default()));
408410
}

spark/src/main/scala/org/apache/comet/serde/statics.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.comet.serde
2121

22-
import org.apache.spark.sql.catalyst.expressions.Attribute
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils}
2323
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
2424
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
2525

@@ -34,7 +34,8 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] {
3434
: Map[(String, Class[_]), CometExpressionSerde[StaticInvoke]] =
3535
Map(
3636
("readSidePadding", classOf[CharVarcharCodegenUtils]) -> CometScalarFunction(
37-
"read_side_padding"))
37+
"read_side_padding"),
38+
("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check"))
3839

3940
override def convert(
4041
expr: StaticInvoke,
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one
2+
-- or more contributor license agreements. See the NOTICE file
3+
-- distributed with this work for additional information
4+
-- regarding copyright ownership. The ASF licenses this file
5+
-- to you under the Apache License, Version 2.0 (the
6+
-- "License"); you may not use this file except in compliance
7+
-- with the License. You may obtain a copy of the License at
8+
--
9+
-- http://www.apache.org/licenses/LICENSE-2.0
10+
--
11+
-- Unless required by applicable law or agreed to in writing,
12+
-- software distributed under the License is distributed on an
13+
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
-- KIND, either express or implied. See the License for the
15+
-- specific language governing permissions and limitations
16+
-- under the License.
17+
18+
-- MinSparkVersion: 3.5
19+
-- ConfigMatrix: parquet.enable.dictionary=false,true
20+
21+
statement
22+
CREATE TABLE test_luhn(s string) USING parquet
23+
24+
statement
25+
INSERT INTO test_luhn VALUES ('79927398710'), ('79927398713'), ('1234567812345670'), ('0'), (''), ('abc'), (NULL)
26+
27+
-- column input
28+
query
29+
SELECT luhn_check(s) FROM test_luhn
30+
31+
-- literal arguments
32+
query
33+
SELECT luhn_check('79927398713'), luhn_check('79927398710'), luhn_check(''), luhn_check(NULL)

0 commit comments

Comments
 (0)