Skip to content

Commit d600e92

Browse files
authored
[KIP-932] Add share consumer commit workflow (#2241)
1 parent ec91f71 commit d600e92

8 files changed

Lines changed: 2102 additions & 34 deletions

File tree

src/confluent_kafka/cimpl.pyi

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@ class Message:
284284
timestamp: Optional[Tuple[int, int]] = ...,
285285
latency: Optional[float] = ...,
286286
leader_epoch: Optional[int] = ...,
287+
delivery_count: Optional[int] = ...,
287288
) -> None: ...
288289
def topic(self) -> Optional[str]: ...
289290
def partition(self) -> Optional[int]: ...
@@ -295,6 +296,7 @@ class Message:
295296
def timestamp(self) -> Tuple[int, int]: ... # (timestamp_type, timestamp)
296297
def latency(self) -> Optional[float]: ...
297298
def leader_epoch(self) -> Optional[int]: ...
299+
def delivery_count(self) -> Optional[int]: ...
298300
def set_headers(self, headers: HeadersType) -> None: ...
299301
def set_key(self, key: Any) -> None: ...
300302
def set_value(self, value: Any) -> None: ...
@@ -343,13 +345,9 @@ class Producer:
343345
Producer({'bootstrap.servers': 'localhost:9092'})
344346
"""
345347
...
348+
346349
@overload
347-
def __init__(
348-
self,
349-
config: Dict[str, Any],
350-
/,
351-
**kwargs: Any
352-
) -> None:
350+
def __init__(self, config: Dict[str, Any], /, **kwargs: Any) -> None:
353351
"""
354352
Create Producer with configuration dict and additional keyword arguments.
355353
Keyword arguments override values in the config dict.
@@ -363,6 +361,7 @@ class Producer:
363361
Producer({'bootstrap.servers': 'localhost'}, enable_idempotence=True)
364362
"""
365363
...
364+
366365
@overload
367366
def __init__(self, **config: Any) -> None:
368367
"""
@@ -376,6 +375,7 @@ class Producer:
376375
Producer(bootstrap_servers='localhost:9092')
377376
"""
378377
...
378+
379379
def produce(
380380
self,
381381
topic: str,
@@ -426,13 +426,9 @@ class Consumer:
426426
Consumer({'bootstrap.servers': 'localhost', 'group.id': 'mygroup'})
427427
"""
428428
...
429+
429430
@overload
430-
def __init__(
431-
self,
432-
config: dict[str, Any],
433-
/,
434-
**kwargs: Any
435-
) -> None:
431+
def __init__(self, config: dict[str, Any], /, **kwargs: Any) -> None:
436432
"""
437433
Create Consumer with configuration dict and additional keyword arguments.
438434
Keyword arguments override values in the config dict.
@@ -446,6 +442,7 @@ class Consumer:
446442
Consumer({'bootstrap.servers': 'localhost'}, group_id='mygroup')
447443
"""
448444
...
445+
449446
@overload
450447
def __init__(self, **config: Any) -> None:
451448
"""
@@ -459,6 +456,7 @@ class Consumer:
459456
Consumer(bootstrap_servers='localhost', group_id='mygroup')
460457
"""
461458
...
459+
462460
def subscribe(
463461
self,
464462
topics: List[str],
@@ -482,6 +480,7 @@ class Consumer:
482480
Message and offsets omitted, asynchronous.
483481
"""
484482
...
483+
485484
@overload
486485
def commit(
487486
self,
@@ -492,6 +491,7 @@ class Consumer:
492491
Message and offsets omitted, synchronous.
493492
"""
494493
...
494+
495495
@overload
496496
def commit(
497497
self,
@@ -503,6 +503,7 @@ class Consumer:
503503
Message specified, asynchronous.
504504
"""
505505
...
506+
506507
@overload
507508
def commit(
508509
self,
@@ -514,17 +515,19 @@ class Consumer:
514515
Message specified, synchronous.
515516
"""
516517
...
518+
517519
@overload
518520
def commit(
519-
self,
520-
*,
521-
offsets: List[TopicPartition],
522-
asynchronous: Literal[True] = ...,
521+
self,
522+
*,
523+
offsets: List[TopicPartition],
524+
asynchronous: Literal[True] = ...,
523525
) -> None:
524526
"""
525527
Offsets specified, asynchronous.
526528
"""
527529
...
530+
528531
@overload
529532
def commit(
530533
self,
@@ -536,6 +539,7 @@ class Consumer:
536539
Offsets specified, synchronous
537540
"""
538541
...
542+
539543
def get_watermark_offsets(
540544
self, partition: TopicPartition, timeout: float = -1, cached: bool = False
541545
) -> Tuple[int, int]: ...
@@ -560,6 +564,7 @@ class Consumer:
560564

561565
class ShareConsumer:
562566
"""Share Consumer for queue-like message consumption (KIP-932)."""
567+
563568
@overload
564569
def __init__(self, config: Dict[str, Any]) -> None: ...
565570
@overload
@@ -579,6 +584,12 @@ class ShareConsumer:
579584
def acknowledge_offset(
580585
self, topic: str, partition: int, offset: int, ack_type: AcknowledgeType = ...
581586
) -> None: ...
587+
# TODO KIP-932: Java's share-consumer commit returns a map keyed by
588+
# TopicIdPartition (topic name + topic UUID + partition). Python uses
589+
# the existing TopicPartition (no UUID) for now. Add a TopicIdPartition
590+
# class once the interface is finalized.
591+
def commit_sync(self, timeout: float = 60) -> Dict[TopicPartition, Optional[KafkaError]]: ...
592+
def commit_async(self) -> None: ...
582593
def close(self) -> None: ...
583594
def __enter__(self) -> "ShareConsumer": ...
584595
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...

src/confluent_kafka/src/ShareConsumer.c

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,102 @@ static PyObject *ShareConsumer_acknowledge_offset(ShareConsumerHandle *self,
494494
}
495495

496496

497+
/**
498+
* @brief Synchronously commit pending acknowledgements.
499+
*
500+
* Implicit mode auto-converts the records returned by the latest poll() to
501+
* ACCEPT inside librdkafka before sending. Blocks until all broker replies
502+
* arrive or the timeout expires.
503+
*
504+
* @returns dict mapping TopicPartition -> None on success or KafkaError on
505+
* per-partition failure. Empty dict when no acknowledgements are
506+
* pending.
507+
*/
508+
static PyObject *ShareConsumer_commit_sync(ShareConsumerHandle *self,
509+
PyObject *args,
510+
PyObject *kwargs) {
511+
/* TODO KIP-932: check if kwargs is needed */
512+
/* TODO KIP-932: pick the right default timeout */
513+
double tmout = 60.0;
514+
rd_kafka_error_t *error = NULL;
515+
rd_kafka_topic_partition_list_t *c_parts = NULL;
516+
PyObject *result = NULL;
517+
CallState cs;
518+
static char *kws[] = {"timeout", NULL};
519+
520+
if (!self->rkshare) {
521+
PyErr_SetString(PyExc_RuntimeError,
522+
ERR_MSG_SHARE_CONSUMER_CLOSED);
523+
goto err;
524+
}
525+
526+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
527+
goto err;
528+
529+
CallState_begin(&self->base, &cs);
530+
error = rd_kafka_share_commit_sync(self->rkshare, cfl_timeout_ms(tmout),
531+
&c_parts);
532+
if (!CallState_end(&self->base, &cs))
533+
goto err;
534+
535+
if (error) {
536+
cfl_PyErr_from_error_destroy(error);
537+
error = NULL;
538+
goto err;
539+
}
540+
541+
/* TODO KIP-932: c_parts shouldn't be NULL here, drop once librdkafka
542+
* guarantees it */
543+
if (!c_parts)
544+
return PyDict_New();
545+
546+
result = c_parts_to_dict_topic_partition_to_error(c_parts);
547+
rd_kafka_topic_partition_list_destroy(c_parts);
548+
return result;
549+
550+
err:
551+
if (c_parts)
552+
rd_kafka_topic_partition_list_destroy(c_parts);
553+
if (error)
554+
rd_kafka_error_destroy(error);
555+
return NULL;
556+
}
557+
558+
559+
/**
560+
* @brief Asynchronously commit pending acknowledgements.
561+
*
562+
* Returns immediately; broker results are delivered via the
563+
* share_acknowledgement_commit_cb (when configured).
564+
*/
565+
static PyObject *ShareConsumer_commit_async(ShareConsumerHandle *self,
566+
PyObject *ignore) {
567+
rd_kafka_error_t *error;
568+
CallState cs;
569+
570+
if (!self->rkshare) {
571+
PyErr_SetString(PyExc_RuntimeError,
572+
ERR_MSG_SHARE_CONSUMER_CLOSED);
573+
return NULL;
574+
}
575+
576+
CallState_begin(&self->base, &cs);
577+
error = rd_kafka_share_commit_async(self->rkshare);
578+
if (!CallState_end(&self->base, &cs)) {
579+
if (error)
580+
rd_kafka_error_destroy(error);
581+
return NULL;
582+
}
583+
584+
if (error) {
585+
cfl_PyErr_from_error_destroy(error);
586+
return NULL;
587+
}
588+
589+
Py_RETURN_NONE;
590+
}
591+
592+
497593
/**
498594
* @brief Close the share consumer.
499595
*/
@@ -674,6 +770,47 @@ static PyMethodDef ShareConsumer_methods[] = {
674770
" :raises RuntimeError: if called on a closed share consumer.\n"
675771
"\n"},
676772

773+
{"commit_sync", (PyCFunction)ShareConsumer_commit_sync,
774+
METH_VARARGS | METH_KEYWORDS,
775+
".. py:function:: commit_sync([timeout=60])\n"
776+
"\n"
777+
" Synchronously commit pending acknowledgements and block until the\n"
778+
" broker responds or the timeout expires.\n"
779+
"\n"
780+
" In implicit acknowledgement mode "
781+
"(``share.acknowledgement.mode=implicit``,\n"
782+
" the default), all records returned by the previous :py:func:`poll` "
783+
"call\n"
784+
" are auto-converted to ACCEPT before being sent. In explicit mode, "
785+
"only\n"
786+
" records previously passed to :py:func:`acknowledge` /\n"
787+
" :py:func:`acknowledge_offset` are sent.\n"
788+
"\n"
789+
" :param float timeout: Maximum time to block (seconds). Default: 60.\n"
790+
" Pass -1 for infinite.\n"
791+
" :returns: Dict mapping TopicPartition to None on success or "
792+
"KafkaError\n"
793+
" on per-partition failure. Empty dict when no\n"
794+
" acknowledgements are pending.\n"
795+
" :rtype: dict(TopicPartition, KafkaError | None)\n"
796+
" :raises KafkaException: on error\n"
797+
" :raises RuntimeError: if called on a closed share consumer\n"
798+
" :raises TypeError: if timeout is not a float\n"
799+
"\n"},
800+
801+
{"commit_async", (PyCFunction)ShareConsumer_commit_async, METH_NOARGS,
802+
".. py:function:: commit_async()\n"
803+
"\n"
804+
" Asynchronously commit pending acknowledgements. Returns immediately;\n"
805+
" broker results are delivered via the configured\n"
806+
" ``share_acknowledgement_commit_cb`` callback.\n"
807+
"\n"
808+
" :returns: None\n"
809+
" :raises KafkaException: on error\n"
810+
" :raises RuntimeError: if called on a closed share consumer\n"
811+
" :raises TypeError: if any arguments are passed\n"
812+
"\n"},
813+
677814
{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
678815
".. py:function:: close()\n"
679816
"\n"

0 commit comments

Comments
 (0)