Skip to content

Commit a980019

Browse files
authored
[KIP-932] : Implement Share consumer interface with poll API (#2217)
1 parent a0496cf commit a980019

17 files changed

Lines changed: 2108 additions & 27 deletions

.semaphore/semaphore.yml

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ global_job_config:
99
env_vars:
1010
- name: LIBRDKAFKA_VERSION
1111
value: v2.14.0
12+
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
13+
- name: LIBRDKAFKA_BRANCH
14+
value: dev_kip-932_queues-for-kafka
1215
prologue:
1316
commands:
1417
- checkout
@@ -32,6 +35,7 @@ blocks:
3235
- export ARCH=x64
3336
- sem-version python 3.11
3437
- pip install uv
38+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
3539
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
3640
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
3741
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
@@ -57,6 +61,7 @@ blocks:
5761
- export ARCH=x64
5862
- sem-version python 3.13
5963
- pip install uv
64+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
6065
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
6166
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
6267
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
@@ -80,6 +85,7 @@ blocks:
8085
- export ARCH=arm64
8186
- sem-version python 3.11
8287
- pip install uv
88+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
8389
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
8490
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
8591
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
@@ -107,6 +113,7 @@ blocks:
107113
- export ARCH=arm64
108114
- sem-version python 3.13
109115
- pip install uv
116+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
110117
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
111118
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
112119
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
@@ -128,6 +135,7 @@ blocks:
128135
- export ARCH=arm64
129136
- sem-version python 3.13
130137
- pip install uv
138+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
131139
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
132140
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
133141
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
@@ -149,6 +157,7 @@ blocks:
149157
- export ARCH=x64
150158
- sem-version python 3.11
151159
- pip install uv
160+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
152161
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
153162
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
154163
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
@@ -181,6 +190,7 @@ blocks:
181190
commands:
182191
- sem-version python 3.11.9
183192
- bash tools/mingw-w64/semaphore_commands.sh
193+
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
184194
- bash tools/wheels/install-librdkafka.sh $env:LIBRDKAFKA_VERSION.TrimStart("v") dest
185195
- tools/wheels/build-wheels.bat x64 win_amd64 dest wheelhouse
186196
- tar -czf wheelhouse-windows-${Env:ARCH}.tgz wheelhouse
@@ -199,10 +209,15 @@ blocks:
199209
- export ARCH=x64
200210
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
201211
- sudo apt-get update -qq && sudo apt-get install -y -qq clang-format
212+
# TODO KIP-932: matrix the Python version (3.12, 3.13, 3.14)
213+
# across these jobs. Currently every job pins 3.11, which leaves
214+
# version-specific issues in the C extension undetected until users hit
215+
# them.
202216
jobs:
203217
- name: Build and Tests with 'classic' group protocol
204218
commands:
205219
- sem-version python 3.11
220+
- sem-version java 17
206221
- pip install uv
207222
# use a virtualenv
208223
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
@@ -221,6 +236,7 @@ blocks:
221236
- name: Build, Test, and Report coverage
222237
commands:
223238
- sem-version python 3.11
239+
- sem-version java 17
224240
- pip install uv
225241
# use a virtualenv
226242
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
@@ -245,6 +261,7 @@ blocks:
245261
commands:
246262
- export ARCH=arm64
247263
- sem-version python 3.11
264+
- sem-version java 17
248265
- pip install uv
249266
# use a virtualenv
250267
- uv venv _venv --python "$(command -v python)" && source _venv/bin/activate
@@ -319,9 +336,15 @@ blocks:
319336
# Install existing test requirements
320337
- uv pip install -r requirements/requirements-tests-install.txt
321338

339+
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH fallback once LIBRDKAFKA_VERSION includes share consumer support
322340
# Build and install confluent-kafka from source
323341
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
324-
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
342+
- |
343+
if [[ -n $LIBRDKAFKA_BRANCH ]]; then
344+
tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest
345+
else
346+
tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
347+
fi
325348
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
326349
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
327350
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"

examples/share_consumer.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2026 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import sys
19+
20+
#
21+
# Example KIP-932 ShareConsumer.
22+
#
23+
# A share consumer reads from one or more topics like a queue: many consumers
24+
# in the same share group can read the same partition, and each record is
25+
# acknowledged implicitly.
26+
#
27+
from confluent_kafka import ShareConsumer
28+
29+
30+
def print_usage_and_exit(program_name):
31+
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
32+
sys.exit(1)
33+
34+
35+
if __name__ == '__main__':
36+
if len(sys.argv) < 4:
37+
print_usage_and_exit(sys.argv[0])
38+
39+
broker = sys.argv[1]
40+
group = sys.argv[2]
41+
topics = sys.argv[3:]
42+
43+
# ShareConsumer configuration.
44+
# See https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
45+
conf = {
46+
'bootstrap.servers': broker,
47+
'group.id': group,
48+
}
49+
50+
sc = ShareConsumer(conf)
51+
sc.subscribe(topics)
52+
53+
try:
54+
while True:
55+
messages = sc.poll(timeout=1.0) # returns a list (possibly empty)
56+
for msg in messages:
57+
if msg.error():
58+
# Per-message errors are informational; log and keep
59+
# polling. Truly fatal errors are raised out of poll()
60+
# itself via the error_cb path.
61+
sys.stderr.write('%% Error: %s\n' % msg.error())
62+
continue
63+
sys.stderr.write(
64+
'%% %s [%d] at offset %d with key %s:\n'
65+
% (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))
66+
)
67+
print(msg.value())
68+
# Implicit ack: the next poll() acknowledges this message.
69+
# If we crash before the next poll, the broker will
70+
# redeliver this record to another consumer in the share
71+
# group after the acquisition lock expires.
72+
except KeyboardInterrupt:
73+
sys.stderr.write('%% Aborted by user\n')
74+
finally:
75+
sc.close()

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
os.path.join(ext_dir, 'confluent_kafka.c'),
2424
os.path.join(ext_dir, 'Producer.c'),
2525
os.path.join(ext_dir, 'Consumer.c'),
26+
os.path.join(ext_dir, 'ShareConsumer.c'),
2627
os.path.join(ext_dir, 'Metadata.c'),
2728
os.path.join(ext_dir, 'AdminTypes.c'),
2829
os.path.join(ext_dir, 'Admin.c'),

