Skip to content

Commit d8c7760

Browse files
author
Kazantsev Maksim
committed
Final approach
1 parent 1809df8 commit d8c7760

5 files changed

Lines changed: 186 additions & 113 deletions

File tree

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -241,25 +241,25 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
241241
if (!partitionSchemaSupported) {
242242
fallbackReasons += s"Partition schema ${scan.readPartitionSchema} is not supported"
243243
}
244-
val columnNameOfCorruptedRecords =
244+
val corruptedRecordsColumnName =
245245
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
246-
val hasNoCorruptedColumn =
247-
!scan.readDataSchema.fieldNames.contains(columnNameOfCorruptedRecords)
248-
if (!hasNoCorruptedColumn) {
249-
fallbackReasons += "Comet doesn't support the processing of corrupted records in Spark"
246+
val containsCorruptedRecordsColumn =
247+
!scan.readDataSchema.fieldNames.contains(corruptedRecordsColumnName)
248+
if (!containsCorruptedRecordsColumn) {
249+
fallbackReasons += "Comet doesn't support the processing of corrupted records"
250250
}
251-
val inferSchemaEnabled = scan.options.getBoolean("inferSchema", false)
252-
if (inferSchemaEnabled) {
251+
val isInferSchemaEnabled = scan.options.getBoolean("inferSchema", false)
252+
if (isInferSchemaEnabled) {
253253
fallbackReasons += "Comet doesn't support inferSchema=true option"
254254
}
255255
val delimiter = scan.options.get("delimiter")
256-
val isSingleCharDelimiter = delimiter.length == 1
257-
if (!isSingleCharDelimiter) {
258-
fallbackReasons += s"Comet doesn't support delimiter: '$delimiter' " +
259-
s"with more then one character"
256+
val isSingleCharacterDelimiter = delimiter.length == 1
257+
if (!isSingleCharacterDelimiter) {
258+
fallbackReasons +=
259+
s"Comet supports only single-character delimiters, but got: '$delimiter'"
260260
}
261-
if (schemaSupported && partitionSchemaSupported && hasNoCorruptedColumn
262-
&& !inferSchemaEnabled && isSingleCharDelimiter) {
261+
if (schemaSupported && partitionSchemaSupported && containsCorruptedRecordsColumn
262+
&& !isInferSchemaEnabled && isSingleCharacterDelimiter) {
263263
CometBatchScanExec(
264264
scanExec.clone().asInstanceOf[BatchScanExec],
265265
runtimeFilters = scanExec.runtimeFilters)

spark/src/main/scala/org/apache/comet/testing/CsvGenerator.scala

Lines changed: 0 additions & 40 deletions
This file was deleted.

spark/src/main/scala/org/apache/spark/sql/comet/CometCsvNativeScanExec.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,17 @@ import org.apache.spark.sql.execution.datasources.FilePartition
2929
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
3030
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan
3131

32+
import com.google.common.base.Objects
33+
3234
import org.apache.comet.{CometConf, ConfigEntry}
3335
import org.apache.comet.objectstore.NativeConfig
34-
import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClass, SupportLevel}
36+
import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass}
3537
import org.apache.comet.serde.OperatorOuterClass.Operator
3638
import org.apache.comet.serde.operator.{partition2Proto, schema2Proto}
3739

40+
/*
41+
* Native CSV scan operator that delegates file reading to datafusion.
42+
*/
3843
case class CometCsvNativeScanExec(
3944
override val nativeOp: Operator,
4045
override val output: Seq[Attribute],
@@ -53,17 +58,28 @@ case class CometCsvNativeScanExec(
5358
override protected def doCanonicalize(): SparkPlan = {
5459
CometCsvNativeScanExec(nativeOp, output, originalPlan, serializedPlanOpt)
5560
}
61+
62+
override def equals(obj: Any): Boolean = {
63+
obj match {
64+
case other: CometCsvNativeScanExec =>
65+
this.output == other.output &&
66+
this.serializedPlanOpt == other.serializedPlanOpt &&
67+
this.originalPlan == other.originalPlan
68+
case _ =>
69+
false
70+
}
71+
}
72+
73+
override def hashCode(): Int = {
74+
Objects.hashCode(output, serializedPlanOpt, originalPlan)
75+
}
5676
}
5777

5878
object CometCsvNativeScanExec extends CometOperatorSerde[CometBatchScanExec] {
5979

6080
override def enabledConfig: Option[ConfigEntry[Boolean]] = Some(
6181
CometConf.COMET_CSV_V2_NATIVE_ENABLED)
6282

63-
override def getSupportLevel(operator: CometBatchScanExec): SupportLevel = {
64-
Compatible()
65-
}
66-
6783
override def convert(
6884
op: CometBatchScanExec,
6985
builder: Operator.Builder,

spark/src/test/scala/org/apache/comet/csv/CometCsvReadSuite.scala renamed to spark/src/test/scala/org/apache/comet/csv/CometCsvNativeReadSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
2525

2626
import org.apache.comet.CometConf
2727

28-
class CometCsvReadSuite extends CometTestBase {
28+
class CometCsvNativeReadSuite extends CometTestBase {
2929
private val TEST_CSV_PATH_NO_HEADER = "src/test/resources/test-data/csv-test-1.csv"
3030
private val TEST_CSV_PATH_HAS_HEADER = "src/test/resources/test-data/csv-test-2.csv"
3131

@@ -73,7 +73,7 @@ class CometCsvReadSuite extends CometTestBase {
7373
.csv(TEST_CSV_PATH_NO_HEADER)
7474
checkSparkAnswerAndFallbackReason(
7575
df,
76-
"Comet doesn't support the processing of corrupted records in Spark")
76+
"Comet doesn't support the processing of corrupted records")
7777
df = spark.read
7878
.options(Map("header" -> "false", "delimiter" -> ",", "inferSchema" -> "true"))
7979
.csv(TEST_CSV_PATH_NO_HEADER)
@@ -83,7 +83,7 @@ class CometCsvReadSuite extends CometTestBase {
8383
.csv(TEST_CSV_PATH_NO_HEADER)
8484
checkSparkAnswerAndFallbackReason(
8585
df,
86-
"Comet doesn't support delimiter: ',,' with more then one character")
86+
"Comet supports only single-character delimiters, but got: ',,'")
8787
}
8888
}
8989
}

spark/src/test/scala/org/apache/spark/sql/benchmark/CometNativeCsvReadBenchmark.scala

Lines changed: 149 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,71 +19,168 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22-
import java.io.File
23-
24-
import scala.util.Random
25-
2622
import org.apache.spark.benchmark.Benchmark
27-
import org.apache.spark.sql.benchmark.CometExecBenchmark.withSQLConf
23+
import org.apache.spark.sql.benchmark.CometNativeCsvReadBenchmark.TPCHSchemas._
24+
import org.apache.spark.sql.execution.benchmark.TPCDSQueryBenchmarkArguments
2825
import org.apache.spark.sql.internal.SQLConf
29-
import org.apache.spark.sql.types.{DataTypes, StructType}
26+
import org.apache.spark.sql.types._
3027

3128
import org.apache.comet.CometConf
32-
import org.apache.comet.testing.{CsvGenerator, FuzzDataGenerator, SchemaGenOptions}
3329

3430
/**
35-
* Benchmark to measure Comet read performance. To run this benchmark:
31+
* @param tableName
32+
* Name of the TPC-H table. Must match one of the standard table names: region, nation, part,
33+
* supplier, partsupp, customer, orders, lineitem.
34+
*
35+
* @param schema
36+
* Table data structure in Spark StructType format.
37+
*/
38+
case class NativeCsvReadConfig(tableName: String, schema: StructType)
39+
40+
/**
41+
* Benchmark to measure Comet csv read performance. To run this benchmark:
3642
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
37-
* benchmark-org.apache.spark.sql.benchmark.CometNativeCsvReadBenchmark` Results will be written
38-
* to "spark/benchmarks/CometNativeCsvReadBenchmark-**results.txt".
43+
* benchmark-org.apache.spark.sql.benchmark.CometNativeCsvReadBenchmark -- --data-location
44+
* /tmp/tpcds` Results will be written to
45+
* "spark/benchmarks/CometNativeCsvReadBenchmark-**results.txt".
3946
*/
4047
object CometNativeCsvReadBenchmark extends CometBenchmarkBase {
4148

42-
private def prepareCsvTable(dir: File, schema: StructType, numRows: Int): Unit = {
43-
val random = new Random(42)
44-
CsvGenerator.makeCsvFile(random, spark, schema, dir.getCanonicalPath, numRows)
49+
private def runNativeCsvBenchmark(
50+
dataLocation: String,
51+
tableName: String,
52+
schema: StructType,
53+
valuesPerPartition: Int,
54+
numIters: Int): Unit = {
55+
val benchmark =
56+
new Benchmark(s"Native csv read - `$tableName` table", valuesPerPartition, output = output)
57+
val filePath = s"$dataLocation/$tableName.csv"
58+
benchmark.addCase("Spark", numIters) { _ =>
59+
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
60+
spark.read
61+
.schema(schema)
62+
.options(Map("header" -> "true", "delimiter" -> ","))
63+
.csv(filePath)
64+
.noop()
65+
}
66+
}
67+
benchmark.addCase("Native", numIters) { _ =>
68+
withSQLConf(
69+
CometConf.COMET_ENABLED.key -> "true",
70+
CometConf.COMET_EXEC_ENABLED.key -> "true",
71+
CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true",
72+
SQLConf.USE_V1_SOURCE_LIST.key -> "") {
73+
spark.read
74+
.schema(schema)
75+
.options(Map("header" -> "true", "delimiter" -> ","))
76+
.csv(filePath)
77+
.noop()
78+
}
79+
}
80+
benchmark.run()
4581
}
4682

83+
private val testCases = Seq(
84+
/* NativeCsvReadConfig("orders", ordersSchema),
85+
NativeCsvReadConfig("region", regionSchema),*/
86+
NativeCsvReadConfig("nation", nationSchema)
87+
/*NativeCsvReadConfig("part", partSchema),
88+
NativeCsvReadConfig("supplier", supplierSchema),
89+
NativeCsvReadConfig("partsupp", partsuppSchema),
90+
NativeCsvReadConfig("customer", customerSchema),
91+
NativeCsvReadConfig("lineitem", lineitemSchema)*/ )
92+
4793
override def runCometBenchmark(args: Array[String]): Unit = {
48-
val numRows = 2000000
49-
val benchmark = new Benchmark(s"Native csv read - $numRows rows", numRows, output = output)
50-
withTempPath { dir =>
51-
val schema = FuzzDataGenerator.generateSchema(
52-
SchemaGenOptions(primitiveTypes = Seq(
53-
DataTypes.BooleanType,
54-
DataTypes.ByteType,
55-
DataTypes.ShortType,
56-
DataTypes.IntegerType,
57-
DataTypes.LongType,
58-
DataTypes.FloatType,
59-
DataTypes.DoubleType,
60-
DataTypes.createDecimalType(10, 2),
61-
DataTypes.createDecimalType(36, 18),
62-
DataTypes.DateType,
63-
DataTypes.StringType)))
64-
prepareCsvTable(dir, schema, numRows)
65-
benchmark.addCase("Simple csv v2 read - spark") { _ =>
66-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
67-
spark.read
68-
.schema(schema)
69-
.csv(dir.getCanonicalPath)
70-
.noop()
71-
}
72-
}
73-
benchmark.addCase("Simple csv v2 read - comet native") { _ =>
74-
withSQLConf(
75-
CometConf.COMET_ENABLED.key -> "true",
76-
CometConf.COMET_EXEC_ENABLED.key -> "true",
77-
CometConf.COMET_CSV_V2_NATIVE_ENABLED.key -> "true",
78-
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true",
79-
SQLConf.USE_V1_SOURCE_LIST.key -> "") {
80-
spark.read
81-
.schema(schema)
82-
.csv(dir.getCanonicalPath)
83-
.noop()
84-
}
85-
}
86-
benchmark.run()
94+
val benchmarkArgs = new TPCDSQueryBenchmarkArguments(args)
95+
val valuesPerPartition = 1024 * 1024 * 2
96+
val numIters = 1
97+
testCases.foreach { config =>
98+
runNativeCsvBenchmark(
99+
benchmarkArgs.dataLocation,
100+
config.tableName,
101+
config.schema,
102+
valuesPerPartition,
103+
numIters)
87104
}
88105
}
106+
107+
object TPCHSchemas {
108+
109+
val regionSchema: StructType = new StructType()
110+
.add("r_regionkey", IntegerType, nullable = true)
111+
.add("r_name", StringType, nullable = true)
112+
.add("r_comment", StringType, nullable = true)
113+
114+
val nationSchema: StructType = new StructType()
115+
.add("n_nationkey", IntegerType, nullable = true)
116+
.add("n_name", StringType, nullable = true)
117+
.add("n_regionkey", IntegerType, nullable = true)
118+
.add("n_comment", StringType, nullable = true)
119+
120+
val partSchema: StructType = new StructType()
121+
.add("p_partkey", IntegerType, nullable = true)
122+
.add("p_name", StringType, nullable = true)
123+
.add("p_mfgr", StringType, nullable = true)
124+
.add("p_brand", StringType, nullable = true)
125+
.add("p_type", StringType, nullable = true)
126+
.add("p_size", IntegerType, nullable = true)
127+
.add("p_container", StringType, nullable = true)
128+
.add("p_retailprice", DoubleType, nullable = true)
129+
.add("p_comment", StringType, nullable = true)
130+
131+
val supplierSchema: StructType = new StructType()
132+
.add("s_suppkey", IntegerType, nullable = true)
133+
.add("s_name", StringType, nullable = true)
134+
.add("s_address", StringType, nullable = true)
135+
.add("s_nationkey", IntegerType, nullable = true)
136+
.add("s_phone", StringType, nullable = true)
137+
.add("s_acctbal", DoubleType, nullable = true)
138+
.add("s_comment", StringType, nullable = true)
139+
140+
val partsuppSchema: StructType = new StructType()
141+
.add("ps_partkey", IntegerType, nullable = true)
142+
.add("ps_suppkey", IntegerType, nullable = true)
143+
.add("ps_availqty", IntegerType, nullable = true)
144+
.add("ps_supplycost", DoubleType, nullable = true)
145+
.add("ps_comment", StringType, nullable = true)
146+
147+
val customerSchema: StructType = new StructType()
148+
.add("c_custkey", IntegerType, nullable = true)
149+
.add("c_name", StringType, nullable = true)
150+
.add("c_address", StringType, nullable = true)
151+
.add("c_nationkey", IntegerType, nullable = true)
152+
.add("c_phone", StringType, nullable = true)
153+
.add("c_acctbal", DoubleType, nullable = true)
154+
.add("c_mktsegment", StringType, nullable = true)
155+
.add("c_comment", StringType, nullable = true)
156+
157+
val ordersSchema: StructType = new StructType()
158+
.add("o_orderkey", IntegerType, nullable = true)
159+
.add("o_custkey", IntegerType, nullable = true)
160+
.add("o_orderstatus", StringType, nullable = true)
161+
.add("o_totalprice", DoubleType, nullable = true)
162+
.add("o_orderdate", DateType, nullable = true)
163+
.add("o_orderpriority", StringType, nullable = true)
164+
.add("o_clerk", StringType, nullable = true)
165+
.add("o_shippriority", IntegerType, nullable = true)
166+
.add("o_comment", StringType, nullable = true)
167+
168+
val lineitemSchema: StructType = new StructType()
169+
.add("l_orderkey", IntegerType, nullable = true)
170+
.add("l_partkey", IntegerType, nullable = true)
171+
.add("l_suppkey", IntegerType, nullable = true)
172+
.add("l_linenumber", IntegerType, nullable = true)
173+
.add("l_quantity", IntegerType, nullable = true)
174+
.add("l_extendedprice", DoubleType, nullable = true)
175+
.add("l_discount", DoubleType, nullable = true)
176+
.add("l_tax", DoubleType, nullable = true)
177+
.add("l_returnflag", StringType, nullable = true)
178+
.add("l_linestatus", StringType, nullable = true)
179+
.add("l_shipdate", DateType, nullable = true)
180+
.add("l_commitdate", DateType, nullable = true)
181+
.add("l_receiptdate", DateType, nullable = true)
182+
.add("l_shipinstruct", StringType, nullable = true)
183+
.add("l_shipmode", StringType, nullable = true)
184+
.add("l_comment", StringType, nullable = true)
185+
}
89186
}

0 commit comments

Comments
 (0)