Skip to content

Commit 1a5af73

Browse files
committed
fix: max retry
1 parent 2c7f5f7 commit 1a5af73

11 files changed

Lines changed: 467 additions & 203 deletions

File tree

.github/workflows/java-ci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ jobs:
446446

447447
- name: Run NATS tests
448448
env:
449+
RQUEUE_TEST_BACKEND: nats
449450
NATS_RUNNING: "true"
450451
NATS_URL: nats://127.0.0.1:4222
451452
run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable
225225
}
226226
}
227227

228-
private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
229-
return rqueueMessage.getRetryCount() == null
230-
? queueDetail.getNumRetry()
231-
: rqueueMessage.getRetryCount();
232-
}
233-
234228
private void handleFailure(JobImpl job, int failureCount, Throwable throwable) {
235229
if (job.getQueueDetail().isDoNotRetryError(throwable)) {
236230
handleRetryExceededMessage(job, failureCount, throwable);
237231
} else {
238-
int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
239-
if (failureCount < maxRetryCount) {
232+
if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) {
240233
long delay = taskExecutionBackoff.nextBackOff(
241234
job.getMessage(), job.getRqueueMessage(), failureCount, throwable);
242235
if (delay == TaskExecutionBackOff.STOP) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.listener;
18+
19+
import com.github.sonus21.rqueue.config.RqueueConfig;
20+
import com.github.sonus21.rqueue.core.RqueueMessage;
21+
import lombok.AccessLevel;
22+
import lombok.NoArgsConstructor;
23+
24+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
25+
final class RetryPolicy {
26+
27+
static final int UNLIMITED_RETRY_LIMIT = 100_000;
28+
29+
static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
30+
int maxRetryCount = rqueueMessage.getRetryCount() == null
31+
? queueDetail.getNumRetry()
32+
: rqueueMessage.getRetryCount();
33+
if (maxRetryCount == Integer.MAX_VALUE) {
34+
return UNLIMITED_RETRY_LIMIT;
35+
}
36+
return maxRetryCount;
37+
}
38+
39+
static int remainingRetryCount(
40+
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
41+
int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail);
42+
return Math.max(0, maxRetryCount - failureCount);
43+
}
44+
45+
static int retryCountForPoll(
46+
RqueueConfig rqueueConfig,
47+
RqueueMessage rqueueMessage,
48+
QueueDetail queueDetail,
49+
int failureCount) {
50+
int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount);
51+
if (rqueueConfig.getRetryPerPoll() == -1) {
52+
return remainingRetryCount;
53+
}
54+
return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount);
55+
}
56+
57+
static boolean isExhausted(
58+
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
59+
return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0;
60+
}
61+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,6 @@ private void init() {
111111
this.failureCount = job.getRqueueMessage().getFailureCount();
112112
}
113113

114-
private int getMaxRetryCount() {
115-
return Objects.isNull(job.getRqueueMessage().getRetryCount())
116-
? job.getQueueDetail().getNumRetry()
117-
: job.getRqueueMessage().getRetryCount();
118-
}
119-
120114
private void updateCounter(boolean fail) {
121115
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter();
122116
if (Objects.isNull(counter)) {
@@ -179,11 +173,8 @@ private boolean isOldMessage() {
179173
}
180174

181175
private int getRetryCount() {
182-
int maxRetry = getMaxRetryCount();
183-
if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) {
184-
return maxRetry;
185-
}
186-
return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry);
176+
return RetryPolicy.retryCountForPoll(
177+
beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount);
187178
}
188179

189180
private boolean queueInactive() {
@@ -283,6 +274,7 @@ private void execute() {
283274
private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) {
284275
if (retryCount > 0
285276
&& ExecutionStatus.FAILED.equals(status)
277+
&& !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount)
286278
&& System.currentTimeMillis() < maxProcessingTime) {
287279
boolean doNoRetry = queueDetail.isDoNotRetryError(error);
288280
// it should not be retried based on the exception list
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.listener;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
22+
import static org.mockito.Mockito.doReturn;
23+
24+
import com.github.sonus21.rqueue.CoreUnitTest;
25+
import com.github.sonus21.rqueue.config.RqueueConfig;
26+
import com.github.sonus21.rqueue.core.RqueueMessage;
27+
import com.github.sonus21.rqueue.utils.TestUtils;
28+
import org.junit.jupiter.api.Test;
29+
import org.mockito.Mockito;
30+
31+
@CoreUnitTest
32+
class RetryPolicyTest {
33+
34+
private final QueueDetail queueDetail = TestUtils.createQueueDetail("queue", 2, 900000L, null);
35+
36+
@Test
37+
void retryCountForPollUsesRemainingRetryBudget() {
38+
RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class);
39+
doReturn(100).when(rqueueConfig).getRetryPerPoll();
40+
RqueueMessage rqueueMessage = new RqueueMessage();
41+
42+
assertEquals(1, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1));
43+
}
44+
45+
@Test
46+
void retryCountForPollKeepsExplicitMessageRetryCount() {
47+
RqueueConfig rqueueConfig = Mockito.mock(RqueueConfig.class);
48+
doReturn(-1).when(rqueueConfig).getRetryPerPoll();
49+
RqueueMessage rqueueMessage = RqueueMessage.builder().retryCount(1000).build();
50+
51+
assertEquals(
52+
999, RetryPolicy.retryCountForPoll(rqueueConfig, rqueueMessage, queueDetail, 1));
53+
}
54+
55+
@Test
56+
void isExhaustedUsesEffectiveRetryCount() {
57+
RqueueMessage rqueueMessage = new RqueueMessage();
58+
59+
assertFalse(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 1));
60+
assertTrue(RetryPolicy.isExhausted(rqueueMessage, queueDetail, 2));
61+
}
62+
63+
@Test
64+
void retryForeverSentinelUsesFiniteLimit() {
65+
QueueDetail retryForeverQueue =
66+
TestUtils.createQueueDetail("queue", Integer.MAX_VALUE, 900000L, null);
67+
RqueueMessage rqueueMessage = new RqueueMessage();
68+
69+
assertEquals(RetryPolicy.UNLIMITED_RETRY_LIMIT, RetryPolicy.maxRetryCount(
70+
rqueueMessage, retryForeverQueue));
71+
assertEquals(
72+
1,
73+
RetryPolicy.remainingRetryCount(
74+
rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT - 1));
75+
assertTrue(RetryPolicy.isExhausted(
76+
rqueueMessage, retryForeverQueue, RetryPolicy.UNLIMITED_RETRY_LIMIT));
77+
}
78+
}

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainerBrokerBranchTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,24 @@ void redisDefaultsBrokerAlsoUsesNormalStartQueuePath() throws Exception {
228228
}
229229
}
230230

231+
@Test
232+
void startDoesNotStartQueuesAgainWhenContainerIsAlreadyRunning() throws Exception {
233+
EndpointRegistry.delete();
234+
CountingBroker broker = new CountingBroker(Capabilities.REDIS_DEFAULTS);
235+
TrackingContainer container = new TrackingContainer(messageHandler);
236+
container.setMessageBroker(broker);
237+
container.afterPropertiesSet();
238+
try {
239+
container.start();
240+
container.start();
241+
242+
assertEquals(1, container.startQueueCalls.get() + container.startGroupCalls.get());
243+
} finally {
244+
container.stop();
245+
container.destroy();
246+
}
247+
}
248+
231249
@Test
232250
void pollerForwardsPollingIntervalAsBrokerFetchWait() throws Exception {
233251
EndpointRegistry.delete();
@@ -288,6 +306,8 @@ private class TrackingContainer extends RqueueMessageListenerContainer {
288306
final AtomicBoolean startBrokerPollersCalled = new AtomicBoolean();
289307
final AtomicBoolean startQueueCalled = new AtomicBoolean();
290308
final AtomicBoolean startGroupCalled = new AtomicBoolean();
309+
final AtomicInteger startQueueCalls = new AtomicInteger();
310+
final AtomicInteger startGroupCalls = new AtomicInteger();
291311

292312
TrackingContainer(RqueueMessageHandler handler) {
293313
super(handler, rqueueMessageTemplate);
@@ -297,12 +317,14 @@ private class TrackingContainer extends RqueueMessageListenerContainer {
297317
@Override
298318
protected void startQueue(String pollerKey, QueueDetail queueDetail) {
299319
startQueueCalled.set(true);
320+
startQueueCalls.incrementAndGet();
300321
// Do not actually start the poller; it would need a real broker.
301322
}
302323

303324
@Override
304325
protected void startGroup(String groupName, List<QueueDetail> queueDetails) {
305326
startGroupCalled.set(true);
327+
startGroupCalls.incrementAndGet();
306328
}
307329
}
308330
}

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ void getPollWait_usesConfiguredFetchWait() {
101101
assertEquals(Duration.ofSeconds(9), f.broker.getPollWait(Duration.ofMillis(137)));
102102
}
103103

104+
@Test
105+
void getPollWait_fallsBackToPollingIntervalWhenFetchWaitIsUnset() {
106+
RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setDefaultFetchWait(null);
107+
Fixture f = newFixture(cfg);
108+
Duration pollingInterval = Duration.ofMillis(137);
109+
110+
assertEquals(pollingInterval, f.broker.getPollWait(pollingInterval));
111+
}
112+
104113
@Test
105114
void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception {
106115
Fixture f = newFixture(RqueueNatsConfig.defaults());

rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/integration/NatsGlobalRetryLimitE2EIT.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)