Skip to content

Commit 82ef24e

Browse files
committed
This is a backport of the commit 7c9f5c0 to the branch 3.3.x
Signed-off-by: Leos Bitto <leos.bitto@gmail.com>
1 parent 893bf50 commit 82ef24e

5 files changed

Lines changed: 158 additions & 13 deletions

File tree

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
* @author Antonio Tomac
5252
* @author Wang Zhiyang
5353
* @author Sanghyeok An
54+
* @author Leos Bitto
5455
*
5556
* @since 2.8
5657
*
@@ -78,7 +79,7 @@ private ErrorHandlingUtils() {
7879
* @param logLevel the log level.
7980
* @param retryListeners the retry listeners.
8081
* @param classifier the exception classifier.
81-
* @param reClassifyOnExceptionChange true to reset the state if a different exception
82+
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
8283
* is thrown during retry.
8384
* @since 2.9.7
8485
*/
@@ -87,8 +88,40 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
8788
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
8889
KafkaException.Level logLevel, List<RetryListener> retryListeners, BinaryExceptionClassifier classifier,
8990
boolean reClassifyOnExceptionChange) {
91+
retryBatch(thrownException, records, consumer, container, invokeListener, backOff.start(), seeker, recoverer,
92+
logger, logLevel, retryListeners, classifier, reClassifyOnExceptionChange, false);
93+
}
94+
95+
/**
96+
* Retry a complete batch by pausing the consumer and then, in a loop, poll the
97+
* consumer, wait for the next back off, then call the listener. When retries are
98+
* exhausted, call the recoverer with the {@link ConsumerRecords}.
99+
* @param thrownException the exception.
100+
* @param records the records.
101+
* @param consumer the consumer.
102+
* @param container the container.
103+
* @param invokeListener the {@link Runnable} to run (call the listener).
104+
* @param execution the backOff execution to use for the retries.
105+
* @param seeker the common error handler that re-seeks the entire batch.
106+
* @param recoverer the recoverer.
107+
* @param logger the logger.
108+
* @param logLevel the log level.
109+
* @param retryListeners the retry listeners.
110+
* @param classifier the exception classifier.
111+
* @param reClassifyOnExceptionChange true to reclassify the exception if a different exception
112+
* is thrown during retry.
113+
* @param resetStateOnExceptionChange true if a different exception thrown during retry should end this method.
114+
* @return a new exception if resetStateOnExceptionChange was true and a different exception has occurred
115+
* (retry with a possibly different BackOffExecution is expected if non-null is returned)
116+
* @since 4.0.7
117+
*/
118+
@Nullable
119+
public static Exception retryBatch(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
120+
MessageListenerContainer container, Runnable invokeListener, BackOffExecution execution,
121+
CommonErrorHandler seeker, BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer, LogAccessor logger,
122+
KafkaException.Level logLevel, List<RetryListener> retryListeners, BinaryExceptionClassifier classifier,
123+
boolean reClassifyOnExceptionChange, boolean resetStateOnExceptionChange) {
90124

91-
BackOffExecution execution = backOff.start();
92125
long nextBackOff = execution.nextBackOff();
93126
String failed = null;
94127
Set<TopicPartition> assignment = consumer.assignment();
@@ -147,7 +180,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
147180
}
148181
try {
149182
invokeListener.run();
150-
return;
183+
return null;
151184
}
152185
catch (Exception ex) {
153186
listen(retryListeners, records, ex, attempt++);
@@ -163,6 +196,9 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
163196

164197
break;
165198
}
199+
if (resetStateOnExceptionChange && !newException.getClass().equals(lastException.getClass())) {
200+
return newException;
201+
}
166202
}
167203
nextBackOff = execution.nextBackOff();
168204
}
@@ -184,6 +220,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
184220
consumerPauseResumeEventPublisher.publishConsumerResumedEvent(assignment2);
185221
}
186222
}
223+
return null;
187224
} // NOSONAR NCSS line count
188225

189226
private static void listen(List<RetryListener> listeners, ConsumerRecords<?, ?> records,

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* @author Francois Rosiere
5151
* @author Wang Zhiyang
5252
* @author Artem Bilan
53+
* @author Leos Bitto
5354
* @since 2.8
5455
*
5556
*/
@@ -79,11 +80,15 @@ public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception
7980
* @param fallbackHandler the fallback handler.
8081
* @since 2.9
8182
*/
83+
@SuppressWarnings("this-escape") // getFailureTracker() returns an already initialized final field
8284
public FailedBatchProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
8385
@Nullable BackOffHandler backOffHandler, CommonErrorHandler fallbackHandler) {
8486

8587
super(recoverer, backOff, backOffHandler);
8688
this.fallbackBatchHandler = fallbackHandler;
89+
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
90+
handler.setFailureTracker(getFailureTracker());
91+
}
8792
}
8893

