Skip to content

Commit b0548a0

Browse files
committed
Merge remote-tracking branch 'apache/main' into cast-to-int-perf
2 parents b56aa04 + af3bd81 commit b0548a0

8 files changed

Lines changed: 225 additions & 89 deletions

File tree

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,7 @@ impl IcebergFileStream {
369369
}
370370
}
371371

372-
match ready!(self
373-
.metrics
374-
.baseline
375-
.record_poll(current.poll_next_unpin(cx)))
376-
{
372+
match ready!(current.poll_next_unpin(cx)) {
377373
Some(result) => {
378374
// Stop time_scanning_until_data on first batch (idempotent)
379375
self.metrics.file_stream.time_scanning_until_data.stop();
@@ -428,7 +424,7 @@ impl Stream for IcebergFileStream {
428424
self.metrics.file_stream.time_processing.start();
429425
let result = self.poll_inner(cx);
430426
self.metrics.file_stream.time_processing.stop();
431-
result
427+
self.metrics.baseline.record_poll(result)
432428
}
433429
}
434430

spark/src/main/scala/org/apache/comet/serde/structs.scala

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,26 +111,6 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
111111
withInfo(expr, "StructsToJson with options is not supported")
112112
None
113113
} else {
114-
115-
def isSupportedType(dt: DataType): Boolean = {
116-
dt match {
117-
case StructType(fields) =>
118-
fields.forall(f => isSupportedType(f.dataType))
119-
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
120-
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
121-
DataTypes.DoubleType | DataTypes.StringType =>
122-
true
123-
case DataTypes.DateType | DataTypes.TimestampType =>
124-
// TODO implement these types with tests for formatting options and timezone
125-
false
126-
case _: MapType | _: ArrayType =>
127-
// Spark supports map and array in StructsToJson but this is not yet
128-
// implemented in Comet
129-
false
130-
case _ => false
131-
}
132-
}
133-
134114
val isSupported = expr.child.dataType match {
135115
case s: StructType =>
136116
s.fields.forall(f => isSupportedType(f.dataType))
@@ -166,6 +146,25 @@ object CometStructsToJson extends CometExpressionSerde[StructsToJson] {
166146
}
167147
}
168148
}
149+
150+
def isSupportedType(dt: DataType): Boolean = {
151+
dt match {
152+
case StructType(fields) =>
153+
fields.forall(f => isSupportedType(f.dataType))
154+
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
155+
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
156+
DataTypes.DoubleType | DataTypes.StringType =>
157+
true
158+
case DataTypes.DateType | DataTypes.TimestampType =>
159+
// TODO implement these types with tests for formatting options and timezone
160+
false
161+
case _: MapType | _: ArrayType =>
162+
// Spark supports map and array in StructsToJson but this is not yet
163+
// implemented in Comet
164+
false
165+
case _ => false
166+
}
167+
}
169168
}
170169

