Skip to content

Commit 32d79ae

Browse files
committed
GH-3400: Expose ConditionalRejectingErrorHandler.stopListenerOnFatal
Fixes: #3400 The `ConditionalRejectingErrorHandler` throws an `AmqpRejectAndDontRequeueException` causing the message to be rejected while the real problem could be with listener misconfiguration. * Provide a `ConditionalRejectingErrorHandler.stopListenerOnFatal` option to treat a fatal error as fatal for the listener, not the message. The listener container must be stopped, and message requeued for future, valid consumption. * Throw `FatalListenerExecutionException` in case of `stopListenerOnFatal` * To satisfy such a logic, move `FatalListenerExecutionException` to the `spring-amqp` module (alongside with the `FatalListenerStartupException`) deprecating existing classes. * Fix `RabbitAmqpListenerContainer` to check for the `FatalListenerExecutionException` as an outcome from the `ConditionalRejectingErrorHandler`
1 parent fb3a79e commit 32d79ae

18 files changed

Lines changed: 208 additions & 71 deletions

File tree

spring-amqp-client/src/main/java/org/springframework/amqp/client/DefaultAmqpClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,7 @@ private Sender getSender() throws ClientException {
157157
senderToReturn = this.sender;
158158
if (senderToReturn == null) {
159159
senderToReturn =
160-
this.connectionFactory
161-
.getConnection()
160+
this.connectionFactory.getConnection()
162161
.openAnonymousSender(this.senderOptions);
163162
Future<Sender> openFuture = senderToReturn.openFuture();
164163
this.sender = ProtonUtils.toSupplier(openFuture, this.senderOptions.openTimeout()).get();

spring-amqp-client/src/main/java/org/springframework/amqp/client/listener/AmqpMessageListenerContainer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.locks.ReentrantLock;
3030

3131
import org.aopalliance.aop.Advice;
32-
import org.apache.qpid.protonj2.client.Connection;
3332
import org.apache.qpid.protonj2.client.Delivery;
3433
import org.apache.qpid.protonj2.client.Receiver;
3534
import org.apache.qpid.protonj2.client.ReceiverOptions;
@@ -273,7 +272,6 @@ public void start() {
273272
this.lock.lock();
274273
try {
275274
if (this.queueToConsumers.isEmpty()) {
276-
Connection connection = this.connectionFactory.getConnection();
277275
ReceiverOptions receiverOptions =
278276
new ReceiverOptions()
279277
// Since 'AmqpConsumer' implements pause/resume logic,

spring-amqp/src/main/java/org/springframework/amqp/listener/ConditionalRejectingErrorHandler.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
* <p>
4848
* The exception will not be wrapped if the {@code cause} chain already contains an
4949
* {@link AmqpRejectAndDontRequeueException}.
50+
* <p>
51+
* If {@link #setStopListenerOnFatal(boolean)} is true, a {@link FatalListenerExecutionException}
52+
* is thrown instead of an {@link AmqpRejectAndDontRequeueException}.
53+
* The listener container must be stopped due to such a fatal state,
54+
* and the message requeued for other consumers on the destination.
5055
*
5156
* @author Gary Russell
5257
* @author Ngoc Nhan
@@ -65,6 +70,8 @@ public class ConditionalRejectingErrorHandler implements ErrorHandler {
6570

6671
private boolean rejectManual = true;
6772

73+
private boolean stopListenerOnFatal;
74+
6875
/**
6976
* Create a handler with the {@link ConditionalRejectingErrorHandler.DefaultExceptionStrategy}.
7077
*/
@@ -125,10 +132,24 @@ protected FatalExceptionStrategy getExceptionStrategy() {
125132
return this.exceptionStrategy;
126133
}
127134

135+
/**
136+
* Set to {@code true} to throw a {@link FatalListenerExecutionException} on fatal error
137+
* instead of a {@link AmqpRejectAndDontRequeueException}.
138+
* @param stopListenerOnFatal true to throw a {@link FatalListenerExecutionException}
139+
* @since 4.1
140+
*/
141+
public void setStopListenerOnFatal(boolean stopListenerOnFatal) {
142+
this.stopListenerOnFatal = stopListenerOnFatal;
143+
}
144+
145+
protected boolean isStopListenerOnFatal() {
146+
return this.stopListenerOnFatal;
147+
}
148+
128149
@Override
129150
public void handleError(Throwable t) {
130151
log(t);
131-
if (!this.causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
152+
if (!causeChainContainsARADRE(t) && this.exceptionStrategy.isFatal(t)) {
132153
if (this.discardFatalsWithXDeath && t instanceof ListenerExecutionFailedException lefe) {
133154
Message failed = lefe.getFailedMessage();
134155
if (failed != null) {
@@ -141,8 +162,15 @@ public void handleError(Throwable t) {
141162
}
142163
}
143164
}
144-
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", this.rejectManual,
145-
t);
165+
166+
if (this.stopListenerOnFatal) {
167+
throw new FatalListenerExecutionException(
168+
"Fatal error for listener execution: container should be stopped", t);
169+
}
170+
else {
171+
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal",
172+
this.rejectManual, t);
173+
}
146174
}
147175
}
148176

@@ -224,9 +252,8 @@ private boolean isCauseFatal(@Nullable Throwable cause) {
224252
protected void logFatalException(ListenerExecutionFailedException exception, @Nullable Throwable cause) {
225253
if (this.logger.isWarnEnabled()) {
226254
this.logger.warn(
227-
"Fatal message conversion error; message rejected; "
228-
+ "it will be dropped or routed to a dead letter exchange, if so configured: "
229-
+ exception.getFailedMessage());
255+
"Fatal message conversion error; message will be reject or listener container stopped, " +
256+
"if so configured: " + exception.getFailedMessage());
230257
}
231258
}
232259

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.listener;
18+
19+
import java.io.Serial;
20+
21+
import org.springframework.amqp.AmqpException;
22+
23+
/**
24+
* Exception to be thrown when the execution of a listener method failed with an
25+
* irrecoverable problem as an alternative to the {@link org.springframework.amqp.AmqpRejectAndDontRequeueException}.
26+
* The listener container must requeue the message and stop its execution in case of this error.
27+
*
28+
* @author Dave Syer
29+
* @author Artem Bilan
30+
*
31+
* @since 4.1
32+
*/
33+
public class FatalListenerExecutionException extends AmqpException {
34+
35+
@Serial
36+
private static final long serialVersionUID = 1L;
37+
38+
/**
39+
* Constructor for ListenerExecutionFailedException.
40+
* @param msg the detail message
41+
* @param cause the exception thrown by the listener method
42+
*/
43+
public FatalListenerExecutionException(String msg, Throwable cause) {
44+
super(msg, cause);
45+
}
46+
47+
/**
48+
* Constructor for ListenerExecutionFailedException.
49+
* @param msg the detail message
50+
*/
51+
public FatalListenerExecutionException(String msg) {
52+
super(msg);
53+
}
54+
55+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2026-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.listener;
18+
19+
import java.io.Serial;
20+
21+
import org.springframework.amqp.AmqpException;
22+
23+
/**
24+
* Exception to be thrown when the execution of a listener method failed on startup.
25+
*
26+
* @author Dave Syer
27+
* @author Artem Bilan
28+
*
29+
* @since 4.1
30+
*/
31+
public class FatalListenerStartupException extends AmqpException {
32+
33+
@Serial
34+
private static final long serialVersionUID = 1L;
35+
36+
/**
37+
* Constructor for ListenerExecutionFailedException.
38+
* @param msg the detail message
39+
* @param cause the exception thrown by the listener method
40+
*/
41+
public FatalListenerStartupException(String msg, Throwable cause) {
42+
super(msg, cause);
43+
}
44+
45+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@
6868
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
6969
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
7070
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
71-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
72-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
7371
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
7472
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
7573
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
@@ -1570,7 +1568,7 @@ protected void executeListenerAndHandleException(Channel channel, Object data) {
15701568
private void checkStatefulRetry(RuntimeException ex, Message message) {
15711569
if (message.getMessageProperties().isFinalRetryForMessageWithNoId()) {
15721570
if (this.statefulRetryFatalWithNullMessageId) {
1573-
throw new FatalListenerExecutionException(
1571+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException(
15741572
"Illegal null id in message. Failed to manage retry for message: " + message, ex);
15751573
}
15761574
else {
@@ -1611,10 +1609,12 @@ protected void invokeListener(Channel channel, Object data) {
16111609
* @param data the received Rabbit Message or List of Message.
16121610
* @see #setMessageListener(MessageListener)
16131611
*/
1612+
@SuppressWarnings("removal")
16141613
protected void actualInvokeListener(Channel channel, Object data) {
16151614
MessageListener listener = getMessageListener();
16161615
if (listener == null) {
1617-
throw new FatalListenerStartupException("listener cannot be null", new NullPointerException());
1616+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException(
1617+
"listener cannot be null", new NullPointerException());
16181618
}
16191619
if (listener instanceof ChannelAwareMessageListener chaml) {
16201620
doInvokeListener(chaml, channel, data);
@@ -1859,6 +1859,7 @@ protected void configureAdminIfNeeded() {
18591859
}
18601860
}
18611861

1862+
@SuppressWarnings("removal")
18621863
protected void checkMismatchedQueues() {
18631864
if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
18641865
try {
@@ -1869,7 +1870,8 @@ protected void checkMismatchedQueues() {
18691870
}
18701871
catch (AmqpIOException e) {
18711872
if (RabbitUtils.isMismatchedQueueArgs(e)) {
1872-
throw new FatalListenerStartupException("Mismatched queues", e);
1873+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException(
1874+
"Mismatched queues", e);
18731875
}
18741876
else {
18751877
logger.info("Failed to get connection during start(): " + e);
@@ -1923,6 +1925,7 @@ else if (this.missingQueuesFatal) {
19231925
* the declarations are always attempted during restart so the listener will
19241926
* fail with a fatal error if mismatches occur.
19251927
*/
1928+
@SuppressWarnings("removal")
19261929
protected void redeclareElementsIfNecessary() {
19271930
this.lifecycleLock.lock();
19281931
try {
@@ -1934,7 +1937,8 @@ protected void redeclareElementsIfNecessary() {
19341937
}
19351938
catch (Exception e) {
19361939
if (RabbitUtils.isMismatchedQueueArgs(e)) {
1937-
throw new FatalListenerStartupException("Mismatched queues", e);
1940+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException(
1941+
"Mismatched queues", e);
19381942
}
19391943
if (this.logDeclarationException.getAndSet(false)) {
19401944
this.logger.error("Failed to check/redeclare auto-delete queue(s).", e);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
6464
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
6565
import org.springframework.amqp.rabbit.connection.RabbitUtils;
66-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
6766
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
6867
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
6968
import org.springframework.amqp.rabbit.support.Delivery;
@@ -619,7 +618,7 @@ private void checkMissingQueues() {
619618
}
620619
}
621620

622-
@SuppressWarnings("NullAway") // Dataflow analysis limitation
621+
@SuppressWarnings({ "NullAway", "removal" }) // Dataflow analysis limitation
623622
public void start() throws AmqpException {
624623
if (logger.isDebugEnabled()) {
625624
logger.debug("Starting consumer " + this);
@@ -634,7 +633,8 @@ public void start() throws AmqpException {
634633
ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
635634
}
636635
catch (AmqpAuthenticationException e) {
637-
throw new FatalListenerStartupException("Authentication failure", e);
636+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException(
637+
"Authentication failure", e);
638638
}
639639
this.deliveryTags.clear();
640640
this.activeObjectCounter.add(this);
@@ -748,6 +748,7 @@ private void consumeFromQueue(String queue) throws IOException {
748748
}
749749
}
750750

751+
@SuppressWarnings("removal")
751752
private void attemptPassiveDeclarations() {
752753
DeclarationException failures = null;
753754
for (String queueName : this.queues) {
@@ -764,7 +765,8 @@ private void attemptPassiveDeclarations() {
764765
catch (TimeoutException e1) {
765766
// Ignore
766767
}
767-
throw new FatalListenerStartupException("Illegal Argument on Queue Declaration", e);
768+
throw new org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException(
769+
"Illegal Argument on Queue Declaration", e);
768770
}
769771
}
770772
catch (IOException e) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/QueuesNotAvailableException.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,24 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
19+
import java.io.Serial;
2020

2121
/**
2222
* This exception indicates that a consumer could not be started because none of
2323
* its queues are available for listening.
2424
*
2525
* @author Gary Russell
26+
* @author Artem Bilan
27+
*
2628
* @since 1.3.5
2729
*
2830
*/
29-
@SuppressWarnings("serial")
30-
public class QueuesNotAvailableException extends FatalListenerStartupException {
31+
@SuppressWarnings("removal")
32+
public class QueuesNotAvailableException
33+
extends org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException {
34+
35+
@Serial
36+
private static final long serialVersionUID = 1L;
3137

3238
public QueuesNotAvailableException(String msg, Throwable cause) {
3339
super(msg, cause);

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.springframework.amqp.core.MessagePostProcessor;
4949
import org.springframework.amqp.core.MessageProperties;
5050
import org.springframework.amqp.core.Queue;
51+
import org.springframework.amqp.listener.FatalListenerExecutionException;
52+
import org.springframework.amqp.listener.FatalListenerStartupException;
5153
import org.springframework.amqp.listener.ListenerExecutionFailedException;
5254
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
5355
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -58,8 +60,6 @@
5860
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
5961
import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
6062
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
61-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
62-
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
6363
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
6464
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
6565
import org.springframework.amqp.rabbit.support.ListenerContainerAware;
@@ -1456,7 +1456,7 @@ public void run() { // NOSONAR - line count
14561456
}
14571457
}
14581458

1459-
private void mainLoop() throws Exception { // NOSONAR Exception
1459+
private void mainLoop() throws Exception {
14601460
try {
14611461
if (SimpleMessageListenerContainer.this.stopNow.get()) {
14621462
this.consumer.forceCloseAndClearQueue();

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/exception/FatalListenerExecutionException.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.amqp.rabbit.listener.exception;
1818

19-
import org.springframework.amqp.AmqpException;
20-
2119
/**
2220
* Exception to be thrown when the execution of a listener method failed with an
2321
* irrecoverable problem.
@@ -26,7 +24,8 @@
2624
* @see org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
2725
*/
2826
@SuppressWarnings("serial")
29-
public class FatalListenerExecutionException extends AmqpException {
27+
@Deprecated(since = "4.1", forRemoval = true)
28+
public class FatalListenerExecutionException extends org.springframework.amqp.listener.FatalListenerExecutionException {
3029

3130
/**
3231
* Constructor for ListenerExecutionFailedException.

0 commit comments

Comments
 (0)