Skip to content

Commit 1b4c33b

Browse files
committed
This is a backport of the commit 7c9f5c0 to the branch 4.0.x
Signed-off-by: Leos Bitto <leos.bitto@gmail.com>
1 parent 80fa599 commit 1b4c33b

6 files changed

Lines changed: 164 additions & 18 deletions

File tree

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
* @author Antonio Tomac
5353
* @author Wang Zhiyang
5454
* @author Sanghyeok An
55+
* @author Leos Bitto
5556
*
5657
* @since 2.8
5758
*
@@ -79,7 +80,7 @@ private ErrorHandlingUtils() {
7980
* @param logLevel the log level.
8081
* @param retryListeners the retry listeners.
8182
* @param exceptionMatcher the exception matcher.
82-
* @param reClassifyOnExceptionChange true to reset the state if a different exception
83+
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
8384
* is thrown during retry.
8485
* @since 2.9.7
8586
*/
@@ -88,8 +89,40 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
8889
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
8990
KafkaException.Level logLevel, List<RetryListener> retryListeners, ExceptionMatcher exceptionMatcher,
9091
boolean reClassifyOnExceptionChange) {
92+
retryBatch(thrownException, records, consumer, container, invokeListener, backOff.start(), seeker, recoverer,
93+
logger, logLevel, retryListeners, exceptionMatcher, reClassifyOnExceptionChange, false);
94+
}
95+
96+
/**
97+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
98+
* consumer, wait for the next back off, then call the listener. When retries are
99+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
100+
* @param thrownException the exception.
101+
* @param records the records.
102+
* @param consumer the consumer.
103+
* @param container the container.
104+
* @param invokeListener the {@link Runnable} to run (call the listener).
105+
* @param execution the backOff execution to use for the retries.
106+
* @param seeker the common error handler that re-seeks the entire batch.
107+
* @param recoverer the recoverer.
108+
* @param logger the logger.
109+
* @param logLevel the log level.
110+
* @param retryListeners the retry listeners.
111+
* @param exceptionMatcher the exception matcher.
112+
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
113+
* is thrown during retry.
114+
* @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method.
115+
* @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred
116+
* (retry with a possibly different BackOffExecution is expected if non-null is returned)
117+
* @since 4.0.7
118+
*/
119+
@Nullable
120+
public static Exception retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
121+
MessageListenerContainer container, Runnable invokeListener, BackOffExecution execution,
122+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
123+
KafkaException.Level logLevel, List<RetryListener> retryListeners, ExceptionMatcher exceptionMatcher,
124+
boolean reClassifyOnExceptionChange, boolean resetStateOnExceptionChange) {
91125

92-
BackOffExecution execution = backOff.start();
93126
long nextBackOff = execution.nextBackOff();
94127
String failed = null;
95128
Set<TopicPartition> assignment = consumer.assignment();
@@ -148,7 +181,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
148181
}
149182
try {
150183
invokeListener.run();
151-
return;
184+
return null;
152185
}
153186
catch (Exception ex) {
154187
listen(retryListeners, records, ex, attempt++);
@@ -165,6 +198,10 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
165198

166199
break;
167200
}
201+
if (resetStateOnExceptionChange && !Objects.requireNonNull(newException).getClass()
202+
.equals(Objects.requireNonNull(lastException).getClass())) {
203+
return newException;
204+
}
168205
}
169206
nextBackOff = execution.nextBackOff();
170207
}
@@ -186,6 +223,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
186223
consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2);
187224
}
188225
}
226+
return null;
189227
} // NOSONAR NCSS line count
190228

191229
private static void listen(List<RetryListener> listeners, ConsumerRecords<?, ?> records,

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Francois Rosiere
5151
* @author Wang Zhiyang
5252
* @author Artem Bilan
53+
* @author Leos Bitto
5354
* @since 2.8
5455
*
5556
*/
@@ -79,11 +80,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7980
* @param fallbackHandler the fallback handler.
8081
* @since 2.9
8182
*/
83+
@SuppressWarnings("this-escape") // getFailureTracker() returns an already initialized final field
8284
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
8385
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {
8486

8587
super(recoverer, backOff, backOffHandler);
8688
this.fallbackBatchHandler = fallbackHandler;
89+
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
90+
handler.setFailureTracker(getFailureTracker());
91+
}
8792
}
8893