171170
object CometJsonToStructs extends CometExpressionSerde[JsonToStructs] {

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ object FuzzDataGenerator {
229229
Range(0, numRows).map(_ => {
230230
r.nextInt(20) match {
231231
case 0 if options.allowNull => null
232-
case 1 => Float.NegativeInfinity
233-
case 2 => Float.PositiveInfinity
232+
case 1 if options.generateInfinity => Float.NegativeInfinity
233+
case 2 if options.generateInfinity => Float.PositiveInfinity
234234
case 3 => Float.MinValue
235235
case 4 => Float.MaxValue
236236
case 5 => 0.0f
@@ -243,8 +243,8 @@ object FuzzDataGenerator {
243243
Range(0, numRows).map(_ => {
244244
r.nextInt(20) match {
245245
case 0 if options.allowNull => null
246-
case 1 => Double.NegativeInfinity
247-
case 2 => Double.PositiveInfinity
246+
case 1 if options.generateInfinity => Double.NegativeInfinity
247+
case 2 if options.generateInfinity => Double.PositiveInfinity
248248
case 3 => Double.MinValue
249249
case 4 => Double.MaxValue
250250
case 5 => 0.0
@@ -329,4 +329,5 @@ case class DataGenOptions(
329329
generateNaN: Boolean = true,
330330
baseDate: Long = FuzzDataGenerator.defaultBaseDate,
331331
customStrings: Seq[String] = Seq.empty,
332-
maxStringLength: Int = 8)
332+
maxStringLength: Int = 8,
333+
generateInfinity: Boolean = true)

spark/src/test/scala/org/apache/comet/CometJsonExpressionSuite.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,59 @@
1919

2020
package org.apache.comet
2121

22+
import scala.util.Random
23+
2224
import org.scalactic.source.Position
2325
import org.scalatest.Tag
2426

27+
import org.apache.hadoop.fs.Path
2528
import org.apache.spark.sql.CometTestBase
26-
import org.apache.spark.sql.catalyst.expressions.JsonToStructs
29+
import org.apache.spark.sql.catalyst.expressions.{JsonToStructs, StructsToJson}
2730
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
31+
import org.apache.spark.sql.functions._
32+
33+
import org.apache.comet.CometSparkSessionExtensions.isSpark40Plus
34+
import org.apache.comet.serde.CometStructsToJson
35+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
2836

2937
class CometJsonExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
3038

3139
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
3240
pos: Position): Unit = {
3341
super.test(testName, testTags: _*) {
34-
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true") {
42+
withSQLConf(
43+
CometConf.getExprAllowIncompatConfigKey(classOf[JsonToStructs]) -> "true",
44+
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToJson]) -> "true") {
3545
testFun
3646
}
3747
}
3848
}
3949

50+
test("to_json - all supported types") {
51+
assume(!isSpark40Plus)
52+
withTempDir { dir =>
53+
val path = new Path(dir.toURI.toString, "test.parquet")
54+
val filename = path.toString
55+
val random = new Random(42)
56+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
57+
ParquetGenerator.makeParquetFile(
58+
random,
59+
spark,
60+
filename,
61+
100,
62+
SchemaGenOptions(generateArray = false, generateStruct = false, generateMap = false),
63+
DataGenOptions(generateNaN = false, generateInfinity = false))
64+
}
65+
val table = spark.read.parquet(filename)
66+
val fieldsNames = table.schema.fields
67+
.filter(sf => CometStructsToJson.isSupportedType(sf.dataType))
68+
.map(sf => col(sf.name))
69+
.toSeq
70+
val df = table.select(to_json(struct(fieldsNames: _*)))
71+
checkSparkAnswerAndOperator(df)
72+
}
73+
}
74+
4075
test("from_json - basic primitives") {
4176
Seq(true, false).foreach { dictionaryEnabled =>
4277
withParquetTable(

spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.parquet.hadoop.example.{ExampleParquetWriter, GroupWriteSuppor
3737
import org.apache.parquet.schema.{MessageType, MessageTypeParser}
3838
import org.apache.spark._
3939
import org.apache.spark.internal.config.{MEMORY_OFFHEAP_ENABLED, MEMORY_OFFHEAP_SIZE, SHUFFLE_MANAGER}
40-
import org.apache.spark.sql.comet._
40+
import org.apache.spark.sql.comet.CometPlanChecker
4141
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec}
4242
import org.apache.spark.sql.execution._
4343
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -58,7 +58,8 @@ abstract class CometTestBase
5858
with BeforeAndAfterEach
5959
with AdaptiveSparkPlanHelper
6060
with ShimCometSparkSessionExtensions
61-
with ShimCometTestBase {
61+
with ShimCometTestBase
62+
with CometPlanChecker {
6263
import testImplicits._
6364

6465
protected val shuffleManager: String =
@@ -396,26 +397,6 @@ abstract class CometTestBase
396397
checkPlanNotMissingInput(plan)
397398
}
398399

399-
protected def findFirstNonCometOperator(
400-
plan: SparkPlan,
401-
excludedClasses: Class[_]*): Option[SparkPlan] = {
402-
val wrapped = wrapCometSparkToColumnar(plan)
403-
wrapped.foreach {
404-
case _: CometNativeScanExec | _: CometScanExec | _: CometBatchScanExec |
405-
_: CometIcebergNativeScanExec =>
406-
case _: CometSinkPlaceHolder | _: CometScanWrapper =>
407-
case _: CometColumnarToRowExec =>
408-
case _: CometSparkToColumnarExec =>
409-
case _: CometExec | _: CometShuffleExchangeExec =>
410-
case _: CometBroadcastExchangeExec =>
411-
case _: WholeStageCodegenExec | _: ColumnarToRowExec | _: InputAdapter =>
412-
case op if !excludedClasses.exists(c => c.isAssignableFrom(op.getClass)) =>
413-
return Some(op)
414-
case _ =>
415-
}
416-
None
417-
}
418-
419400
// checks the plan node has no missing inputs
420401
// such nodes represented in plan with exclamation mark !
421402
// example: !CometWindowExec
@@ -449,14 +430,6 @@ abstract class CometTestBase
449430
}
450431
}
451432

452-
/** Wraps the CometRowToColumn as ScanWrapper, so the child operators will not be checked */
453-
private def wrapCometSparkToColumnar(plan: SparkPlan): SparkPlan = {
454-
plan.transformDown {
455-
// don't care the native operators
456-
case p: CometSparkToColumnarExec => CometScanWrapper(null, p)
457-
}
458-
}
459-
460433
private var _spark: SparkSessionType = _
461434
override protected implicit def spark: SparkSessionType = _spark
462435
protected implicit def sqlContext: SQLContext = _spark.sqlContext

spark/src/test/scala/org/apache/spark/sql/benchmark/CometBenchmarkBase.scala

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,19 @@ import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
3131
import org.apache.spark.SparkConf
3232
import org.apache.spark.benchmark.Benchmark
3333
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
34+
import org.apache.spark.sql.comet.CometPlanChecker
35+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3436
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
3537
import org.apache.spark.sql.internal.SQLConf
3638
import org.apache.spark.sql.types.DecimalType
3739

3840
import org.apache.comet.CometConf
3941
import org.apache.comet.CometSparkSessionExtensions
4042

41-
trait CometBenchmarkBase extends SqlBasedBenchmark {
43+
trait CometBenchmarkBase
44+
extends SqlBasedBenchmark
45+
with AdaptiveSparkPlanHelper
46+
with CometPlanChecker {
4247
override def getSparkSession: SparkSession = {
4348
val conf = new SparkConf()
4449
.setAppName("CometReadBenchmark")
@@ -88,28 +93,6 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
8893
}
8994
}
9095

91-
/** Runs function `f` with Comet on and off. */
92-
final def runWithComet(name: String, cardinality: Long)(f: => Unit): Unit = {
93-
val benchmark = new Benchmark(name, cardinality, output = output)
94-
95-
benchmark.addCase(s"$name - Spark ") { _ =>
96-
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
97-
f
98-
}
99-
}
100-
101-
benchmark.addCase(s"$name - Comet") { _ =>
102-
withSQLConf(
103-
CometConf.COMET_ENABLED.key -> "true",
104-
CometConf.COMET_EXEC_ENABLED.key -> "true",
105-
SQLConf.ANSI_ENABLED.key -> "false") {
106-
f
107-
}
108-
}
109-
110-
benchmark.run()
111-
}
112-
11396
/**
11497
* Runs an expression benchmark with standard cases: Spark, Comet (Scan), Comet (Scan + Exec).
11598
* This provides a consistent benchmark structure for expression evaluation.
@@ -149,6 +132,29 @@ trait CometBenchmarkBase extends SqlBasedBenchmark {
149132
CometConf.COMET_EXEC_ENABLED.key -> "true",
150133
"spark.sql.optimizer.constantFolding.enabled" -> "false") ++ extraCometConfigs
151134

135+
// Check that the plan is fully Comet native before running the benchmark
136+
withSQLConf(cometExecConfigs.toSeq: _*) {
137+
val df = spark.sql(query)
138+
df.noop()
139+
val plan = stripAQEPlan(df.queryExecution.executedPlan)
140+
findFirstNonCometOperator(plan) match {
141+
case Some(op) =>
142+
// scalastyle:off println
143+
println()
144+
println("=" * 80)
145+
println("WARNING: Benchmark plan is NOT fully Comet native!")
146+
println(s"First non-Comet operator: ${op.nodeName}")
147+
println("=" * 80)
148+
println("Query plan:")
149+
println(plan.treeString)
150+
println("=" * 80)
151+
println()
152+
// scalastyle:on println
153+
case None =>
154+
// All operators are Comet native, no warning needed
155+
}
156+
}
157+
152158
benchmark.addCase("Comet (Scan + Exec)") { _ =>
153159
withSQLConf(cometExecConfigs.toSeq: _*) {
154160
spark.sql(query).noop()

0 commit comments

Comments
 (0)