Skip to content

[KIP-932] Add share consumer commit workflow#2241

Open
Kaushik Raina (k-raina) wants to merge 2 commits into
dev_kip-932_share_consumer_ackfrom
dev_kip-932_share_consumer_commit
Open

[KIP-932] Add share consumer commit workflow#2241
Kaushik Raina (k-raina) wants to merge 2 commits into
dev_kip-932_share_consumer_ackfrom
dev_kip-932_share_consumer_commit

Conversation

@k-raina
Copy link
Copy Markdown
Member

@k-raina Kaushik Raina (k-raina) commented Apr 29, 2026

Summary

  • Add ShareConsumer.commit(asynchronous=True, timeout=-1) to the C extension, wired to librdkafka's rd_kafka_share_commit_async() / rd_kafka_share_commit_sync().
  • 28 integration tests in tests/test_ShareConsumer_commit.py against a real KIP-932 broker
  • Type stub updated in cimpl.pyi.

Design decisions

  • Single method with asynchronous= kwarg, not separate commit_sync() / commit_async(). Matches the existing Consumer.commit() precedent in this repo.
  • Default asynchronous=True to match Consumer.commit().
  • No chunked polling on sync commit. rd_kafka_share_commit_sync() drains pending acks into the request, so retrying it would silently lose the first call's results. Sync commit just releases the GIL and blocks.

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent
Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
15.0% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

* TopicPartition where each entry's .error field carries the per-partition
* acknowledgement result.
*/
static PyObject *ShareConsumer_commit(ShareConsumerHandle *self,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match the Java signature we should have two separate commit Apis, commitSync and commitAsync

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does.

Comment on lines +429 to +430
* the broker responds or the timeout expires, and receives a list of
* TopicPartition where each entry's .error field carries the per-partition
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java returns a Map<TopicPartition, Optional>, we should match that signature

Copy link
Copy Markdown
Member

@ojasvajain Ojasva Jain (ojasvajain) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple of comments. Can review again after we modify the APIs according to Java client.

* TopicPartition where each entry's .error field carries the per-partition
* acknowledgement result.
*/
static PyObject *ShareConsumer_commit(ShareConsumerHandle *self,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does.

Comment on lines 501 to 503
/* TODO KIP-932: rd_kafka_share_consumer_close() return type will change
* to rd_kafka_error_t *. Update error handling accordingly. */
err = rd_kafka_share_consumer_close(self->rkshare);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to address this todo now, now that close changes are merged. Otherwise, an error is thrown when trying to link with the qfk branch of librdkafka.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants