@@ -15,11 +15,13 @@ kafka-python
1515kafka-python is a pure-python client library for Apache Kafka, the distributed
1616stream processing engine. It has no external dependencies and no Cython/C/rust
1717core, making installation across a wide variety of environments simple and easy
18- to manage.
18+ to manage. It provides high-level class components for consumer, producer, and
19+ admin clients, as well as CLI scripts for quick interactive tasks.
1920
20- kafka-python can also be used as a simple alternative to the apache kafka admin
21- scripts, which require an installed/compatible jvm. A simple CLI interface for
22- admin commands is provided as ``kafka-python admin `` / ``python -m kafka.admin ``.
21+ ``kafka-python admin `` serves as a simple alternative to the apache kafka bin/
22+ scripts, particularly if/when you do not have easy access to an
23+ installed/compatible jvm. The CLI interface for admin commands is provided as
24+ ``kafka-python admin `` and ``python -m kafka.admin ``.
2325
2426Users looking to add more raw throughput can ``pip install crc32c `` as
2527an optional dependency, offloading one of the most CPU intensive subsystems
@@ -85,30 +87,37 @@ that expose basic message attributes: topic, partition, offset, key, and value:
8587 .. code-block :: python
8688
8789 # manually assign the partition list for the consumer
88- from kafka import TopicPartition
90+ from kafka import KafkaConsumer, TopicPartition
8991 consumer = KafkaConsumer(bootstrap_servers = ' localhost:1234' )
9092 consumer.assign([TopicPartition(' foobar' , 2 )])
9193 msg = next (consumer)
9294
95+ Keys and Values returned by KafkaConsumer will be raw bytes by default. Use a
96+ ``value_deserializer `` to automatically decode into something else. Helpers
97+ are available for simple utf-8 string decoding (``DefaultSerializer ``) and
98+ json (``JsonSerializer ``).
99+
93100.. code-block :: python
94101
95- # Deserialize msgpack-encoded values
96- consumer = KafkaConsumer(value_deserializer = msgpack.loads)
97- consumer.subscribe([' msgpackfoo' ])
102+ # Deserialize json-encoded values
103+ from kafka import KafkaConsumer, JsonSerializer
104+ consumer = KafkaConsumer(value_deserializer = JsonSerializer())
105+ consumer.subscribe([' json-foo' ])
98106 for msg in consumer:
99107 assert isinstance (msg.value, dict )
100108
101109 .. code-block :: python
102110
103- # Access record headers. The returned value is a list of tuples
104- # with str, bytes for key and value
111+ # Access record headers. The returned value is a list of
112+ # ( str, bytes) tuples, representing the header key and value.
105113 for msg in consumer:
106114 print (msg.headers)
107115
108116 .. code-block :: python
109117
110118 # Read only committed messages from transactional topic
111- consumer = KafkaConsumer(isolation_level = ' read_committed' )
119+ from kafka import KafkaConsumer, IsolationLevel
120+ consumer = KafkaConsumer(isolation_level = IsolationLevel.READ_COMMITTED )
112121 consumer.subscribe([' txn_topic' ])
113122 for msg in consumer:
114123 print (msg)
@@ -131,12 +140,14 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
131140 from kafka import KafkaProducer
132141 producer = KafkaProducer(bootstrap_servers = ' localhost:1234' )
133142 for _ in range (100 ):
143+ # Fire-and-forget: send() is async and returns before delivery
134144 producer.send(' foobar' , b ' some_message_bytes' )
135145
136146 .. code-block :: python
137147
138- # Block until a single message is sent (or timeout )
148+ # To check the status of an async message delivery, use .get( )
139149 future = producer.send(' foobar' , b ' another_message' )
150+ # future.get() will block until it can return the result or raise on error
140151 result = future.get(timeout = 60 )
141152
142153 .. code-block :: python
@@ -146,6 +157,9 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
146157 # only useful if you configure internal batching using linger_ms
147158 producer.flush()
148159
160+ Message keys are used to hash messages with the same key to the same
161+ partition. Both keys and values should be raw bytes unless a serializer is configured.
162+
149163.. code-block :: python
150164
151165 # Use a key for hashed-partitioning
@@ -154,23 +168,30 @@ client. See `KafkaProducer <apidoc/KafkaProducer.html>`_ for more details.
154168 .. code-block :: python
155169
156170 # Serialize json messages
157- import json
158- producer = KafkaProducer(value_serializer = lambda v : json.dumps(v).encode( ' utf-8 ' ))
171+ from kafka import KafkaProducer, JsonSerializer
172+ producer = KafkaProducer(value_serializer = JsonSerializer( ))
159173 producer.send(' fizzbuzz' , {' foo' : ' bar' })
160174
161175 .. code-block :: python
162176
163177 # Serialize string keys
164- producer = KafkaProducer(key_serializer = str .encode)
178+ from kafka import KafkaProducer, DefaultSerializer
179+ producer = KafkaProducer(key_serializer = DefaultSerializer())
165180 producer.send(' flipflap' , key = ' ping' , value = b ' 1234' )
166181
182+ Compression can be used to reduce message size on the wire. Gzip is supported
183+ via python stdlib. For other compression types you must install optional
184+ dependencies.
185+
167186.. code-block :: python
168187
169188 # Compress messages
170189 producer = KafkaProducer(compression_type = ' gzip' )
171190 for i in range (1000 ):
172191 producer.send(' foobar' , b ' msg %d ' % i)
173192
193+ KafkaProducer also supports transactions and message headers when needed.
194+
174195.. code-block :: python
175196
176197 # Use transactions
@@ -264,4 +285,5 @@ See https://docs.python.org/3/howto/logging.html for overview / howto.
264285 Supported KIPs <kips >
265286 support
266287 license
288+ Upgrading to 3.0 <upgrade_to_3_0 >
267289 changelog
0 commit comments