Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* @author Antonio Tomac
* @author Wang Zhiyang
* @author Sanghyeok An
* @author Leos Bitto
*
* @since 2.8
*
Expand Down Expand Up @@ -78,7 +79,7 @@ private ErrorHandlingUtils() {
* @param logLevel the log level.
* @param retryListeners the retry listeners.
* @param classifier the exception classifier.
* @param reClassifyOnExceptionChange true to reset the state if a different exception
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
* is thrown during retry.
* @since 2.9.7
*/
Expand All @@ -87,8 +88,40 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
KafkaException.Level logLevel, List<RetryListener> retryListeners, BinaryExceptionClassifier classifier,
boolean reClassifyOnExceptionChange) {
retryBatch(thrownException, records, consumer, container, invokeListener, backOff.start(), seeker, recoverer,
logger, logLevel, retryListeners, classifier, reClassifyOnExceptionChange, false);
}

/**
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
* consumer, wait for the next back off, then call the listener. When retries are
* exhausted, call the recoverer with the {@link ConsumerRecords}.
* @param thrownException the exception.
* @param records the records.
* @param consumer the consumer.
* @param container the container.
* @param invokeListener the {@link Runnable} to run (call the listener).
* @param execution the backOff execution to use for the retries.
* @param seeker the common error handler that re-seeks the entire batch.
* @param recoverer the recoverer.
* @param logger the logger.
* @param logLevel the log level.
* @param retryListeners the retry listeners.
* @param classifier the exception classifier.
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
* is thrown during retry.
* @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method.
* @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred
* (retry with a possibly different BackOffExecution is expected if non-null is returned)
* @since 3.3.16, 4.0.6
*/
@Nullable
public static Exception retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
MessageListenerContainer container, Runnable invokeListener, BackOffExecution execution,
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
KafkaException.Level logLevel, List<RetryListener> retryListeners, BinaryExceptionClassifier classifier,
boolean reClassifyOnExceptionChange, boolean resetStateOnExceptionChange) {

BackOffExecution execution = backOff.start();
long nextBackOff = execution.nextBackOff();
String failed = null;
Set<TopicPartition> assignment = consumer.assignment();
Expand Down Expand Up @@ -147,7 +180,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
}
try {
invokeListener.run();
return;
return null;
}
catch (Exception ex) {
listen(retryListeners, records, ex, attempt++);
Expand All @@ -163,6 +196,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r

break;
}
if (resetStateOnExceptionChange && !newException.getClass().equals(lastException.getClass())) {
return newException;
}
}
nextBackOff = execution.nextBackOff();
}
Expand All @@ -184,6 +220,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2);
}
}
return null;
} // NOSONAR NCSS line count

private static void listen(List<RetryListener> listeners, ConsumerRecords<?, ?> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @author Francois Rosiere
* @author Wang Zhiyang
* @author Artem Bilan
* @author Leos Bitto
* @since 2.8
*
*/
Expand Down Expand Up @@ -79,11 +80,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
* @param fallbackHandler the fallback handler.
* @since 2.9
*/
@SuppressWarnings("this-escape") // getFailureTracker() returns an already initialized final field
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {

super(recoverer, backOff, backOffHandler);
this.fallbackBatchHandler = fallbackHandler;
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
handler.setFailureTracker(getFailureTracker());
}
}

@Override
Expand All @@ -104,10 +109,10 @@ public void setLogLevel(Level logLevel) {

/**
* Set to {@code false} to not reclassify the exception if different from the previous
* failure. If the changed exception is classified as retryable, the existing back off
* sequence is used; a new sequence is not started. Default true. Only applies when
* the fallback batch error handler (for exceptions other than
* {@link BatchListenerFailedException}) is the default.
* failure. Default true. If the changed exception is classified as retryable, the existing
* back off sequence is used if resetStateOnExceptionChange is false; a new sequence is started
* if resetStateOnExceptionChange is true. Only applies when the fallback batch error handler
* (for exceptions other than {@link BatchListenerFailedException}) is the default.
* @param reclassifyOnExceptionChange false to not reclassify.
* @since 2.9.7
*/
Expand All @@ -117,6 +122,14 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
}
}

@Override
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
super.setResetStateOnExceptionChange(resetStateOnExceptionChange);
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
handler.setResetStateOnExceptionChange(resetStateOnExceptionChange);
}
}

@Override
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Except
return failedRecord;
}

