Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.13.2
- name: LIBRDKAFKA_BRANCH
value: dev_kip-932_queues-for-kafka
prologue:
commands:
- checkout
Expand Down Expand Up @@ -332,7 +334,13 @@ blocks:

# Build and install confluent-kafka from source
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- |
if [[ -n $LIBRDKAFKA_BRANCH ]]; then
sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev
tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest
else
tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
fi
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
os.path.join(ext_dir, 'confluent_kafka.c'),
os.path.join(ext_dir, 'Producer.c'),
os.path.join(ext_dir, 'Consumer.c'),
os.path.join(ext_dir, 'ShareConsumer.c'),
os.path.join(ext_dir, 'Metadata.c'),
os.path.join(ext_dir, 'AdminTypes.c'),
os.path.join(ext_dir, 'Admin.c'),
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Consumer,
Message,
Producer,
ShareConsumer,
TopicPartition,
Uuid,
consistent,
Expand All @@ -54,6 +55,7 @@
__all__ = [
"admin",
"Consumer",
"ShareConsumer",
"aio",
"KafkaError",
"KafkaException",
Expand Down
26 changes: 26 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class KafkaError:
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int
DUPLICATE_RESOURCE: int
DUPLICATE_SEQUENCE_NUMBER: int
DUPLICATE_VOTER: int
ELECTION_NOT_NEEDED: int
ELIGIBLE_LEADERS_NOT_AVAILABLE: int
FEATURE_UPDATE_FAILED: int
FENCED_INSTANCE_ID: int
FENCED_LEADER_EPOCH: int
FENCED_MEMBER_EPOCH: int
FENCED_STATE_EPOCH: int
FETCH_SESSION_ID_NOT_FOUND: int
GROUP_AUTHORIZATION_FAILED: int
GROUP_ID_NOT_FOUND: int
Expand All @@ -89,20 +91,26 @@ class KafkaError:
INVALID_PRODUCER_EPOCH: int
INVALID_PRODUCER_ID_MAPPING: int
INVALID_RECORD: int
INVALID_RECORD_STATE: int
INVALID_REGISTRATION: int
INVALID_REGULAR_EXPRESSION: int
INVALID_REPLICATION_FACTOR: int
INVALID_REPLICA_ASSIGNMENT: int
INVALID_REQUEST: int
INVALID_REQUIRED_ACKS: int
INVALID_SHARE_SESSION_EPOCH: int
INVALID_SESSION_TIMEOUT: int
INVALID_TIMESTAMP: int
INVALID_TRANSACTION_TIMEOUT: int
INVALID_TXN_STATE: int
INVALID_UPDATE_VERSION: int
INVALID_VOTER_KEY: int
KAFKA_STORAGE_ERROR: int
LEADER_NOT_AVAILABLE: int
LISTENER_NOT_FOUND: int
LOG_DIR_NOT_FOUND: int
MEMBER_ID_REQUIRED: int
MISMATCHED_ENDPOINT_TYPE: int
MSG_SIZE_TOO_LARGE: int
NETWORK_EXCEPTION: int
NON_EMPTY_GROUP: int
Expand Down Expand Up @@ -131,19 +139,26 @@ class KafkaError:
RESOURCE_NOT_FOUND: int
SASL_AUTHENTICATION_FAILED: int
SECURITY_DISABLED: int
SHARE_SESSION_LIMIT_REACHED: int
SHARE_SESSION_NOT_FOUND: int
STALE_BROKER_EPOCH: int
STALE_CTRL_EPOCH: int
STALE_MEMBER_EPOCH: int
STREAMS_INVALID_TOPOLOGY: int
STREAMS_INVALID_TOPOLOGY_EPOCH: int
STREAMS_TOPOLOGY_FENCED: int
TELEMETRY_TOO_LARGE: int
THROTTLING_QUOTA_EXCEEDED: int
TOPIC_ALREADY_EXISTS: int
TOPIC_AUTHORIZATION_FAILED: int
TOPIC_DELETION_DISABLED: int
TOPIC_EXCEPTION: int
TRANSACTION_ABORTABLE: int
TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int
TRANSACTION_COORDINATOR_FENCED: int
UNACCEPTABLE_CREDENTIAL: int
UNKNOWN: int
UNKNOWN_CONTROLLER_ID: int
UNKNOWN_LEADER_EPOCH: int
UNKNOWN_MEMBER_ID: int
UNKNOWN_PRODUCER_ID: int
Expand All @@ -154,9 +169,11 @@ class KafkaError:
UNSTABLE_OFFSET_COMMIT: int
UNSUPPORTED_ASSIGNOR: int
UNSUPPORTED_COMPRESSION_TYPE: int
UNSUPPORTED_ENDPOINT_TYPE: int
UNSUPPORTED_FOR_MESSAGE_FORMAT: int
UNSUPPORTED_SASL_MECHANISM: int
UNSUPPORTED_VERSION: int
VOTER_NOT_FOUND: int
_ALL_BROKERS_DOWN: int
_APPLICATION: int
_ASSIGNMENT_LOST: int
Expand Down Expand Up @@ -535,6 +552,15 @@ class Consumer:
def memberid(self) -> str: ...
def set_sasl_credentials(self, username: str, password: str) -> None: ...

class ShareConsumer:
"""Share Consumer for queue-like message consumption (KIP-932)."""
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def subscribe(self, topics: List[str]) -> None: ...
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
def consume_batch(self, timeout: float = -1) -> List[Message]: ...
def close(self) -> None: ...

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> Self: ...
Expand Down
12 changes: 6 additions & 6 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ struct Admin_options {
* Make sure this is kept up to date with Admin_options above. */
#define Admin_options_INITIALIZER \
{ \
Admin_options_def_int, Admin_options_def_float, \
Admin_options_def_float, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_ptr, \
Admin_options_def_cnt, Admin_options_def_ptr, \
Admin_options_def_cnt, \
Admin_options_def_int, Admin_options_def_float, \
Admin_options_def_float, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_ptr, \
Admin_options_def_cnt, Admin_options_def_ptr, \
Admin_options_def_cnt, \
}

#define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
Expand Down
Loading