[KIP-932] Add share consumer commit workflow#2241
[KIP-932] Add share consumer commit workflow#2241Kaushik Raina (k-raina) wants to merge 2 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
|
| * TopicPartition where each entry's .error field carries the per-partition | ||
| * acknowledgement result. | ||
| */ | ||
| static PyObject *ShareConsumer_commit(ShareConsumerHandle *self, |
There was a problem hiding this comment.
To match the Java signature we should have two separate commit Apis, commitSync and commitAsync
There was a problem hiding this comment.
Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does.
| * the broker responds or the timeout expires, and receives a list of | ||
| * TopicPartition where each entry's .error field carries the per-partition |
There was a problem hiding this comment.
Java returns a Map<TopicPartition, Optional>, we should match that signature
Ojasva Jain (ojasvajain)
left a comment
There was a problem hiding this comment.
Left a couple of comments. Can review again after we modify the APIs according to Java client.
| * TopicPartition where each entry's .error field carries the per-partition | ||
| * acknowledgement result. | ||
| */ | ||
| static PyObject *ShareConsumer_commit(ShareConsumerHandle *self, |
There was a problem hiding this comment.
Agreed. Also, there are 2 variations of commitSync in java, one that doesn't accept a timeout and the one that does.
| /* TODO KIP-932: rd_kafka_share_consumer_close() return type will change | ||
| * to rd_kafka_error_t *. Update error handling accordingly. */ | ||
| err = rd_kafka_share_consumer_close(self->rkshare); |
There was a problem hiding this comment.
You might want to address this todo now, now that close changes are merged. Otherwise, an error is thrown when trying to link with the qfk branch of librdkafka.


Summary
Design decisions
rd_kafka_share_commit_sync()drains pending acks into the request, so retrying it would silently lose the first call's results. Sync commit just releases the GIL and blocks.