Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
Kafka Python client
------------------------

.. image:: https://img.shields.io/badge/kafka-4.0--0.8-brightgreen.svg
.. image:: https://img.shields.io/badge/kafka-4.3--0.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python.svg
Expand All @@ -18,27 +16,48 @@ Kafka Python client



Python client for the Apache Kafka distributed stream processing system.
kafka-python is designed to function much like the official java client, with a
sprinkling of pythonic interfaces (e.g., consumer iterators).
kafka-python is a pure-python client library for Apache Kafka, the distributed
stream processing engine. It has no external dependencies and no Cython/C/rust
core, making installation across a wide variety of environments simple and easy
to manage.

Please note that the master branch may contain unreleased features. For release
documentation, please see readthedocs and/or python's inline help.
kafka-python can also be used as a simple alternative to the apache kafka admin
scripts, which require an installed/compatible jvm. A simple CLI interface for
admin commands is provided as `kafka-python admin` / `python -m kafka.admin`.

Users looking to add more raw throughput can pip install `crc32c` as
an optional dependency, offloading one of the most CPU intensive subsystems
to an optimized C library.

New in 2.3 release: python -m kafka.* interfaces for quick scripts and testing.

.. code-block:: bash

$ pip install kafka-python
pip install kafka-python
# callable as module or as cli-script
kafka-python admin -b localhost:9092 cluster describe
python -m kafka.admin -b localhost:9092 topics create -t foo-topic
echo "foo message" | python -m kafka.producer -b localhost:9092 -t foo-topic
python -m kafka.consumer -b localhost:9092 -C auto_offset_reset=earliest -g foo-group -t foo-topic


What's New in 3.0
*****************

- Protocol Stack dynamically generated from Apache Kafka json message schemas.
- Encode/decode performance optimizations with compiled/cached python bytecode.
- Expanded KIP feature support, including Cooperative Rebalance (KIP-429),
Rack-aware Fetch (KIP-392), Log-Truncation detection (KIP-320), Transactional
Producer improvements (KIP-360, KIP-447, KIP-654), Sticky Partitioner (KIP-480),
and splittting oversized producer batches (KIP-126).
- Full refactor and expansion of KafkaAdminClient.
- Networking changes to leverage kafka.net event-loop and async/await syntax.
- Python 3.8+ required

KafkaConsumer
*************

KafkaConsumer is a high-level message consumer, intended to operate as similarly
as possible to the official java client. Full support for coordinated
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.

as possible to the official java client.
See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
for API and configuration details.

Expand Down Expand Up @@ -185,25 +204,15 @@ Access via ``python -m kafka.consumer``, ``python -m kafka.producer``, and ``pyt
See https://kafka-python.readthedocs.io/en/master/usage.html for more details.


Thread safety
*************

The KafkaProducer can be used across threads without issue, unlike the
KafkaConsumer which cannot.

While it is possible to use the KafkaConsumer in a thread-local manner,
multiprocessing is recommended.


Compression
***********

kafka-python supports the following compression formats:

- gzip
- LZ4
- Snappy
- Zstandard (zstd)
- gzip (via stdlib)
- LZ4 (via `python-lz4`, `lz4tools`, or `py-lz4framed`)
- Snappy (via `python-snappy`)
- Zstandard (via `python-zstandard`)

gzip is supported natively, the others require installing additional libraries.
See https://kafka-python.readthedocs.io/en/master/install.html for more information.
Expand All @@ -224,10 +233,9 @@ Protocol

A secondary goal of kafka-python is to provide an easy-to-use protocol layer
for interacting with kafka brokers via the python repl. This is useful for
testing, probing, and general experimentation. The protocol support is
leveraged to enable a KafkaClient.check_version() method that
probes a kafka broker and attempts to identify which version it is running
(0.8.0 to 2.6+).
testing, probing, and general experimentation. In version 3.0 the protocol
layer was re-written to generate encoder/decoder classes using json message
definitions imported directly from the Apache Kafka project source.


Debugging
Expand Down
76 changes: 44 additions & 32 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,40 +1,62 @@
kafka-python
############

