Skip to content

GH-4465: Fix unbounded async retry re-delivery#4469

Open
BK202503 wants to merge 2 commits into
spring-projects:mainfrom
BK202503:GH-4465
Open

GH-4465: Fix unbounded async retry re-delivery#4469
BK202503 wants to merge 2 commits into
spring-projects:mainfrom
BK202503:GH-4465

Conversation

@BK202503
Copy link
Copy Markdown

@BK202503 BK202503 commented Jun 4, 2026

Fixes #4465.

Problem

With the change merged in #4254, async listener exceptions (Kotlin suspend, Mono<?>, CompletableFuture<?>) propagate to a seek-after-handling CommonErrorHandler such as DefaultErrorHandler. That path correctly drives FailedRecordTracker and seeks the consumer back to the failed offset for retry. After backoff is exhausted, the recoverer runs and the tracker entry is cleared.

However, the listener keeps being re-delivered forever instead of stopping after the configured number of retries (issue #4465). The reporter's minimal repro shows ~18–20 deliveries within 10s for FixedBackOff(100L, 2L) (which should produce exactly 3), and a fresh Backoff … exhausted for suspend-topic-0@0 log line on every cycle.

Root cause

KafkaMessageListenerContainer$ListenerConsumer#handleAsyncFailure (lines 1513–1536 on main) re-queues a FailedRecordTuple whenever invokeErrorHandlerBySingleRecord reports the record is still in retry:

catch (RecordInRetryException e) {
    // Keep retryable async failures for the next poll loop.
    this.failedRecords.addLast(failedRecord);
}

For a seek-after-handling error handler, the handler has already repositioned the consumer to the failed offset, so the next poll re-delivers the record and produces another FailedRecordTuple via the async failure callback. Each loop iteration therefore processes the same record twice — once from the re-queued tuple, once from the seek-induced re-delivery — inflating FailedRecordTracker's attempt counter at double the actual rate.

When backoff exhausts and the recoverer clears the tracker entry, the duplicate still sitting in failedRecords is then processed against an empty tracker, which creates a brand-new FailedRecord(attempts = 1) and starts a fresh retry cycle. That's the unbounded loop reported on the issue.

The synchronous (blocking) listener path is not affected because the listener throws before the container can ack, so no async failure callback fires and the queue stays empty.

Fix

Drop the re-queue. The FailedRecordTracker entry is keyed by topic-partition-offset and survives across loop iterations independently of the failedRecords queue, so the attempt counter continues to advance correctly through the natural re-deliveries triggered by the handler's seek.

catch (RecordInRetryException e) {
    // (no longer re-queue; explained in the comment in the diff)
}

A trace through the fixed code with FixedBackOff(100L, 2L):

Iter Queue in Action Tracker attempts Queue out
1 [FR_1] retry → seek to 0 → drop 1 []
1' (poll) re-deliver offset 0 → Mono fails → enqueue 1 [FR_2]
2 [FR_2] retry → seek to 0 → drop 2 []
2' (poll) re-deliver offset 0 → Mono fails → enqueue 2 [FR_3]
3 [FR_3] exhausted → recover → no seek (cleared) []
4 [] poll returns nothing (position past record) - []

Three deliveries, then quiescence — matching the blocking listener.

Test

Added test suspend function bounded retries with CommonErrorHandler alongside the existing test in EnableKafkaKotlinCoroutinesTests (the file PR #4254 used to validate the async error-handler integration). It uses DefaultErrorHandler(FixedBackOff(100L, 2L)) against an always-failing suspend listener and asserts:

  1. The configured recoverer is invoked (latch within 10s).
  2. The listener delivery counter is exactly 3.
  3. Two additional seconds elapse with the counter still at 3 — to prove the loop has actually stopped, not merely been observed mid-flight.

Without the fix the test fails with expected: 3 but was: 6 (on my machine), confirming the second retry cycle the issue describes.

Verification

./gradlew :spring-kafka:test --tests "org.springframework.kafka.listener.*"

passes. Targeted runs:

  • EnableKafkaKotlinCoroutinesTests — 7 tests, including the new one.
  • AsyncMonoRetryTopicScenarioTests — 2 tests (the @RetryableTopic + async-return path that previously relied on the same handleAsyncFailure code).
  • ObservationTests, SeekToCurrentRecovererTests — pass.

Scope

  • One source change in KafkaMessageListenerContainer's handleAsyncFailure catch block.
  • One test addition.
  • No public API changes.
  • No effect on the @RetryableTopic path: that path uses an error handler with seeksAfterHandling() == false, takes the else branch in invokeErrorHandlerBySingleRecord, and never throws RecordInRetryException to the catch modified here.
  • No effect on blocking listeners: their failures never enter failedRecords in the first place.

Fixes spring-projectsGH-4465.

`KafkaMessageListenerContainer$ListenerConsumer#handleAsyncFailure`
re-queued a `FailedRecordTuple` whenever the error handler reported
that the record was still in retry. For a seek-after-handling error
handler (e.g. `DefaultErrorHandler`), the handler has already
repositioned the consumer to the failed offset, so the next poll
re-delivers the record and produces a fresh `FailedRecordTuple`
through the async failure callback - which means the same record
gets processed twice per loop, once from the queue and once from
the seek-induced re-delivery. The `FailedRecordTracker` attempt
counter is inflated correspondingly. When backoff exhausts and the
tracker entry is removed during recovery, the duplicate still in
the queue starts a new retry cycle, producing the unbounded
re-delivery reported in the issue.

The fix drops the re-queue. The `FailedRecordTracker` entry is
keyed by topic-partition-offset and survives across loop iterations,
so attempts continue to accumulate correctly across the natural
re-deliveries.

A regression test (`test suspend function bounded retries with
CommonErrorHandler`) is added to `EnableKafkaKotlinCoroutinesTests`.
It uses `DefaultErrorHandler(FixedBackOff(100L, 2L))` against an
always-failing suspend listener and asserts the listener is invoked
exactly three times, then verifies no further deliveries occur for
two more seconds. Without the fix the test fails with `expected: 3
but was: 6`.

Signed-off-by: BK202503 <199436087+BK202503@users.noreply.github.com>
assertThat(this.config.boundedRetryRecoveredLatch.await(10, TimeUnit.SECONDS)).isTrue()
// Give the container a generous window in which any unbounded re-delivery
// loop would visibly grow the counter past the expected bound.
Thread.sleep(2_000)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this time delay?
Isn't that assertion on the boundedRetryRecoveredLatch enough?
According to the report claim, we had an unbound retries on the record.
Therefore, in my feeling such a recoverer would not run.
No? Do I miss anything yet?

Thanks

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point on the bare sleep — switched the bounded-retry test to an Awaitility pollDelay(2s).atMost(3s).untilAsserted { ... == 3 } instead, which expresses the same intent ("the count must reach 3 and stay at 3") without an unconditional Thread.sleep. Pushed as f60f214.

(The latch alone wasn't sufficient: it's CountDownLatch(1), so under the bug it still trips on the first recovery while subsequent cycles keep re-delivering the record — the actual regression signal is the delivery counter staying at 3.)

…y test

Addresses review feedback on spring-projectsGH-4469: the latch only signals the first
recovery, so under the bug subsequent re-delivery cycles would still
satisfy `latch.await(...)`. The actual regression check is the delivery
counter, which now uses Awaitility's `pollDelay + atMost + untilAsserted`
to express "the count must reach 3 and stay at 3" without a bare sleep.

Signed-off-by: BK202503 <199436087+BK202503@users.noreply.github.com>
@BK202503
Copy link
Copy Markdown
Author

BK202503 commented Jun 6, 2026

Good point on the bare sleep — switched the bounded-retry test to an Awaitility pollDelay(2s).atMost(3s).untilAsserted { ... == 3 } instead, which expresses the same intent ("the count must reach 3 and stay at 3") without an unconditional Thread.sleep. Pushed as f60f214.

(The latch alone wasn't sufficient: it's CountDownLatch(1), so under the bug it still trips on the first recovery while subsequent cycles keep re-delivering the record — the actual regression signal is the delivery counter staying at 3.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Suspend @KafkaListener re-delivers a failing record without bound after DefaultErrorHandler retries are exhausted

2 participants