-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMysqlAsSource.py
More file actions
28 lines (23 loc) · 841 Bytes
/
MysqlAsSource.py
File metadata and controls
28 lines (23 loc) · 841 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import json
from kafka import KafkaProducer
import pymysql.cursors
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
connection = pymysql.connect(host='127.0.0.1',
user='root',
password='root',
db='datasource',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
cursor = connection.cursor()
sql = "Select * from emp"
cursor.execute(sql)
rows = cursor.fetchall()
data = ""
for row in rows:
data = str(row['empno']) + '|' + row['ename'] + '|' + row['job']
producer.send('test', json.dumps(data).encode('utf-8'))
data = ""
cursor.close()
connection.close()
# configure multiple retries
producer = KafkaProducer(retries=5)