-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_job.py
More file actions
24 lines (18 loc) · 1.26 KB
/
streaming_job.py
File metadata and controls
24 lines (18 loc) · 1.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
attr_stream_data = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaLocation",
"dbfs:/FileStore/datasets/attr_source_stream") \
.option("cloudFiles.schemaHints",
"""Age int, DailyRate int, DistanceFromHome int,
HourlyRate int, JobLevel int, JobSatisfaction int, MonthlyIncome int,
PercentSalaryHike int, PerformanceRating int, YearsSinceLastPromotion int,
YearsWithCurrManager int""")\
.load("dbfs:/FileStore/datasets/attr_source_stream")
attr_stream_subset = attr_stream_data.select("Age", "Gender", "BusinessTravel", "JobSatisfaction", "PerformanceRating")\
.filter("PerformanceRating > 3")
attr_stream_subset.writeStream \
.option("mergeSchema", "true") \
.format("csv") \
.option("checkpointLocation",
"dbfs:/FileStore/datasets/attr_dest_location/checkpoint_dir") \
.start("dbfs:/FileStore/datasets/attr_dest_location/")