Skip to content

[KIP-932] Implement acknowledgement RPC#2239

Open
Kaushik Raina (k-raina) wants to merge 5 commits into
dev_kip-932_share_consumer_pollfrom
dev_kip-932_share_consumer_ack
Open

[KIP-932] Implement acknowledgement RPC#2239
Kaushik Raina (k-raina) wants to merge 5 commits into
dev_kip-932_share_consumer_pollfrom
dev_kip-932_share_consumer_ack

Conversation

@k-raina
Copy link
Copy Markdown
Member

Summary
  • Add ShareConsumer.acknowledge() and acknowledge_offset(), explicit-mode acknowledgement APIs in the C extension. . Internally delegates to rd_kafka_share_acknowledge_offset() to work around Message not retaining the underlying rd_kafka_message_t.
  • 12 integration tests in tests/test_ShareConsumer_ack.py covering the full ack workflow against a real KIP-932 broker:.
  • Type stubs updated in cimpl.pyi to keep mypy and IDE autocomplete in sync with the new C surface.

@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_share_consumer_ack branch from 1043228 to 09fb66d Compare April 27, 2026 14:20
@sonarqube-confluent
Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
2.3% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

@k-raina
Copy link
Copy Markdown
Member Author

Unrelated test failure as per base branch #2217 (comment)

Comment thread src/confluent_kafka/cimpl.pyi Outdated
Comment thread src/confluent_kafka/cimpl.pyi Outdated
Comment thread src/confluent_kafka/_model/__init__.py Outdated
Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Comment thread src/confluent_kafka/src/ShareConsumer.c Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits

Comment thread src/confluent_kafka/_model/__init__.py Outdated
Comment thread src/confluent_kafka/_model/__init__.py Outdated
Comment on lines +313 to +319
/**
* @brief Acknowledge previously polled message.
*
* Internally delegates to rd_kafka_share_acknowledge_offset() because the
* Python Message object does not retain the underlying rd_kafka_message_t
* pointer (Message_new0 copies fields out and destroys the rkm)
*/
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a TODO to check whether we should match the Java functionality that we should only acknowledge the successful messages with this API and the error messages with the offset API?
Like for the case of Java, they throw an error if the successful messages are processed with the offset API. I think that decision will be finalised once we have the discussion on the differences between the Java clients and the NJC clients

Copy link
Copy Markdown
Member

@ojasvajain Ojasva Jain (ojasvajain) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have resolved my earlier comments. Thanks.
Just one minor comment.

@@ -0,0 +1,591 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add license details

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants