Skip to content

Commit a3f3ba5

Browse files
committed
Merge branch '7.0.x'
2 parents 79f4f76 + 1e45ad8 commit a3f3ba5

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

spring-jms/src/main/java/org/springframework/jms/listener/DefaultMessageListenerContainer.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1296,6 +1296,8 @@ private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
12961296

12971297
private @Nullable Object lastRecoveryMarker;
12981298

1299+
private @Nullable BackOffExecution invokerBackOff;
1300+
12991301
private int idleTaskExecutionCount = 0;
13001302

13011303
private volatile boolean idle = true;
@@ -1338,6 +1340,15 @@ public void run() {
13381340
}
13391341
catch (Throwable ex) {
13401342
clearResources();
1343+
if (this.invokerBackOff != null) {
1344+
// We failed more than once in a row, probably due to a specific failure
1345+
// from the JMS receive call even after a successful connection recovery:
1346+
// locally wait before a further recovery attempt to avoid a burst retry.
1347+
applyBackOffTime(this.invokerBackOff);
1348+
}
1349+
else {
1350+
this.invokerBackOff = DefaultMessageListenerContainer.this.backOff.start();
1351+
}
13411352
boolean alreadyRecovered = false;
13421353
recoveryLock.lock();
13431354
try {
@@ -1449,7 +1460,9 @@ private boolean invokeListener() throws JMSException {
14491460
this.currentReceiveThread = Thread.currentThread();
14501461
try {
14511462
initResourcesIfNecessary();
1452-
return receiveAndExecute(this, this.session, this.consumer);
1463+
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
1464+
this.invokerBackOff = null; // successful receive attempt without exception
1465+
return messageReceived;
14531466
}
14541467
finally {
14551468
this.currentReceiveThread = null;
@@ -1471,7 +1484,6 @@ private void decreaseActiveInvokerCount() {
14711484
}
14721485
}
14731486

1474-
@SuppressWarnings("NullAway") // Dataflow analysis limitation
14751487
private void initResourcesIfNecessary() throws JMSException {
14761488
if (getCacheLevel() <= CACHE_CONNECTION) {
14771489
updateRecoveryMarker();
@@ -1482,6 +1494,7 @@ private void initResourcesIfNecessary() throws JMSException {
14821494
this.session = createSession(getSharedConnection());
14831495
}
14841496
if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
1497+
Assert.state(this.session != null, "No cached Session");
14851498
this.consumer = createListenerConsumer(this.session);
14861499
lifecycleLock.lock();
14871500
try {

0 commit comments

Comments
 (0)