Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ class ShareConsumer:
def acknowledge_offset(
self, topic: str, partition: int, offset: int, ack_type: int = 1
) -> None: ...
def commit(
self, asynchronous: bool = True, timeout: float = -1
) -> Optional[List[TopicPartition]]: ...
def close(self) -> None: ...
def __enter__(self) -> "ShareConsumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
Expand Down
106 changes: 106 additions & 0 deletions src/confluent_kafka/src/ShareConsumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@

rd_kafka_share_t *rkshare;

/* TODO KIP-932: Remove after interface of librdkafka is updated to
* return double pointer */

Check warning on line 45 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=e744a927-602a-4098-9625-1fc90131285a&open=e744a927-602a-4098-9625-1fc90131285a
size_t batch_size;

} ShareConsumerHandle;
Expand All @@ -59,8 +59,8 @@
if (self->rkshare) {
CallState cs;
CallState_begin(&self->base, &cs);
/* TODO KIP-932: Use rd_kafka_share_destroy_flags() once
* available in the librdkafka public API. */

Check warning on line 63 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=afc7d51b-e5e4-429f-9eb3-5f0343314819&open=afc7d51b-e5e4-429f-9eb3-5f0343314819
rd_kafka_share_destroy(self->rkshare);
self->rkshare = NULL;
CallState_end(&self->base, &cs);
Expand Down Expand Up @@ -108,7 +108,7 @@
c_topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
for (i = 0; i < PyList_Size(tlist); i++) {
PyObject *o = PyList_GetItem(tlist, i);
PyObject *uo, *uo8 = NULL;

Check warning on line 111 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define each identifier in a dedicated statement.

[S1659] Multiple variables should not be declared on the same line See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=cfd85d57-a3b4-4030-acce-79284347a6da&open=cfd85d57-a3b4-4030-acce-79284347a6da
if (!(uo = cfl_PyObject_Unistr(o))) {
PyErr_SetString(PyExc_TypeError,
"expected list of unicode strings");
Expand Down Expand Up @@ -240,7 +240,7 @@
CallState_begin(&self->base, &cs);

/* Chunked polling pattern for signal interruptibility */
while (1) {

Check warning on line 243 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Reduce the number of nested "break" statements from 3 to 1 authorized.

[S924] Loops should not have more than one "break" or "goto" statement See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=99c463ee-0b35-4611-83fe-18f56721fd56&open=99c463ee-0b35-4611-83fe-18f56721fd56
chunk_timeout_ms = calculate_chunk_timeout(
total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS);
if (chunk_timeout_ms == 0) {
Expand Down Expand Up @@ -419,6 +419,73 @@
}


/**
* @brief Commit pending acknowledgements.
*
* Implicit mode auto-converts ACQUIRED → ACCEPT inside librdkafka before
* sending. Sync mode returns a per-partition result list; async mode returns
* None. By default, commit is asynchronous. In async mode, the application
* receives no immediate feedback. In sync mode, the application blocks until
* the broker responds or the timeout expires, and receives a list of
* TopicPartition where each entry's .error field carries the per-partition
Comment on lines +429 to +430
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java returns a Map<TopicPartition, Optional>, we should match that signature

* acknowledgement result.
*/
static PyObject *ShareConsumer_commit(ShareConsumerHandle *self,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match the Java signature we should have two separate commit Apis, commitSync and commitAsync

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does.

PyObject *args,
PyObject *kwargs) {
int async = 1;
double tmout = -1.0;
PyObject *async_o = NULL;
rd_kafka_error_t *error = NULL;
rd_kafka_topic_partition_list_t *c_parts = NULL;
PyThreadState *thread_state = NULL;
PyObject *plist;
static char *kws[] = {"asynchronous", "timeout", NULL};

if (!self->rkshare) {
PyErr_SetString(PyExc_RuntimeError,
ERR_MSG_SHARE_CONSUMER_CLOSED);
return NULL;
}

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Od", kws, &async_o,
&tmout))
return NULL;

if (async_o)
async = PyObject_IsTrue(async_o);

if (async) {
error = rd_kafka_share_commit_async(self->rkshare);
if (error) {
cfl_PyErr_from_error_destroy(error);
return NULL;
}
Py_RETURN_NONE;
}

thread_state = PyEval_SaveThread();
error = rd_kafka_share_commit_sync(self->rkshare, cfl_timeout_ms(tmout),
&c_parts);
PyEval_RestoreThread(thread_state);

if (error) {
if (c_parts)
rd_kafka_topic_partition_list_destroy(c_parts);
cfl_PyErr_from_error_destroy(error);
return NULL;
}

/* No pending acks: librdkafka returns NULL c_parts. */
if (!c_parts)
Py_RETURN_NONE;

plist = c_parts_to_py(c_parts);
rd_kafka_topic_partition_list_destroy(c_parts);
return plist;
}


/**
* @brief Close the share consumer.
*/
Expand All @@ -431,8 +498,8 @@
Py_RETURN_NONE;

CallState_begin(&self->base, &cs);
/* TODO KIP-932: rd_kafka_share_consumer_close() return type will change
* to rd_kafka_error_t *. Update error handling accordingly. */

Check warning on line 502 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=37475a01-7f9d-4e0f-81a1-1fd4f644472f&open=37475a01-7f9d-4e0f-81a1-1fd4f644472f
err = rd_kafka_share_consumer_close(self->rkshare);
rd_kafka_share_destroy(self->rkshare);
self->rkshare = NULL;
Expand All @@ -453,7 +520,7 @@
* @brief Context manager entry — returns self.
*/
static PyObject *ShareConsumer_enter(ShareConsumerHandle *self,
PyObject *ignore) {

Check warning on line 523 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove the unused parameter "ignore", make it unnamed, or declare it "[[maybe_unused]]".

[S1172] Unused function parameters should be removed See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=08f2b418-5087-483a-ada2-dfb63ac3ed68&open=08f2b418-5087-483a-ada2-dfb63ac3ed68
Py_INCREF(self);
return (PyObject *)self;
}
Expand All @@ -462,7 +529,7 @@
* @brief Context manager exit — calls close().
*/
static PyObject *ShareConsumer_exit(ShareConsumerHandle *self, PyObject *args) {
PyObject *exc_type, *exc_value, *exc_traceback;

Check warning on line 532 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define each identifier in a dedicated statement.

[S1659] Multiple variables should not be declared on the same line See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=ecd590c1-4134-4b63-94f2-87527ffcc648&open=ecd590c1-4134-4b63-94f2-87527ffcc648

if (!PyArg_UnpackTuple(args, "__exit__", 3, 3, &exc_type, &exc_value,
&exc_traceback))
Expand Down Expand Up @@ -572,6 +639,45 @@
" :raises RuntimeError: if called on a closed share consumer.\n"
"\n"},

{"commit", (PyCFunction)ShareConsumer_commit, METH_VARARGS | METH_KEYWORDS,
".. py:function:: commit([asynchronous=True], [timeout=-1])\n"
"\n"
" Commit pending acknowledgements.\n"
"\n"
" In implicit acknowledgement mode "
"(``share.acknowledgement.mode=implicit``,\n"
" the default), all records returned by the previous :py:func:`poll` "
"call\n"
" are auto-converted to ACCEPT before being sent. In explicit mode, "
"only\n"
" records previously passed to :py:func:`acknowledge` /\n"
" :py:func:`acknowledge_offset` are sent.\n"
"\n"
" :param bool asynchronous: If True (default), return immediately "
"without\n"
" waiting for broker confirmation (returns "
"None).\n"
" If False, block until the broker replies "
"or\n"
" the timeout expires, and return a list of\n"
" TopicPartition where each entry's "
"``.error``\n"
" field carries the per-partition "
"acknowledgement\n"
" result.\n"
" :param float timeout: Maximum time to block in synchronous mode "
"(seconds).\n"
" Default: -1 (infinite). Ignored in async "
"mode.\n"
" :returns: None when ``asynchronous=True`` or when there are no "
"pending\n"
" acknowledgements; otherwise a list of TopicPartition "
"results.\n"
" :rtype: None | list(TopicPartition)\n"
" :raises KafkaException: on error\n"
" :raises RuntimeError: if called on a closed share consumer\n"
"\n"},

{"close", (PyCFunction)ShareConsumer_close, METH_NOARGS,
".. py:function:: close()\n"
"\n"
Expand All @@ -583,9 +689,9 @@
" :raises KafkaException: on error\n"
"\n"},

/* TODO KIP-932: Add set_sasl_credentials once librdkafka exposes
* rd_kafka_sasl_set_credentials() (or the underlying rd_kafka_t *)
* for rd_kafka_share_t handles. */

Check warning on line 694 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=c8ebb68d-2b60-45e8-8f97-7bad12e24139&open=c8ebb68d-2b60-45e8-8f97-7bad12e24139

{"__enter__", (PyCFunction)ShareConsumer_enter, METH_NOARGS,
"Context manager entry."},
Expand Down Expand Up @@ -616,8 +722,8 @@
kwargs)))
return -1; /* Exception raised by common_conf_setup() */

