-
-
Notifications
You must be signed in to change notification settings - Fork 8.1k
Expand file tree
/
Copy pathstreaming.py
More file actions
115 lines (86 loc) · 4.14 KB
/
streaming.py
File metadata and controls
115 lines (86 loc) · 4.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from settings import RIDE_SCHEMA, CONSUME_TOPIC_RIDES_CSV, TOPIC_WINDOWED_VENDOR_ID_COUNT
def read_from_kafka(consume_topic: str):
# Spark Streaming DataFrame, connect to Kafka topic served at host in bootrap.servers option
df_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
.option("subscribe", consume_topic) \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "checkpoint") \
.load()
return df_stream
def parse_ride_from_kafka_message(df, schema):
""" take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
assert df.isStreaming is True, "DataFrame doesn't receive streaming data"
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# split attributes to nested array in one Column
col = F.split(df['value'], ', ')
# expand col to multiple top-level columns
for idx, field in enumerate(schema):
df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
return df.select([field.name for field in schema])
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
write_query = df.writeStream \
.outputMode(output_mode) \
.trigger(processingTime=processing_time) \
.format("console") \
.option("truncate", False) \
.start()
return write_query # pyspark.sql.streaming.StreamingQuery
def sink_memory(df, query_name, query_template):
query_df = df \
.writeStream \
.queryName(query_name) \
.format("memory") \
.start()
query_str = query_template.format(table_name=query_name)
query_results = spark.sql(query_str)
return query_results, query_df
def sink_kafka(df, topic):
write_query = df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
.outputMode('complete') \
.option("topic", topic) \
.option("checkpointLocation", "checkpoint") \
.start()
return write_query
def prepare_df_to_kafka_sink(df, value_columns, key_column=None):
columns = df.columns
df = df.withColumn("value", F.concat_ws(', ', *value_columns))
if key_column:
df = df.withColumnRenamed(key_column, "key")
df = df.withColumn("key", df.key.cast('string'))
return df.select(['key', 'value'])
def op_groupby(df, column_names):
df_aggregation = df.groupBy(column_names).count()
return df_aggregation
def op_windowed_groupby(df, window_duration, slide_duration):
df_windowed_aggregation = df.groupBy(
F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration),
df.vendor_id
).count()
return df_windowed_aggregation
if __name__ == "__main__":
spark = SparkSession.builder.appName('streaming-examples').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
# read_streaming data
df_consume_stream = read_from_kafka(consume_topic=CONSUME_TOPIC_RIDES_CSV)
print(df_consume_stream.printSchema())
# parse streaming data
df_rides = parse_ride_from_kafka_message(df_consume_stream, RIDE_SCHEMA)
print(df_rides.printSchema())
sink_console(df_rides, output_mode='append')
df_trip_count_by_vendor_id = op_groupby(df_rides, ['vendor_id'])
df_trip_count_by_pickup_date_vendor_id = op_windowed_groupby(df_rides, window_duration="10 minutes",
slide_duration='5 minutes')
# write the output out to the console for debugging / testing
sink_console(df_trip_count_by_vendor_id)
# write the output to the kafka topic
df_trip_count_messages = prepare_df_to_kafka_sink(df=df_trip_count_by_pickup_date_vendor_id,
value_columns=['count'], key_column='vendor_id')
kafka_sink_query = sink_kafka(df=df_trip_count_messages, topic=TOPIC_WINDOWED_VENDOR_ID_COUNT)
spark.streams.awaitAnyTermination()