Skip to content

Commit d04cb66

Browse files
committed
spark - kafka - example
0 parents  commit d04cb66

8 files changed

Lines changed: 253 additions & 0 deletions

File tree

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/.metadata/
2+
Spark-Kafka/.gitignore
3+
Spark-Kafka/mvnw
4+
Spark-Kafka/mvnw.cmd
5+
Spark-Kafka/.mvn/

Spark-Kafka/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<parent>
7+
<groupId>org.springframework.boot</groupId>
8+
<artifactId>spring-boot-starter-parent</artifactId>
9+
<version>2.2.0.RELEASE</version>
10+
<relativePath /> <!-- lookup parent from repository -->
11+
</parent>
12+
<groupId>com.spark.kafka.task</groupId>
13+
<artifactId>Spark-Kafka</artifactId>
14+
<version>0.0.1-SNAPSHOT</version>
15+
<name>Spark-Kafka</name>
16+
<description>Demo project for Spark Kafka</description>
17+
18+
<properties>
19+
<java.version>1.8</java.version>
20+
</properties>
21+
22+
<dependencies>
23+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
24+
<dependency>
25+
<groupId>org.apache.spark</groupId>
26+
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
27+
<version>2.3.1</version>
28+
<scope>provided</scope>
29+
</dependency>
30+
31+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
32+
<dependency>
33+
<groupId>org.apache.spark</groupId>
34+
<artifactId>spark-sql_2.11</artifactId>
35+
<version>2.3.1</version>
36+
</dependency>
37+
38+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
39+
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
40+
<dependency>
41+
<groupId>org.apache.spark</groupId>
42+
<artifactId>spark-core_2.11</artifactId>
43+
<version>2.3.1</version>
44+
</dependency>
45+
<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
46+
<!-- https://mvnrepository.com/artifact/junit/junit -->
47+
<dependency>
48+
<groupId>junit</groupId>
49+
<artifactId>junit</artifactId>
50+
<scope>test</scope>
51+
</dependency>
52+
</dependencies>
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-shade-plugin</artifactId>
58+
59+
<executions>
60+
<!-- Run shade goal on package phase -->
61+
<execution>
62+
<phase>package</phase>
63+
<goals>
64+
<goal>shade</goal>
65+
</goals>
66+
<configuration>
67+
<transformers>
68+
<!--add Main-Class to manifest file -->
69+
<transformer
70+
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
71+
<manifestEntries>
72+
<Main-Class>com.spark.kafka.task.SparkKafkaApplication</Main-Class>
73+
<Build-Number>1</Build-Number>
74+
</manifestEntries>
75+
</transformer>
76+
</transformers>
77+
<filters>
78+
<filter>
79+
<artifact>*:*</artifact>
80+
<excludes>
81+
<exclude>META-INF/*.SF</exclude>
82+
<exclude>META-INF/*.DSA</exclude>
83+
<exclude>META-INF/*.RSA</exclude>
84+
</excludes>
85+
</filter>
86+
</filters>
87+
</configuration>
88+
</execution>
89+
</executions>
90+
</plugin>
91+
92+
</plugins>
93+
</build>
94+
</project>
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.spark.kafka.task;
2+
3+
public class SparkKafkaApplication {
4+
5+
public static void main(String[] args) {
6+
7+
}
8+
9+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package com.spark.kafka.task.client;
2+
3+
import java.util.Objects;
4+
5+
import org.apache.spark.sql.Dataset;
6+
import org.apache.spark.sql.Row;
7+
import org.apache.spark.sql.SparkSession;
8+
import org.apache.spark.sql.streaming.StreamingQuery;
9+
10+
import com.spark.kafka.task.exceptions.DataSetFailoverException;
11+
import com.spark.kafka.task.exceptions.SparkSessionFailoverException;
12+
13+
/**
14+
* @author Shahzad Masud
15+
*
16+
* In this example we are going to use spark structured streaming, The
17+
* difference between spark streaming and spark structured streaming is
18+
* that structured streaming does not use any concept of micro-batches
19+
* like spark streaming, instead it's architecture is more likely
20+
* towards real streaming where data is poll after some
21+
* duration/interval and result is appended in a unbounded table.
22+
*
23+
* where as spark streaming use a concept of batches where record
24+
* belongs to a batch of DStream
25+
*
26+
*/
27+
public class SSClient {
28+
29+
private static final String KAFKA_FORMAT = "kafka";
30+
private static final String SUBSCRIBER_KEY = "subscribe";
31+
private static String TOPIC_NAME = "MTKAF";
32+
private static final String KAFKA_BOOTSTRAP_SERVER_KEY = "kafka.bootstrap.servers";
33+
private static final int BROKER_PORT_NUMBER = 6667;
34+
private static final String BROKER_DOMAIN_NAME = "sandbox-hdp.hortonworks.com";
35+
private static final String SPAKR_STREAM_STARTING_OFFSET_KEY = "startingOffsets";
36+
private static final String SPARK_STREAM_STARTING_OFFSET_BEGINNING = "earliest";
37+
38+
private static final String MASTER_PATH = "local[*]";
39+
private static final String SPARK_SQL_STREAMING_CHECKPOINT_LOCATION_CONFIG = "spark.sql.streaming.checkpointLocation";
40+
private static final String SPARK_SQL_STREAMING_CHECKPOINT_LOCATION = "/user/sparktest/checkpoints";
41+
private static SSClient ssClient = null;
42+
private SparkSession sparkSession = null;
43+
private Dataset<Row> datasets;
44+
45+
public SSClient initSpark() {
46+
try {
47+
sparkSession = SparkSession.builder().appName(SSClient.class.getName()).master(MASTER_PATH)
48+
.config(SPARK_SQL_STREAMING_CHECKPOINT_LOCATION_CONFIG, SPARK_SQL_STREAMING_CHECKPOINT_LOCATION)
49+
.getOrCreate();
50+
51+
} catch (Exception e) {
52+
throw new SparkSessionFailoverException(e.getMessage(), e);
53+
}
54+
return ssClient;
55+
}
56+
57+
public SSClient loadDataFromKafka() {
58+
try {
59+
datasets = sparkSession.readStream().format(KAFKA_FORMAT)
60+
.option(KAFKA_BOOTSTRAP_SERVER_KEY, BROKER_DOMAIN_NAME + ":" + BROKER_PORT_NUMBER)
61+
.option(SUBSCRIBER_KEY, TOPIC_NAME)
62+
.option(SPAKR_STREAM_STARTING_OFFSET_KEY, SPARK_STREAM_STARTING_OFFSET_BEGINNING).load();
63+
} catch (Exception e) {
64+
throw new DataSetFailoverException(e.getMessage(), e);
65+
}
66+
return ssClient;
67+
}
68+
69+
public void writeData() {
70+
datasets = datasets.selectExpr("CAST(value AS STRING)");
71+
Dataset<String> dataAsJson = datasets.toJSON();
72+
StreamingQuery query = dataAsJson.writeStream()
73+
.format("console")
74+
.outputMode("complete")
75+
.start();
76+
77+
query.awaitTermination();
78+
}
79+
80+
private SSClient() {
81+
82+
}
83+
84+
public static SSClient getInstance() {
85+
if (Objects.isNull(ssClient)) {
86+
synchronized (SSClient.class) {
87+
if (Objects.isNull(ssClient)) {
88+
ssClient = new SSClient();
89+
}
90+
}
91+
}
92+
return ssClient;
93+
}
94+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.spark.kafka.task.exceptions;
2+
3+
public class DataSetFailoverException extends RuntimeException {
4+
5+
/**
6+
*
7+
*/
8+
private static final long serialVersionUID = -2009928124293947716L;
9+
10+
public DataSetFailoverException(String message) {
11+
super(message);
12+
}
13+
14+
public DataSetFailoverException(String message, Throwable e) {
15+
super(message, e);
16+
}
17+
18+
19+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.spark.kafka.task.exceptions;
2+
3+
public class SparkSessionFailoverException extends RuntimeException {
4+
5+
/**
6+
*
7+
*/
8+
private static final long serialVersionUID = -5511413755856786260L;
9+
10+
public SparkSessionFailoverException(String message) {
11+
super(message);
12+
}
13+
14+
public SparkSessionFailoverException(String message, Throwable e) {
15+
super(message, e);
16+
}
17+
18+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.spark.kafka.task;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.springframework.boot.test.context.SpringBootTest;
5+
6+
@SpringBootTest
7+
class SparkKafkaApplicationTests {
8+
9+
@Test
10+
void contextLoads() {
11+
}
12+
13+
}

0 commit comments

Comments
 (0)