/* TODO KIP-932: Remove after interface of librdkafka is updated to
* return double pointer */

Check warning on line 726 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=0e305db9-d3e6-4c39-b451-7e043da345d4&open=0e305db9-d3e6-4c39-b451-7e043da345d4
self->batch_size = 10005;

self->rkshare =
Expand All @@ -629,13 +735,13 @@
return -1;
}

/* TODO KIP-932: call rd_kafka_set_log_queue() once librdkafka adds a
* rd_kafka_share_set_log_queue() wrapper — needs rd_kafka_t *, which
* is opaque inside rd_kafka_share_t in the public API. */

Check warning on line 740 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=3c439906-5555-4866-9415-439a6cc34759&open=3c439906-5555-4866-9415-439a6cc34759

/* TODO KIP-932: call rd_kafka_sasl_background_callbacks_enable() for
* OAuth once librdkafka adds a share-level wrapper for the same reason.
*/

Check warning on line 744 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=996216b2-2b14-4184-8ca1-58b387b6de4d&open=996216b2-2b14-4184-8ca1-58b387b6de4d


return 0;
Expand All @@ -646,7 +752,7 @@
* @brief ShareConsumer __new__ method.
*/
static PyObject *
ShareConsumer_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {

Check warning on line 755 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove the unused parameter "kwargs", make it unnamed, or declare it "[[maybe_unused]]".

[S1172] Unused function parameters should be removed See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=a4a42471-752a-4810-b2c5-c1c0f8c11fa7&open=a4a42471-752a-4810-b2c5-c1c0f8c11fa7

Check warning on line 755 in src/confluent_kafka/src/ShareConsumer.c

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Remove the unused parameter "args", make it unnamed, or declare it "[[maybe_unused]]".

[S1172] Unused function parameters should be removed See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2241&issues=0a6425ad-3ff8-4e0e-abd2-0dceec074545&open=0a6425ad-3ff8-4e0e-abd2-0dceec074545
return type->tp_alloc(type, 0);
}

Expand Down
Loading
Loading