8994
@Override
@@ -104,10 +109,10 @@ public void setLogLevel(Level logLevel) {
104109

105110
/**
106111
* Set to {@code false} to not reclassify the exception if different from the previous
107-
* failure. If the changed exception is classified as retryable, the existing back off
108-
* sequence is used; a new sequence is not started. Default true. Only applies when
109-
* the fallback batch error handler (for exceptions other than
110-
* {@link BatchListenerFailedException}) is the default.
112+
* failure. Default true. If the changed exception is classified as retryable, the existing
113+
* back off sequence is used if resetStateOnExceptionChange is false; a new sequence is started
114+
* if resetStateOnExceptionChange is true. Only applies when the fallback batch error handler
115+
* (for exceptions other than {@link BatchListenerFailedException}) is the default.
111116
* @param reclassifyOnExceptionChange false to not reclassify.
112117
* @since 2.9.7
113118
*/
@@ -117,6 +122,14 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
117122
}
118123
}
119124

125+
@Override
126+
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
127+
super.setResetStateOnExceptionChange(resetStateOnExceptionChange);
128+
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
129+
handler.setResetStateOnExceptionChange(resetStateOnExceptionChange);
130+
}
131+
}
132+
120133
@Override
121134
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
122135
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
4141

4242
private static final BackOff NO_RETRIES_OR_DELAY_BACKOFF = new FixedBackOff(0L, 0L);
4343

