Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.14.0
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
- name: LIBRDKAFKA_BRANCH
Comment thread
pranavrth marked this conversation as resolved.
value: dev_kip-932_queues-for-kafka
prologue:
commands:
- checkout
Expand All @@ -32,6 +35,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand All @@ -57,6 +61,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
Expand All @@ -80,6 +85,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand Down Expand Up @@ -107,6 +113,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
Expand All @@ -128,6 +135,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand All @@ -149,6 +157,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand Down Expand Up @@ -181,6 +190,7 @@ blocks:
commands:
- sem-version python 3.11.9
- bash tools/mingw-w64/semaphore_commands.sh
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- bash tools/wheels/install-librdkafka.sh $env:LIBRDKAFKA_VERSION.TrimStart("v") dest
- tools/wheels/build-wheels.bat x64 win_amd64 dest wheelhouse
- tar -czf wheelhouse-windows-${Env:ARCH}.tgz wheelhouse
Expand All @@ -199,10 +209,15 @@ blocks:
- export ARCH=x64
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- sudo apt-get update -qq && sudo apt-get install -y -qq clang-format
# TODO KIP-932: matrix the Python version (3.12, 3.13, 3.14)
# across these jobs. Currently every job pins 3.11, which leaves
# version-specific issues in the C extension undetected until users hit
# them.
jobs:
- name: Build and Tests with 'classic' group protocol
commands:
- sem-version python 3.11
- sem-version java 17
- pip install uv
# use a virtualenv
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
Expand All @@ -221,6 +236,7 @@ blocks:
- name: Build, Test, and Report coverage
commands:
- sem-version python 3.11
- sem-version java 17
- pip install uv
# use a virtualenv
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
Expand All @@ -245,6 +261,7 @@ blocks:
commands:
- export ARCH=arm64
- sem-version python 3.11
- sem-version java 17
- pip install uv
# use a virtualenv
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
Expand Down Expand Up @@ -319,9 +336,15 @@ blocks:
# Install existing test requirements
- uv pip install -r requirements/requirements-tests-install.txt

# TODO KIP-932: Remove LIBRDKAFKA_BRANCH fallback once LIBRDKAFKA_VERSION includes share consumer support
# Build and install confluent-kafka from source
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- |
Comment thread
pranavrth marked this conversation as resolved.
if [[ -n $LIBRDKAFKA_BRANCH ]]; then
tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest
else
tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
fi
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
Expand Down
76 changes: 76 additions & 0 deletions examples/share_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python
#
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

#
# Example KIP-932 ShareConsumer.
#
# A share consumer reads from one or more topics like a queue: many consumers
# in the same share group can read the same partition, and each record is
# acknowledged implicitly.
#
from confluent_kafka import KafkaException, ShareConsumer


def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
sys.exit(1)


if __name__ == '__main__':
if len(sys.argv) < 4:
print_usage_and_exit(sys.argv[0])

broker = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3:]

# ShareConsumer configuration.
# See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': broker,
'group.id': group,
'auto.offset.reset': 'earliest',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no auto.offset.reset property in share consumer.

}

sc = ShareConsumer(conf)
sc.subscribe(topics)

try:
while True:
messages = sc.poll(timeout=1.0) # returns a list (possibly empty)
for msg in messages:
# This example is fail-fast on per-message
# errors — the first error terminates the consumer. For a long-running queue worker, prefer
# logging msg.error() and `continue` so transient delivery
# errors don't kill the process.
if msg.error():
raise KafkaException(msg.error())
Comment on lines +62 to +63
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't throw exception in this case. We should log and move on. These errors are mostly informational. We only fail in FATAL errors.

