Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from ._model import Node # noqa: F401
from ._model import (
AcknowledgeType,
ConsumerGroupState,
ConsumerGroupTopicPartitions,
ConsumerGroupType,
Expand Down Expand Up @@ -86,6 +87,7 @@
"TopicCollection",
"TopicPartitionInfo",
"ElectionType",
"AcknowledgeType",
]


Expand Down
19 changes: 18 additions & 1 deletion src/confluent_kafka/_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from enum import Enum, IntEnum
from typing import List, Optional

from .. import cimpl
Expand Down Expand Up @@ -193,3 +193,20 @@ def __lt__(self, other: object) -> bool:
if not isinstance(other, ElectionType):
return NotImplemented
return self.value < other.value


class AcknowledgeType(IntEnum):
"""
Share Consumer acknowledgement type used to tell the broker how to
handle a polled message in explicit acknowledgement mode.

Values:
-------
"""

#: Record was processed successfully — broker will not redeliver it.
ACCEPT = 1
#: Could not process — Release it for another delivery attempt
RELEASE = 2
#: Could not process - Do not release for another delivery attempt
REJECT = 3
5 changes: 5 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ from typing_extensions import Literal, Self

from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata

from ._model import AcknowledgeType
from ._types import HeadersType

# Callback types with proper class references (defined locally to avoid circular imports)
Expand Down Expand Up @@ -564,6 +565,10 @@ class ShareConsumer:
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
def poll(self, timeout: float = -1) -> List[Message]: ...
def acknowledge(self, message: Message, ack_type: Union[AcknowledgeType, int] = ...) -> None: ...
def acknowledge_offset(
self, topic: str, partition: int, offset: int, ack_type: Union[AcknowledgeType, int] = ...
) -> None: ...
def close(self) -> None: ...
def __enter__(self) -> "ShareConsumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
Expand Down
136 changes: 136 additions & 0 deletions src/confluent_kafka/src/ShareConsumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,104 @@ static PyObject *ShareConsumer_poll(ShareConsumerHandle *self,
}


/**
* @brief Acknowledge previously polled message.
*
* Internally delegates to rd_kafka_share_acknowledge_offset() because the
* Python Message object does not retain the underlying rd_kafka_message_t
* pointer (Message_new0 copies fields out and destroys the rkm)
*/
Comment on lines +313 to +319
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a TODO to check whether we should match the Java functionality that we should only acknowledge the successful messages with this API and the error messages with the offset API?
Like for the case of Java, they throw an error if the successful messages are processed with the offset API. I think that decision will be finalised once we have the discussion on the differences between the Java clients and the NJC clients

static PyObject *ShareConsumer_acknowledge(ShareConsumerHandle *self,
PyObject *args,
PyObject *kwargs) {
Message *msg = NULL;
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
PyObject *uo8 = NULL;
const char *topic;
rd_kafka_resp_err_t err;
static char *kws[] = {"message", "ack_type", NULL};

if (!self->rkshare) {
PyErr_SetString(PyExc_RuntimeError,
ERR_MSG_SHARE_CONSUMER_CLOSED);
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!|i", kws,
&MessageType, &msg, &ack_type))
return NULL;

/* No need to range-check ack_type here —
* rd_kafka_share_acknowledge_offset() does it and returns INVALID_ARG,
* which we'll surface as KafkaException. */

if (!msg->topic || msg->topic == Py_None) {
PyErr_SetString(PyExc_ValueError, "Message topic is None");
return NULL;
}

topic = cfl_PyUnistr_AsUTF8(msg->topic, &uo8);
if (!topic) {
Py_XDECREF(uo8);
return NULL;
}

err = rd_kafka_share_acknowledge_offset(
self->rkshare, topic, msg->partition, msg->offset,
(rd_kafka_share_AcknowledgeType_t)ack_type);

Py_XDECREF(uo8);

if (err) {
cfl_PyErr_Format(err, "Failed to acknowledge message: %s",
rd_kafka_err2str(err));
return NULL;
}

Py_RETURN_NONE;
}


/**
* @brief Acknowledge a message by topic/partition/offset directly.
*/
static PyObject *ShareConsumer_acknowledge_offset(ShareConsumerHandle *self,
PyObject *args,
PyObject *kwargs) {
const char *topic;
int partition;
long long offset;
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
rd_kafka_resp_err_t err;
static char *kws[] = {"topic", "partition", "offset", "ack_type", NULL};

if (!self->rkshare) {
PyErr_SetString(PyExc_RuntimeError,
ERR_MSG_SHARE_CONSUMER_CLOSED);
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "siL|i", kws, &topic,
&partition, &offset, &ack_type))
return NULL;

/* ack_type, partition and offset are all validated by
* rd_kafka_share_acknowledge_offset()*/

err = rd_kafka_share_acknowledge_offset(
self->rkshare, topic, (int32_t)partition, (int64_t)offset,
(rd_kafka_share_AcknowledgeType_t)ack_type);

if (err) {
cfl_PyErr_Format(err, "Failed to acknowledge offset: %s",
rd_kafka_err2str(err));
return NULL;
}

Py_RETURN_NONE;
}


/**
* @brief Close the share consumer.
*/
Expand Down Expand Up @@ -425,6 +523,44 @@ static PyMethodDef ShareConsumer_methods[] = {
" :raises KeyboardInterrupt: if Ctrl+C pressed during consumption\n"
"\n"},

{"acknowledge", (PyCFunction)ShareConsumer_acknowledge,
METH_VARARGS | METH_KEYWORDS,
".. py:function:: acknowledge(message, "
"[ack_type=AcknowledgeType.ACCEPT])\n"
"\n"
" Acknowledge a previously polled message in explicit acknowledgement\n"
" mode. Tells the broker how to handle the record.\n"
"\n"
" :param Message message: A message returned by poll().\n"
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
"REJECT.\n"
" :raises KafkaException: if the consumer is not in explicit\n"
" acknowledgement mode, the message is no "
"longer\n"
" in-flight, or ack_type is invalid.\n"
" :raises ValueError: if message.topic() is None.\n"
" :raises RuntimeError: if called on a closed share consumer.\n"
"\n"},

{"acknowledge_offset", (PyCFunction)ShareConsumer_acknowledge_offset,
METH_VARARGS | METH_KEYWORDS,
".. py:function:: acknowledge_offset(topic, partition, offset, "
"[ack_type=AcknowledgeType.ACCEPT])\n"
"\n"
" Acknowledge a message by topic/partition/offset.\n"
"\n"
" :param str topic: Topic name.\n"
" :param int partition: Partition id.\n"
" :param int offset: Offset to acknowledge.\n"
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
"REJECT.\n"
" :raises KafkaException: if the consumer is not in explicit\n"
" acknowledgement mode, the offset is not\n"
" in-flight, the offset is a GAP record,\n"
" or ack_type is invalid.\n"
" :raises RuntimeError: if called on a closed share consumer.\n"
"\n"},

{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
".. py:function:: close()\n"
"\n"
Expand Down
Loading