Skip to content

Commit ec91f71

Browse files
authored
[KIP-932] Implement acknowledgement RPC (#2239)
1 parent b5cb017 commit ec91f71

10 files changed

Lines changed: 1090 additions & 28 deletions

File tree

src/confluent_kafka/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from ._model import Node # noqa: F401
2222
from ._model import (
23+
AcknowledgeType,
2324
ConsumerGroupState,
2425
ConsumerGroupTopicPartitions,
2526
ConsumerGroupType,
@@ -86,6 +87,7 @@
8687
"TopicCollection",
8788
"TopicPartitionInfo",
8889
"ElectionType",
90+
"AcknowledgeType",
8991
]
9092

9193

src/confluent_kafka/_model/__init__.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from enum import Enum
15+
from enum import Enum, IntEnum
1616
from typing import List, Optional
1717

1818
from .. import cimpl
@@ -193,3 +193,20 @@ def __lt__(self, other: object) -> bool:
193193
if not isinstance(other, ElectionType):
194194
return NotImplemented
195195
return self.value < other.value
196+
197+
198+
class AcknowledgeType(IntEnum):
199+
"""
200+
Share Consumer acknowledgement type used to tell the broker how to
201+
handle a polled message in explicit acknowledgement mode.
202+
203+
Values:
204+
-------
205+
"""
206+
207+
#: Record was processed successfully — broker will not redeliver it.
208+
ACCEPT = cimpl.SHARE_ACKNOWLEDGE_TYPE_ACCEPT
209+
#: Could not process — Release it for another delivery attempt
210+
RELEASE = cimpl.SHARE_ACKNOWLEDGE_TYPE_RELEASE
211+
#: Could not process - Do not release for another delivery attempt
212+
REJECT = cimpl.SHARE_ACKNOWLEDGE_TYPE_REJECT

src/confluent_kafka/cimpl.pyi

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ except ImportError:
4545

4646
from confluent_kafka.admin._metadata import ClusterMetadata, GroupMetadata
4747

48+
from ._model import AcknowledgeType
4849
from ._types import HeadersType
4950

5051
# Callback types with proper class references (defined locally to avoid circular imports)
@@ -83,6 +84,7 @@ class KafkaError:
8384
ILLEGAL_GENERATION: int
8485
ILLEGAL_SASL_STATE: int
8586
INCONSISTENT_GROUP_PROTOCOL: int
87+
INCONSISTENT_TOPIC_ID: int
8688
INCONSISTENT_VOTER_SET: int
8789
INVALID_COMMIT_OFFSET_SIZE: int
8890
INVALID_CONFIG: int
@@ -567,7 +569,16 @@ class ShareConsumer:
567569
def subscribe(self, topics: List[str]) -> None: ...
568570
def unsubscribe(self) -> None: ...
569571
def subscription(self) -> List[str]: ...
572+
# TODO KIP-932: poll() returns List[Message] today. Java returns a
573+
# dedicated container object (ConsumerRecords) instead of a list so it
574+
# can carry extra metadata alongside the records. Replace List[Message]
575+
# with a Messages container class once we have a clear use for that
576+
# metadata in Python.
570577
def poll(self, timeout: float = -1) -> List[Message]: ...
578+
def acknowledge(self, message: Message, ack_type: AcknowledgeType = ...) -> None: ...
579+
def acknowledge_offset(
580+
self, topic: str, partition: int, offset: int, ack_type: AcknowledgeType = ...
581+
) -> None: ...
571582
def close(self) -> None: ...
572583
def __enter__(self) -> "ShareConsumer": ...
573584
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
@@ -786,6 +797,9 @@ OFFSET_SPEC_EARLIEST: int
786797
OFFSET_SPEC_LATEST: int
787798
OFFSET_SPEC_MAX_TIMESTAMP: int
788799
OFFSET_STORED: int
800+
SHARE_ACKNOWLEDGE_TYPE_ACCEPT: int
801+
SHARE_ACKNOWLEDGE_TYPE_RELEASE: int
802+
SHARE_ACKNOWLEDGE_TYPE_REJECT: int
789803
RESOURCE_ANY: int
790804
RESOURCE_BROKER: int
791805
RESOURCE_GROUP: int

src/confluent_kafka/src/AdminTypes.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,16 @@ static void AdminTypes_AddObjectsElectionType(PyObject *m) {
658658
RD_KAFKA_ELECTION_TYPE_UNCLEAN);
659659
}
660660

