Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.14.0
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
- name: LIBRDKAFKA_BRANCH
Comment thread
pranavrth marked this conversation as resolved.
value: dev_kip-932_queues-for-kafka
prologue:
commands:
- checkout
Expand All @@ -32,6 +35,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand All @@ -57,6 +61,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
Expand All @@ -80,6 +85,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse 2.16.2
- tar -czf wheelhouse-macOS-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand Down Expand Up @@ -107,6 +113,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- PIP_INSTALL_OPTIONS="--user" tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-macOS-${ARCH}-py313-plus.tgz wheelhouse
- artifact push workflow wheelhouse-macOS-${ARCH}-py313-plus.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}-py313-plus.tgz/
Expand All @@ -128,6 +135,7 @@ blocks:
- export ARCH=arm64
- sem-version python 3.13
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand All @@ -149,6 +157,7 @@ blocks:
- export ARCH=x64
- sem-version python 3.11
- pip install uv
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- ./tools/wheels/build-wheels.sh "${LIBRDKAFKA_VERSION#v}" wheelhouse
- tar -czf wheelhouse-linux-${ARCH}.tgz wheelhouse
- artifact push workflow wheelhouse-linux-${ARCH}.tgz --destination artifacts/wheels-${OS_NAME}-${ARCH}.tgz/
Expand Down Expand Up @@ -181,6 +190,7 @@ blocks:
commands:
- sem-version python 3.11.9
- bash tools/mingw-w64/semaphore_commands.sh
# TODO KIP-932: Add LIBRDKAFKA_BRANCH fallback for share consumer support
- bash tools/wheels/install-librdkafka.sh $env:LIBRDKAFKA_VERSION.TrimStart("v") dest
- tools/wheels/build-wheels.bat x64 win_amd64 dest wheelhouse
- tar -czf wheelhouse-windows-${Env:ARCH}.tgz wheelhouse
Expand Down Expand Up @@ -319,9 +329,15 @@ blocks:
# Install existing test requirements
- uv pip install -r requirements/requirements-tests-install.txt

# TODO KIP-932: Remove LIBRDKAFKA_BRANCH fallback once LIBRDKAFKA_VERSION includes share consumer support
# Build and install confluent-kafka from source
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- |
Comment thread
pranavrth marked this conversation as resolved.
if [[ -n $LIBRDKAFKA_BRANCH ]]; then
tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest
else
tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
fi
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
os.path.join(ext_dir, 'confluent_kafka.c'),
os.path.join(ext_dir, 'Producer.c'),
os.path.join(ext_dir, 'Consumer.c'),
os.path.join(ext_dir, 'ShareConsumer.c'),
os.path.join(ext_dir, 'Metadata.c'),
os.path.join(ext_dir, 'AdminTypes.c'),
os.path.join(ext_dir, 'Admin.c'),
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an example file examples/share_consumer.py.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Consumer,
Message,
Producer,
ShareConsumer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Have it after Consumer.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed this comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not moved.

