diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 7b480fd888..6296b77c77 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -52,6 +52,7 @@ * @author Antonio Tomac * @author Wang Zhiyang * @author Sanghyeok An + * @author Leos Bitto * * @since 2.8 * @@ -79,7 +80,7 @@ private ErrorHandlingUtils() { * @param logLevel the log level. * @param retryListeners the retry listeners. * @param exceptionMatcher the exception matcher. - * @param reClassifyOnExceptionChange true to reset the state if a different exception + * @param reClassifyOnExceptionChange true to reclassify the exception if a different exception * is thrown during retry. * @since 2.9.7 */ @@ -88,8 +89,40 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r CommonErrorHandler seeker, BiConsumer, Exception> recoverer, LogAccessor logger, KafkaException.Level logLevel, List retryListeners, ExceptionMatcher exceptionMatcher, boolean reClassifyOnExceptionChange) { + retryBatch(thrownException, records, consumer, container, invokeListener, backOff.start(), seeker, recoverer, + logger, logLevel, retryListeners, exceptionMatcher, reClassifyOnExceptionChange, false); + } + + /** + * Retry a complete batch by pausing the consumer and then, in a loop, poll the + * consumer, wait for the next back off, then call the listener. When retries are + * exhausted, call the recoverer with the {@link ConsumerRecords}. + * @param thrownException the exception. + * @param records the records. + * @param consumer the consumer. + * @param container the container. + * @param invokeListener the {@link Runnable} to run (call the listener). + * @param execution the backOff execution to use for the retries. + * @param seeker the common error handler that re-seeks the entire batch. + * @param recoverer the recoverer. + * @param logger the logger. + * @param logLevel the log level. + * @param retryListeners the retry listeners. + * @param exceptionMatcher the exception matcher. + * @param reClassifyOnExceptionChange true to reclassify the exception if a different exception + * is thrown during retry. + * @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method. + * @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred + * (retry with a possibly different BackOffExecution is expected if non-null is returned) + * @since 3.3.16, 4.0.6 + */ + @Nullable + public static Exception retryBatch(Exception thrownException, ConsumerRecords records, Consumer consumer, + MessageListenerContainer container, Runnable invokeListener, BackOffExecution execution, + CommonErrorHandler seeker, BiConsumer, Exception> recoverer, LogAccessor logger, + KafkaException.Level logLevel, List retryListeners, ExceptionMatcher exceptionMatcher, + boolean reClassifyOnExceptionChange, boolean resetStateOnExceptionChange) { - BackOffExecution execution = backOff.start(); long nextBackOff = execution.nextBackOff(); String failed = null; Set assignment = consumer.assignment(); @@ -148,7 +181,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r } try { invokeListener.run(); - return; + return null; } catch (Exception ex) { listen(retryListeners, records, ex, attempt++); @@ -165,6 +198,10 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r break; } + if (resetStateOnExceptionChange && !Objects.requireNonNull(newException).getClass() + .equals(Objects.requireNonNull(lastException).getClass())) { + return newException; + } } nextBackOff = execution.nextBackOff(); } @@ -186,6 +223,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords r consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2); } } + return null; } // NOSONAR NCSS line count private static void listen(List listeners, ConsumerRecords records, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 66ad8320c0..a5446afca3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -50,6 +50,7 @@ * @author Francois Rosiere * @author Wang Zhiyang * @author Artem Bilan + * @author Leos Bitto * @since 2.8 * */ @@ -79,11 +80,15 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception * @param fallbackHandler the fallback handler. * @since 2.9 */ + @SuppressWarnings("this-escape") // getFailureTracker() returns an already initialized final field public FailedBatchProcessor(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, @Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) { super(recoverer, backOff, backOffHandler); this.fallbackBatchHandler = fallbackHandler; + if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) { + handler.setFailureTracker(getFailureTracker()); + } } @Override @@ -104,10 +109,10 @@ public void setLogLevel(Level logLevel) { /** * Set to {@code false} to not reclassify the exception if different from the previous - * failure. If the changed exception is classified as retryable, the existing back off - * sequence is used; a new sequence is not started. Default true. Only applies when - * the fallback batch error handler (for exceptions other than - * {@link BatchListenerFailedException}) is the default. + * failure. Default true. If the changed exception is classified as retryable, the existing + * back off sequence is used if resetStateOnExceptionChange is false; a new sequence is started + * if resetStateOnExceptionChange is true. Only applies when the fallback batch error handler + * (for exceptions other than {@link BatchListenerFailedException}) is the default. * @param reclassifyOnExceptionChange false to not reclassify. * @since 2.9.7 */ @@ -117,6 +122,14 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) } } + @Override + public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) { + super.setResetStateOnExceptionChange(resetStateOnExceptionChange); + if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) { + handler.setResetStateOnExceptionChange(resetStateOnExceptionChange); + } + } + @Override protected void notRetryable(Stream> notRetryable) { if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index c6aa5d0cee..95c59de154 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -41,7 +41,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private static final BackOff NO_RETRIES_OR_DELAY_BACKOFF = new FixedBackOff(0L, 0L); - private final BiFunction, @Nullable Exception, BackOff> noRetriesForClassified = + private final BiFunction, @Nullable Exception, @Nullable BackOff> noRetriesForClassified = (rec, ex) -> { Exception theEx = ErrorHandlingUtils.unwrapIfNeeded(ex); if (!getExceptionMatcher().match(theEx) || theEx instanceof KafkaBackoffException) { @@ -56,7 +56,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private boolean commitRecovered; - private BiFunction, Exception, BackOff> userBackOffFunction = (rec, ex) -> null; + private BiFunction, Exception, @Nullable BackOff> userBackOffFunction = (rec, ex) -> null; private boolean seekAfterError = true; @@ -94,7 +94,7 @@ public void setCommitRecovered(boolean commitRecovered) { * @param backOffFunction the function. * @since 2.6 */ - public void setBackOffFunction(BiFunction, Exception, BackOff> backOffFunction) { + public void setBackOffFunction(BiFunction, Exception, @Nullable BackOff> backOffFunction) { Assert.notNull(backOffFunction, "'backOffFunction' cannot be null"); this.userBackOffFunction = backOffFunction; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index e783ce5bac..af41fe4931 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -58,7 +58,7 @@ class FailedRecordTracker implements RecoveryStrategy { private final BackOff backOff; - private @Nullable BiFunction, @Nullable Exception, BackOff> backOffFunction; + private @Nullable BiFunction, @Nullable Exception, @Nullable BackOff> backOffFunction; private final BackOffHandler backOffHandler; @@ -113,7 +113,7 @@ class FailedRecordTracker implements RecoveryStrategy { * @param backOffFunction the function. * @since 2.6 */ - public void setBackOffFunction(@Nullable BiFunction, @Nullable Exception, BackOff> backOffFunction) { + public void setBackOffFunction(@Nullable BiFunction, @Nullable Exception, @Nullable BackOff> backOffFunction) { this.backOffFunction = backOffFunction; } @@ -215,7 +215,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord record, @Nulla return failedRecord; } - private BackOff determineBackOff(ConsumerRecord record, @Nullable Exception exception) { + BackOff determineBackOff(ConsumerRecord record, @Nullable Exception exception) { if (this.backOffFunction == null) { return this.backOff; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java index af789a8a7c..f60ddc7124 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java @@ -36,6 +36,7 @@ import org.springframework.kafka.KafkaException; import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.BackOffExecution; import org.springframework.util.backoff.FixedBackOff; /** @@ -47,6 +48,7 @@ * performed so that the batch will be redelivered on the next poll. * * @author Gary Russell + * @author Leos Bitto * @since 2.3.7 * */ @@ -68,6 +70,10 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr private boolean reclassifyOnExceptionChange = true; + private boolean resetStateOnExceptionChange = false; + + private @Nullable FailedRecordTracker failureTracker; + /** * Construct an instance with a default {@link FixedBackOff} (unlimited attempts with * a 5 second back off). @@ -130,8 +136,9 @@ protected boolean isReclassifyOnExceptionChange() { /** * Set to false to not reclassify the exception if different from the previous - * failure. If the changed exception is classified as retryable, the existing back off - * sequence is used; a new sequence is not started. Default true. + * failure. Default true. If the changed exception is classified as retryable, the + * existing back off sequence is used when resetStateOnExceptionChange is set to + * false; a new sequence is started when resetStateOnExceptionChange is set to true. * @param reclassifyOnExceptionChange false to not reclassify. * @since 2.9.7 */ @@ -139,6 +146,22 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) this.reclassifyOnExceptionChange = reclassifyOnExceptionChange; } + /** + * Set to false to keep the same back off sequence even when the exception changes + * in subsequent retries. Default false. If set to true, a new sequence is started + * when the exception changes. + * @param resetStateOnExceptionChange true to start a new back off sequence + * when the exception changes in subsequent retries + * @since 3.3.16, 4.0.6 + */ + void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) { + this.resetStateOnExceptionChange = resetStateOnExceptionChange; + } + + void setFailureTracker(FailedRecordTracker failedRecordTracker) { + this.failureTracker = failedRecordTracker; + } + @Override public void handleBatch(Exception thrownException, @Nullable ConsumerRecords records, Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { @@ -149,9 +172,17 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords thrown = new AtomicReference<>(); + DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> { + thrown.set(ex); + }, new FixedBackOff(0L, 3)); + eh.setResetStateOnExceptionChange(true); + ConsumerRecords records = new ConsumerRecords<>( + Map.of(new TopicPartition("foo", 0), + List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))), Map.of()); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.isRunning()).willReturn(true); + AtomicInteger retries = new AtomicInteger(); + eh.handleBatch(new IllegalStateException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(4); + } + + @Test + void usingBackOffFunction() { + AtomicReference thrown = new AtomicReference<>(); + DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> { + thrown.set(ex); + }, new FixedBackOff(0L, 2)); + eh.setBackOffFunction((cr, ex) -> ex instanceof UnsupportedOperationException ? + new FixedBackOff(0L, 1) : null); + ConsumerRecords records = new ConsumerRecords<>( + Map.of(new TopicPartition("foo", 0), + List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))), Map.of()); + MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.isRunning()).willReturn(true); + + // first thrown exception is UnsupportedOperationException, so BackOffFunction should cause 1 retry + AtomicInteger retries = new AtomicInteger(); + eh.handleBatch(new UnsupportedOperationException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(1); + + // first thrown exception is IllegalArgumentException, so BackOffFunction should return null + // which means that DefaultErrorHandler should use its the default FixedBackOff with 2 retries + retries.set(0); + eh.handleBatch(new IllegalArgumentException(), records, mock(Consumer.class), container, + () -> { + retries.incrementAndGet(); + throw new ListenerExecutionFailedException("", new IllegalArgumentException()); + }); + assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class) + .extracting("cause") + .isInstanceOf(IllegalArgumentException.class); + assertThat(retries.get()).isEqualTo(2); + } + private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) { Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying"); ReflectionUtils.makeAccessible(field);