-
Notifications
You must be signed in to change notification settings - Fork 951
[KIP-932] : Implement Share consumer interface with poll API #2217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev_kip-932_queues-for-kafka
Are you sure you want to change the base?
Changes from 5 commits
51ec85d
fc5ef4f
e23df2e
88b4a3a
852d090
d59b196
48208f4
1e41691
59db830
9580e6d
6991299
5b4b408
cabc157
1e1fe37
62021da
f32e19c
901697c
5f4bc3b
282d38f
cc940c2
d4d1456
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,8 @@ global_job_config: | |
| env_vars: | ||
| - name: LIBRDKAFKA_VERSION | ||
| value: v2.13.2 | ||
| - name: LIBRDKAFKA_BRANCH | ||
| value: dev_kip-932_queues-for-kafka | ||
| prologue: | ||
| commands: | ||
| - checkout | ||
|
|
@@ -332,7 +334,13 @@ blocks: | |
|
|
||
| # Build and install confluent-kafka from source | ||
| - lib_dir=dest/runtimes/$OS_NAME-$ARCH/native | ||
| - tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest | ||
| - | | ||
|
pranavrth marked this conversation as resolved.
|
||
| if [[ -n $LIBRDKAFKA_BRANCH ]]; then | ||
| sudo apt-get install -y -qq libssl-dev libsasl2-dev liblz4-dev libzstd-dev | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this |
||
| tools/wheels/build-librdkafka-branch.sh "$LIBRDKAFKA_BRANCH" dest | ||
| else | ||
| tools/wheels/install-librdkafka.sh "${LIBRDKAFKA_VERSION#v}" dest | ||
| fi | ||
| - export CFLAGS="$CFLAGS -I${PWD}/dest/build/native/include" | ||
| - export LDFLAGS="$LDFLAGS -L${PWD}/${lib_dir}" | ||
| - export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$PWD/$lib_dir" | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add an example file |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ | |
| Consumer, | ||
| Message, | ||
| Producer, | ||
| ShareConsumer, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Have it after
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you missed this comment.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still not moved. |
||
| TopicPartition, | ||
| Uuid, | ||
| consistent, | ||
|
|
@@ -54,6 +55,7 @@ | |
| __all__ = [ | ||
| "admin", | ||
| "Consumer", | ||
| "ShareConsumer", | ||
| "aio", | ||
| "KafkaError", | ||
| "KafkaException", | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -63,12 +63,14 @@ class KafkaError: | |
| DELEGATION_TOKEN_REQUEST_NOT_ALLOWED: int | ||
| DUPLICATE_RESOURCE: int | ||
| DUPLICATE_SEQUENCE_NUMBER: int | ||
| DUPLICATE_VOTER: int | ||
|
pranavrth marked this conversation as resolved.
|
||
| ELECTION_NOT_NEEDED: int | ||
| ELIGIBLE_LEADERS_NOT_AVAILABLE: int | ||
| FEATURE_UPDATE_FAILED: int | ||
| FENCED_INSTANCE_ID: int | ||
| FENCED_LEADER_EPOCH: int | ||
| FENCED_MEMBER_EPOCH: int | ||
| FENCED_STATE_EPOCH: int | ||
| FETCH_SESSION_ID_NOT_FOUND: int | ||
| GROUP_AUTHORIZATION_FAILED: int | ||
| GROUP_ID_NOT_FOUND: int | ||
|
|
@@ -89,20 +91,26 @@ class KafkaError: | |
| INVALID_PRODUCER_EPOCH: int | ||
| INVALID_PRODUCER_ID_MAPPING: int | ||
| INVALID_RECORD: int | ||
| INVALID_RECORD_STATE: int | ||
| INVALID_REGISTRATION: int | ||
| INVALID_REGULAR_EXPRESSION: int | ||
| INVALID_REPLICATION_FACTOR: int | ||
| INVALID_REPLICA_ASSIGNMENT: int | ||
| INVALID_REQUEST: int | ||
| INVALID_REQUIRED_ACKS: int | ||
| INVALID_SHARE_SESSION_EPOCH: int | ||
| INVALID_SESSION_TIMEOUT: int | ||
| INVALID_TIMESTAMP: int | ||
| INVALID_TRANSACTION_TIMEOUT: int | ||
| INVALID_TXN_STATE: int | ||
| INVALID_UPDATE_VERSION: int | ||
| INVALID_VOTER_KEY: int | ||
| KAFKA_STORAGE_ERROR: int | ||
| LEADER_NOT_AVAILABLE: int | ||
| LISTENER_NOT_FOUND: int | ||
| LOG_DIR_NOT_FOUND: int | ||
| MEMBER_ID_REQUIRED: int | ||
| MISMATCHED_ENDPOINT_TYPE: int | ||
| MSG_SIZE_TOO_LARGE: int | ||
| NETWORK_EXCEPTION: int | ||
| NON_EMPTY_GROUP: int | ||
|
|
@@ -131,19 +139,26 @@ class KafkaError: | |
| RESOURCE_NOT_FOUND: int | ||
| SASL_AUTHENTICATION_FAILED: int | ||
| SECURITY_DISABLED: int | ||
| SHARE_SESSION_LIMIT_REACHED: int | ||
| SHARE_SESSION_NOT_FOUND: int | ||
| STALE_BROKER_EPOCH: int | ||
| STALE_CTRL_EPOCH: int | ||
| STALE_MEMBER_EPOCH: int | ||
| STREAMS_INVALID_TOPOLOGY: int | ||
| STREAMS_INVALID_TOPOLOGY_EPOCH: int | ||
| STREAMS_TOPOLOGY_FENCED: int | ||
| TELEMETRY_TOO_LARGE: int | ||
| THROTTLING_QUOTA_EXCEEDED: int | ||
| TOPIC_ALREADY_EXISTS: int | ||
| TOPIC_AUTHORIZATION_FAILED: int | ||
| TOPIC_DELETION_DISABLED: int | ||
| TOPIC_EXCEPTION: int | ||
| TRANSACTION_ABORTABLE: int | ||
| TRANSACTIONAL_ID_AUTHORIZATION_FAILED: int | ||
| TRANSACTION_COORDINATOR_FENCED: int | ||
| UNACCEPTABLE_CREDENTIAL: int | ||
| UNKNOWN: int | ||
| UNKNOWN_CONTROLLER_ID: int | ||
| UNKNOWN_LEADER_EPOCH: int | ||
| UNKNOWN_MEMBER_ID: int | ||
| UNKNOWN_PRODUCER_ID: int | ||
|
|
@@ -154,9 +169,11 @@ class KafkaError: | |
| UNSTABLE_OFFSET_COMMIT: int | ||
| UNSUPPORTED_ASSIGNOR: int | ||
| UNSUPPORTED_COMPRESSION_TYPE: int | ||
| UNSUPPORTED_ENDPOINT_TYPE: int | ||
| UNSUPPORTED_FOR_MESSAGE_FORMAT: int | ||
| UNSUPPORTED_SASL_MECHANISM: int | ||
| UNSUPPORTED_VERSION: int | ||
| VOTER_NOT_FOUND: int | ||
| _ALL_BROKERS_DOWN: int | ||
| _APPLICATION: int | ||
| _ASSIGNMENT_LOST: int | ||
|
|
@@ -535,6 +552,15 @@ class Consumer: | |
| def memberid(self) -> str: ... | ||
| def set_sasl_credentials(self, username: str, password: str) -> None: ... | ||
|
|
||
| class ShareConsumer: | ||
| """Share Consumer for queue-like message consumption (KIP-932).""" | ||
| def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ... | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The interface should be like the normal consumer. It should accept |
||
| def subscribe(self, topics: List[str]) -> None: ... | ||
| def unsubscribe(self) -> None: ... | ||
| def subscription(self) -> List[str]: ... | ||
| def consume_batch(self, timeout: float = -1) -> List[Message]: ... | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should name this Poll only to match the signature of the Java Client atleast in Higher level clients
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In past, It was discussed that we would align naming with librdkafka APis.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still need to finalize this name. Mostly we will go with |
||
| def close(self) -> None: ... | ||
|
|
||
| class _AdminClientImpl: | ||
| def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ... | ||
| def __enter__(self) -> Self: ... | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,12 +99,12 @@ struct Admin_options { | |
| * Make sure this is kept up to date with Admin_options above. */ | ||
| #define Admin_options_INITIALIZER \ | ||
| { \ | ||
| Admin_options_def_int, Admin_options_def_float, \ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can skip this file change. |
||
| Admin_options_def_float, Admin_options_def_int, \ | ||
| Admin_options_def_int, Admin_options_def_int, \ | ||
| Admin_options_def_int, Admin_options_def_ptr, \ | ||
| Admin_options_def_cnt, Admin_options_def_ptr, \ | ||
| Admin_options_def_cnt, \ | ||
| Admin_options_def_int, Admin_options_def_float, \ | ||
| Admin_options_def_float, Admin_options_def_int, \ | ||
| Admin_options_def_int, Admin_options_def_int, \ | ||
| Admin_options_def_int, Admin_options_def_ptr, \ | ||
| Admin_options_def_cnt, Admin_options_def_ptr, \ | ||
| Admin_options_def_cnt, \ | ||
| } | ||
|
|
||
| #define Admin_options_is_set_int(v) ((v) != Admin_options_def_int) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.