Skip to content

Commit f687fed

Browse files
committed
update code to support Spark 3.4 and 3.5, and fork from original
1 parent 079ccdb commit f687fed

48 files changed

Lines changed: 555 additions & 286 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/main.yml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ on:
44
push:
55
branches: [ main ]
66
pull_request:
7-
branches: [ main ]
7+
types: [ opened, reopened ]
88
workflow_dispatch:
99

1010
jobs:
@@ -14,21 +14,22 @@ jobs:
1414

1515
strategy:
1616
matrix:
17-
spark-version: [3.0.1, 3.0.2, 3.0.3, 3.1.1, 3.1.2, 3.1.3, 3.2.0, 3.2.2, 3.3.0]
17+
spark-version: [ 3.0.1, 3.0.2, 3.0.3, 3.1.2, 3.2.1, 3.2.4, 3.3.0, 3.3.1, 3.3.2, 3.3.3, 3.4.0, 3.4.1, 3.5.0 ]
1818

1919
steps:
20-
- uses: actions/checkout@v2
20+
- uses: actions/checkout@v3.5.2
2121

2222
- name: Set up JDK 1.8
23-
uses: actions/setup-java@v1
23+
uses: actions/setup-java@v3.11.0
2424
with:
25-
java-version: 1.8
25+
java-version: '8'
26+
distribution: 'adopt'
2627

2728
- name: Test and package
2829
run: sbt -DsparkVersion="${{ matrix.spark-version }}" clean compile test package
2930

3031
- name: Upload the package
31-
uses: actions/upload-artifact@v2.2.0
32+
uses: actions/upload-artifact@v3.1.2
3233
with:
3334
path: ./target/**/spark-cef-reader*.jar
3435
if-no-files-found: warn

