From 07e20d4ea224c2bf4cfd0df2b3475e8fe3fe5b53 Mon Sep 17 00:00:00 2001 From: Kaushik Raina Date: Fri, 26 Jun 2026 14:17:42 +0530 Subject: [PATCH] [KIP-932] - Update ShareConsumer api docs --- src/confluent_kafka/src/ShareConsumer.c | 70 +++++++++++++++++++++---- 1 file changed, 59 insertions(+), 11 deletions(-) diff --git a/src/confluent_kafka/src/ShareConsumer.c b/src/confluent_kafka/src/ShareConsumer.c index 6435b9864..73188c938 100644 --- a/src/confluent_kafka/src/ShareConsumer.c +++ b/src/confluent_kafka/src/ShareConsumer.c @@ -929,12 +929,6 @@ static PyObject *ShareConsumer_exit(ShareConsumerHandle *self, PyObject *args) { /** * @brief ShareConsumer methods. - * - * TODO KIP-932: ShareConsumer is not thread-safe. Document this in the - * user-facing API, and consider raising an error if used from multiple - * threads. librdkafka enforces this on its side, but verify that the - * Python wrapper code here does not introduce additional thread-safety - * issues. */ static PyMethodDef ShareConsumer_methods[] = { {"subscribe", (PyCFunction)ShareConsumer_subscribe, @@ -943,6 +937,10 @@ static PyMethodDef ShareConsumer_methods[] = { "\n" " Set subscription to supplied list of topics\n" "\n" + " Subscriptions are not incremental: this list replaces the current\n" + " subscription, if any. Passing an empty list is equivalent to\n" + " :py:func:`unsubscribe`.\n" + "\n" " :param list(str) topics: List of topics to subscribe to.\n" " :raises ValueError: if topics contains empty or duplicate names\n" " :raises KafkaException: on other errors\n" @@ -974,6 +972,10 @@ static PyMethodDef ShareConsumer_methods[] = { "\n" " Poll for a batch of messages from the share consumer.\n" "\n" + " The consumer must be subscribed to at least one topic before\n" + " polling. In explicit acknowledgement mode, all records returned by\n" + " the previous poll must be acknowledged before polling again.\n" + "\n" " The application must check each Message object's error() method\n" " to distinguish between proper messages (error() returns None)\n" " and errors.\n" @@ -986,7 +988,10 @@ static PyMethodDef ShareConsumer_methods[] = { "the timeout.\n" " :rtype: Messages\n" " :raises KafkaException: on error\n" - " :raises IllegalStateException: if called on a closed share consumer\n" + " :raises IllegalStateException: if the consumer is not subscribed to " + "any topic, if it is in explicit acknowledgement mode and not all " + "records from the previous poll have been acknowledged, or if the " + "share consumer is closed.\n" " :raises KeyboardInterrupt: if Ctrl+C pressed during consumption\n" "\n"}, @@ -996,7 +1001,10 @@ static PyMethodDef ShareConsumer_methods[] = { "[ack_type=AcknowledgeType.ACCEPT])\n" "\n" " Acknowledge a previously polled message in explicit acknowledgement\n" - " mode. Tells the broker how to handle the record.\n" + " mode, recording how the broker should treat the record (accept it,\n" + " release it for redelivery, or reject it). This only updates local\n" + " state; the acknowledgement is sent to the broker on the next\n" + " :py:func:`poll`, :py:func:`commit_sync`, or :py:func:`commit_async`.\n" "\n" " :param Message message: A message returned by poll().\n" " :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or " @@ -1018,6 +1026,13 @@ static PyMethodDef ShareConsumer_methods[] = { "\n" " Acknowledge a message by topic/partition/offset.\n" "\n" + " Use this when only the coordinates of a message are known and the\n" + " Message object returned by :py:func:`poll` is not available. Like\n" + " :py:func:`acknowledge` (with which it is otherwise equivalent),\n" + " this only updates local state; the acknowledgement is sent to the\n" + " broker on the next :py:func:`poll`, :py:func:`commit_sync`, or\n" + " :py:func:`commit_async`.\n" + "\n" " :param str topic: Topic name.\n" " :param int partition: Partition id.\n" " :param int offset: Offset to acknowledge.\n" @@ -1103,10 +1118,12 @@ static PyMethodDef ShareConsumer_methods[] = { {"close", (PyCFunction)ShareConsumer_close, METH_NOARGS, ".. py:function:: close()\n" "\n" - " Close the share consumer.\n" + " Close the share consumer, cleanly releasing its resources.\n" "\n" - " This method should be called to properly clean up the share consumer\n" - " and leave the share group.\n" + " Any pending acknowledgements are committed and the consumer leaves\n" + " the share group before returning. This blocks until the broker has\n" + " confirmed the outstanding acknowledgements and the group has been\n" + " left, up to roughly ``socket.timeout.ms``.\n" "\n" " :raises KafkaException: on error\n" "\n"}, @@ -1326,9 +1343,40 @@ PyTypeObject ShareConsumerType = { "assigned to multiple consumers. Messages are delivered to only one " "consumer.\n" "\n" + "Each message returned by :py:func:`poll` is *acquired* for a limited " + "time and must be acknowledged before its lock expires. An " + "unacknowledged record is released back to the share group and may be " + "redelivered, possibly to a different consumer. The lock duration is a " + "broker/group setting (``group.share.record.lock.duration.ms``, 30 " + "seconds by default) and is not configured on this client.\n" + "\n" + "``share.acknowledgement.mode`` selects how records are acknowledged. In " + "``implicit`` mode (the default) the records from a poll are accepted " + "automatically on the next :py:func:`poll` or commit. In ``explicit`` " + "mode the application must mark each record with :py:func:`acknowledge` " + "(or :py:func:`acknowledge_offset`) and then call :py:func:`commit_sync` " + "or :py:func:`commit_async`.\n" + "\n" + "The application must call :py:func:`poll` at least once every " + "``max.poll.interval.ms``; otherwise the broker considers the member " + "failed and removes it from the group.\n" + "\n" ":param dict config: Configuration properties. At a minimum, " "``group.id`` **must** be set and ``bootstrap.servers`` **should** be " "set.\n" + "\n" + ".. note::\n" + " Always call :py:func:`close` when finished, or use the consumer as\n" + " a context manager. close() commits pending acknowledgements and\n" + " leaves the share group; skipping it leaks resources and holds the\n" + " records acquired by this consumer until their locks expire.\n" + "\n" + ".. warning::\n" + " The share consumer is not thread-safe. A single instance must not\n" + " be used concurrently from more than one thread; a method invoked\n" + " while another call is already in progress on a different thread\n" + " raises :py:class:`ConcurrentModificationException`. Use a separate\n" + " consumer instance per thread.\n" "\n", /*tp_doc*/ (traverseproc)ShareConsumer_traverse, /* tp_traverse */ (inquiry)ShareConsumer_clear, /* tp_clear */