Skip to content

Commit ce1b9d4

Browse files
authored
feat: implement parse_url (apache#4350)
1 parent 271a4f4 commit ce1b9d4

13 files changed

Lines changed: 362 additions & 16 deletions

File tree

docs/source/contributor-guide/spark_expressions_support.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@
596596

597597
### url_funcs
598598

599-
- [ ] parse_url
599+
- [x] parse_url (Incompatible: native diverges from Spark on edge cases)
600600
- [x] try_url_decode
601601
- 4.0.1, 2026-05-05
602602
- [x] url_decode

docs/source/user-guide/latest/compatibility/expressions/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ math
3636
misc
3737
string
3838
struct
39+
url
3940
cast
4041
```
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# URL Expressions
21+
22+
<!--BEGIN:EXPR_COMPAT[url]-->
23+
<!--END:EXPR_COMPAT-->

native/core/src/execution/jni_api.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ use datafusion_spark::function::string::char::CharFunc;
6767
use datafusion_spark::function::string::concat::SparkConcat;
6868
use datafusion_spark::function::string::luhn_check::SparkLuhnCheck;
6969
use datafusion_spark::function::string::space::SparkSpace;
70+
use datafusion_spark::function::url::parse_url::ParseUrl as SparkParseUrl;
71+
use datafusion_spark::function::url::try_parse_url::TryParseUrl as SparkTryParseUrl;
7072
use datafusion_spark::function::url::try_url_decode::TryUrlDecode as SparkTryUrlDecode;
7173
use datafusion_spark::function::url::url_decode::UrlDecode as SparkUrlDecode;
7274
use datafusion_spark::function::url::url_encode::UrlEncode as SparkUrlEncode;
@@ -599,6 +601,8 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
599601
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlEncode::default()));
600602
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryUrlDecode::default()));
601603
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkCsc::default()));
604+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkParseUrl::default()));
605+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryParseUrl::default()));
602606
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkFactorial::default()));
603607
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSec::default()));
604608
}

spark/src/main/scala/org/apache/comet/GenerateDocs.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,16 @@ object GenerateDocs {
129129
serde.getCompatibleNotes(),
130130
serde.getIncompatibleReasons(),
131131
serde.getUnsupportedReasons())
132+
})),
133+
"url" -> ((
134+
"compatibility/expressions/url.md",
135+
() =>
136+
QueryPlanSerde.urlExpressions.toSeq.map { case (cls, serde) =>
137+
(
138+
cls.getSimpleName,
139+
serde.getCompatibleNotes(),
140+
serde.getIncompatibleReasons(),
141+
serde.getUnsupportedReasons())
132142
})))
133143

134144
def main(args: Array[String]): Unit = {

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
249249
classOf[WeekOfYear] -> CometWeekOfYear,
250250
classOf[Quarter] -> CometQuarter)
251251

252+
private[comet] val urlExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
253+
classOf[ParseUrl] -> CometParseUrl)
254+
252255
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
253256
classOf[Cast] -> CometCast)
254257

@@ -276,7 +279,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
276279
mathExpressions ++ hashExpressions ++ stringExpressions ++
277280
conditionalExpressions ++ mapExpressions ++ predicateExpressions ++
278281
structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++
279-
temporalExpressions ++ conversionExpressions
282+
temporalExpressions ++ conversionExpressions ++ urlExpressions
280283

281284
/**
282285
* Mapping of Spark aggregate expression class to Comet expression handler.
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, ParseUrl}
23+
24+
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}
25+
26+
// On Spark 4.x ParseUrl is RuntimeReplaceable and handled via CometExprShim (ParseUrlEvaluator).
27+
object CometParseUrl extends CometExpressionSerde[ParseUrl] {
28+
29+
private val incompatibleReason =
30+
"Native parse_url diverges from Spark on several edge cases " +
31+
"(https://github.com/apache/datafusion/issues/21943)"
32+
33+
override def getIncompatibleReasons(): Seq[String] = Seq(incompatibleReason)
34+
35+
override def getSupportLevel(expr: ParseUrl): SupportLevel =
36+
Incompatible(Some(incompatibleReason))
37+
38+
override def convert(
39+
expr: ParseUrl,
40+
inputs: Seq[Attribute],
41+
binding: Boolean): Option[ExprOuterClass.Expr] = {
42+
val funcName = if (expr.failOnError) "parse_url" else "try_parse_url"
43+
val childExprs = expr.children.map(exprToProtoInternal(_, inputs, binding))
44+
val optExpr = scalarFunctionExprToProto(funcName, childExprs: _*)
45+
optExprWithInfo(optExpr, expr, expr.children: _*)
46+
}
47+
}

spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
2424
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
2525
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
26+
import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator
2627
import org.apache.spark.sql.internal.SQLConf
2728
import org.apache.spark.sql.internal.types.StringTypeWithCollation
2829
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType}
2930

30-
import org.apache.comet.CometConf
31+
import org.apache.comet.{CometConf, CometExplainInfo}
3132
import org.apache.comet.CometSparkSessionExtensions.withInfo
3233
import org.apache.comet.expressions.{CometCast, CometEvalMode}
3334
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible, SupportLevel}
@@ -133,16 +134,25 @@ trait CometExprShim extends CommonStringExprs {
133134
val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
134135
optExprWithInfo(optExpr, wb, wb.children: _*)
135136

136-
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is
137-
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the
138-
// original StructsToJson and recurse so support-level checks apply.
137+
// In Spark 4.x, RuntimeReplaceable expressions (StructsToJson, ParseUrl) become
138+
// Invoke(Literal(Evaluator), "evaluate", ...). Reconstruct the original expression
139+
// and recurse so support-level checks apply.
139140
case i: Invoke =>
140141
(i.targetObject, i.functionName, i.arguments) match {
141142
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) =>
142143
exprToProtoInternal(
143144
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
144145
inputs,
145146
binding)
147+
case (Literal(evaluator: ParseUrlEvaluator, _), "evaluate", args) =>
148+
val parseUrl = ParseUrl(args, evaluator.failOnError)
149+
val result = exprToProtoInternal(parseUrl, inputs, binding)
150+
if (result.isEmpty) {
151+
parseUrl
152+
.getTagValue(CometExplainInfo.EXTENSION_INFO)
153+
.foreach(reasons => i.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons))
154+
}
155+
result
146156
case _ => None
147157
}
148158

spark/src/main/spark-4.1/org/apache/comet/shims/CometExprShim.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
2424
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
2525
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
26+
import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator
2627
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2728
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.internal.types.StringTypeWithCollation
2930
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType, TimeType}
3031

31-
import org.apache.comet.CometConf
32+
import org.apache.comet.{CometConf, CometExplainInfo}
3233
import org.apache.comet.CometSparkSessionExtensions.withInfo
3334
import org.apache.comet.expressions.{CometCast, CometEvalMode}
3435
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible, SupportLevel}
@@ -143,17 +144,25 @@ trait CometExprShim extends CommonStringExprs {
143144
val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
144145
optExprWithInfo(optExpr, wb, wb.children: _*)
145146

146-
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is
147-
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the
148-
// original StructsToJson and recurse so support-level checks apply.
149-
// ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse", TimeType(), ...).
147+
// In Spark 4.x, RuntimeReplaceable expressions (StructsToJson, ParseUrl) become
148+
// Invoke(Literal(Evaluator), "evaluate", ...). Reconstruct the original expression
149+
// and recurse so support-level checks apply.
150150
case i: Invoke =>
151151
(i.targetObject, i.functionName, i.arguments) match {
152152
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) =>
153153
exprToProtoInternal(
154154
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
155155
inputs,
156156
binding)
157+
case (Literal(evaluator: ParseUrlEvaluator, _), "evaluate", args) =>
158+
val parseUrl = ParseUrl(args, evaluator.failOnError)
159+
val result = exprToProtoInternal(parseUrl, inputs, binding)
160+
if (result.isEmpty) {
161+
parseUrl
162+
.getTagValue(CometExplainInfo.EXTENSION_INFO)
163+
.foreach(reasons => i.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons))
164+
}
165+
result
157166
case (Literal(parser: ToTimeParser, _), "parse", args)
158167
if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty && args.size == 1 =>
159168
val childExprs = args.map(exprToProtoInternal(_, inputs, binding))

spark/src/main/spark-4.2/org/apache/comet/shims/CometExprShim.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
2424
import org.apache.spark.sql.catalyst.expressions.json.StructsToJsonEvaluator
2525
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
26+
import org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator
2627
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2728
import org.apache.spark.sql.internal.SQLConf
2829
import org.apache.spark.sql.internal.types.StringTypeWithCollation
2930
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DataTypes, MapType, StringType, TimeType}
3031

31-
import org.apache.comet.CometConf
32+
import org.apache.comet.{CometConf, CometExplainInfo}
3233
import org.apache.comet.CometSparkSessionExtensions.withInfo
3334
import org.apache.comet.expressions.{CometCast, CometEvalMode}
3435
import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible, SupportLevel}
@@ -143,17 +144,25 @@ trait CometExprShim extends CommonStringExprs {
143144
val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*)
144145
optExprWithInfo(optExpr, wb, wb.children: _*)
145146

146-
// In Spark 4.0, StructsToJson is a RuntimeReplaceable whose replacement is
147-
// Invoke(Literal(StructsToJsonEvaluator), "evaluate", ...). Reconstruct the
148-
// original StructsToJson and recurse so support-level checks apply.
149-
// ToTime (Spark 4.1) resolves to Invoke(Literal(ToTimeParser), "parse", TimeType(), ...).
147+
// In Spark 4.x, RuntimeReplaceable expressions (StructsToJson, ParseUrl) become
148+
// Invoke(Literal(Evaluator), "evaluate", ...). Reconstruct the original expression
149+
// and recurse so support-level checks apply.
150150
case i: Invoke =>
151151
(i.targetObject, i.functionName, i.arguments) match {
152152
case (Literal(evaluator: StructsToJsonEvaluator, _), "evaluate", Seq(child)) =>
153153
exprToProtoInternal(
154154
StructsToJson(evaluator.options, child, evaluator.timeZoneId),
155155
inputs,
156156
binding)
157+
case (Literal(evaluator: ParseUrlEvaluator, _), "evaluate", args) =>
158+
val parseUrl = ParseUrl(args, evaluator.failOnError)
159+
val result = exprToProtoInternal(parseUrl, inputs, binding)
160+
if (result.isEmpty) {
161+
parseUrl
162+
.getTagValue(CometExplainInfo.EXTENSION_INFO)
163+
.foreach(reasons => i.setTagValue(CometExplainInfo.EXTENSION_INFO, reasons))
164+
}
165+
result
157166
case (Literal(parser: ToTimeParser, _), "parse", args)
158167
if i.dataType.isInstanceOf[TimeType] && parser.fmt.isEmpty && args.size == 1 =>
159168
val childExprs = args.map(exprToProtoInternal(_, inputs, binding))

0 commit comments

Comments
 (0)