661+
static void AdminTypes_AddObjectsShareAcknowledgeType(PyObject *m) {
662+
/* rd_kafka_share_AcknowledgeType_t */
663+
PyModule_AddIntConstant(m, "SHARE_ACKNOWLEDGE_TYPE_ACCEPT",
664+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT);
665+
PyModule_AddIntConstant(m, "SHARE_ACKNOWLEDGE_TYPE_RELEASE",
666+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_RELEASE);
667+
PyModule_AddIntConstant(m, "SHARE_ACKNOWLEDGE_TYPE_REJECT",
668+
RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_REJECT);
669+
}
670+
661671
/**
662672
* @brief Add Admin types to module
663673
*/
@@ -679,4 +689,5 @@ void AdminTypes_AddObjects(PyObject *m) {
679689
AdminTypes_AddObjectsIsolationLevel(m);
680690
AdminTypes_AddObjectsOffsetSpec(m);
681691
AdminTypes_AddObjectsElectionType(m);
692+
AdminTypes_AddObjectsShareAcknowledgeType(m);
682693
}

src/confluent_kafka/src/ShareConsumer.c

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,107 @@ static PyObject *ShareConsumer_poll(ShareConsumerHandle *self,
393393
}
394394

395395

396+
/**
397+
* @brief Acknowledge previously polled message.
398+
*
399+
* Internally delegates to rd_kafka_share_acknowledge_offset() because the
400+
* Python Message object does not retain the underlying rd_kafka_message_t
401+
* pointer (Message_new0 copies fields out and destroys the rkm)
402+
*
403+
* TODO KIP-932: Java splits ack APIs by message kind — successful records
404+
* go through acknowledge(message), error/GAP records through
405+
* acknowledge_offset(topic, partition, offset), and crossing the wires
406+
* throws. NJC clients (this one included) accept either. Revisit once the
407+
* Java-vs-NJC alignment is settled.
408+
*/
409+
static PyObject *ShareConsumer_acknowledge(ShareConsumerHandle *self,
410+
PyObject *args,
411+
PyObject *kwargs) {
412+
Message *msg = NULL;
413+
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
414+
PyObject *uo8 = NULL;
415+
const char *topic = NULL;
416+
rd_kafka_resp_err_t err;
417+
static char *kws[] = {"message", "ack_type", NULL};
418+
419+
if (!self->rkshare) {
420+
PyErr_SetString(PyExc_RuntimeError,
421+
ERR_MSG_SHARE_CONSUMER_CLOSED);
422+
return NULL;
423+
}
424+
425+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!|i", kws,
426+
&MessageType, &msg, &ack_type))
427+
return NULL;
428+
429+
/* Validation (ack_type range, topic, partition, offset) is left to
430+
* librdkafka so every failure surfaces as KafkaException. Pass NULL
431+
* topic through for the None case rather than raising ValueError. */
432+
433+
if (msg->topic && msg->topic != Py_None) {
434+
topic = cfl_PyUnistr_AsUTF8(msg->topic, &uo8);
435+
if (!topic) {
436+
Py_XDECREF(uo8);
437+
return NULL;
438+
}
439+
}
440+
441+
err = rd_kafka_share_acknowledge_offset(
442+
self->rkshare, topic, msg->partition, msg->offset,
443+
(rd_kafka_share_AcknowledgeType_t)ack_type);
444+
445+
Py_XDECREF(uo8);
446+
447+
if (err) {
448+
cfl_PyErr_Format(err, "Failed to acknowledge message: %s",
449+
rd_kafka_err2str(err));
450+
return NULL;
451+
}
452+
453+
Py_RETURN_NONE;
454+
}
455+
456+
457+
/**
458+
* @brief Acknowledge a message by topic/partition/offset directly.
459+
*/
460+
static PyObject *ShareConsumer_acknowledge_offset(ShareConsumerHandle *self,
461+
PyObject *args,
462+
PyObject *kwargs) {
463+
const char *topic;
464+
int partition;
465+
long long offset;
466+
int ack_type = (int)RD_KAFKA_SHARE_ACKNOWLEDGE_TYPE_ACCEPT;
467+
rd_kafka_resp_err_t err;
468+
static char *kws[] = {"topic", "partition", "offset", "ack_type", NULL};
469+
470+
if (!self->rkshare) {
471+
PyErr_SetString(PyExc_RuntimeError,
472+
ERR_MSG_SHARE_CONSUMER_CLOSED);
473+
return NULL;
474+
}
475+
476+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "siL|i", kws, &topic,
477+
&partition, &offset, &ack_type))
478+
return NULL;
479+
480+
/* ack_type, partition and offset are all validated by
481+
* rd_kafka_share_acknowledge_offset()*/
482+
483+
err = rd_kafka_share_acknowledge_offset(
484+
self->rkshare, topic, (int32_t)partition, (int64_t)offset,
485+
(rd_kafka_share_AcknowledgeType_t)ack_type);
486+
487+
if (err) {
488+
cfl_PyErr_Format(err, "Failed to acknowledge offset: %s",
489+
rd_kafka_err2str(err));
490+
return NULL;
491+
}
492+
493+
Py_RETURN_NONE;
494+
}
495+
496+
396497
/**
397498
* @brief Close the share consumer.
398499
*/
@@ -514,6 +615,65 @@ static PyMethodDef ShareConsumer_methods[] = {
514615
" :raises KeyboardInterrupt: if Ctrl+C pressed during consumption\n"
515616
"\n"},
516617