.. image:: https://img.shields.io/badge/kafka-4.0--0.8-brightgreen.svg
.. image:: https://img.shields.io/badge/kafka-4.3--0.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
.. image:: https://img.shields.io/github/actions/workflow/status/dpkp/kafka-python/python-package.yml
:target: https://github.com/dpkp/kafka-python/actions/workflows/python-package.yml
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python.svg
:target: https://pypistats.org/packages/kafka-python
.. image:: https://img.shields.io/pypi/v/kafka-python.svg
:target: https://pypi.org/project/kafka-python
.. image:: https://img.shields.io/pypi/implementation/kafka-python
:target: https://github.com/dpkp/kafka-python/blob/master/pyproject.toml

Python client for the Apache Kafka distributed stream processing system.
kafka-python is designed to function much like the official java client, with a
sprinkling of pythonic interfaces (e.g., consumer iterators).

Please note that the master branch may contain unreleased features. For release
documentation, please see readthedocs and/or python's inline help.

New in 2.3 release: python -m kafka.* interfaces for quick scripts and testing.
kafka-python is a pure-python client library for Apache Kafka, the distributed
stream processing engine. It has no external dependencies and no Cython/C/rust
core, making installation across a wide variety of environments simple and easy
to manage. Users looking to add more raw throughput can pip install `crc32c` as
an optional dependency, offloading one of the most CPU intensive subsystems
to an optimized C library.

kafka-python can also be used as a simple alternative to the apache kafka admin
scripts, which require an installed/compatible jvm. A simple CLI interface for
admin commands is provided as `kafka-python admin` / `python -m kafka.admin`.

.. code:: bash

pip install kafka-python

# callable as module or as cli-script
kafka-python admin -b localhost:9092 cluster describe
python -m kafka.admin -b localhost:9092 topics create -t foo-topic
echo "foo message" | python -m kafka.producer -b localhost:9092 -t foo-topic
python -m kafka.consumer -b localhost:9092 -C auto_offset_reset=earliest -g foo-group -t foo-topic


What's New in 3.0
*****************

- Protocol Stack dynamically generated from Apache Kafka json message schemas.
- Encode/decode performance optimizations with compiled/cached python bytecode.
- Expanded KIP feature support, including Cooperative Rebalance (KIP-429),
Rack-aware Fetch (KIP-392), Log-Truncation detection (KIP-320), Transactional
Producer improvements (KIP-360, KIP-447, KIP-654), Sticky Partitioner (KIP-480),
and splittting oversized producer batches (KIP-126).
- Full refactor and expansion of KafkaAdminClient.
- Networking changes to leverage kafka.net event-loop and async/await syntax.
- Python 3.8+ required

KafkaConsumer
*************

:class:`~kafka.KafkaConsumer` is a high-level message consumer, intended to
operate as similarly as possible to the official java client. Full support
for coordinated consumer groups requires use of kafka brokers that support the
Group APIs: kafka v0.9+.

See `KafkaConsumer <apidoc/KafkaConsumer.html>`_ for API and configuration details.
:class:`~kafka.KafkaConsumer` is a high-level message consumer, intended to operate
as similarly as possible to the official java client.
See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples
that expose basic message attributes: topic, partition, offset, key, and value:
Expand Down Expand Up @@ -178,25 +200,15 @@ Access via ``python -m kafka.consumer``, ``python -m kafka.producer``, and ``pyt
See `Usage <usage.html>`_ for more details.


Thread safety
*************

The KafkaProducer can be used across threads without issue, unlike the
KafkaConsumer which cannot.

While it is possible to use the KafkaConsumer in a thread-local manner,
multiprocessing is recommended.


Compression
***********

kafka-python supports the following compression formats:

- gzip
- LZ4
- Snappy
- Zstandard (zstd)
- gzip (via stdlib)
- LZ4 (via `python-lz4`, `lz4tools`, or `py-lz4framed`)
- Snappy (via `python-snappy`)
- Zstandard (via `python-zstandard`)

