Skip to content

Commit d288b7e

Browse files
feat(spark): Refactoring datasources (#514)
### Reason for this PR By moving datasources under `org.apache.spark.sql` we are able to access private Spark API. Last time when I was trying to fully migrate datasources to V2 it was a blocker. Detailed motivation is in #493 ### What changes are included in this PR? Mostly refactoring. ### Are these changes tested? Unit tests are passed I manually checked the generated JARs: ![image](https://github.com/apache/incubator-graphar/assets/29755009/1b094516-88b1-490a-a2ea-8dcd092a3b1d) ### Are there any user-facing changes? Mostly not because `GarDataSource` was left under the same package. Close #493
1 parent 8b315a7 commit d288b7e

21 files changed

Lines changed: 86 additions & 110 deletions

File tree

licenserc.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ excludes = [
4545
"cpp/thirdparty",
4646
"cpp/misc/cpplint.py",
4747
"spark/datasources-32/src/main/scala/org/apache/graphar/datasources",
48+
"spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar",
4849
"spark/datasources-33/src/main/scala/org/apache/graphar/datasources",
50+
"spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar",
4951
"java/src/main/java/org/apache/graphar/stdcxx/StdString.java",
5052
"java/src/main/java/org/apache/graphar/stdcxx/StdVector.java",
5153
"java/src/main/java/org/apache/graphar/stdcxx/StdSharedPtr.java",

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,24 @@
1616

1717
package org.apache.graphar.datasources
1818

19-
import scala.collection.JavaConverters._
20-
import scala.util.matching.Regex
21-
import java.util
22-
2319
import com.fasterxml.jackson.databind.ObjectMapper
2420
import org.apache.hadoop.conf.Configuration
2521
import org.apache.hadoop.fs.Path
26-
22+
import org.apache.spark.sql.SparkSession
2723
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
24+
import org.apache.spark.sql.connector.expressions.Transform
2825
import org.apache.spark.sql.execution.datasources._
29-
import org.apache.spark.sql.SparkSession
3026
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
3127
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
3228
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
29+
import org.apache.spark.sql.graphar.GarTable
30+
import org.apache.spark.sql.sources.DataSourceRegister
3331
import org.apache.spark.sql.types.StructType
3432
import org.apache.spark.sql.util.CaseInsensitiveStringMap
35-
import org.apache.spark.sql.sources.DataSourceRegister
36-
import org.apache.spark.sql.connector.expressions.Transform
33+
34+
import java.util
35+
import scala.collection.JavaConverters._
36+
import scala.util.matching.Regex
3737

3838
// Derived from Apache Spark 3.1.1
3939
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,14 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
1919

20-
package org.apache.graphar.datasources
20+
package org.apache.spark.sql.graphar
2121

2222
import org.apache.graphar.GeneralParams
23-
24-
import org.json4s._
25-
import org.json4s.jackson.JsonMethods._
26-
27-
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
2823
import org.apache.hadoop.mapreduce._
2924
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
26+
import org.json4s._
27+
import org.json4s.jackson.JsonMethods._
3028

3129
object GarCommitProtocol {
3230
private def binarySearchPair(aggNums: Array[Int], key: Int): (Int, Int) = {

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,39 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala
1919

20-
package org.apache.graphar.datasources
21-
22-
import scala.collection.JavaConverters._
23-
import scala.collection.mutable.ArrayBuffer
20+
package org.apache.spark.sql.graphar
2421

2522
import org.apache.hadoop.conf.Configuration
2623
import org.apache.hadoop.fs.Path
2724
import org.apache.parquet.hadoop.ParquetInputFormat
28-
2925
import org.apache.spark.sql.SparkSession
30-
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils}
3126
import org.apache.spark.sql.catalyst.csv.CSVOptions
27+
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
3228
import org.apache.spark.sql.connector.read.PartitionReaderFactory
3329
import org.apache.spark.sql.execution.PartitionedFileUtil
34-
import org.apache.spark.sql.execution.datasources.{
35-
FilePartition,
36-
PartitioningAwareFileIndex,
37-
PartitionedFile
38-
}
3930
import org.apache.spark.sql.execution.datasources.parquet.{
4031
ParquetOptions,
4132
ParquetReadSupport,
4233
ParquetWriteSupport
4334
}
4435
import org.apache.spark.sql.execution.datasources.v2.FileScan
45-
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
46-
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
4736
import org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
37+
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
38+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
39+
import org.apache.spark.sql.execution.datasources.{
40+
FilePartition,
41+
PartitionedFile,
42+
PartitioningAwareFileIndex
43+
}
4844
import org.apache.spark.sql.internal.SQLConf
4945
import org.apache.spark.sql.sources.Filter
5046
import org.apache.spark.sql.types.StructType
5147
import org.apache.spark.sql.util.CaseInsensitiveStringMap
5248
import org.apache.spark.util.SerializableConfiguration
5349

50+
import scala.collection.JavaConverters._
51+
import scala.collection.mutable.ArrayBuffer
52+
5453
/** GarScan is a class to implement the file scan for GarDataSource. */
5554
case class GarScan(
5655
sparkSession: SparkSession,

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,19 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
1919

20-
package org.apache.graphar.datasources
20+
package org.apache.spark.sql.graphar
2121

2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
2424
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
25-
2625
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
26+
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
27+
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
2728
import org.apache.spark.sql.sources.Filter
2829
import org.apache.spark.sql.types.StructType
2930
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3031

3132
import scala.collection.JavaConverters._
32-
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
33-
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
3433

3534
/** GarScanBuilder is a class to build the file scan for GarDataSource. */
3635
case class GarScanBuilder(
@@ -49,6 +48,7 @@ case class GarScanBuilder(
4948
}
5049

5150
private var filters: Array[Filter] = Array.empty
51+
5252
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
5353
this.filters = filters
5454
filters

maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarTable.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,24 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala
1919

20-
package org.apache.graphar.datasources
21-
22-
import scala.collection.JavaConverters._
20+
package org.apache.spark.sql.graphar
2321

2422
import org.apache.hadoop.fs.FileStatus
25-
2623
import org.apache.spark.sql.SparkSession
27-
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2824
import org.apache.spark.sql.catalyst.csv.CSVOptions
25+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
2926
import org.apache.spark.sql.execution.datasources.FileFormat
3027
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
3128
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
3229
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
3330
import org.apache.spark.sql.execution.datasources.v2.FileTable
31+
import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
32+
import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
33+
import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
3434
import org.apache.spark.sql.types._
3535
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3636

37-
import org.apache.graphar.datasources.csv.CSVWriteBuilder
38-
import org.apache.graphar.datasources.parquet.ParquetWriteBuilder
39-
import org.apache.graphar.datasources.orc.OrcWriteBuilder
37+
import scala.collection.JavaConverters._
4038

4139
/** GarTable is a class to represent the graph data in GraphAr as a table. */
4240
case class GarTable(

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,22 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala
1919

20-
package org.apache.graphar.datasources
21-
22-
import java.util.UUID
23-
24-
import scala.collection.JavaConverters._
20+
package org.apache.spark.sql.graphar
2521

2622
import org.apache.hadoop.conf.Configuration
2723
import org.apache.hadoop.fs.Path
2824
import org.apache.hadoop.mapreduce.Job
2925
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
30-
import org.apache.hadoop.mapreduce.Job
31-
32-
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
3326
import org.apache.spark.sql.SparkSession
3427
import org.apache.spark.sql.catalyst.InternalRow
28+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
3529
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3630
import org.apache.spark.sql.connector.write.{
3731
BatchWrite,
3832
LogicalWriteInfo,
3933
WriteBuilder
4034
}
35+
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
4136
import org.apache.spark.sql.execution.datasources.{
4237
BasicWriteJobStatsTracker,
4338
DataSource,
@@ -48,8 +43,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
4843
import org.apache.spark.sql.internal.SQLConf
4944
import org.apache.spark.sql.types.{DataType, StructType}
5045
import org.apache.spark.util.SerializableConfiguration
51-
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
52-
import org.apache.spark.sql.catalyst.expressions.AttributeReference
46+
47+
import java.util.UUID
48+
import scala.collection.JavaConverters._
5349

5450
abstract class GarWriteBuilder(
5551
paths: Seq[String],

maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,22 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala
1919

20-
package org.apache.graphar.datasources.csv
20+
package org.apache.spark.sql.graphar.csv
2121

2222
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2323
import org.apache.spark.sql.catalyst.csv.CSVOptions
2424
import org.apache.spark.sql.catalyst.util.CompressionCodecs
2525
import org.apache.spark.sql.connector.write.LogicalWriteInfo
26+
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
2627
import org.apache.spark.sql.execution.datasources.{
2728
CodecStreams,
2829
OutputWriter,
2930
OutputWriterFactory
3031
}
31-
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
32+
import org.apache.spark.sql.graphar.GarWriteBuilder
3233
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.types.{DataType, StructType}
3435

35-
import org.apache.graphar.datasources.GarWriteBuilder
36-
3736
class CSVWriteBuilder(
3837
paths: Seq[String],
3938
formatName: String,

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818
// we have to reimplement it here.
1919
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala
2020

21-
package org.apache.graphar.datasources.orc
21+
package org.apache.spark.sql.graphar.orc
2222

2323
import org.apache.hadoop.fs.Path
2424
import org.apache.hadoop.io.NullWritable
2525
import org.apache.hadoop.mapreduce.TaskAttemptContext
2626
import org.apache.orc.OrcFile
2727
import org.apache.orc.mapred.{
28-
OrcOutputFormat => OrcMapRedOutputFormat,
29-
OrcStruct
28+
OrcStruct,
29+
OrcOutputFormat => OrcMapRedOutputFormat
3030
}
3131
import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}
32-
3332
import org.apache.spark.sql.catalyst.InternalRow
3433
import org.apache.spark.sql.execution.datasources.OutputWriter
3534
import org.apache.spark.sql.execution.datasources.orc.{OrcSerializer, OrcUtils}

maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala renamed to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,22 @@
1717
// Derived from Apache Spark 3.1.1
1818
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/ORCWriteBuilder.scala
1919

20-
package org.apache.graphar.datasources.orc
20+
package org.apache.spark.sql.graphar.orc
2121

2222
import org.apache.hadoop.mapred.JobConf
2323
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
2424
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
2525
import org.apache.orc.mapred.OrcStruct
26-
2726
import org.apache.spark.sql.connector.write.LogicalWriteInfo
27+
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils}
2828
import org.apache.spark.sql.execution.datasources.{
2929
OutputWriter,
3030
OutputWriterFactory
3131
}
32-
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils}
32+
import org.apache.spark.sql.graphar.GarWriteBuilder
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.types._
3535

36-
import org.apache.graphar.datasources.GarWriteBuilder
37-
3836
object OrcWriteBuilder {
3937
// the getQuotedSchemaString method of spark OrcFileFormat
4038
private def getQuotedSchemaString(dataType: DataType): String =

0 commit comments

Comments
 (0)