44-
private final BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, BackOff> noRetriesForClassified =
44+
private final BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, @Nullable BackOff> noRetriesForClassified =
4545
(rec, ex) -> {
4646
Exception theEx = ErrorHandlingUtils.unwrapIfNeeded(ex);
4747
if (!getExceptionMatcher().match(theEx) || theEx instanceof KafkaBackoffException) {
@@ -56,7 +56,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
5656

5757
private boolean commitRecovered;
5858

59-
private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> userBackOffFunction = (rec, ex) -> null;
59+
private BiFunction<ConsumerRecord<?, ?>, Exception, @Nullable BackOff> userBackOffFunction = (rec, ex) -> null;
6060

6161
private boolean seekAfterError = true;
6262

@@ -94,7 +94,7 @@ public void setCommitRecovered(boolean commitRecovered) {
9494
* @param backOffFunction the function.
9595
* @since 2.6
9696
*/
97-
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
97+
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, @Nullable BackOff> backOffFunction) {
9898
Assert.notNull(backOffFunction, "'backOffFunction' cannot be null");
9999
this.userBackOffFunction = backOffFunction;
100100
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class FailedRecordTracker implements RecoveryStrategy {
5858

5959
private final BackOff backOff;
6060

61-
private @Nullable BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, BackOff> backOffFunction;
61+
private @Nullable BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, @Nullable BackOff> backOffFunction;
6262

6363
private final BackOffHandler backOffHandler;
6464

@@ -113,7 +113,7 @@ class FailedRecordTracker implements RecoveryStrategy {
113113
* @param backOffFunction the function.
114114
* @since 2.6
115115
*/
116-
public void setBackOffFunction(@Nullable BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, BackOff> backOffFunction) {
116+
public void setBackOffFunction(@Nullable BiFunction<ConsumerRecord<?, ?>, @Nullable Exception, @Nullable BackOff> backOffFunction) {
117117
this.backOffFunction = backOffFunction;
118118
}
119119

@@ -215,7 +215,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, @Nulla
215215
return failedRecord;
216216
}
217217

218-
private BackOff determineBackOff(ConsumerRecord<?, ?> record, @Nullable Exception exception) {
218+
BackOff determineBackOff(ConsumerRecord<?, ?> record, @Nullable Exception exception) {
219219
if (this.backOffFunction == null) {
220220
return this.backOff;
221221
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.kafka.KafkaException;
3737
import org.springframework.util.Assert;
3838
import org.springframework.util.backoff.BackOff;
39+
import org.springframework.util.backoff.BackOffExecution;
3940
import org.springframework.util.backoff.FixedBackOff;
4041

4142
/**
@@ -47,6 +48,7 @@
4748
* performed so that the batch will be redelivered on the next poll.
4849
*
4950
* @author Gary Russell
51+
* @author Leos Bitto
5052
* @since 2.3.7
5153
*
5254
*/
@@ -68,6 +70,10 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr
6870

6971
private boolean reclassifyOnExceptionChange = true;
7072

73+
private boolean resetStateOnExceptionChange = false;
74+
75+
private @Nullable FailedRecordTracker failureTracker;
76+
7177
/**
7278
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
7379
* a 5 second back off).
@@ -130,15 +136,32 @@ protected boolean isReclassifyOnExceptionChange() {
130136

131137
/**
132138
* Set to false to not reclassify the exception if different from the previous
133-
* failure. If the changed exception is classified as retryable, the existing back off
134-
* sequence is used; a new sequence is not started. Default true.
139+
* failure. Default true. If the changed exception is classified as retryable, the
140+
* existing back off sequence is used when resetStateOnExceptionChange is set to
141+
* false; a new sequence is started when resetStateOnExceptionChange is set to true.
135142
* @param reclassifyOnExceptionChange false to not reclassify.
136143
* @since 2.9.7
137144
*/
138145
public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) {
139146
this.reclassifyOnExceptionChange = reclassifyOnExceptionChange;
140147
}
141148

149+
/**
150+
* Set to false to keep the same back off sequence even when the exception changes
151+
* in subsequent retries. Default false. If set to true, a new sequence is started
152+
* when the exception changes.
153+
* @param resetStateOnExceptionChange true to start a new back off sequence
154+
* when the exception changes in subsequent retries
155+
* @since 4.0.7
156+
*/
157+
void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
158+
this.resetStateOnExceptionChange = resetStateOnExceptionChange;
159+
}
160+
161+
void setFailureTracker(FailedRecordTracker failedRecordTracker) {
162+
this.failureTracker = failedRecordTracker;
163+
}
164+
142165
@Override
143166
public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
144167
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
@@ -149,9 +172,17 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?,
149172
}
150173
this.retrying.put(Thread.currentThread(), true);
151174
try {
152-
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
153-
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getExceptionMatcher(),
154-
this.reclassifyOnExceptionChange);
175+
BackOffExecution backOffExecution = null;
176+
while (thrownException != null) {
177+
if (backOffExecution == null || this.resetStateOnExceptionChange) {
178+
BackOff backOffToUse = this.failureTracker == null ? this.backOff :
179+
this.failureTracker.determineBackOff(records.iterator().next(), thrownException);
180+
backOffExecution = backOffToUse.start();
181+
}
182+
thrownException = ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener,
183+
backOffExecution, this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners,
184+
getExceptionMatcher(), this.reclassifyOnExceptionChange, this.resetStateOnExceptionChange);
185+
}
155186
}
156187
finally {
157188
this.retrying.remove(Thread.currentThread());

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,70 @@ void reclassifyUseSameBackOffOnExceptionChange() {
289289
assertThat(retries.get()).isEqualTo(3);
290290
}
291291

292+
@Test
293+
void reclassifyResetBackOffOnExceptionChange() {
294+
AtomicReference<Exception> thrown = new AtomicReference<>();
295+
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
296+
thrown.set(ex);
297+
}, new FixedBackOff(0L, 3));
298+
eh.setResetStateOnExceptionChange(true);
299+
ConsumerRecords<String, String> records = new ConsumerRecords<>(
300+
Map.of(new TopicPartition("foo", 0),
301+
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))), Map.of());
302+
MessageListenerContainer container = mock(MessageListenerContainer.class);
303+
given(container.isRunning()).willReturn(true);
304+
AtomicInteger retries = new AtomicInteger();
305+
eh.handleBatch(new IllegalStateException(), records, mock(Consumer.class), container,
306+
() -> {
307+
retries.incrementAndGet();
308+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
309+
});
310+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
311+
.extracting("cause")
312+
.isInstanceOf(IllegalArgumentException.class);
313+
assertThat(retries.get()).isEqualTo(4);
314+
}
315+
316+
@Test
317+
void usingBackOffFunction() {
318+
AtomicReference<Exception> thrown = new AtomicReference<>();
319+
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
320+
thrown.set(ex);
321+
}, new FixedBackOff(0L, 2));
322+
eh.setBackOffFunction((cr, ex) -> ex instanceof UnsupportedOperationException ?
323+
new FixedBackOff(0L, 1) : null);
324+
ConsumerRecords<String, String> records = new ConsumerRecords<>(
325+
Map.of(new TopicPartition("foo", 0),
326+
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))), Map.of());
327+
MessageListenerContainer container = mock(MessageListenerContainer.class);
328+
given(container.isRunning()).willReturn(true);
329+
330+
// first thrown exception is UnsupportedOperationException, so BackOffFunction should cause 1 retry
331+
AtomicInteger retries = new AtomicInteger();
332+
eh.handleBatch(new UnsupportedOperationException(), records, mock(Consumer.class), container,
333+
() -> {
334+
retries.incrementAndGet();
335+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
336+
});
337+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
338+
.extracting("cause")
339+
.isInstanceOf(IllegalArgumentException.class);
340+
assertThat(retries.get()).isEqualTo(1);
341+
342+
// first thrown exception is IllegalArgumentException, so BackOffFunction should return null
343+
// which means that DefaultErrorHandler should use its the default FixedBackOff with 2 retries
344+
retries.set(0);
345+
eh.handleBatch(new IllegalArgumentException(), records, mock(Consumer.class), container,
346+
() -> {
347+
retries.incrementAndGet();
348+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
349+
});
350+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
351+
.extracting("cause")
352+
.isInstanceOf(IllegalArgumentException.class);
353+
assertThat(retries.get()).isEqualTo(2);
354+
}
355+
292356
private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
293357
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
294358
ReflectionUtils.makeAccessible(field);

0 commit comments

Comments
 (0)