Skip to content

Commit 51ec85d

Browse files
committed
KIP-932 : Implement Share consumer interface with poll API
1 parent 1a0da59 commit 51ec85d

7 files changed

Lines changed: 771 additions & 0 deletions

File tree

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
os.path.join(ext_dir, 'confluent_kafka.c'),
2424
os.path.join(ext_dir, 'Producer.c'),
2525
os.path.join(ext_dir, 'Consumer.c'),
26+
os.path.join(ext_dir, 'ShareConsumer.c'),
2627
os.path.join(ext_dir, 'Metadata.c'),
2728
os.path.join(ext_dir, 'AdminTypes.c'),
2829
os.path.join(ext_dir, 'Admin.c'),

src/confluent_kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
Consumer,
4040
Message,
4141
Producer,
42+
ShareConsumer,
4243
TopicPartition,
4344
Uuid,
4445
consistent,
@@ -54,6 +55,7 @@
5455
__all__ = [
5556
"admin",
5657
"Consumer",
58+
"ShareConsumer",
5759
"aio",
5860
"KafkaError",
5961
"KafkaException",

src/confluent_kafka/cimpl.pyi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,15 @@ class Consumer:
535535
def memberid(self) -> str: ...
536536
def set_sasl_credentials(self, username: str, password: str) -> None: ...
537537

538+
class ShareConsumer:
539+
"""Share Consumer for queue-like message consumption (KIP-932)."""
540+
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
541+
def subscribe(self, topics: List[str]) -> None: ...
542+
def unsubscribe(self) -> None: ...
543+
def subscription(self) -> List[str]: ...
544+
def consume_batch(self, timeout: float = -1) -> List[Message]: ...
545+
def close(self) -> None: ...
546+
538547
class _AdminClientImpl:
539548
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
540549
def __enter__(self) -> Self: ...

0 commit comments

Comments
 (0)