private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
if (this.backOffFunction == null) {
return this.backOff;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.BackOffExecution;
import org.springframework.util.backoff.FixedBackOff;

/**
Expand All @@ -47,6 +48,7 @@
* performed so that the batch will be redelivered on the next poll.
*
* @author Gary Russell
* @author Leos Bitto
* @since 2.3.7
*
*/
Expand All @@ -68,6 +70,10 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr

private boolean reclassifyOnExceptionChange = true;

private boolean resetStateOnExceptionChange = false;

private @Nullable FailedRecordTracker failureTracker;

/**
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
* a 5 second back off).
Expand Down Expand Up @@ -130,15 +136,32 @@ protected boolean isReclassifyOnExceptionChange() {

/**
* Set to false to not reclassify the exception if different from the previous
* failure. If the changed exception is classified as retryable, the existing back off
* sequence is used; a new sequence is not started. Default true.
* failure. Default true. If the changed exception is classified as retryable, the
* existing back off sequence is used when resetStateOnExceptionChange is set to
* false; a new sequence is started when resetStateOnExceptionChange is set to true.
* @param reclassifyOnExceptionChange false to not reclassify.
* @since 2.9.7
*/
public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) {
this.reclassifyOnExceptionChange = reclassifyOnExceptionChange;
}

/**
* Set to false to keep the same back off sequence even when the exception changes
* in subsequent retries. Default false. If set to true, a new sequence is started
* when the exception changes.
* @param resetStateOnExceptionChange true to start a new back off sequence
* when the exception changes in subsequent retries
* @since 3.3.16, 4.0.6
*/
void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
this.resetStateOnExceptionChange = resetStateOnExceptionChange;
}

void setFailureTracker(FailedRecordTracker failedRecordTracker) {
this.failureTracker = failedRecordTracker;
}

@Override
public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
Expand All @@ -149,9 +172,17 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?,
}
this.retrying.put(Thread.currentThread(), true);
try {
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier(),
this.reclassifyOnExceptionChange);
BackOffExecution backOffExecution = null;
while (thrownException != null) {
if (backOffExecution == null || this.resetStateOnExceptionChange) {
BackOff backOffToUse = this.failureTracker == null ? this.backOff :
this.failureTracker.determineBackOff(records.iterator().next(), thrownException);
backOffExecution = backOffToUse.start();
}
thrownException = ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener,
backOffExecution, this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners,
getClassifier(), this.reclassifyOnExceptionChange, this.resetStateOnExceptionChange);
}
}
finally {
this.retrying.remove(Thread.currentThread());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,70 @@ void reclassifyUseSameBackOffOnExceptionChange() {
assertThat(retries.get()).isEqualTo(3);
}

@Test
void reclassifyResetBackOffOnExceptionChange() {
AtomicReference<Exception> thrown = new AtomicReference<>();
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
thrown.set(ex);
}, new FixedBackOff(0L, 3));
eh.setResetStateOnExceptionChange(true);
ConsumerRecords<String, String> records = new ConsumerRecords<>(
Map.of(new TopicPartition("foo", 0),
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))));
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);
AtomicInteger retries = new AtomicInteger();
eh.handleBatch(new IllegalStateException(), records, mock(Consumer.class), container,
() -> {
retries.incrementAndGet();
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
});
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
.extracting("cause")
.isInstanceOf(IllegalArgumentException.class);
assertThat(retries.get()).isEqualTo(4);
}

@Test
void usingBackOffFunction() {
AtomicReference<Exception> thrown = new AtomicReference<>();
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
thrown.set(ex);
}, new FixedBackOff(0L, 2));
eh.setBackOffFunction((cr, ex) -> ex instanceof UnsupportedOperationException ?
new FixedBackOff(0L, 1) : null);
ConsumerRecords<String, String> records = new ConsumerRecords<>(
Map.of(new TopicPartition("foo", 0),
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))));
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.isRunning()).willReturn(true);

// first thrown exception is UnsupportedOperationException, so BackOffFunction should cause 1 retry
AtomicInteger retries = new AtomicInteger();
eh.handleBatch(new UnsupportedOperationException(), records, mock(Consumer.class), container,
() -> {
retries.incrementAndGet();
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
});
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
.extracting("cause")
.isInstanceOf(IllegalArgumentException.class);
assertThat(retries.get()).isEqualTo(1);

// first thrown exception is IllegalArgumentException, so BackOffFunction should return null
// which means that DefaultErrorHandler should use its the default FixedBackOff with 2 retries
retries.set(0);
eh.handleBatch(new IllegalArgumentException(), records, mock(Consumer.class), container,
() -> {
retries.incrementAndGet();
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
});
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
.extracting("cause")
.isInstanceOf(IllegalArgumentException.class);
assertThat(retries.get()).isEqualTo(2);
}

private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
ReflectionUtils.makeAccessible(field);
Expand Down