GH-4465: Fix unbounded async retry re-delivery#4469
Conversation
Fixes spring-projectsGH-4465. `KafkaMessageListenerContainer$ListenerConsumer#handleAsyncFailure` re-queued a `FailedRecordTuple` whenever the error handler reported that the record was still in retry. For a seek-after-handling error handler (e.g. `DefaultErrorHandler`), the handler has already repositioned the consumer to the failed offset, so the next poll re-delivers the record and produces a fresh `FailedRecordTuple` through the async failure callback - which means the same record gets processed twice per loop, once from the queue and once from the seek-induced re-delivery. The `FailedRecordTracker` attempt counter is inflated correspondingly. When backoff exhausts and the tracker entry is removed during recovery, the duplicate still in the queue starts a new retry cycle, producing the unbounded re-delivery reported in the issue. The fix drops the re-queue. The `FailedRecordTracker` entry is keyed by topic-partition-offset and survives across loop iterations, so attempts continue to accumulate correctly across the natural re-deliveries. A regression test (`test suspend function bounded retries with CommonErrorHandler`) is added to `EnableKafkaKotlinCoroutinesTests`. It uses `DefaultErrorHandler(FixedBackOff(100L, 2L))` against an always-failing suspend listener and asserts the listener is invoked exactly three times, then verifies no further deliveries occur for two more seconds. Without the fix the test fails with `expected: 3 but was: 6`. Signed-off-by: BK202503 <199436087+BK202503@users.noreply.github.com>
| assertThat(this.config.boundedRetryRecoveredLatch.await(10, TimeUnit.SECONDS)).isTrue() | ||
| // Give the container a generous window in which any unbounded re-delivery | ||
| // loop would visibly grow the counter past the expected bound. | ||
| Thread.sleep(2_000) |
There was a problem hiding this comment.
Do we really need this time delay?
Isn't that assertion on the boundedRetryRecoveredLatch enough?
According to the report claim, we had an unbound retries on the record.
Therefore, in my feeling such a recoverer would not run.
No? Do I miss anything yet?
Thanks
There was a problem hiding this comment.
Good point on the bare sleep — switched the bounded-retry test to an Awaitility pollDelay(2s).atMost(3s).untilAsserted { ... == 3 } instead, which expresses the same intent ("the count must reach 3 and stay at 3") without an unconditional Thread.sleep. Pushed as f60f214.
(The latch alone wasn't sufficient: it's CountDownLatch(1), so under the bug it still trips on the first recovery while subsequent cycles keep re-delivering the record — the actual regression signal is the delivery counter staying at 3.)
…y test Addresses review feedback on spring-projectsGH-4469: the latch only signals the first recovery, so under the bug subsequent re-delivery cycles would still satisfy `latch.await(...)`. The actual regression check is the delivery counter, which now uses Awaitility's `pollDelay + atMost + untilAsserted` to express "the count must reach 3 and stay at 3" without a bare sleep. Signed-off-by: BK202503 <199436087+BK202503@users.noreply.github.com>
|
Good point on the bare sleep — switched the bounded-retry test to an Awaitility (The latch alone wasn't sufficient: it's |
Fixes #4465.
Problem
With the change merged in #4254, async listener exceptions (Kotlin
suspend,Mono<?>,CompletableFuture<?>) propagate to a seek-after-handlingCommonErrorHandlersuch asDefaultErrorHandler. That path correctly drivesFailedRecordTrackerand seeks the consumer back to the failed offset for retry. After backoff is exhausted, the recoverer runs and the tracker entry is cleared.However, the listener keeps being re-delivered forever instead of stopping after the configured number of retries (issue #4465). The reporter's minimal repro shows ~18–20 deliveries within 10s for
FixedBackOff(100L, 2L)(which should produce exactly 3), and a freshBackoff … exhausted for suspend-topic-0@0log line on every cycle.Root cause
KafkaMessageListenerContainer$ListenerConsumer#handleAsyncFailure(lines 1513–1536 onmain) re-queues aFailedRecordTuplewheneverinvokeErrorHandlerBySingleRecordreports the record is still in retry:For a seek-after-handling error handler, the handler has already repositioned the consumer to the failed offset, so the next poll re-delivers the record and produces another
FailedRecordTuplevia the async failure callback. Each loop iteration therefore processes the same record twice — once from the re-queued tuple, once from the seek-induced re-delivery — inflatingFailedRecordTracker's attempt counter at double the actual rate.When backoff exhausts and the recoverer clears the tracker entry, the duplicate still sitting in
failedRecordsis then processed against an empty tracker, which creates a brand-newFailedRecord(attempts = 1)and starts a fresh retry cycle. That's the unbounded loop reported on the issue.The synchronous (blocking) listener path is not affected because the listener throws before the container can ack, so no async failure callback fires and the queue stays empty.
Fix
Drop the re-queue. The
FailedRecordTrackerentry is keyed by topic-partition-offset and survives across loop iterations independently of thefailedRecordsqueue, so the attempt counter continues to advance correctly through the natural re-deliveries triggered by the handler's seek.A trace through the fixed code with
FixedBackOff(100L, 2L):[FR_1][][FR_2][FR_2][][FR_3][FR_3][][][]Three deliveries, then quiescence — matching the blocking listener.
Test
Added
test suspend function bounded retries with CommonErrorHandleralongside the existing test inEnableKafkaKotlinCoroutinesTests(the file PR #4254 used to validate the async error-handler integration). It usesDefaultErrorHandler(FixedBackOff(100L, 2L))against an always-failing suspend listener and asserts:3.3— to prove the loop has actually stopped, not merely been observed mid-flight.Without the fix the test fails with
expected: 3 but was: 6(on my machine), confirming the second retry cycle the issue describes.Verification
passes. Targeted runs:
EnableKafkaKotlinCoroutinesTests— 7 tests, including the new one.AsyncMonoRetryTopicScenarioTests— 2 tests (the@RetryableTopic+ async-return path that previously relied on the samehandleAsyncFailurecode).ObservationTests,SeekToCurrentRecovererTests— pass.Scope
KafkaMessageListenerContainer'shandleAsyncFailurecatch block.@RetryableTopicpath: that path uses an error handler withseeksAfterHandling() == false, takes theelsebranch ininvokeErrorHandlerBySingleRecord, and never throwsRecordInRetryExceptionto thecatchmodified here.failedRecordsin the first place.