Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.13.2
- name: LIBRDKAFKA_BRANCH
Comment thread
pranavrth marked this conversation as resolved.
value: dev_kip-932_queues-for-kafka
prologue:
commands:
- checkout
Expand Down Expand Up @@ -332,7 +334,13 @@ blocks:

# Build and install confluent-kafka from source
- lib_dir=dest/runtimes/$OS_NAME-$ARCH/native
- tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
- |
Comment thread
pranavrth marked this conversation as resolved.
if [[ -n $LIBRDKAFKA_BRANCH ]]; then
sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this sudo apt-get install? If needed then move them to the install script instead.

tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest
else
tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest
fi
- export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include"
- export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}"
- export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir"
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
os.path.join(ext_dir, 'confluent_kafka.c'),
os.path.join(ext_dir, 'Producer.c'),
os.path.join(ext_dir, 'Consumer.c'),
os.path.join(ext_dir, 'ShareConsumer.c'),
os.path.join(ext_dir, 'Metadata.c'),
os.path.join(ext_dir, 'AdminTypes.c'),
os.path.join(ext_dir, 'Admin.c'),
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an example file examples/share_consumer.py.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Consumer,
Message,
Producer,
ShareConsumer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Have it after Consumer.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed this comment.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not moved.

TopicPartition,
Uuid,
consistent,
Expand All @@ -54,6 +55,7 @@
__all__ = [
"admin",
"Consumer",
"ShareConsumer",
"aio",
"KafkaError",
"KafkaException",
Expand Down
26 changes: 26 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ class KafkaError:
DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int
DUPLICATE_RESOURCE: int
DUPLICATE_SEQUENCE_NUMBER: int
DUPLICATE_VOTER: int
Comment thread
pranavrth marked this conversation as resolved.
ELECTION_NOT_NEEDED: int
ELIGIBLE_LEADERS_NOT_AVAILABLE: int
FEATURE_UPDATE_FAILED: int
FENCED_INSTANCE_ID: int
FENCED_LEADER_EPOCH: int
FENCED_MEMBER_EPOCH: int
FENCED_STATE_EPOCH: int
FETCH_SESSION_ID_NOT_FOUND: int
GROUP_AUTHORIZATION_FAILED: int
GROUP_ID_NOT_FOUND: int
Expand All @@ -89,20 +91,26 @@ class KafkaError:
INVALID_PRODUCER_EPOCH: int
INVALID_PRODUCER_ID_MAPPING: int
INVALID_RECORD: int
INVALID_RECORD_STATE: int
INVALID_REGISTRATION: int
INVALID_REGULAR_EXPRESSION: int
INVALID_REPLICATION_FACTOR: int
INVALID_REPLICA_ASSIGNMENT: int
INVALID_REQUEST: int
INVALID_REQUIRED_ACKS: int
INVALID_SHARE_SESSION_EPOCH: int
INVALID_SESSION_TIMEOUT: int
INVALID_TIMESTAMP: int
INVALID_TRANSACTION_TIMEOUT: int
INVALID_TXN_STATE: int
INVALID_UPDATE_VERSION: int
INVALID_VOTER_KEY: int
KAFKA_STORAGE_ERROR: int
LEADER_NOT_AVAILABLE: int
LISTENER_NOT_FOUND: int
LOG_DIR_NOT_FOUND: int
MEMBER_ID_REQUIRED: int
MISMATCHED_ENDPOINT_TYPE: int
MSG_SIZE_TOO_LARGE: int
NETWORK_EXCEPTION: int
NON_EMPTY_GROUP: int
Expand Down Expand Up @@ -131,19 +139,26 @@ class KafkaError:
RESOURCE_NOT_FOUND: int
SASL_AUTHENTICATION_FAILED: int
SECURITY_DISABLED: int
SHARE_SESSION_LIMIT_REACHED: int
SHARE_SESSION_NOT_FOUND: int
STALE_BROKER_EPOCH: int
STALE_CTRL_EPOCH: int
STALE_MEMBER_EPOCH: int
STREAMS_INVALID_TOPOLOGY: int
STREAMS_INVALID_TOPOLOGY_EPOCH: int
STREAMS_TOPOLOGY_FENCED: int
TELEMETRY_TOO_LARGE: int
THROTTLING_QUOTA_EXCEEDED: int
TOPIC_ALREADY_EXISTS: int
TOPIC_AUTHORIZATION_FAILED: int
TOPIC_DELETION_DISABLED: int
TOPIC_EXCEPTION: int
TRANSACTION_ABORTABLE: int
TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int
TRANSACTION_COORDINATOR_FENCED: int
UNACCEPTABLE_CREDENTIAL: int
UNKNOWN: int
UNKNOWN_CONTROLLER_ID: int
UNKNOWN_LEADER_EPOCH: int
UNKNOWN_MEMBER_ID: int
UNKNOWN_PRODUCER_ID: int
Expand All @@ -154,9 +169,11 @@ class KafkaError:
UNSTABLE_OFFSET_COMMIT: int
UNSUPPORTED_ASSIGNOR: int
UNSUPPORTED_COMPRESSION_TYPE: int
UNSUPPORTED_ENDPOINT_TYPE: int
UNSUPPORTED_FOR_MESSAGE_FORMAT: int
UNSUPPORTED_SASL_MECHANISM: int
UNSUPPORTED_VERSION: int
VOTER_NOT_FOUND: int
_ALL_BROKERS_DOWN: int
_APPLICATION: int
_ASSIGNMENT_LOST: int
Expand Down Expand Up @@ -535,6 +552,15 @@ class Consumer:
def memberid(self) -> str: ...
def set_sasl_credentials(self, username: str, password: str) -> None: ...

class ShareConsumer:
"""Share Consumer for queue-like message consumption (KIP-932)."""
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface should be like the normal consumer. It should accept dict as well as kwargs. Check Consumer.

def subscribe(self, topics: List[str]) -> None: ...
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
def consume_batch(self, timeout: float = -1) -> List[Message]: ...
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should name this Poll only to match the signature of the Java Client atleast in Higher level clients

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In past, It was discussed that we would align naming with librdkafka APis.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need to finalize this name. Mostly we will go with poll to adhere to Java. It was mentioned in one of the recent comments in the librdkafka public interface docs.

def close(self) -> None: ...

class _AdminClientImpl:
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
def __enter__(self) -> Self: ...
Expand Down
12 changes: 6 additions & 6 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ struct Admin_options {
* Make sure this is kept up to date with Admin_options above. */
#define Admin_options_INITIALIZER \
{ \
Admin_options_def_int, Admin_options_def_float, \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can skip this file change.

Admin_options_def_float, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_ptr, \
Admin_options_def_cnt, Admin_options_def_ptr, \
Admin_options_def_cnt, \
Admin_options_def_int, Admin_options_def_float, \
Admin_options_def_float, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_int, \
Admin_options_def_int, Admin_options_def_ptr, \
Admin_options_def_cnt, Admin_options_def_ptr, \
Admin_options_def_cnt, \
}

#define Admin_options_is_set_int(v) ((v) != Admin_options_def_int)
Expand Down
Loading
Loading