Skip to content

Commit 1c4507a

Browse files
author
B Vadlamani
committed
enable_spark_tests_comet_native_writer
1 parent 2c6a8ac commit 1c4507a

4 files changed

Lines changed: 270 additions & 570 deletions

File tree

.github/workflows/spark_sql_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ jobs:
155155
run: |
156156
cd apache-spark
157157
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
158-
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true COMET_PARQUET_SCAN_IMPL=${{ matrix.config.scan-impl }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
158+
NOLINT_ON_COMPILE=true ENABLE_COMET=true ENABLE_COMET_ONHEAP=true ENABLE_COMET_WRITER=true ${{ matrix.config.scan-env }} ENABLE_COMET_LOG_FALLBACK_REASONS=${{ github.event.inputs.collect-fallback-logs || 'false' }} \
159159
build/sbt -Dsbt.log.noformat=true ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
160160
if [ "${{ github.event.inputs.collect-fallback-logs }}" = "true" ]; then
161161
find . -type f -name "unit-tests.log" -print0 | xargs -0 grep -h "Comet cannot accelerate" | sed 's/.*Comet cannot accelerate/Comet cannot accelerate/' | sort -u > fallback.log

dev/diffs/3.4.3.diff

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2092,7 +2092,7 @@ index 104b4e416cd..37ea65081e4 100644
20922092
case _ =>
20932093
throw new AnalysisException("Can not match ParquetTable in the query.")
20942094
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
2095-
index 8670d95c65e..b624c3811dd 100644
2095+
index 8670d95c65e..3fe49802309 100644
20962096
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
20972097
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
20982098
@@ -1335,7 +1335,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
@@ -2105,6 +2105,41 @@ index 8670d95c65e..b624c3811dd 100644
21052105
withAllParquetReaders {
21062106
checkAnswer(
21072107
// "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY.
2108+
@@ -1541,7 +1542,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
2109+
}
2110+
}
2111+
2112+
- test("Write Spark version into Parquet metadata") {
2113+
+// TODO : Comet native writer to add spark / comet version into parquet metadata
2114+
+ test("Write Spark version into Parquet metadata",
2115+
+ IgnoreComet("Comet doesn't support DELTA encoding yet")) {
2116+
withTempPath { dir =>
2117+
spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
2118+
assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
2119+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2120+
index 8b386e8f689..28ced6209e0 100644
2121+
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2122+
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
2123+
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
2124+
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
2125+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
2126+
2127+
-import org.apache.spark.sql.Row
2128+
+import org.apache.spark.sql.{IgnoreComet, Row}
2129+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2130+
import org.apache.spark.sql.internal.SQLConf
2131+
import org.apache.spark.sql.test.SharedSparkSession
2132+
@@ -153,7 +153,9 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
2133+
}
2134+
}
2135+
2136+
- test("parquet timestamp conversion") {
2137+
+ // TODO : Support legacy timestamps conversion /cast in comet native writer
2138+
+ test("parquet timestamp conversion",
2139+
+ IgnoreComet("timestamp96 conversion failed with the native writer")) {
2140+
// Make a table with one parquet file written by impala, and one parquet file written by spark.
2141+
// We should only adjust the timestamps in the impala file, and only if the conf is set
2142+
val impalaFile = "test-data/impala_timestamp.parq"
21082143
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
21092144
index 29cb224c878..44837aa953b 100644
21102145
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -2798,7 +2833,7 @@ index abe606ad9c1..2d930b64cca 100644
27982833
val tblTargetName = "tbl_target"
27992834
val tblSourceQualified = s"default.$tblSourceName"
28002835
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
2801-
index dd55fcfe42c..a1d390c93d0 100644
2836+
index dd55fcfe42c..e898fc33bab 100644
28022837
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28032838
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
28042839
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
@@ -2856,7 +2891,7 @@ index dd55fcfe42c..a1d390c93d0 100644
28562891
}
28572892
}
28582893

2859-
@@ -242,6 +265,29 @@ private[sql] trait SQLTestUtilsBase
2894+
@@ -242,6 +265,34 @@ private[sql] trait SQLTestUtilsBase
28602895
protected override def _sqlContext: SQLContext = self.spark.sqlContext
28612896
}
28622897

@@ -2882,11 +2917,16 @@ index dd55fcfe42c..a1d390c93d0 100644
28822917
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
28832918
+ v != null && v.toBoolean
28842919
+ }
2920+
+
2921+
+ protected def isCometWriterEnabled: Boolean = {
2922+
+ val v = System.getenv("ENABLE_COMET_WRITER")
2923+
+ v != null && v.toBoolean
2924+
+ }
28852925
+
28862926
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
28872927
SparkSession.setActiveSession(spark)
28882928
super.withSQLConf(pairs: _*)(f)
2889-
@@ -434,6 +480,8 @@ private[sql] trait SQLTestUtilsBase
2929+
@@ -434,6 +485,8 @@ private[sql] trait SQLTestUtilsBase
28902930
val schema = df.schema
28912931
val withoutFilters = df.queryExecution.executedPlan.transform {
28922932
case FilterExec(_, child) => child
@@ -2896,10 +2936,10 @@ index dd55fcfe42c..a1d390c93d0 100644
28962936

28972937
spark.internalCreateDataFrame(withoutFilters.execute(), schema)
28982938
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2899-
index ed2e309fa07..a5ea58146ad 100644
2939+
index ed2e309fa07..9c5c393ad14 100644
29002940
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
29012941
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
2902-
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
2942+
@@ -74,6 +74,36 @@ trait SharedSparkSessionBase
29032943
// this rule may potentially block testing of other optimization rules such as
29042944
// ConstantPropagation etc.
29052945
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
@@ -2927,6 +2967,11 @@ index ed2e309fa07..a5ea58146ad 100644
29272967
+ conf
29282968
+ .set("spark.sql.ansi.enabled", "true")
29292969
+ }
2970+
+
2971+
+ if (isCometWriterEnabled) {
2972+
+ conf.set("spark.comet.parquet.write.enabled", "true")
2973+
+ conf.set("spark.comet.operator.DataWritingCommandExec.allowIncompatible", "true")
2974+
+ }
29302975
+ }
29312976
conf.set(
29322977
StaticSQLConf.WAREHOUSE_PATH,

0 commit comments

Comments
 (0)