From 82ef24e071ae4b5070c4aee1af087a54c5eee1e1 Mon Sep 17 00:00:00 2001 From: Leos Bitto Date: Thu, 4 Jun 2026 07:50:07 +0200 Subject: [PATCH 1/2] This is a backport of the commit 7c9f5c0b0dcd5f027708112fdb703e5410f705fd to the branch 3.3.x Signed-off-by: Leos Bitto --- .../kafka/listener/ErrorHandlingUtils.java | 43 ++++++++++++- .../kafka/listener/FailedBatchProcessor.java | 21 ++++-- .../kafka/listener/FailedRecordTracker.java | 2 +- .../listener/FallbackBatchErrorHandler.java | 41 ++++++++++-- .../FallbackBatchErrorHandlerTests.java | 64 +++++++++++++++++++ 5 files changed, 158 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 782755210f..1170a2d8e6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -51,6 +51,7 @@ * @author Antonio Tomac * @author Wang Zhiyang * @author Sanghyeok An + * @author Leos Bitto * * @since 2.8 * @@ -78,7 +79,7 @@ private ErrorHandlingUtils() { * @param logLevel the log level. * @param retryListeners the retry listeners. * @param classifier the exception classifier. - * @param reClassifyOnExceptionChange true to reset the state if a different exception + * @param reClassifyOnExceptionChange true to reclassify the exception if a different exception * is thrown during retry. * @since 2.9.7 */ @@ -87,8 +88,40 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r CommonErrorHandler seeker, BiConsumer, Exception> recoverer, LogAccessor logger, KafkaException.Level logLevel, List retryListeners, BinaryExceptionClassifier classifier, boolean reClassifyOnExceptionChange) { + retryBatch(thrownException, records, consumer, container, invokeListener, backOff.start(), seeker, recoverer, + logger, logLevel, retryListeners, classifier, reClassifyOnExceptionChange, false); + } + + /** + * Retry a complete batch by pausing the consumer and then, in a loop, poll the + * consumer, wait for the next back off, then call the listener. When retries are + * exhausted, call the recoverer with the {@link ConsumerRecords}. + * @param thrownException the exception. + * @param records the records. + * @param consumer the consumer. + * @param container the container. + * @param invokeListener the {@link Runnable} to run (call the listener). + * @param execution the backOff execution to use for the retries. + * @param seeker the common error handler that re-seeks the entire batch. + * @param recoverer the recoverer. + * @param logger the logger. + * @param logLevel the log level. + * @param retryListeners the retry listeners. + * @param classifier the exception classifier. + * @param reClassifyOnExceptionChange true to reclassify the exception if a different exception + * is thrown during retry. + * @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method. + * @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred + * (retry with a possibly different BackOffExecution is expected if non-null is returned) + * @since 4.0.7 + */ + @Nullable + public static Exception retryBatch(Exception thrownException, ConsumerRecords records, Consumer consumer, + MessageListenerContainer container, Runnable invokeListener, BackOffExecution execution, + CommonErrorHandler seeker, BiConsumer, Exception> recoverer, LogAccessor logger, + KafkaException.Level logLevel, List retryListeners, BinaryExceptionClassifier classifier, + boolean reClassifyOnExceptionChange, boolean resetStateOnExceptionChange) { - BackOffExecution execution = backOff.start(); long nextBackOff = execution.nextBackOff(); String failed = null; Set assignment = consumer.assignment(); @@ -147,7 +180,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r } try { invokeListener.run(); - return; + return null; } catch (Exception ex) { listen(retryListeners, records, ex, attempt++); @@ -163,6 +196,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r break; } + if (resetStateOnExceptionChange && !newException.getClass().equals(lastException.getClass())) { + return newException; + } } nextBackOff = execution.nextBackOff(); } @@ -184,6 +220,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2); } } + return null; } // NOSONAR NCSS line count private static void listen(List listeners, ConsumerRecords records, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 67a1e7c5b3..b32302002a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -50,6 +50,7 @@ * @author Francois Rosiere * @author Wang Zhiyang * @author Artem Bilan + * @author Leos Bitto * @since 2.8 * */ @@ -79,11 +80,15 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception * @param fallbackHandler the fallback handler. * @since 2.9 */ + @SuppressWarnings("this-escape") // getFailureTracker() returns an already initialized final field public FailedBatchProcessor(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) { super(recoverer, backOff, backOffHandler); this.fallbackBatchHandler = fallbackHandler; + if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) { + handler.setFailureTracker(getFailureTracker()); + } } @Override @@ -104,10 +109,10 @@ public void setLogLevel(Level logLevel) { /** * Set to {@code false} to not reclassify the exception if different from the previous - * failure. If the changed exception is classified as retryable, the existing back off - * sequence is used; a new sequence is not started. Default true. Only applies when - * the fallback batch error handler (for exceptions other than - * {@link BatchListenerFailedException}) is the default. + * failure. Default true. If the changed exception is classified as retryable, the existing + * back off sequence is used if resetStateOnExceptionChange is false; a new sequence is started + * if resetStateOnExceptionChange is true. Only applies when the fallback batch error handler + * (for exceptions other than {@link BatchListenerFailedException}) is the default. * @param reclassifyOnExceptionChange false to not reclassify. * @since 2.9.7 */ @@ -117,6 +122,14 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) } } + @Override + public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) { + super.setResetStateOnExceptionChange(resetStateOnExceptionChange); + if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) { + handler.setResetStateOnExceptionChange(resetStateOnExceptionChange); + } + } + @Override protected void notRetryable(Stream> notRetryable) { if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 0a72ca3c2b..fb86286553 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -213,7 +213,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord record, Except return failedRecord; } - private BackOff determineBackOff(ConsumerRecord record, Exception exception) { + BackOff determineBackOff(ConsumerRecord record, Exception exception) { if (this.backOffFunction == null) { return this.backOff; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java index e8f40048ee..d25d375718 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java @@ -36,6 +36,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; import org.springframework.util.backoff.FixedBackOff; /** @@ -47,6 +48,7 @@ * performed so that the batch will be redelivered on the next poll. * * @author Gary Russell + * @author Leos Bitto * @since 2.3.7 * */ @@ -68,6 +70,10 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr private boolean reclassifyOnExceptionChange = true; + private boolean resetStateOnExceptionChange = false; + + private @Nullable FailedRecordTracker failureTracker; + /** * Construct an instance with a default {@link FixedBackOff} (unlimited attempts with * a 5 second back off). @@ -130,8 +136,9 @@ protected boolean isReclassifyOnExceptionChange() { /** * Set to false to not reclassify the exception if different from the previous - * failure. If the changed exception is classified as retryable, the existing back off - * sequence is used; a new sequence is not started. Default true. + * failure. Default true. If the changed exception is classified as retryable, the + * existing back off sequence is used when resetStateOnExceptionChange is set to + * false; a new sequence is started when resetStateOnExceptionChange is set to true. * @param reclassifyOnExceptionChange false to not reclassify. * @since 2.9.7 */ @@ -139,6 +146,22 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) this.reclassifyOnExceptionChange = reclassifyOnExceptionChange; } + /** + * Set to false to keep the same back off sequence even when the exception changes + * in subsequent retries. Default false. If set to true, a new sequence is started + * when the exception changes. + * @param resetStateOnExceptionChange true to start a new back off sequence + * when the exception changes in subsequent retries + * @since 3.3.16 + */ + void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) { + this.resetStateOnExceptionChange = resetStateOnExceptionChange; + } + + void setFailureTracker(FailedRecordTracker failedRecordTracker) { + this.failureTracker = failedRecordTracker; + } + @Override public void handleBatch(Exception thrownException, @Nullable ConsumerRecords records, Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { @@ -149,9 +172,17 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords thrown = new AtomicReference<>(); + DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> { + thrown.set(ex); + }, new FixedBackOff(0L, 3)); + eh.setResetStateOnExceptionChange(true); + ConsumerRecords records = new ConsumerRecords<>( + Map.of(new TopicPartition("foo", 0), + List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar")))); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.isRunning()).willReturn(true); + AtomicInteger retries = new AtomicInteger(); + eh.handleBatch(new IllegalStateException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(4); + } + + @Test + void usingBackOffFunction() { + AtomicReference thrown = new AtomicReference<>(); + DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> { + thrown.set(ex); + }, new FixedBackOff(0L, 2)); + eh.setBackOffFunction((cr, ex) -> ex instanceof UnsupportedOperationException ? + new FixedBackOff(0L, 1) : null); + ConsumerRecords records = new ConsumerRecords<>( + Map.of(new TopicPartition("foo", 0), + List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar")))); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.isRunning()).willReturn(true); + + // first thrown exception is UnsupportedOperationException, so BackOffFunction should cause 1 retry + AtomicInteger retries = new AtomicInteger(); + eh.handleBatch(new UnsupportedOperationException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(1); + + // first thrown exception is IllegalArgumentException, so BackOffFunction should return null + // which means that DefaultErrorHandler should use its the default FixedBackOff with 2 retries + retries.set(0); + eh.handleBatch(new IllegalArgumentException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(2); + } + private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) { Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying"); ReflectionUtils.makeAccessible(field); From 05fdc63eb7111b621b4c2586dc69e325c8d5c1e4 Mon Sep 17 00:00:00 2001 From: Leos Bitto Date: Fri, 5 Jun 2026 15:23:29 +0200 Subject: [PATCH 2/2] javadoc: @since 3.3.16, 4.0.6 Signed-off-by: Leos Bitto --- .../org/springframework/kafka/listener/ErrorHandlingUtils.java | 2 +- .../kafka/listener/FallbackBatchErrorHandler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 1170a2d8e6..5f83ce3880 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -113,7 +113,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r * @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method. * @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred * (retry with a possibly different BackOffExecution is expected if non-null is returned) - * @since 4.0.7 + * @since 3.3.16, 4.0.6 */ @Nullable public static Exception retryBatch(Exception thrownException, ConsumerRecords records, Consumer consumer, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java index d25d375718..ffef2c31dd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java @@ -152,7 +152,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) * when the exception changes. * @param resetStateOnExceptionChange true to start a new back off sequence * when the exception changes in subsequent retries - * @since 3.3.16 + * @since 3.3.16, 4.0.6 */ void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) { this.resetStateOnExceptionChange = resetStateOnExceptionChange;