sys.stderr.write(
'%% %s [%d] at offset %d with key %s:\n'
% (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
)
print(msg.value())
# Implicit ack: the next poll() acknowledges this message.
# If we crash before the next poll, the broker will
# redeliver this record to another consumer in the share
# group after the acquisition lock expires.
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
sc.close()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
os.path.join(ext_dir, 'confluent_kafka.c'),
os.path.join(ext_dir, 'Producer.c'),
os.path.join(ext_dir, 'Consumer.c'),
os.path.join(ext_dir, 'ShareConsumer.c'),
os.path.join(ext_dir, 'Metadata.c'),
os.path.join(ext_dir, 'AdminTypes.c'),
os.path.join(ext_dir, 'Admin.c'),
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an example file examples/share_consumer.py.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Consumer,
Message,
Producer,
ShareConsumer,
TopicPartition,
Uuid,
consistent,
Expand All @@ -54,6 +55,7 @@
__all__ = [
"admin",
"Consumer",
"ShareConsumer",
"aio",
"KafkaError",
"KafkaException",
Expand Down
33 changes: 33 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class KafkaError:
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int
DUPLICATE_RESOURCE: int
DUPLICATE_SEQUENCE_NUMBER: int
DUPLICATE_VOTER: int
Comment thread
pranavrth marked this conversation as resolved.
ELECTION_NOT_NEEDED: int
ELIGIBLE_LEADERS_NOT_AVAILABLE: int
FEATURE_UPDATE_FAILED: int
FENCED_INSTANCE_ID: int
FENCED_LEADER_EPOCH: int
FENCED_MEMBER_EPOCH: int
FENCED_STATE_EPOCH: int
FETCH_SESSION_ID_NOT_FOUND: int
GROUP_AUTHORIZATION_FAILED: int
GROUP_ID_NOT_FOUND: int
Expand All @@ -89,20 +91,26 @@ class KafkaError:
INVALID_PRODUCER_EPOCH: int
INVALID_PRODUCER_ID_MAPPING: int
INVALID_RECORD: int
INVALID_RECORD_STATE: int
INVALID_REGISTRATION: int
INVALID_REGULAR_EXPRESSION: int
INVALID_REPLICATION_FACTOR: int
INVALID_REPLICA_ASSIGNMENT: int
INVALID_REQUEST: int
INVALID_REQUIRED_ACKS: int
INVALID_SHARE_SESSION_EPOCH: int
INVALID_SESSION_TIMEOUT: int
INVALID_TIMESTAMP: int
INVALID_TRANSACTION_TIMEOUT: int
INVALID_TXN_STATE: int
INVALID_UPDATE_VERSION: int
INVALID_VOTER_KEY: int
KAFKA_STORAGE_ERROR: int
LEADER_NOT_AVAILABLE: int
LISTENER_NOT_FOUND: int
LOG_DIR_NOT_FOUND: int
MEMBER_ID_REQUIRED: int
MISMATCHED_ENDPOINT_TYPE: int
MSG_SIZE_TOO_LARGE: int
NETWORK_EXCEPTION: int
NON_EMPTY_GROUP: int
Expand Down Expand Up @@ -131,19 +139,26 @@ class KafkaError:
RESOURCE_NOT_FOUND: int
SASL_AUTHENTICATION_FAILED: int
SECURITY_DISABLED: int
SHARE_SESSION_LIMIT_REACHED: int
SHARE_SESSION_NOT_FOUND: int
STALE_BROKER_EPOCH: int
STALE_CTRL_EPOCH: int
STALE_MEMBER_EPOCH: int
STREAMS_INVALID_TOPOLOGY: int
STREAMS_INVALID_TOPOLOGY_EPOCH: int
STREAMS_TOPOLOGY_FENCED: int
TELEMETRY_TOO_LARGE: int
THROTTLING_QUOTA_EXCEEDED: int
TOPIC_ALREADY_EXISTS: int
TOPIC_AUTHORIZATION_FAILED: int
TOPIC_DELETION_DISABLED: int
TOPIC_EXCEPTION: int
TRANSACTION_ABORTABLE: int
TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int
TRANSACTION_COORDINATOR_FENCED: int
UNACCEPTABLE_CREDENTIAL: int
UNKNOWN: int
UNKNOWN_CONTROLLER_ID: int
UNKNOWN_LEADER_EPOCH: int
UNKNOWN_MEMBER_ID: int
UNKNOWN_PRODUCER_ID: int
Expand All @@ -154,9 +169,11 @@ class KafkaError:
UNSTABLE_OFFSET_COMMIT: int
UNSUPPORTED_ASSIGNOR: int
UNSUPPORTED_COMPRESSION_TYPE: int
UNSUPPORTED_ENDPOINT_TYPE: int
UNSUPPORTED_FOR_MESSAGE_FORMAT: int
UNSUPPORTED_SASL_MECHANISM: int
UNSUPPORTED_VERSION: int
VOTER_NOT_FOUND: int
_ALL_BROKERS_DOWN: int
_APPLICATION: int
_ASSIGNMENT_LOST: int
Expand Down Expand Up @@ -535,6 +552,22 @@ class Consumer:
def memberid(self) -> str: ...
def set_sasl_credentials(self, username: str, password: str) -> None: ...

class ShareConsumer:
"""Share Consumer for queue-like message consumption (KIP-932)."""
@overload
def __init__(self, config: Dict[str, Any]) -> None: ...
@overload
def __init__(self, config: Dict[str, Any], /, **kwargs: Any) -> None: ...
@overload
def __init__(self, **config: Any) -> None: ...
def subscribe(self, topics: List[str]) -> None: ...
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
def poll(self, timeout: float = -1) -> List[Message]: ...
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be consume_batch

def close(self) -> None: ...
def __enter__(self) -> "ShareConsumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> Self: ...
Expand Down
Loading
Loading