8994
@Override
@@ -104,10 +109,10 @@ public void setLogLevel(Level logLevel) {
104109

105110
/**
106111
* Set to {@code false} to not reclassify the exception if different from the previous
107-
* failure. If the changed exception is classified as retryable, the existing back off
108-
* sequence is used; a new sequence is not started. Default true. Only applies when
109-
* the fallback batch error handler (for exceptions other than
110-
* {@link BatchListenerFailedException}) is the default.
112+
* failure. Default true. If the changed exception is classified as retryable, the existing
113+
* back off sequence is used if resetStateOnExceptionChange is false; a new sequence is started
114+
* if resetStateOnExceptionChange is true. Only applies when the fallback batch error handler
115+
* (for exceptions other than {@link BatchListenerFailedException}) is the default.
111116
* @param reclassifyOnExceptionChange false to not reclassify.
112117
* @since 2.9.7
113118
*/
@@ -117,6 +122,14 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
117122
}
118123
}
119124

125+
@Override
126+
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
127+
super.setResetStateOnExceptionChange(resetStateOnExceptionChange);
128+
if (this.fallbackBatchHandler instanceof FallbackBatchErrorHandler handler) {
129+
handler.setResetStateOnExceptionChange(resetStateOnExceptionChange);
130+
}
131+
}
132+
120133
@Override
121134
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
122135
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Except
213213
return failedRecord;
214214
}
215215

216-
private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
216+
BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
217217
if (this.backOffFunction == null) {
218218
return this.backOff;
219219
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FallbackBatchErrorHandler.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.lang.Nullable;
3737
import org.springframework.util.Assert;
3838
import org.springframework.util.backoff.BackOff;
39+
import org.springframework.util.backoff.BackOffExecution;
3940
import org.springframework.util.backoff.FixedBackOff;
4041

4142
/**
@@ -47,6 +48,7 @@
4748
* performed so that the batch will be redelivered on the next poll.
4849
*
4950
* @author Gary Russell
51+
* @author Leos Bitto
5052
* @since 2.3.7
5153
*
5254
*/
@@ -68,6 +70,10 @@ class FallbackBatchErrorHandler extends ExceptionClassifier implements CommonErr
6870

6971
private boolean reclassifyOnExceptionChange = true;
7072

73+
private boolean resetStateOnExceptionChange = false;
74+
75+
private @Nullable FailedRecordTracker failureTracker;
76+
7177
/**
7278
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
7379
* a 5 second back off).
@@ -130,15 +136,32 @@ protected boolean isReclassifyOnExceptionChange() {
130136

131137
/**
132138
* Set to false to not reclassify the exception if different from the previous
133-
* failure. If the changed exception is classified as retryable, the existing back off
134-
* sequence is used; a new sequence is not started. Default true.
139+
* failure. Default true. If the changed exception is classified as retryable, the
140+
* existing back off sequence is used when resetStateOnExceptionChange is set to
141+
* false; a new sequence is started when resetStateOnExceptionChange is set to true.
135142
* @param reclassifyOnExceptionChange false to not reclassify.
136143
* @since 2.9.7
137144
*/
138145
public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange) {
139146
this.reclassifyOnExceptionChange = reclassifyOnExceptionChange;
140147
}
141148

