Skip to content

Commit fff4584

Browse files
committed
[KIP-932] Implement acknowledgement RPC
1 parent 9580e6d commit fff4584

5 files changed

Lines changed: 779 additions & 1 deletion

File tree

src/confluent_kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from ._model import Node # noqa: F401
2222
from ._model import (
23+
AcknowledgeType,
2324
ConsumerGroupState,
2425
ConsumerGroupTopicPartitions,
2526
ConsumerGroupType,
@@ -86,6 +87,7 @@
8687
"TopicCollection",
8788
"TopicPartitionInfo",
8889
"ElectionType",
90+
"AcknowledgeType",
8991
]
9092

9193

src/confluent_kafka/_model/__init__.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from enum import Enum
15+
from enum import Enum, IntEnum
1616
from typing import List, Optional
1717

1818
from .. import cimpl
@@ -193,3 +193,19 @@ def __lt__(self, other: object) -> bool:
193193
if not isinstance(other, ElectionType):
194194
return NotImplemented
195195
return self.value < other.value
196+
197+
198+
class AcknowledgeType(IntEnum):
199+
"""
200+
Share Consumer acknowledgement type.
201+
202+
Used to tell the broker how to
203+
handle a polled message in explicit acknowledgement mode.
204+
"""
205+
206+
#: Record was processed successfully — broker will not redeliver it.
207+
ACCEPT = 1
208+
#: Could not process — return to the share group for another delivery attempt.
209+
RELEASE = 2
210+
#: Permanently failed — do NOT redeliver (poison pill).
211+
REJECT = 3

src/confluent_kafka/cimpl.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,10 @@ class ShareConsumer:
564564
def unsubscribe(self) -> None: ...
565565
def subscription(self) -> List[str]: ...
566566
def poll(self, timeout: float = -1) -> List[Message]: ...
567+
def acknowledge(self, message: Message, ack_type: int = 1) -> None: ...
568+
def acknowledge_offset(
569+
self, topic: str, partition: int, offset: int, ack_type: int = 1
570+
) -> None: ...
567571
def close(self) -> None: ...
568572
def __enter__(self) -> "ShareConsumer": ...
569573
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

src/confluent_kafka/src/ShareConsumer.c

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,115 @@ static PyObject *ShareConsumer_poll(ShareConsumerHandle *self,
310310
}
311311

312312