src/confluent_kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
Consumer,
4040
Message,
4141
Producer,
42+
ShareConsumer,
4243
TopicPartition,
4344
Uuid,
4445
consistent,
@@ -54,6 +55,7 @@
5455
__all__ = [
5556
"admin",
5657
"Consumer",
58+
"ShareConsumer",
5759
"aio",
5860
"KafkaError",
5961
"KafkaException",

src/confluent_kafka/cimpl.pyi

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ class KafkaError:
6363
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int
6464
DUPLICATE_RESOURCE: int
6565
DUPLICATE_SEQUENCE_NUMBER: int
66+
DUPLICATE_VOTER: int
6667
ELECTION_NOT_NEEDED: int
6768
ELIGIBLE_LEADERS_NOT_AVAILABLE: int
6869
FEATURE_UPDATE_FAILED: int
6970
FENCED_INSTANCE_ID: int
7071
FENCED_LEADER_EPOCH: int
7172
FENCED_MEMBER_EPOCH: int
73+
FENCED_STATE_EPOCH: int
7274
FETCH_SESSION_ID_NOT_FOUND: int
7375
GROUP_AUTHORIZATION_FAILED: int
7476
GROUP_ID_NOT_FOUND: int
@@ -89,20 +91,26 @@ class KafkaError:
8991
INVALID_PRODUCER_EPOCH: int
9092
INVALID_PRODUCER_ID_MAPPING: int
9193
INVALID_RECORD: int
94+
INVALID_RECORD_STATE: int
95+
INVALID_REGISTRATION: int
96+
INVALID_REGULAR_EXPRESSION: int
9297
INVALID_REPLICATION_FACTOR: int
9398
INVALID_REPLICA_ASSIGNMENT: int
9499
INVALID_REQUEST: int
95100
INVALID_REQUIRED_ACKS: int
101+
INVALID_SHARE_SESSION_EPOCH: int
96102
INVALID_SESSION_TIMEOUT: int
97103
INVALID_TIMESTAMP: int
98104
INVALID_TRANSACTION_TIMEOUT: int
99105
INVALID_TXN_STATE: int
100106
INVALID_UPDATE_VERSION: int
107+
INVALID_VOTER_KEY: int
101108
KAFKA_STORAGE_ERROR: int
102109
LEADER_NOT_AVAILABLE: int
103110
LISTENER_NOT_FOUND: int
104111
LOG_DIR_NOT_FOUND: int
105112
MEMBER_ID_REQUIRED: int
113+
MISMATCHED_ENDPOINT_TYPE: int
106114
MSG_SIZE_TOO_LARGE: int
107115
NETWORK_EXCEPTION: int
108116
NON_EMPTY_GROUP: int
@@ -131,19 +139,26 @@ class KafkaError:
131139
RESOURCE_NOT_FOUND: int
132140
SASL_AUTHENTICATION_FAILED: int
133141
SECURITY_DISABLED: int
142+
SHARE_SESSION_LIMIT_REACHED: int
143+
SHARE_SESSION_NOT_FOUND: int
134144
STALE_BROKER_EPOCH: int
135145
STALE_CTRL_EPOCH: int
136146
STALE_MEMBER_EPOCH: int
147+
STREAMS_INVALID_TOPOLOGY: int
148+
STREAMS_INVALID_TOPOLOGY_EPOCH: int
149+
STREAMS_TOPOLOGY_FENCED: int
137150
TELEMETRY_TOO_LARGE: int
138151
THROTTLING_QUOTA_EXCEEDED: int
139152
TOPIC_ALREADY_EXISTS: int
140153
TOPIC_AUTHORIZATION_FAILED: int
141154
TOPIC_DELETION_DISABLED: int
142155
TOPIC_EXCEPTION: int
156+
TRANSACTION_ABORTABLE: int
143157
TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int
144158
TRANSACTION_COORDINATOR_FENCED: int
145159
UNACCEPTABLE_CREDENTIAL: int
146160
UNKNOWN: int
161+
UNKNOWN_CONTROLLER_ID: int
147162
UNKNOWN_LEADER_EPOCH: int
148163
UNKNOWN_MEMBER_ID: int
149164
UNKNOWN_PRODUCER_ID: int
@@ -154,9 +169,11 @@ class KafkaError:
154169
UNSTABLE_OFFSET_COMMIT: int
155170
UNSUPPORTED_ASSIGNOR: int
156171
UNSUPPORTED_COMPRESSION_TYPE: int
172+
UNSUPPORTED_ENDPOINT_TYPE: int
157173
UNSUPPORTED_FOR_MESSAGE_FORMAT: int
158174
UNSUPPORTED_SASL_MECHANISM: int
159175
UNSUPPORTED_VERSION: int
176+
VOTER_NOT_FOUND: int
160177
_ALL_BROKERS_DOWN: int
161178
_APPLICATION: int
162179
_ASSIGNMENT_LOST: int
@@ -535,6 +552,22 @@ class Consumer:
535552
def memberid(self) -> str: ...
536553
def set_sasl_credentials(self, username: str, password: str) -> None: ...
537554

555+
class ShareConsumer:
556+
"""Share Consumer for queue-like message consumption (KIP-932)."""
557+
@overload
558+
def __init__(self, config: Dict[str, Any]) -> None: ...
559+
@overload
560+
def __init__(self, config: Dict[str, Any], /, **kwargs: Any) -> None: ...
561+
@overload
562+
def __init__(self, **config: Any) -> None: ...
563+
def subscribe(self, topics: List[str]) -> None: ...
564+
def unsubscribe(self) -> None: ...
565+
def subscription(self) -> List[str]: ...
566+
def poll(self, timeout: float = -1) -> List[Message]: ...
567+
def close(self) -> None: ...
568+
def __enter__(self) -> "ShareConsumer": ...
569+
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
570+
538571
class _AdminClientImpl:
539572
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
540573
def __enter__(self) -> Self: ...

0 commit comments

Comments
 (0)