-
Notifications
You must be signed in to change notification settings - Fork 49
Expand file tree
/
Copy pathdatastream-kafka-connector-example.py
More file actions
105 lines (79 loc) · 4.06 KB
/
datastream-kafka-connector-example.py
File metadata and controls
105 lines (79 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# This is a sample Python script.
import logging
from pyflink.common import Row
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeContext, MapFunction
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.common.serialization import JsonRowDeserializationSchema, SimpleStringSchema, JsonRowSerializationSchema
from pyflink.datastream.state import ValueStateDescriptor
import os
import json
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda
env = StreamExecutionEnvironment.get_execution_environment()
is_local = (
True if os.environ.get("IS_LOCAL") else False
) # set this env var in your local environment
if is_local:
# only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" # local
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
env.add_jars(
f"file:///{CURRENT_DIR}/lib/flink-connector-kafka-1.15.2.jar",f"file:///{CURRENT_DIR}/lib/kafka-clients-3.3.1.jar"
)
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
class DeviceAggregation(MapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = ValueStateDescriptor('device_count', Types.INT())
self.device_count = runtime_context.get_state(state_desc)
def map(self, value):
count = self.device_count.value()
if count is None:
count=0
count = count+1
self.device_count.update(count)
return Row(value[0], count)
def demo_flink_json():
input_topic_key = "input.topic"
bootstrap_server_key = "bootstrap.servers"
output_topic_key="output.topic"
consumer_group_id="group.id"
consumer_property_group_key = "consumer.config.0"
producer_property_group_key = "producer.config.0"
props = get_application_properties()
consumer_property_map = property_map(props, consumer_property_group_key)
producer_property_map = property_map(props, producer_property_group_key)
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW_NAMED(["device_type","session_id"],[Types.STRING(), Types.STRING()])).ignore_parse_errors().build()
serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
type_info=Types.ROW_NAMED(["device_type","total_count"],[Types.STRING(), Types.INT()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics=consumer_property_map[input_topic_key],
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': consumer_property_map[bootstrap_server_key], 'group.id': consumer_property_map[consumer_group_id]}
)
kafka_consumer.set_start_from_latest()
kafka_producer = FlinkKafkaProducer(
topic=producer_property_map[output_topic_key],
serialization_schema=serialization_schema,
producer_config={'bootstrap.servers': producer_property_map[bootstrap_server_key]})
ds = env.add_source(kafka_consumer)
ds = ds.key_by(lambda x: x[0]).map(DeviceAggregation(), output_type=Types.ROW_NAMED(["device_type","total_count"],[Types.STRING(), Types.INT()]))
ds.add_sink(kafka_producer)
env.execute()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
demo_flink_json()
# See PyCharm help at https://www.jetbrains.com/help/pycharm/