-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathavro_producer.py
More file actions
40 lines (34 loc) · 970 Bytes
/
avro_producer.py
File metadata and controls
40 lines (34 loc) · 970 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import time
value_schema_str = """
{
"namespace": "hellokoding.kafka",
"name": "value",
"type": "record",
"fields" : [
{"name" : "browser", "type" : "string"},
{"name" : "created_at", "type" : "int", "logicalType": "date"}
]
}
"""
key_schema_str = """
{
"namespace": "hellokoding.kafka",
"name": "key",
"type": "record",
"fields" : [
{"name" : "url", "type" : "string"}
]
}
"""
value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"browser": "Chrome", "created_at": int(time.time())}
key = {"url": "http://localhost:8081"}
avroProducer = AvroProducer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='page_1', value=value, key=key)
avroProducer.flush()