TopicPartition,
Uuid,
consistent,
Expand All @@ -54,6 +55,7 @@
__all__ = [
"admin",
"Consumer",
"ShareConsumer",
"aio",
"KafkaError",
"KafkaException",
Expand Down
33 changes: 33 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class KafkaError:
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int
DUPLICATE_RESOURCE: int
DUPLICATE_SEQUENCE_NUMBER: int
DUPLICATE_VOTER: int
Comment thread
pranavrth marked this conversation as resolved.
ELECTION_NOT_NEEDED: int
ELIGIBLE_LEADERS_NOT_AVAILABLE: int
FEATURE_UPDATE_FAILED: int
FENCED_INSTANCE_ID: int
FENCED_LEADER_EPOCH: int
FENCED_MEMBER_EPOCH: int
FENCED_STATE_EPOCH: int
FETCH_SESSION_ID_NOT_FOUND: int
GROUP_AUTHORIZATION_FAILED: int
GROUP_ID_NOT_FOUND: int
Expand All @@ -89,20 +91,26 @@ class KafkaError:
INVALID_PRODUCER_EPOCH: int
INVALID_PRODUCER_ID_MAPPING: int
INVALID_RECORD: int
INVALID_RECORD_STATE: int
INVALID_REGISTRATION: int
INVALID_REGULAR_EXPRESSION: int
INVALID_REPLICATION_FACTOR: int
INVALID_REPLICA_ASSIGNMENT: int
INVALID_REQUEST: int
INVALID_REQUIRED_ACKS: int
INVALID_SHARE_SESSION_EPOCH: int
INVALID_SESSION_TIMEOUT: int
INVALID_TIMESTAMP: int
INVALID_TRANSACTION_TIMEOUT: int
INVALID_TXN_STATE: int
INVALID_UPDATE_VERSION: int
INVALID_VOTER_KEY: int
KAFKA_STORAGE_ERROR: int
LEADER_NOT_AVAILABLE: int
LISTENER_NOT_FOUND: int
LOG_DIR_NOT_FOUND: int
MEMBER_ID_REQUIRED: int
MISMATCHED_ENDPOINT_TYPE: int
MSG_SIZE_TOO_LARGE: int
NETWORK_EXCEPTION: int
NON_EMPTY_GROUP: int
Expand Down Expand Up @@ -131,19 +139,26 @@ class KafkaError:
RESOURCE_NOT_FOUND: int
SASL_AUTHENTICATION_FAILED: int
SECURITY_DISABLED: int
SHARE_SESSION_LIMIT_REACHED: int
SHARE_SESSION_NOT_FOUND: int
STALE_BROKER_EPOCH: int
STALE_CTRL_EPOCH: int
STALE_MEMBER_EPOCH: int
STREAMS_INVALID_TOPOLOGY: int
STREAMS_INVALID_TOPOLOGY_EPOCH: int
STREAMS_TOPOLOGY_FENCED: int
TELEMETRY_TOO_LARGE: int
THROTTLING_QUOTA_EXCEEDED: int
TOPIC_ALREADY_EXISTS: int
TOPIC_AUTHORIZATION_FAILED: int
TOPIC_DELETION_DISABLED: int
TOPIC_EXCEPTION: int
TRANSACTION_ABORTABLE: int
TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int
TRANSACTION_COORDINATOR_FENCED: int
UNACCEPTABLE_CREDENTIAL: int
UNKNOWN: int
UNKNOWN_CONTROLLER_ID: int
UNKNOWN_LEADER_EPOCH: int
UNKNOWN_MEMBER_ID: int
UNKNOWN_PRODUCER_ID: int
Expand All @@ -154,9 +169,11 @@ class KafkaError:
UNSTABLE_OFFSET_COMMIT: int
UNSUPPORTED_ASSIGNOR: int
UNSUPPORTED_COMPRESSION_TYPE: int
UNSUPPORTED_ENDPOINT_TYPE: int
UNSUPPORTED_FOR_MESSAGE_FORMAT: int
UNSUPPORTED_SASL_MECHANISM: int
UNSUPPORTED_VERSION: int
VOTER_NOT_FOUND: int
_ALL_BROKERS_DOWN: int
_APPLICATION: int
_ASSIGNMENT_LOST: int
Expand Down Expand Up @@ -535,6 +552,22 @@ class Consumer:
def memberid(self) -> str: ...
def set_sasl_credentials(self, username: str, password: str) -> None: ...

class ShareConsumer:
"""Share Consumer for queue-like message consumption (KIP-932)."""
@overload
def __init__(self, config: Dict[str, Any]) -> None: ...
@overload
def __init__(self, config: Dict[str, Any], /, **kwargs: Any) -> None: ...
@overload
def __init__(self, **config: Any) -> None: ...
def subscribe(self, topics: List[str]) -> None: ...
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
def poll(self, timeout: float = -1) -> List[Message]: ...
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be consume_batch

def close(self) -> None: ...
def __enter__(self) -> "ShareConsumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> Self: ...
Expand Down
Loading