149+
/**
150+
* Set to false to keep the same back off sequence even when the exception changes
151+
* in subsequent retries. Default false. If set to true, a new sequence is started
152+
* when the exception changes.
153+
* @param resetStateOnExceptionChange true to start a new back off sequence
154+
* when the exception changes in subsequent retries
155+
* @since 3.3.16
156+
*/
157+
void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
158+
this.resetStateOnExceptionChange = resetStateOnExceptionChange;
159+
}
160+
161+
void setFailureTracker(FailedRecordTracker failedRecordTracker) {
162+
this.failureTracker = failedRecordTracker;
163+
}
164+
142165
@Override
143166
public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
144167
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
@@ -149,9 +172,17 @@ public void handleBatch(Exception thrownException, @Nullable ConsumerRecords<?,
149172
}
150173
this.retrying.put(Thread.currentThread(), true);
151174
try {
152-
ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener, this.backOff,
153-
this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners, getClassifier(),
154-
this.reclassifyOnExceptionChange);
175+
BackOffExecution backOffExecution = null;
176+
while (thrownException != null) {
177+
if (backOffExecution == null || this.resetStateOnExceptionChange) {
178+
BackOff backOffToUse = this.failureTracker == null ? this.backOff :
179+
this.failureTracker.determineBackOff(records.iterator().next(), thrownException);
180+
backOffExecution = backOffToUse.start();
181+
}
182+
thrownException = ErrorHandlingUtils.retryBatch(thrownException, records, consumer, container, invokeListener,
183+
backOffExecution, this.seeker, this.recoverer, this.logger, getLogLevel(), this.retryListeners,
184+
getClassifier(), this.reclassifyOnExceptionChange, this.resetStateOnExceptionChange);
185+
}
155186
}
156187
finally {
157188
this.retrying.remove(Thread.currentThread());

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,70 @@ void reclassifyUseSameBackOffOnExceptionChange() {
287287
assertThat(retries.get()).isEqualTo(3);
288288
}
289289

290+
@Test
291+
void reclassifyResetBackOffOnExceptionChange() {
292+
AtomicReference<Exception> thrown = new AtomicReference<>();
293+
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
294+
thrown.set(ex);
295+
}, new FixedBackOff(0L, 3));
296+
eh.setResetStateOnExceptionChange(true);
297+
ConsumerRecords<String, String> records = new ConsumerRecords<>(
298+
Map.of(new TopicPartition("foo", 0),
299+
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))));
300+
MessageListenerContainer container = mock(MessageListenerContainer.class);
301+
given(container.isRunning()).willReturn(true);
302+
AtomicInteger retries = new AtomicInteger();
303+
eh.handleBatch(new IllegalStateException(), records, mock(Consumer.class), container,
304+
() -> {
305+
retries.incrementAndGet();
306+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
307+
});
308+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
309+
.extracting("cause")
310+
.isInstanceOf(IllegalArgumentException.class);
311+
assertThat(retries.get()).isEqualTo(4);
312+
}
313+
314+
@Test
315+
void usingBackOffFunction() {
316+
AtomicReference<Exception> thrown = new AtomicReference<>();
317+
DefaultErrorHandler eh = new DefaultErrorHandler((cr, ex) -> {
318+
thrown.set(ex);
319+
}, new FixedBackOff(0L, 2));
320+
eh.setBackOffFunction((cr, ex) -> ex instanceof UnsupportedOperationException ?
321+
new FixedBackOff(0L, 1) : null);
322+
ConsumerRecords<String, String> records = new ConsumerRecords<>(
323+
Map.of(new TopicPartition("foo", 0),
324+
List.of(new ConsumerRecord<>("foo", 0, 0L, null, "bar"))));
325+
MessageListenerContainer container = mock(MessageListenerContainer.class);
326+
given(container.isRunning()).willReturn(true);
327+
328+
// first thrown exception is UnsupportedOperationException, so BackOffFunction should cause 1 retry
329+
AtomicInteger retries = new AtomicInteger();
330+
eh.handleBatch(new UnsupportedOperationException(), records, mock(Consumer.class), container,
331+
() -> {
332+
retries.incrementAndGet();
333+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
334+
});
335+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
336+
.extracting("cause")
337+
.isInstanceOf(IllegalArgumentException.class);
338+
assertThat(retries.get()).isEqualTo(1);
339+
340+
// first thrown exception is IllegalArgumentException, so BackOffFunction should return null
341+
// which means that DefaultErrorHandler should use its the default FixedBackOff with 2 retries
342+
retries.set(0);
343+
eh.handleBatch(new IllegalArgumentException(), records, mock(Consumer.class), container,
344+
() -> {
345+
retries.incrementAndGet();
346+
throw new ListenerExecutionFailedException("", new IllegalArgumentException());
347+
});
348+
assertThat(thrown.get()).isInstanceOf(ListenerExecutionFailedException.class)
349+
.extracting("cause")
350+
.isInstanceOf(IllegalArgumentException.class);
351+
assertThat(retries.get()).isEqualTo(2);
352+
}
353+
290354
private boolean getRetryingFieldValue(FallbackBatchErrorHandler errorHandler) {
291355
Field field = ReflectionUtils.findField(FallbackBatchErrorHandler.class, "retrying");
292356
ReflectionUtils.makeAccessible(field);

0 commit comments

Comments
 (0)