gzip is supported natively, the others require installing additional libraries.
See `Install <install.html>`_ for more information.
Expand Down
33 changes: 17 additions & 16 deletions docs/tests.rst
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
Tests
=====

.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
.. image:: https://img.shields.io/github/actions/workflow/status/dpkp/kafka-python/python-package.yml
:target: https://github.com/dpkp/kafka-python/actions/workflows/python-package.yml

The test suite is run via pytest.

Linting is run via pylint, but is currently skipped during CI/CD due to
accumulated debt. We'd like to transition to ruff!
Linting is run via pylint.

For test coverage details, see https://coveralls.io/github/dpkp/kafka-python
Coverage reporting is currently disabled as we have transitioned from travis
to GH Actions and have not yet re-enabled coveralls integration.
Test coverage details are currently published as an html build artifact.

The test suite includes unit tests that mock network interfaces, as well as
integration tests that setup and teardown kafka broker (and zookeeper)
fixtures for client / consumer / producer testing.
The test suite includes unit tests that mock network interfaces, mock broker tests
that simulate request/receive network messaging, as well as integration tests that
setup and teardown kafka broker (and zookeeper where required) fixtures.


Unit tests
Expand All @@ -34,19 +29,25 @@ Then simply run pytest (or make test) from your preferred python + virtualenv.
.. code:: bash

# run protocol tests only (via pytest)
pytest test/test_protocol.py
pytest test/protocol/

# Run conn tests only (via make)
PYTESTS=test/test_conn.py make test
# Run connection tests only (via make)
PYTESTS=test/net/test_connection.py make test


Integration tests
-----------------

.. code:: bash

KAFKA_VERSION=4.0.0 make test
# Download new broker files
KAFKA_VERSION=4.3.0 make servers/4.3.0/kafka-bin
# Run tests for previously-installed broker version
KAFKA_VERSION=4.3.0 pytest -v test/integration/
# Or install + run all tests
KAFKA_VERSION=4.3.0 make test


Integration tests start Kafka and Zookeeper fixtures. Make will download
kafka server binaries automatically if needed.
Integration tests start Kafka (and Zookeeper where required) fixtures. These
require a functioning java install. Make will download the kafka server binaries
automatically if needed.
8 changes: 6 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,12 @@ class KafkaAdminClient(
this CRL. Default: None.
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will infer the broker version from the results of
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
various known APIs. Different versions enable different functionality.
ApiVersionsRequest API. For brokers earlier than 0.10, which do not
support the ApiVersionsRequest API, api_version is required.
Note: Dynamic version checking is performed eagerly during __init__
and can raise KafkaTimeoutError if no connection can be made before
timeout (see bootstrap_timeout_ms below).
Different versions enable different functionality.

Examples:
(4, 2) most recent broker release, enable all supported features
Expand Down
8 changes: 6 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,12 @@ class KafkaConsumer:
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will infer the broker version from the results of
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
various known APIs. Different versions enable different functionality.
ApiVersionsRequest API. For brokers earlier than 0.10, which do not
support the ApiVersionsRequest API, api_version is required.
Note: Dynamic version checking is performed eagerly during __init__
and can raise KafkaTimeoutError if no connection can be made before
timeout (see bootstrap_timeout_ms below).
Different versions enable different functionality.

Examples:
(4, 2) most recent broker release, enable all supported features
Expand Down
8 changes: 6 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,12 @@ class KafkaProducer:
an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
api_version (tuple): Specify which Kafka API version to use. If set to
None, the client will infer the broker version from the results of
ApiVersionsRequest API or, for brokers earlier than 0.10, probing
various known APIs. Different versions enable different functionality.
ApiVersionsRequest API. For brokers earlier than 0.10, which do not
support the ApiVersionsRequest API, api_version is required.
Note: Dynamic version checking is performed eagerly during __init__
and can raise KafkaTimeoutError if no connection can be made before
timeout (see bootstrap_timeout_ms below).
Different versions enable different functionality.

Examples:
(4, 2) most recent broker release, enable all supported features
Expand Down
Loading