313+
/**
314+
* @brief Acknowledge previously polled message.
315+
*
316+
* Internally delegates to rd_kafka_share_acknowledge_offset() because the
317+
* Python Message object does not retain the underlying rd_kafka_message_t
318+
* pointer (Message_new0 copies fields out and destroys the rkm)
319+
*/
320+
static PyObject *ShareConsumer_acknowledge(ShareConsumerHandle *self,
321+
PyObject *args,
322+
PyObject *kwargs) {
323+
Message *msg = NULL;
324+
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
325+
PyObject *uo8 = NULL;
326+
const char *topic;
327+
rd_kafka_resp_err_t err;
328+
static char *kws[] = {"message", "ack_type", NULL};
329+
330+
if (!self->rkshare) {
331+
PyErr_SetString(PyExc_RuntimeError,
332+
ERR_MSG_SHARE_CONSUMER_CLOSED);
333+
return NULL;
334+
}
335+
336+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!|i", kws,
337+
&MessageType, &msg, &ack_type))
338+
return NULL;
339+
340+
if (ack_type < (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT ||
341+
ack_type > (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_REJECT) {
342+
PyErr_Format(PyExc_ValueError,
343+
"Invalid ack_type %d (expected 1=ACCEPT, "
344+
"2=RELEASE, or 3=REJECT)",
345+
ack_type);
346+
return NULL;
347+
}
348+
349+
if (!msg->topic || msg->topic == Py_None) {
350+
PyErr_SetString(PyExc_ValueError, "Message topic is None");
351+
return NULL;
352+
}
353+
354+
topic = cfl_PyUnistr_AsUTF8(msg->topic, &uo8);
355+
if (!topic) {
356+
Py_XDECREF(uo8);
357+
return NULL;
358+
}
359+
360+
err = rd_kafka_share_acknowledge_offset(
361+
self->rkshare, topic, msg->partition, msg->offset,
362+
(rd_kafka_share_AcknowledgeType_t)ack_type);
363+
364+
Py_XDECREF(uo8);
365+
366+
if (err) {
367+
cfl_PyErr_Format(err, "Failed to acknowledge message: %s",
368+
rd_kafka_err2str(err));
369+
return NULL;
370+
}
371+
372+
Py_RETURN_NONE;
373+
}
374+
375+
376+
/**
377+
* @brief Acknowledge a message by topic/partition/offset directly.
378+
*/
379+
static PyObject *ShareConsumer_acknowledge_offset(ShareConsumerHandle *self,
380+
PyObject *args,
381+
PyObject *kwargs) {
382+
const char *topic;
383+
int partition;
384+
long long offset;
385+
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
386+
rd_kafka_resp_err_t err;
387+
static char *kws[] = {"topic", "partition", "offset", "ack_type", NULL};
388+
389+
if (!self->rkshare) {
390+
PyErr_SetString(PyExc_RuntimeError,
391+
ERR_MSG_SHARE_CONSUMER_CLOSED);
392+
return NULL;
393+
}
394+
395+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "siL|i", kws, &topic,
396+
&partition, &offset, &ack_type))
397+
return NULL;
398+
399+
if (ack_type < (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT ||
400+
ack_type > (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_REJECT) {
401+
PyErr_Format(PyExc_ValueError,
402+
"Invalid ack_type %d (expected 1=ACCEPT, "
403+
"2=RELEASE, or 3=REJECT)",
404+
ack_type);
405+
return NULL;
406+
}
407+
408+
err = rd_kafka_share_acknowledge_offset(
409+
self->rkshare, topic, (int32_t)partition, (int64_t)offset,
410+
(rd_kafka_share_AcknowledgeType_t)ack_type);
411+
412+
if (err) {
413+
cfl_PyErr_Format(err, "Failed to acknowledge offset: %s",
414+
rd_kafka_err2str(err));
415+
return NULL;
416+
}
417+
418+
Py_RETURN_NONE;
419+
}
420+
421+
313422
/**
314423
* @brief Close the share consumer.
315424
*/
@@ -425,6 +534,44 @@ static PyMethodDef ShareConsumer_methods[] = {
425534
" :raises KeyboardInterrupt: if Ctrl+C pressed during consumption\n"
426535
"\n"},
427536

537+
{"acknowledge", (PyCFunction)ShareConsumer_acknowledge,
538+
METH_VARARGS | METH_KEYWORDS,
539+
".. py:function:: acknowledge(message, "
540+
"[ack_type=AcknowledgeType.ACCEPT])\n"
541+
"\n"
542+
" Acknowledge a previously polled message in explicit acknowledgement\n"
543+
" mode. Tells the broker how to handle the record.\n"
544+
"\n"
545+
" :param Message message: A message returned by poll().\n"
546+
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
547+
"REJECT.\n"
548+
" :raises KafkaException: if the consumer is not in explicit\n"
549+
" acknowledgement mode, the message is no "
550+
"longer\n"
551+
" in-flight, or the offset is a GAP record.\n"
552+
" :raises ValueError: if ack_type is not 1, 2, or 3, or if "
553+
"message.topic()\n"
554+
" is None.\n"
555+
" :raises RuntimeError: if called on a closed share consumer.\n"
556+
"\n"},
557+
558+
{"acknowledge_offset", (PyCFunction)ShareConsumer_acknowledge_offset,
559+
METH_VARARGS | METH_KEYWORDS,
560+
".. py:function:: acknowledge_offset(topic, partition, offset, "
561+
"[ack_type=AcknowledgeType.ACCEPT])\n"
562+
"\n"
563+
" Acknowledge a message by topic/partition/offset.\n"
564+
"\n"
565+
" :param str topic: Topic name.\n"
566+
" :param int partition: Partition id.\n"
567+
" :param int offset: Offset to acknowledge.\n"
568+
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
569+
"REJECT.\n"
570+
" :raises KafkaException: same conditions as :py:func:`acknowledge`.\n"
571+
" :raises ValueError: if ack_type is not 1, 2, or 3.\n"
572+
" :raises RuntimeError: if called on a closed share consumer.\n"
573+
"\n"},
574+
428575
{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
429576
".. py:function:: close()\n"
430577
"\n"

0 commit comments

Comments
 (0)