Do not clear offsets after failure to commit offsets due to rebalance exception#6346
Conversation
| } catch (final RebalanceInProgressException ex) { | ||
| LOG.error("Failed to commit offsets in topic {} due to rebalance in progress", topicName, ex); | ||
| try { | ||
| Thread.sleep(100); |
There was a problem hiding this comment.
Why do we sleep? Is it related to retrying? If so, can we add some check that we are not still rebalancing?
There was a problem hiding this comment.
The sleep is meant to add some backoff to let the rebalance complete without just spamming commit offsets. Supposedly we have to call poll again to complete the rebalance though, so maybe sleep is not that useful.
There was a problem hiding this comment.
Yes. It looks like sleep() is not needed.
There was a problem hiding this comment.
Pushed a revision removing the sleep
| throw new RuntimeException("Interrupted while waiting to retry after failing to commit offsets"); | ||
| } | ||
| return; | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Currently, this exception set the lastCommitTime. But, now we don't. Is that ok?
There was a problem hiding this comment.
It seems like it was incorrectly saying we committed offsets when we didn't if we get an exception
There was a problem hiding this comment.
@dlvenable it is correct because the last commit time should be set only if commit really happens.
… exception Signed-off-by: Taylor Gray <tylgry@amazon.com>
5f72d6c to
fd976fe
Compare
… exception (opensearch-project#6346) Signed-off-by: Taylor Gray <tylgry@amazon.com> Signed-off-by: Nathan Wand <wandna@amazon.com>
… exception (opensearch-project#6346) Signed-off-by: Taylor Gray <tylgry@amazon.com> Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
… exception (opensearch-project#6346) Signed-off-by: Taylor Gray <tylgry@amazon.com>
… exception (opensearch-project#6346) Signed-off-by: Taylor Gray <tylgry@amazon.com>
Description
This change handles RebalanceInProgressException by not clearing offsets on this exception and retrying, rather than completely clearing offsets and not committing them, which can lead to duplicate processing unnecessarily (in the case that retry would succeed after rebalance)
Tested by modifying the integration test locally to kill one of the consumers and verifying that the data is still processed.
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.