Skip to content

Commit 2caebb6

Browse files
committed
GH-3393: Fix RabbitAmqpListenerContainer for fatal errors
Fixes: #3393 If `ErrorHandler` is configured on the container to rethrow an exception, the final decision must be done in the listener container itself. * Use `ConditionalRejectingErrorHandler.DefaultExceptionStrategy` in the `SimpleMessageListenerContainer` and `RabbitAmqpListenerContainer` to determine if listener container has to be stopped due to fatal error, e.g. `NoSuchMethodException` * Fix `RabbitAmqpListenerContainer` to emit `ListenerContainerConsumerFailedEvent` in case of forced stop * Propagate `ErrorHandler` and `ApplicationEventPublisher` from the `RabbitAmqpListenerContainerFactory` * Use `log4j-slf4j-impl` dependency instead of `log4j-slf4j2-impl` to satisfy third-party libraries * Make `spotbugs-annotations` as `compileOnly` and `testImplementation` dependency **Auto-cherry-pick to `4.0.x`**
1 parent 7eebee6 commit 2caebb6

6 files changed

Lines changed: 122 additions & 23 deletions

File tree

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ configure(javaProjects) { subproject ->
180180
// dependencies that are common across all java projects
181181
dependencies {
182182
def spotBugs = 'com.github.spotbugs:spotbugs-annotations:4.9.8'
183-
runtimeOnly spotBugs
184-
testRuntimeOnly spotBugs
183+
compileOnly spotBugs
184+
testImplementation spotBugs
185185

186186
testImplementation 'org.apache.logging.log4j:log4j-core'
187187
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
@@ -198,7 +198,7 @@ configure(javaProjects) { subproject ->
198198
testImplementation "com.willowtreeapps.assertk:assertk-jvm:$assertkVersion"
199199

200200
testRuntimeOnly 'org.apache.logging.log4j:log4j-jcl'
201-
testRuntimeOnly 'org.apache.logging.log4j:log4j-slf4j2-impl'
201+
testRuntimeOnly 'org.apache.logging.log4j:log4j-slf4j-impl'
202202
}
203203

204204
// enable all compiler warnings; individual projects may customize further

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene
142142

143143
protected final AtomicBoolean stopNow = new AtomicBoolean(); // NOSONAR
144144

145+
protected final FatalExceptionStrategy exceptionStrategy =
146+
new ConditionalRejectingErrorHandler.DefaultExceptionStrategy();
147+
145148
private ContainerDelegate proxy = this.delegate;
146149

147150
private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1486,7 +1486,7 @@ private void mainLoop() throws Exception { // NOSONAR Exception
14861486
}
14871487
catch (ListenerExecutionFailedException ex) {
14881488
// Continue to process, otherwise re-throw
1489-
if (causeContainsNoSuchMethod(ex.getCause())) {
1489+
if (SimpleMessageListenerContainer.this.exceptionStrategy.isFatal(ex)) {
14901490
throw new FatalListenerExecutionException("Invalid listener", ex);
14911491
}
14921492
}
@@ -1651,22 +1651,6 @@ else if (logger.isWarnEnabled()) {
16511651
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
16521652
}
16531653

1654-
private static boolean causeContainsNoSuchMethod(@Nullable Throwable throwable) {
1655-
if (throwable instanceof NoSuchMethodException) {
1656-
return true;
1657-
}
1658-
1659-
if (throwable != null) {
1660-
Throwable cause = throwable.getCause();
1661-
if (cause == null || cause == throwable) {
1662-
return false;
1663-
}
1664-
return causeContainsNoSuchMethod(cause);
1665-
}
1666-
1667-
return false;
1668-
}
1669-
16701654
}
16711655

16721656
}

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/config/RabbitAmqpListenerContainerFactory.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@
3131
import org.springframework.amqp.rabbitmq.client.listener.RabbitAmqpMessageListenerAdapter;
3232
import org.springframework.amqp.support.converter.MessageConverter;
3333
import org.springframework.amqp.utils.JavaUtils;
34+
import org.springframework.context.ApplicationEventPublisher;
35+
import org.springframework.context.ApplicationEventPublisherAware;
3436
import org.springframework.scheduling.TaskScheduler;
37+
import org.springframework.util.ErrorHandler;
3538

3639
/**
3740
* Factory for {@link RabbitAmqpListenerContainer}.
@@ -44,7 +47,8 @@
4447
*
4548
*/
4649
public class RabbitAmqpListenerContainerFactory
47-
extends BaseRabbitListenerContainerFactory<RabbitAmqpListenerContainer> {
50+
extends BaseRabbitListenerContainerFactory<RabbitAmqpListenerContainer>
51+
implements ApplicationEventPublisherAware {
4852

4953
private final AmqpConnectionFactory connectionFactory;
5054

@@ -60,6 +64,10 @@ public class RabbitAmqpListenerContainerFactory
6064

6165
private @Nullable MessageConverter messageConverter;
6266

67+
private @Nullable ErrorHandler errorHandler;
68+
69+
private @Nullable ApplicationEventPublisher applicationEventPublisher;
70+
6371
/**
6472
* Construct an instance using the provided {@link AmqpConnectionFactory}.
6573
* @param connectionFactory the connection.
@@ -68,6 +76,11 @@ public RabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactor
6876
this.connectionFactory = connectionFactory;
6977
}
7078

79+
@Override
80+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
81+
this.applicationEventPublisher = applicationEventPublisher;
82+
}
83+
7184
/**
7285
* Set a {@link ContainerCustomizer} that is invoked after a container is created and
7386
* configured to enable further customization of the container.
@@ -130,6 +143,16 @@ public void setMessageConverter(MessageConverter messageConverter) {
130143
this.messageConverter = messageConverter;
131144
}
132145

146+
/**
147+
* Set the error handler for the container.
148+
* @param errorHandler The error handler.
149+
* @since 4.0.3
150+
* @see RabbitAmqpListenerContainer#setErrorHandler(ErrorHandler)
151+
*/
152+
public void setErrorHandler(ErrorHandler errorHandler) {
153+
this.errorHandler = errorHandler;
154+
}
155+
133156
@Override
134157
public RabbitAmqpListenerContainer createListenerContainer(@Nullable RabbitListenerEndpoint endpoint) {
135158
if (endpoint instanceof MethodRabbitListenerEndpoint methodRabbitListenerEndpoint) {
@@ -152,7 +175,9 @@ public RabbitAmqpListenerContainer createListenerContainer(@Nullable RabbitListe
152175
.acceptIfNotNull(this.afterReceivePostProcessors, container::setAfterReceivePostProcessors)
153176
.acceptIfNotNull(this.batchSize, container::setBatchSize)
154177
.acceptIfNotNull(this.batchReceiveTimeout, container::setBatchReceiveTimeout)
155-
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler);
178+
.acceptIfNotNull(this.taskScheduler, container::setTaskScheduler)
179+
.acceptIfNotNull(this.errorHandler, container::setErrorHandler)
180+
.acceptIfNotNull(this.applicationEventPublisher, container::setApplicationEventPublisher);
156181

157182
applyCommonOverrides(endpoint, container);
158183

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerContainer.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,19 @@
4343
import org.springframework.amqp.core.MessagePostProcessor;
4444
import org.springframework.amqp.listener.ConditionalRejectingErrorHandler;
4545
import org.springframework.amqp.listener.ContainerUtils;
46+
import org.springframework.amqp.listener.FatalExceptionStrategy;
47+
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
48+
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerTerminatedEvent;
4649
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
4750
import org.springframework.amqp.rabbitmq.client.RabbitAmqpUtils;
4851
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
4952
import org.springframework.aop.framework.ProxyFactory;
5053
import org.springframework.aop.support.DefaultPointcutAdvisor;
5154
import org.springframework.beans.factory.BeanNameAware;
5255
import org.springframework.beans.factory.DisposableBean;
56+
import org.springframework.context.ApplicationEvent;
57+
import org.springframework.context.ApplicationEventPublisher;
58+
import org.springframework.context.ApplicationEventPublisherAware;
5359
import org.springframework.core.log.LogAccessor;
5460
import org.springframework.scheduling.TaskScheduler;
5561
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -69,7 +75,8 @@
6975
*/
7076
@SuppressWarnings("removal")
7177
public class RabbitAmqpListenerContainer
72-
implements org.springframework.amqp.rabbit.listener.MessageListenerContainer, BeanNameAware, DisposableBean {
78+
implements org.springframework.amqp.rabbit.listener.MessageListenerContainer, BeanNameAware, DisposableBean,
79+
ApplicationEventPublisherAware {
7380

7481
private static final LogAccessor LOG = new LogAccessor(LogFactory.getLog(RabbitAmqpListenerContainer.class));
7582

@@ -79,6 +86,11 @@ public class RabbitAmqpListenerContainer
7986

8087
private final MultiValueMap<String, Consumer> queueToConsumers = new LinkedMultiValueMap<>();
8188

89+
private final FatalExceptionStrategy exceptionStrategy =
90+
new ConditionalRejectingErrorHandler.DefaultExceptionStrategy();
91+
92+
private @Nullable ApplicationEventPublisher applicationEventPublisher;
93+
8294
private String @Nullable [] queues;
8395

8496
private Advice @Nullable [] adviceChain;
@@ -129,6 +141,11 @@ public RabbitAmqpListenerContainer(AmqpConnectionFactory connectionFactory) {
129141
this.connectionFactory = connectionFactory;
130142
}
131143

144+
@Override
145+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
146+
this.applicationEventPublisher = applicationEventPublisher;
147+
}
148+
132149
@Override
133150
public void setQueueNames(String... queueNames) {
134151
this.queues = Arrays.copyOf(queueNames, queueNames.length);
@@ -446,6 +463,12 @@ private void handleListenerError(Exception ex, Consumer.Context context, Object
446463
}
447464

448465
private boolean handleSpecialErrors(Exception ex, Consumer.Context context) {
466+
if (this.exceptionStrategy.isFatal(ex)) {
467+
stop();
468+
context.requeue();
469+
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
470+
return true;
471+
}
449472
if (ContainerUtils.shouldRequeue(this.defaultRequeue, ex, LOG.getLog())) {
450473
context.requeue();
451474
return true;
@@ -461,6 +484,16 @@ private boolean handleSpecialErrors(Exception ex, Consumer.Context context) {
461484
return false;
462485
}
463486

487+
private void publishConsumerFailedEvent(@Nullable String reason, boolean fatal, @Nullable Throwable t) {
488+
if (this.applicationEventPublisher != null) {
489+
ApplicationEvent event =
490+
t == null
491+
? new ListenerContainerConsumerTerminatedEvent(this, reason) :
492+
new ListenerContainerConsumerFailedEvent(this, reason, t, fatal);
493+
this.applicationEventPublisher.publishEvent(event);
494+
}
495+
}
496+
464497
@Override
465498
public void stop() {
466499
stop(() -> {

spring-rabbitmq-client/src/test/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpListenerTests.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,15 @@
3434
import org.springframework.amqp.core.Binding;
3535
import org.springframework.amqp.core.BindingBuilder;
3636
import org.springframework.amqp.core.DirectExchange;
37+
import org.springframework.amqp.core.Message;
3738
import org.springframework.amqp.core.MessageListenerContainer;
3839
import org.springframework.amqp.core.Queue;
3940
import org.springframework.amqp.core.QueueBuilder;
4041
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
42+
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
4143
import org.springframework.amqp.rabbit.annotation.RabbitListener;
4244
import org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor;
45+
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
4346
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
4447
import org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory;
4548
import org.springframework.amqp.rabbitmq.client.RabbitAmqpTestBase;
@@ -49,14 +52,17 @@
4952
import org.springframework.amqp.support.converter.SimpleMessageConverter;
5053
import org.springframework.amqp.utils.test.TestUtils;
5154
import org.springframework.beans.factory.annotation.Autowired;
55+
import org.springframework.context.ApplicationListener;
5256
import org.springframework.context.annotation.Bean;
5357
import org.springframework.context.annotation.Configuration;
5458
import org.springframework.messaging.handler.annotation.SendTo;
5559
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
5660
import org.springframework.test.context.ContextConfiguration;
5761
import org.springframework.util.MultiValueMap;
62+
import org.springframework.util.ReflectionUtils;
5863

5964
import static org.assertj.core.api.Assertions.assertThat;
65+
import static org.awaitility.Awaitility.await;
6066

6167
/**
6268
* @author Artem Bilan
@@ -184,6 +190,16 @@ void verifyReplyOnAnotherQueue() {
184190
.isEqualTo("Reply for 'test data4' via 'e1' and 'k4'");
185191
}
186192

193+
@Test
194+
void noSuchMethodErrorStopsContainer(@Autowired NoSuchMethodService noSuchMethodService)
195+
throws InterruptedException {
196+
197+
this.template.convertAndSend("no.such.method", "test data");
198+
MessageListenerContainer container = this.rabbitListenerEndpointRegistry.getListenerContainer("no.such.method");
199+
assertThat(noSuchMethodService.eventLatch.await(10, TimeUnit.SECONDS)).isTrue();
200+
await().until(() -> !container.isRunning());
201+
}
202+
187203
@Configuration
188204
@EnableRabbit
189205
static class Config {
@@ -318,6 +334,44 @@ String replyViaSendTo(String data) {
318334
return "Reply for '%s' via 'e1' and 'k4'".formatted(data);
319335
}
320336

337+
@Bean
338+
public RabbitAmqpListenerContainerFactory noSuchMethodListenerContainerFactory(
339+
AmqpConnectionFactory connectionFactory) {
340+
341+
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory =
342+
new RabbitAmqpListenerContainerFactory(connectionFactory);
343+
rabbitAmqpListenerContainerFactory.setErrorHandler(ReflectionUtils::rethrowRuntimeException);
344+
return rabbitAmqpListenerContainerFactory;
345+
}
346+
347+
@Bean
348+
Queue noSuchMethodQueue() {
349+
return new Queue("no.such.method");
350+
}
351+
352+
@Bean
353+
NoSuchMethodService noSuchMethodService() {
354+
return new NoSuchMethodService();
355+
}
356+
357+
}
358+
359+
@RabbitListener(id = "no.such.method", queues = "no.such.method",
360+
containerFactory = "noSuchMethodListenerContainerFactory")
361+
static class NoSuchMethodService implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
362+
363+
CountDownLatch eventLatch = new CountDownLatch(1);
364+
365+
@RabbitHandler
366+
void notReachableHandle(Message message) {
367+
throw new RuntimeException("Should not be thrown");
368+
}
369+
370+
@Override
371+
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
372+
this.eventLatch.countDown();
373+
}
374+
321375
}
322376

323377
}

0 commit comments

Comments
 (0)