618+
/* TODO KIP-932: librdkafka error code → Python exception mapping is
619+
* provisional. Today the share consumer translates every librdkafka
620+
* error code into KafkaException via cfl_PyErr_Format(). Longer term we
621+
* want each code to map to the Python exception a user porting from
622+
* Java would expect, e.g.
623+
* _INVALID_ARG → ValueError (matches Java IllegalArgumentException)
624+
* _STATE → RuntimeError (matches Java IllegalStateException)
625+
* Open question: per-partition broker errors in commit_sync's result
626+
* dict (mirrors Java's Map<TopicIdPartition, Optional<...>>) — keep as
627+
* KafkaError, or translate as well?
628+
*
629+
* Revisit holistically once:
630+
* - librdkafka's share-consumer error surface is stable (some codes
631+
* may be redefined as work progresses), and
632+
* - the equivalent translation lands on commit_sync / commit_async /
633+
* ack-callback paths (currently TODO'd separately).
634+
*/
635+
{"acknowledge", (PyCFunction)ShareConsumer_acknowledge,
636+
METH_VARARGS | METH_KEYWORDS,
637+
".. py:function:: acknowledge(message, "
638+
"[ack_type=AcknowledgeType.ACCEPT])\n"
639+
"\n"
640+
" Acknowledge a previously polled message in explicit acknowledgement\n"
641+
" mode. Tells the broker how to handle the record.\n"
642+
"\n"
643+
" :param Message message: A message returned by poll().\n"
644+
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
645+
"REJECT.\n"
646+
" :raises TypeError: if message is not a Message instance or ack_type "
647+
"is not an integer.\n"
648+
" :raises KafkaException: if the consumer is not in explicit\n"
649+
" acknowledgement mode, the message is no "
650+
"longer\n"
651+
" in-flight, ack_type is invalid, or "
652+
"message.topic() is None.\n"
653+
" :raises RuntimeError: if called on a closed share consumer.\n"
654+
"\n"},
655+
656+
{"acknowledge_offset", (PyCFunction)ShareConsumer_acknowledge_offset,
657+
METH_VARARGS | METH_KEYWORDS,
658+
".. py:function:: acknowledge_offset(topic, partition, offset, "
659+
"[ack_type=AcknowledgeType.ACCEPT])\n"
660+
"\n"
661+
" Acknowledge a message by topic/partition/offset.\n"
662+
"\n"
663+
" :param str topic: Topic name.\n"
664+
" :param int partition: Partition id.\n"
665+
" :param int offset: Offset to acknowledge.\n"
666+
" :param AcknowledgeType ack_type: ACCEPT (default), RELEASE, or "
667+
"REJECT.\n"
668+
" :raises TypeError: if topic is not a str, partition/offset are not "
669+
"integers, or ack_type is not an integer.\n"
670+
" :raises KafkaException: if the consumer is not in explicit\n"
671+
" acknowledgement mode, the offset is not\n"
672+
" in-flight, the offset is a GAP record,\n"
673+
" or ack_type is invalid.\n"
674+
" :raises RuntimeError: if called on a closed share consumer.\n"
675+
"\n"},
676+
517677
{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
518678
".. py:function:: close()\n"
519679
"\n"

tests/common/__init__.py

Lines changed: 56 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import uuid
2323

2424
from confluent_kafka import Consumer, ShareConsumer
25+
from confluent_kafka.admin import AlterConfigOpType, ConfigEntry, ConfigResource
2526

2627
_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL'
2728

@@ -146,42 +147,71 @@ def unique_id(prefix):
146147
return f'{prefix}-{uuid.uuid4().hex[:10]}'
147148

148149

149-
def drain_share_consumers(consumers, n_expected, timeout_s=20.0, poll_timeout_s=0.5):
150-
"""Round-robin poll until total non-error messages reach n_expected.
150+
def set_group_config(kafka_cluster, group_id, name, value):
151+
"""Set one dynamic group config via incremental_alter_configs."""
152+
res = ConfigResource(
153+
ConfigResource.Type.GROUP,
154+
group_id,
155+
incremental_configs=[
156+
ConfigEntry(name, str(value), incremental_operation=AlterConfigOpType.SET),
157+
],
158+
)
159+
for f in kafka_cluster.admin().incremental_alter_configs([res]).values():
160+
f.result()
151161

152-
Returns a list of message lists, one per input consumer, in the same order.
153-
Stops early once the expected total is reached, or when timeout_s elapses.
154162

155-
Tests with N>2 consumers under the suite-wide
156-
group.share.record.lock.duration.ms=1000
157-
should lower poll_timeout_s so a full round-robin round completes within
158-
the 1s lock window — otherwise locks expire before the same consumer's
159-
next poll can implicit-ack, and records get redelivered to other
160-
consumers (i.e. apparent duplicate delivery).
163+
def poll_first_batch(consumer, timeout_s=20.0, poll_timeout_s=0.5):
164+
"""Return the first non-empty batch of non-error messages from a share consumer.
161165
162-
IMPORTANT: implicit-ack only. This helper assumes share consumers are in
163-
implicit-ack mode (the only mode the Python wrapper currently exposes).
164-
In implicit mode, the second poll() automatically acknowledges records
165-
delivered by the first, so the loop here is safe.
166+
Polls repeatedly until at least one non-error message arrives, or until
167+
timeout_s elapses. Returns the contents of a single poll() call — does not
168+
accumulate across polls and does not acknowledge anything.
166169
167-
TODO KIP-932: when explicit-ack mode lands in the Python wrapper
168-
(ShareConsumer.acknowledge()), update this helper to ack each message as
169-
it's drained, otherwise the broker will return INFLIGHT-records errors
170-
on the second poll. Same caveat exists in the librdkafka tests.
170+
Use this when a test needs to inspect the first records the broker delivers
171+
*before* any implicit- or explicit-ack happens. Unlike drain_share_consumers,
172+
which loops poll() calls and therefore implicit-acks each prior batch on
173+
every iteration, poll_first_batch returns whatever the first non-empty
174+
poll() returned and stops, leaving acknowledgement to the caller.
171175
172-
TODO KIP-932: after the final poll() in the loop below, the last batch of
173-
records is never implicitly acknowledged (no subsequent poll to piggyback
174-
on). Some tests assume the tail batch is ack'd. Fix once explicit-ack is
175-
exposed: emit an explicit ack for the last drained batch before returning.
176+
Returns an empty list if timeout_s elapses without any non-error message.
177+
"""
178+
deadline = time.time() + timeout_s
179+
while time.time() < deadline:
180+
batch = [msg for msg in consumer.poll(timeout=poll_timeout_s) if msg.error() is None]
181+
if batch:
182+
return batch
183+
return []
184+
185+
186+
def drain_share_consumers(consumers, total_messages, timeout_s=20.0, poll_timeout_s=0.5, *, ack_type=None):
187+
"""Round-robin poll consumers until total_messages non-error messages arrive (across all consumers combined).
188+
189+
Returns one message list per consumer, in input order. Stops at
190+
total_messages or timeout_s.
191+
192+
With N>2 consumers and the suite-wide 1s lock duration, drop poll_timeout_s
193+
so each round finishes within the lock window — otherwise records leak to
194+
other consumers and look like duplicate deliveries.
195+
196+
ack_type=None drains in implicit-ack mode (next poll() auto-acks the prior
197+
batch; the tail batch is left unacked). Pass an AcknowledgeType to ack each
198+
message inline — required when share.acknowledgement.mode=explicit.
199+
200+
TODO KIP-932: once commit_sync() / commit_async() are exposed on the
201+
ShareConsumer binding, the implicit-ack drain (ack_type=None) should
202+
still emit a commit on the tail batch so callers don't have to chase
203+
leaked records. Add a commit step here once those APIs land.
176204
"""
177205
received = [[] for _ in consumers]
178206
deadline = time.time() + timeout_s
179207
while time.time() < deadline:
180208
for sc, bucket in zip(consumers, received):
181-
for m in sc.poll(timeout=poll_timeout_s):
182-
if m.error() is None:
183-
bucket.append(m)
184-
if sum(len(b) for b in received) >= n_expected:
209+
for msg in sc.poll(timeout=poll_timeout_s):
210+
if msg.error() is None:
211+
bucket.append(msg)
212+
if ack_type is not None:
213+
sc.acknowledge(msg, ack_type)
214+
if sum(len(bucket) for bucket in received) >= total_messages:
185215
break
186216
return received
187217

0 commit comments

Comments
 (0)