README.md

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,21 @@
33
A custom Spark data source supporting the [Common Event Format](https://support.citrix.com/article/CTX136146) V25
44
standard for logging events.
55

6-
[![Spark library CI](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/bp/spark-cef-reader/actions/workflows/main.yml)
6+
[![Spark library CI](https://github.com/elastacloud/spark-cef-reader/actions/workflows/main.yml/badge.svg)](https://github.com/elastacloud/spark-cef-reader/actions/workflows/main.yml)
7+
8+
## Fork
9+
10+
This is a fork taken from the original source at [https://github.com/bp/spark-cef-reader](https://github.com/bp/spark-cef-reader)
11+
which was created by the same authors as this fork.
12+
13+
This fork has the following changes applied to it at the time of the fork. Subsequence changes can be viewed
14+
in the history of the source code and in the release notes.
15+
16+
* Updated to include support for Spark 3.4 and 3.5
17+
* Rewrite of the options class to meet support for Spark 3.4
18+
* Renamed the package to not violate any trademarks and to ensure that this is seen as a derivative work
19+
20+
This repository contains all history from the original source and has the same license applied.
721

822
## Supported Features
923

@@ -17,21 +31,20 @@ standard for logging events.
1731
## Usage
1832

1933
```scala
20-
import com.bp.sds.cef._
2134
import org.apache.spark.sql.SparkSession
2235

2336
val spark = SparkSession.builder().getOrCreate()
2437

2538
// Read using provided data frame reader
2639
val df = spark.read
27-
.option("maxRecords", "10000") // Optional, default 10,000
28-
.option("pivotFields", "true") // Optional, default is false
40+
.option("maxRecords", "10000") // Optional, default 10,000
41+
.option("pivotFields", "true") // Optional, default is false
2942
.cef("/path/to/file.log")
3043

3144
// Writing the data back out
3245
df.write
3346
.mode("overwrite")
34-
.option("nullValue", "NA") // Optional
47+
.option("nullValue", "NA") // Optional
3548
.option("dateFormat", "millis") // Optional
3649
.cef("/path/to/output/file.log")
3750

@@ -41,7 +54,7 @@ df.write
4154
val dfShort = spark.read.format("cef").load("/path/to/file.log")
4255

4356
// Using the fully qualified name
44-
val dfFull = spark.read.format("com.bp.sds.cef").load("/path/to/file.log")
57+
val dfFull = spark.read.format("com.elastacloud.spark.cef").load("/path/to/file.log")
4558

4659
// The path to the file may be an absolute path name, multiple path names, or a glob pattern.
4760
val dfGlob = spark.read.cef("/landing/events/year=2020/month=*/day=*/*.log.gz")
@@ -50,6 +63,8 @@ val dfGlob = spark.read.cef("/landing/events/year=2020/month=*/day=*/*.log.gz")
5063
Available for use in Spark SQL as well
5164

5265
```sql
66+
-- Note the use of backticks around the path
67+
5368
SELECT
5469
*
5570
FROM
@@ -61,16 +76,15 @@ FROM
6176
The following options are available to pass to the data source, where they are not defined then the default value
6277
will be used.
6378

64-
Option | Type | Default | Supported Actions | Purpose
65-
------ | ---- | ------- | ----------------- | -------
66-
maxRecords | Integer | 10,000 | Read | The number of records to scan when inferring the schema. The data source will keep scanning until either the maximum number of records have been reached or there are no more files to scan.
67-
pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`.
68-
defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations.
69-
nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value.
70-
mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option.
71-
corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema.
72-
dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch
73-
79+
| Option | Type | Default | Supported Actions | Purpose |
80+
|-------------------------|-----------|--------------------------------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
81+
| maxRecords | Integer | 10,000 | Read | The number of records to scan when inferring the schema. The data source will keep scanning until either the maximum number of records have been reached or there are no more files to scan. |
82+
| pivotFields | Boolean | false | Read | Scans for field pairs in the format of `key=value keyLabel=OtherKey` and pivots the data to `OtherKey=value`. |
83+
| defensiveMode | Boolean | false | Read | Used if a feed is known to violate the CEF spec. Adds overhead to the parsing so only use when there are known violations. |
84+
| nullValue | String | `-` | Read/Write | A value used in the CEF records which should be parsed as a `null` value. |
85+
| mode | ParseMode | Permissive | Read | Permitted values are `permissive`, `dropmalformed` and `failfast`. When used in `FailFast` mode the parser will throw an error on the first record exception found. When used in `Permissive` mode it will attempt to parse as much of the record as possible, with `null` values used for all other values. Using `dropmalformed` will simply drop any malformed records from the result. `Permissive` mode may be used in combination with the `corruptRecordColumnName` option. |
86+
| corruptRecordColumnName | String | `null` | Read | When used with `Permissive` mode the full record is stored in a column with the name provided. If null is provided then the full record is discarded. By providing a name the data source will append a column to the inferred schema. |
87+
| dateFormat | String | `MMM dd yyyy HH:mm:ss.SSS zzz` | Write | When writing data this option defines the format time use for timestamp values. The data source will check against CEF valid formats. Alternatively use `millis` to output using milliseconds from the epoch |
7488

7589
### CEF supported date formats
7690

build.ps1

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1-
$versions = @("3.0.1", "3.0.2", "3.0.3", "3.1.1", "3.1.2", "3.2.0")
1+
$versions = @("3.0.1", "3.0.2", "3.1.2", "3.2.1", "3.2.4", "3.3.0", "3.3.1", "3.3.2", "3.4.0", "3.4.1", "3.5.0")
22
$jarPath = "./target/jars"
3+
$covPath = "./target/coverage"
34

45
Write-Host "Clearing existing jar artefacts" -ForegroundColor Green
56
if (Test-Path $jarPath) {
67
Remove-Item -Path $jarPath -Force -Recurse
78
}
89

10+
if (Test-Path $covPath) {
11+
Remove-Item -Path $covPath -Force -Recurse
12+
}
13+
914
New-Item -Path $jarPath -ItemType Directory
15+
New-Item -Path $covPath -ItemType Directory
1016

1117
foreach ($version in $versions) {
1218
Write-Host "Building for Spark version: $version" -ForegroundColor Green
13-
& sbt -DsparkVersion="$version" clean compile test package
19+
& sbt -DsparkVersion="$version" clean coverageOn compile test coverageReport coverageOff package
1420
}
1521

1622
Write-Host "Copying jar files to $jarPath" -ForegroundColor Green
1723
Get-ChildItem -Filter "spark-cef*.jar" -Path ./target -Recurse | Copy-Item -Destination $jarPath
24+
25+
Write-Host "Copying coverage information from most recent spark version to $covPath" -ForegroundColor Green
26+
$maxVersion = ($versions | Measure-Object -Maximum).Maximum
27+
Get-ChildItem -Path ".\target\spark-$maxVersion" -Recurse -Filter "scoverage-report" -Directory | Copy-Item -Destination .\target\coverage\ -Recurse
28+
Get-ChildItem -Path ".\target\spark-$maxVersion" -Recurse -Filter "cobertura.xml" -File | Copy-Item -Destination .\target\coverage\

build.sbt

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ val scalaTestVersion = settingKey[String]("ScalaTest version")
66

77
name := "spark-cef-reader"
88
version := "0.6-SNAPSHOT"
9-
organization := "com.bp"
9+
organization := "com.elastacloud"
1010
description := "CEF data source for Spark"
11-
homepage := Some(url("https://github.com/bp/spark-cef-reader"))
11+
homepage := Some(url("https://github.com/elastacloud/spark-cef-reader"))
1212
licenses += ("Apache License, Version 2.0", url("https://www.apache.org/licenses/LICENSE-2.0"))
13-
scmInfo := Some(ScmInfo(url("https://github.com/bp/spark-cef-reader"), "https://github.com/bp/spark-cef-reader.git"))
13+
scmInfo := Some(ScmInfo(url("https://github.com/elastacloud/spark-cef-reader"), "https://github.com/elastacloud/spark-cef-reader.git"))
1414
developers ++= List(
1515
Developer(id = "dazfuller", name = "Darren Fuller", email = "darren@elastacloud.com", url = url("https://github.com/elastacloud")),
1616
Developer(id = "azurecoder", name = "Richard Conway", email = "richard@elastacloud.com", url = url("https://github.com/elastacloud"))
@@ -32,8 +32,10 @@ Compile / unmanagedSourceDirectories ++= {
3232
Seq(baseDirectory.value / "src/main/3.0/scala")
3333
} else if (sparkVersion.value < "3.3.0") {
3434
Seq(baseDirectory.value / "src/main/3.2/scala")
35-
} else {
35+
} else if (sparkVersion.value < "3.4.0") {
3636
Seq(baseDirectory.value / "src/main/3.3/scala")
37+
} else {
38+
Seq(baseDirectory.value / "src/main/3.4/scala")
3739
}
3840
}
3941

@@ -46,15 +48,23 @@ libraryDependencies ++= Seq(
4648
"org.scalatest" %% "scalatest" % scalaTestVersion.value % Test
4749
)
4850

51+
coverageOutputCobertura := true
52+
coverageOutputHTML := true
53+
coverageMinimumStmtTotal := 70
54+
coverageFailOnMinimum := false
55+
coverageHighlighting := true
56+
4957
// Define common settings for the library
5058
val commonSettings = Seq(
51-
sparkVersion := System.getProperty("sparkVersion", "3.3.0"),
59+
sparkVersion := System.getProperty("sparkVersion", "3.5.0"),
5260
scalaVersion := {
53-
if (sparkVersion.value >= "3.2.0") {
61+
if (sparkVersion.value < "3.2.0") {
62+
"2.12.10"
63+
} else if (sparkVersion.value < "3.4.0") {
5464
"2.12.14"
5565
} else {
56-
"2.12.10"
66+
"2.12.15"
5767
}
5868
},
59-
scalaTestVersion := "3.2.13"
69+
scalaTestVersion := "3.2.17"
6070
)

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")

src/main/3.0/scala/com/bp/sds/cef/CefOutputWriter.scala renamed to src/main/3.0/scala/com/elastacloud/spark/cef/CefOutputWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bp.sds.cef
1+
package com.elastacloud.spark.cef
22

33
import org.apache.hadoop.fs.Path
44
import org.apache.hadoop.mapreduce.TaskAttemptContext

src/main/3.0/scala/com/bp/sds/cef/CefOutputWriterBuilder.scala renamed to src/main/3.0/scala/com/elastacloud/spark/cef/CefOutputWriterBuilder.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bp.sds.cef
1+
package com.elastacloud.spark.cef
22

33
import org.apache.hadoop.mapreduce.Job
44
import org.apache.spark.sql.connector.write.LogicalWriteInfo

src/main/scala/com/bp/sds/cef/CefPartitionReaderFactory.scala renamed to src/main/3.0/scala/com/elastacloud/spark/cef/CefPartitionReaderFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bp.sds.cef
1+
package com.elastacloud.spark.cef
22

33
import org.apache.spark.broadcast.Broadcast
44
import org.apache.spark.sql.catalyst.InternalRow

src/main/3.2/scala/com/bp/sds/cef/CefScan.scala renamed to src/main/3.0/scala/com/elastacloud/spark/cef/CefScan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bp.sds.cef
1+
package com.elastacloud.spark.cef
22

33
import org.apache.hadoop.fs.Path
44
import org.apache.spark.sql.SparkSession
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.catalyst
18+
19+
import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES}
20+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
21+
import org.apache.spark.sql.internal.SQLConf
22+
23+
/**
24+
* Common options for the file-based data source.
25+
*/
26+
class FileSourceOptions(
27+
@transient private val parameters: CaseInsensitiveMap[String])
28+
extends Serializable {
29+
30+
def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
31+
32+
val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
33+
.getOrElse(SQLConf.get.ignoreCorruptFiles)
34+
35+
val ignoreMissingFiles: Boolean = parameters.get(IGNORE_MISSING_FILES).map(_.toBoolean)
36+
.getOrElse(SQLConf.get.ignoreMissingFiles)
37+
}
38+
39+
object FileSourceOptions {
40+
val IGNORE_CORRUPT_FILES = "ignoreCorruptFiles"
41+
val IGNORE_MISSING_FILES = "ignoreMissingFiles"
42+
}

0 commit comments

Comments
 (0)