Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 59 additions & 11 deletions src/confluent_kafka/src/ShareConsumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,6 @@ static PyObject *ShareConsumer_exit(ShareConsumerHandle *self, PyObject *args) {

/**
* @brief ShareConsumer methods.
*
* TODO KIP-932: ShareConsumer is not thread-safe. Document this in the
* user-facing API, and consider raising an error if used from multiple
* threads. librdkafka enforces this on its side, but verify that the
* Python wrapper code here does not introduce additional thread-safety
* issues.
*/
static PyMethodDef ShareConsumer_methods[] = {
{"subscribe", (PyCFunction)ShareConsumer_subscribe,
Expand All @@ -943,6 +937,10 @@ static PyMethodDef ShareConsumer_methods[] = {
"\n"
" Set subscription to supplied list of topics\n"
"\n"
" Subscriptions are not incremental: this list replaces the current\n"
" subscription, if any. Passing an empty list is equivalent to\n"
" :py:func:`unsubscribe`.\n"
"\n"
" :param list(str) topics: List of topics to subscribe to.\n"
" :raises ValueError: if topics contains empty or duplicate names\n"
" :raises KafkaException: on other errors\n"
Expand Down Expand Up @@ -974,6 +972,10 @@ static PyMethodDef ShareConsumer_methods[] = {
"\n"
" Poll for a batch of messages from the share consumer.\n"
"\n"
" The consumer must be subscribed to at least one topic before\n"
" polling. In explicit acknowledgement mode, all records returned by\n"
" the previous poll must be acknowledged before polling again.\n"
"\n"
" The application must check each Message object's error() method\n"
" to distinguish between proper messages (error() returns None)\n"
" and errors.\n"
Expand All @@ -986,7 +988,10 @@ static PyMethodDef ShareConsumer_methods[] = {
"the timeout.\n"
" :rtype: Messages\n"
" :raises KafkaException: on error\n"
" :raises IllegalStateException: if called on a closed share consumer\n"
" :raises IllegalStateException: if the consumer is not subscribed to "
"any topic, if it is in explicit acknowledgement mode and not all "
"records from the previous poll have been acknowledged, or if the "
"share consumer is closed.\n"
" :raises KeyboardInterrupt: if Ctrl+C pressed during consumption\n"
"\n"},

Expand All @@ -996,7 +1001,10 @@ static PyMethodDef ShareConsumer_methods[] = {
"[ack_type=AcknowledgeType.ACCEPT])\n"
"\n"
" Acknowledge a previously polled message in explicit acknowledgement\n"
" mode. Tells the broker how to handle the record.\n"
" mode, recording how the broker should treat the record (accept it,\n"
" release it for redelivery, or reject it). This only updates local\n"
" state; the acknowledgement is sent to the broker on the next\n"
" :py:func:`poll`, :py:func:`commit_sync`, or :py:func:`commit_async`.\n"
"\n"
" :param Message message: A message returned by poll().\n"
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
Expand All @@ -1018,6 +1026,13 @@ static PyMethodDef ShareConsumer_methods[] = {
"\n"
" Acknowledge a message by topic/partition/offset.\n"
"\n"
" Use this when only the coordinates of a message are known and the\n"
" Message object returned by :py:func:`poll` is not available. Like\n"
" :py:func:`acknowledge` (with which it is otherwise equivalent),\n"
" this only updates local state; the acknowledgement is sent to the\n"
" broker on the next :py:func:`poll`, :py:func:`commit_sync`, or\n"
" :py:func:`commit_async`.\n"
"\n"
" :param str topic: Topic name.\n"
" :param int partition: Partition id.\n"
" :param int offset: Offset to acknowledge.\n"
Expand Down Expand Up @@ -1103,10 +1118,12 @@ static PyMethodDef ShareConsumer_methods[] = {
{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
".. py:function:: close()\n"
"\n"
" Close the share consumer.\n"
" Close the share consumer, cleanly releasing its resources.\n"
"\n"
" This method should be called to properly clean up the share consumer\n"
" and leave the share group.\n"
" Any pending acknowledgements are committed and the consumer leaves\n"
" the share group before returning. This blocks until the broker has\n"
" confirmed the outstanding acknowledgements and the group has been\n"
" left, up to roughly ``socket.timeout.ms``.\n"
"\n"
" :raises KafkaException: on error\n"
"\n"},
Expand Down Expand Up @@ -1326,9 +1343,40 @@ PyTypeObject ShareConsumerType = {
"assigned to multiple consumers. Messages are delivered to only one "
"consumer.\n"
"\n"
"Each message returned by :py:func:`poll` is *acquired* for a limited "
"time and must be acknowledged before its lock expires. An "
"unacknowledged record is released back to the share group and may be "
"redelivered, possibly to a different consumer. The lock duration is a "
"broker/group setting (``group.share.record.lock.duration.ms``, 30 "
"seconds by default) and is not configured on this client.\n"
"\n"
"``share.acknowledgement.mode`` selects how records are acknowledged. In "
"``implicit`` mode (the default) the records from a poll are accepted "
"automatically on the next :py:func:`poll` or commit. In ``explicit`` "
"mode the application must mark each record with :py:func:`acknowledge` "
"(or :py:func:`acknowledge_offset`) and then call :py:func:`commit_sync` "
"or :py:func:`commit_async`.\n"
"\n"
"The application must call :py:func:`poll` at least once every "
"``max.poll.interval.ms``; otherwise the broker considers the member "
"failed and removes it from the group.\n"
"\n"
":param dict config: Configuration properties. At a minimum, "
"``group.id`` **must** be set and ``bootstrap.servers`` **should** be "
"set.\n"
"\n"
".. note::\n"
" Always call :py:func:`close` when finished, or use the consumer as\n"
" a context manager. close() commits pending acknowledgements and\n"
" leaves the share group; skipping it leaks resources and holds the\n"
" records acquired by this consumer until their locks expire.\n"
"\n"
".. warning::\n"
" The share consumer is not thread-safe. A single instance must not\n"
" be used concurrently from more than one thread; a method invoked\n"
" while another call is already in progress on a different thread\n"
" raises :py:class:`ConcurrentModificationException`. Use a separate\n"
" consumer instance per thread.\n"
"\n", /*tp_doc*/
(traverseproc)ShareConsumer_traverse, /* tp_traverse */
(inquiry)ShareConsumer_clear, /* tp_clear */
Expand Down