Skip to content

Commit 086757f

Browse files
authored
Add Spark 3.3.0 support (#9)
1 parent 847d001 commit 086757f

7 files changed

Lines changed: 130 additions & 8 deletions

File tree

.github/workflows/main.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,22 @@ jobs:
1111
build:
1212

1313
runs-on: ubuntu-latest
14-
14+
1515
strategy:
1616
matrix:
17-
spark-version: [3.0.1, 3.0.2, 3.0.3, 3.1.1, 3.1.2, 3.2.0]
17+
spark-version: [3.0.1, 3.0.2, 3.0.3, 3.1.1, 3.1.2, 3.1.3, 3.2.0, 3.2.2, 3.3.0]
1818

1919
steps:
2020
- uses: actions/checkout@v2
21-
21+
2222
- name: Set up JDK 1.8
2323
uses: actions/setup-java@v1
2424
with:
2525
java-version: 1.8
26-
26+
2727
- name: Test and package
2828
run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package
29-
29+
3030
- name: Upload the package
3131
uses: actions/upload-artifact@v2.2.0
3232
with:

build.sbt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ libraryDependencies ++= Seq(
3030
Compile / unmanagedSourceDirectories ++= {
3131
if (sparkVersion.value < "3.2.0") {
3232
Seq(baseDirectory.value / "src/main/3.0/scala")
33-
} else {
33+
} else if (sparkVersion.value < "3.3.0") {
3434
Seq(baseDirectory.value / "src/main/3.2/scala")
35+
} else {
36+
Seq(baseDirectory.value / "src/main/3.3/scala")
3537
}
3638
}
3739

@@ -46,13 +48,13 @@ libraryDependencies ++= Seq(
4648

4749
// Define common settings for the library
4850
val commonSettings = Seq(
49-
sparkVersion := System.getProperty("sparkVersion", "3.2.0"),
51+
sparkVersion := System.getProperty("sparkVersion", "3.3.0"),
5052
scalaVersion := {
5153
if (sparkVersion.value >= "3.2.0") {
5254
"2.12.14"
5355
} else {
5456
"2.12.10"
5557
}
5658
},
57-
scalaTestVersion := "3.2.10"
59+
scalaTestVersion := "3.2.13"
5860
)
File renamed without changes.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.bp.sds.cef
2+
3+
import org.apache.hadoop.fs.Path
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.catalyst.expressions.Expression
6+
import org.apache.spark.sql.connector.read.PartitionReaderFactory
7+
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
8+
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
9+
import org.apache.spark.sql.types.StructType
10+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
11+
import org.apache.spark.util.SerializableConfiguration
12+
13+
import scala.collection.JavaConverters.mapAsScalaMapConverter
14+
15+
private[cef] case class CefScan(
16+
sparkSession: SparkSession,
17+
fileIndex: PartitioningAwareFileIndex,
18+
dataSchema: StructType,
19+
readDataSchema: StructType,
20+
readPartitionSchema: StructType,
21+
options: CaseInsensitiveStringMap,
22+
partitionFilters: Seq[Expression] = Seq.empty,
23+
dataFilters: Seq[Expression] = Seq.empty
24+
) extends TextBasedFileScan(sparkSession, options) {
25+
private val optionsAsScala = options.asScala.toMap
26+
private val cefOptions = CefParserOptions.from(options)
27+
28+
override def isSplitable(path: Path): Boolean = super.isSplitable(path)
29+
30+
override def withFilters(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): FileScan =
31+
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
32+
33+
override def createReaderFactory(): PartitionReaderFactory = {
34+
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
35+
val broadcastConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
36+
CefPartitionReaderFactory(sparkSession.sessionState.conf, broadcastConf, dataSchema, readDataSchema, readPartitionSchema, cefOptions)
37+
}
38+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.bp.sds.cef
2+
3+
import org.apache.hadoop.fs.Path
4+
import org.apache.hadoop.mapreduce.TaskAttemptContext
5+
import org.apache.spark.internal.Logging
6+
import org.apache.spark.sql.catalyst.InternalRow
7+
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter}
8+
import org.apache.spark.sql.types.StructType
9+
10+
import java.nio.charset.StandardCharsets
11+
12+
private[cef] class CefOutputWriter(val path: String, cefOptions: CefParserOptions, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter with Logging {
13+
private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path), StandardCharsets.UTF_8)
14+
private val gen = CefRecordWriter(dataSchema, writer, cefOptions)
15+
16+
override def write(row: InternalRow): Unit = {
17+
gen.writeRow(row)
18+
gen.writeLineEnding()
19+
}
20+
21+
override def close(): Unit = {
22+
writer.close()
23+
}
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.bp.sds.cef
2+
3+
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
4+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
5+
import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory}
6+
import org.apache.spark.sql.execution.datasources.v2.FileWrite
7+
import org.apache.spark.sql.internal.SQLConf
8+
import org.apache.spark.sql.types.{DataType, StructType}
9+
10+
private[cef] case class CefOutputWriterBuilder(paths: Seq[String],
11+
formatName: String,
12+
supportsDataType: DataType => Boolean,
13+
info: LogicalWriteInfo
14+
) extends FileWrite with WriteBuilder {
15+
override def prepareWrite(sqlConf: SQLConf, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory =
16+
new OutputWriterFactory {
17+
override def getFileExtension(context: TaskAttemptContext): String =
18+
".log" + CodecStreams.getCompressionExtension(context)
19+
20+
override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter =
21+
new CefOutputWriter(path, CefParserOptions.from(options), dataSchema, context)
22+
}
23+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.bp.sds.cef
2+
3+
import org.apache.hadoop.fs.Path
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.catalyst.expressions.Expression
6+
import org.apache.spark.sql.connector.read.PartitionReaderFactory
7+
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
8+
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
9+
import org.apache.spark.sql.types.StructType
10+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
11+
import org.apache.spark.util.SerializableConfiguration
12+
13+
import scala.collection.JavaConverters.mapAsScalaMapConverter
14+
15+
private[cef] case class CefScan(
16+
sparkSession: SparkSession,
17+
fileIndex: PartitioningAwareFileIndex,
18+
dataSchema: StructType,
19+
readDataSchema: StructType,
20+
readPartitionSchema: StructType,
21+
options: CaseInsensitiveStringMap,
22+
partitionFilters: Seq[Expression] = Seq.empty,
23+
dataFilters: Seq[Expression] = Seq.empty
24+
) extends TextBasedFileScan(sparkSession, options) {
25+
private val optionsAsScala = options.asScala.toMap
26+
private val cefOptions = CefParserOptions.from(options)
27+
28+
override def isSplitable(path: Path): Boolean = super.isSplitable(path)
29+
30+
override def createReaderFactory(): PartitionReaderFactory = {
31+
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
32+
val broadcastConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
33+
CefPartitionReaderFactory(sparkSession.sessionState.conf, broadcastConf, dataSchema, readDataSchema, readPartitionSchema, cefOptions)
34+
}
35+
}

0 commit comments

Comments
 (0)