-
Notifications
You must be signed in to change notification settings - Fork 951
[KIP-932] Add share consumer commit workflow #2241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev_kip-932_share_consumer_ack
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,8 +41,8 @@ | |
|
|
||
| rd_kafka_share_t *rkshare; | ||
|
|
||
| /* TODO KIP-932: Remove after interface of librdkafka is updated to | ||
| * return double pointer */ | ||
|
Check warning on line 45 in src/confluent_kafka/src/ShareConsumer.c
|
||
| size_t batch_size; | ||
|
|
||
| } ShareConsumerHandle; | ||
|
|
@@ -59,8 +59,8 @@ | |
| if (self->rkshare) { | ||
| CallState cs; | ||
| CallState_begin(&self->base, &cs); | ||
| /* TODO KIP-932: Use rd_kafka_share_destroy_flags() once | ||
| * available in the librdkafka public API. */ | ||
|
Check warning on line 63 in src/confluent_kafka/src/ShareConsumer.c
|
||
| rd_kafka_share_destroy(self->rkshare); | ||
| self->rkshare = NULL; | ||
| CallState_end(&self->base, &cs); | ||
|
|
@@ -108,7 +108,7 @@ | |
| c_topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist)); | ||
| for (i = 0; i < PyList_Size(tlist); i++) { | ||
| PyObject *o = PyList_GetItem(tlist, i); | ||
| PyObject *uo, *uo8 = NULL; | ||
|
Check warning on line 111 in src/confluent_kafka/src/ShareConsumer.c
|
||
| if (!(uo = cfl_PyObject_Unistr(o))) { | ||
| PyErr_SetString(PyExc_TypeError, | ||
| "expected list of unicode strings"); | ||
|
|
@@ -240,7 +240,7 @@ | |
| CallState_begin(&self->base, &cs); | ||
|
|
||
| /* Chunked polling pattern for signal interruptibility */ | ||
| while (1) { | ||
|
Check warning on line 243 in src/confluent_kafka/src/ShareConsumer.c
|
||
| chunk_timeout_ms = calculate_chunk_timeout( | ||
| total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS); | ||
| if (chunk_timeout_ms == 0) { | ||
|
|
@@ -419,6 +419,73 @@ | |
| } | ||
|
|
||
|
|
||
| /** | ||
| * @brief Commit pending acknowledgements. | ||
| * | ||
| * Implicit mode auto-converts ACQUIRED → ACCEPT inside librdkafka before | ||
| * sending. Sync mode returns a per-partition result list; async mode returns | ||
| * None. By default, commit is asynchronous. In async mode, the application | ||
| * receives no immediate feedback. In sync mode, the application blocks until | ||
| * the broker responds or the timeout expires, and receives a list of | ||
| * TopicPartition where each entry's .error field carries the per-partition | ||
| * acknowledgement result. | ||
| */ | ||
| static PyObject *ShareConsumer_commit(ShareConsumerHandle *self, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To match the Java signature we should have two separate commit Apis, commitSync and commitAsync
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does. |
||
| PyObject *args, | ||
| PyObject *kwargs) { | ||
| int async = 1; | ||
| double tmout = -1.0; | ||
| PyObject *async_o = NULL; | ||
| rd_kafka_error_t *error = NULL; | ||
| rd_kafka_topic_partition_list_t *c_parts = NULL; | ||
| PyThreadState *thread_state = NULL; | ||
| PyObject *plist; | ||
| static char *kws[] = {"asynchronous", "timeout", NULL}; | ||
|
|
||
| if (!self->rkshare) { | ||
| PyErr_SetString(PyExc_RuntimeError, | ||
| ERR_MSG_SHARE_CONSUMER_CLOSED); | ||
| return NULL; | ||
| } | ||
|
|
||
| if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Od", kws, &async_o, | ||
| &tmout)) | ||
| return NULL; | ||
|
|
||
| if (async_o) | ||
| async = PyObject_IsTrue(async_o); | ||
|
|
||
| if (async) { | ||
| error = rd_kafka_share_commit_async(self->rkshare); | ||
| if (error) { | ||
| cfl_PyErr_from_error_destroy(error); | ||
| return NULL; | ||
| } | ||
| Py_RETURN_NONE; | ||
| } | ||
|
|
||
| thread_state = PyEval_SaveThread(); | ||
| error = rd_kafka_share_commit_sync(self->rkshare, cfl_timeout_ms(tmout), | ||
| &c_parts); | ||
| PyEval_RestoreThread(thread_state); | ||
|
|
||
| if (error) { | ||
| if (c_parts) | ||
| rd_kafka_topic_partition_list_destroy(c_parts); | ||
| cfl_PyErr_from_error_destroy(error); | ||
| return NULL; | ||
| } | ||
|
|
||
| /* No pending acks: librdkafka returns NULL c_parts. */ | ||
| if (!c_parts) | ||
| Py_RETURN_NONE; | ||
|
|
||
| plist = c_parts_to_py(c_parts); | ||
| rd_kafka_topic_partition_list_destroy(c_parts); | ||
| return plist; | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * @brief Close the share consumer. | ||
| */ | ||
|
|
@@ -431,8 +498,8 @@ | |
| Py_RETURN_NONE; | ||
|
|
||
| CallState_begin(&self->base, &cs); | ||
| /* TODO KIP-932: rd_kafka_share_consumer_close() return type will change | ||
| * to rd_kafka_error_t *. Update error handling accordingly. */ | ||
|
Check warning on line 502 in src/confluent_kafka/src/ShareConsumer.c
|
||
| err = rd_kafka_share_consumer_close(self->rkshare); | ||
| rd_kafka_share_destroy(self->rkshare); | ||
| self->rkshare = NULL; | ||
|
|
@@ -453,7 +520,7 @@ | |
| * @brief Context manager entry — returns self. | ||
| */ | ||
| static PyObject *ShareConsumer_enter(ShareConsumerHandle *self, | ||
| PyObject *ignore) { | ||
|
Check warning on line 523 in src/confluent_kafka/src/ShareConsumer.c
|
||
| Py_INCREF(self); | ||
| return (PyObject *)self; | ||
| } | ||
|
|
@@ -462,7 +529,7 @@ | |
| * @brief Context manager exit — calls close(). | ||
| */ | ||
| static PyObject *ShareConsumer_exit(ShareConsumerHandle *self, PyObject *args) { | ||
| PyObject *exc_type, *exc_value, *exc_traceback; | ||
|
Check warning on line 532 in src/confluent_kafka/src/ShareConsumer.c
|
||
|
|
||
| if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, &exc_type, &exc_value, | ||
| &exc_traceback)) | ||
|
|
@@ -572,6 +639,45 @@ | |
| " :raises RuntimeError: if called on a closed share consumer.\n" | ||
| "\n"}, | ||
|
|
||
| {"commit", (PyCFunction)ShareConsumer_commit, METH_VARARGS | METH_KEYWORDS, | ||
| ".. py:function:: commit([asynchronous=True], [timeout=-1])\n" | ||
| "\n" | ||
| " Commit pending acknowledgements.\n" | ||
| "\n" | ||
| " In implicit acknowledgement mode " | ||
| "(``share.acknowledgement.mode=implicit``,\n" | ||
| " the default), all records returned by the previous :py:func:`poll` " | ||
| "call\n" | ||
| " are auto-converted to ACCEPT before being sent. In explicit mode, " | ||
| "only\n" | ||
| " records previously passed to :py:func:`acknowledge` /\n" | ||
| " :py:func:`acknowledge_offset` are sent.\n" | ||
| "\n" | ||
| " :param bool asynchronous: If True (default), return immediately " | ||
| "without\n" | ||
| " waiting for broker confirmation (returns " | ||
| "None).\n" | ||
| " If False, block until the broker replies " | ||
| "or\n" | ||
| " the timeout expires, and return a list of\n" | ||
| " TopicPartition where each entry's " | ||
| "``.error``\n" | ||
| " field carries the per-partition " | ||
| "acknowledgement\n" | ||
| " result.\n" | ||
| " :param float timeout: Maximum time to block in synchronous mode " | ||
| "(seconds).\n" | ||
| " Default: -1 (infinite). Ignored in async " | ||
| "mode.\n" | ||
| " :returns: None when ``asynchronous=True`` or when there are no " | ||
| "pending\n" | ||
| " acknowledgements; otherwise a list of TopicPartition " | ||
| "results.\n" | ||
| " :rtype: None | list(TopicPartition)\n" | ||
| " :raises KafkaException: on error\n" | ||
| " :raises RuntimeError: if called on a closed share consumer\n" | ||
| "\n"}, | ||
|
|
||
| {"close", (PyCFunction)ShareConsumer_close, METH_NOARGS, | ||
| ".. py:function:: close()\n" | ||
| "\n" | ||
|
|
@@ -583,9 +689,9 @@ | |
| " :raises KafkaException: on error\n" | ||
| "\n"}, | ||
|
|
||
| /* TODO KIP-932: Add set_sasl_credentials once librdkafka exposes | ||
| * rd_kafka_sasl_set_credentials() (or the underlying rd_kafka_t *) | ||
| * for rd_kafka_share_t handles. */ | ||
|
Check warning on line 694 in src/confluent_kafka/src/ShareConsumer.c
|
||
|
|
||
| {"__enter__", (PyCFunction)ShareConsumer_enter, METH_NOARGS, | ||
| "Context manager entry."}, | ||
|
|
@@ -616,8 +722,8 @@ | |
| kwargs))) | ||
| return -1; /* Exception raised by common_conf_setup() */ | ||
|
|
||
| /* TODO KIP-932: Remove after interface of librdkafka is updated to | ||
| * return double pointer */ | ||
|
Check warning on line 726 in src/confluent_kafka/src/ShareConsumer.c
|
||
| self->batch_size = 10005; | ||
|
|
||
| self->rkshare = | ||
|
|
@@ -629,13 +735,13 @@ | |
| return -1; | ||
| } | ||
|
|
||
| /* TODO KIP-932: call rd_kafka_set_log_queue() once librdkafka adds a | ||
| * rd_kafka_share_set_log_queue() wrapper — needs rd_kafka_t *, which | ||
| * is opaque inside rd_kafka_share_t in the public API. */ | ||
|
Check warning on line 740 in src/confluent_kafka/src/ShareConsumer.c
|
||
|
|
||
| /* TODO KIP-932: call rd_kafka_sasl_background_callbacks_enable() for | ||
| * OAuth once librdkafka adds a share-level wrapper for the same reason. | ||
| */ | ||
|
Check warning on line 744 in src/confluent_kafka/src/ShareConsumer.c
|
||
|
|
||
|
|
||
| return 0; | ||
|
|
@@ -646,7 +752,7 @@ | |
| * @brief ShareConsumer __new__ method. | ||
| */ | ||
| static PyObject * | ||
| ShareConsumer_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { | ||
|
Check warning on line 755 in src/confluent_kafka/src/ShareConsumer.c
|
||
| return type->tp_alloc(type, 0); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java returns a Map<TopicPartition, Optional>, we should match that signature