Skip to content

Commit 73d79fe

Browse files
[codex] Delay Rqueue startup for Boot web apps (#303)
* Delay Rqueue startup for Boot web apps * Apply Palantir Java Format * Make Boot Rqueue auto-start delay explicit * Use NATS fetch wait for listener long polling * Add MsgPack listener E2E coverage * Stabilize NATS priority queue E2E test * Use MessagePack dependency in listener E2E test * Use MessagePack ObjectMapper in listener E2E test * Remove unsupported MessagePack listener E2E test * fix: max retry * Apply Palantir Java Format * Consolidate backend contract E2E tests * Apply Palantir Java Format * Increase broker unit coverage * chore: bump version * Update RC10 and RC11 release notes --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 0511a8c commit 73d79fe

25 files changed

Lines changed: 1402 additions & 635 deletions

File tree

.github/workflows/java-ci.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ jobs:
446446

447447
- name: Run NATS tests
448448
env:
449+
RQUEUE_TEST_BACKEND: nats
449450
NATS_RUNNING: "true"
450451
NATS_URL: nats://127.0.0.1:4222
451452
run: ./gradlew :rqueue-nats:test :rqueue-spring-boot-starter:test :rqueue-spring:test -DincludeTags=nats

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ ext {
8484

8585
subprojects {
8686
group = "com.github.sonus21"
87-
version = "4.0.0-RC10"
87+
version = "4.0.0-RC11"
8888

8989
dependencies {
9090
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

docs/CHANGELOG.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,52 @@ foundational Spring Boot 4 and Jackson 3 migration notes; RC3 for the Java 17
1818
baseline change; RC4–RC6 below for the NATS backend, broker SPI, dashboard
1919
work, and middleware additions that build on top.
2020

21+
## Release [4.0.0.RC11] 2026-05-24
22+
23+
{: .highlight}
24+
Release candidate.
25+
26+
### Fixes
27+
* **Delayed listener startup for Spring Boot web apps** — Rqueue listener
28+
containers in servlet and reactive Spring Boot web applications now wait until
29+
`ApplicationReadyEvent` before consuming work. Non-web worker applications keep
30+
the existing `SmartLifecycle` startup behavior.
31+
* **Idempotent listener container startup** — repeated `start()` calls no longer
32+
re-run queue startup, and the container marks itself running only after
33+
startup succeeds.
34+
* **Global retry cap enforcement**`rqueue.retry.max` now caps the remaining
35+
retry budget even when `rqueue.retry.per.poll` is low or high. The retry logic
36+
is centralized in `RetryPolicy`, preserving explicit message/listener retry
37+
counts while preventing implicit retry-forever jobs from bypassing the global
38+
max.
39+
* **NATS listener polling wait** — NATS pollers now use the backend-configured
40+
fetch wait via the broker SPI, reducing short-poll churn while keeping Redis
41+
behavior unchanged.
42+
43+
### Build
44+
* **Shared backend contract E2E tests** — Redis and NATS now run the same backend
45+
contract E2E coverage through environment-selected bootstrapping, replacing
46+
duplicated NATS-only E2E classes.
47+
* **Broker coverage** — added focused unit coverage for broker defaults and NATS
48+
JetStream pop, in-flight, size, subscriber, and dashboard-label paths.
49+
50+
## Release [4.0.0.RC10] 2026-05-21
51+
52+
{: .highlight}
53+
Release candidate.
54+
55+
### Fixes
56+
* **Spring Boot 3.x to 4.x message compatibility** — restored Jackson 2.x
57+
property ordering compatibility in `RqueueRedisSerializer` so messages written
58+
by Rqueue 3.x can be acknowledged or parked for retry after upgrading to
59+
Rqueue 4.x. This prevents stale processing-queue entries caused by byte-exact
60+
Redis `ZSCORE` / `ZREM` lookups using a different serialized property order.
61+
62+
### Docs
63+
* **Migration guidance** — clarified the 3.x to 4.x upgrade notes around
64+
`rqueue.serialization.property.order` so applications can choose the
65+
compatibility mode intentionally during rolling upgrades.
66+
2167
## Release [4.0.0.RC9] 2026-05-13
2268

2369
{: .highlight}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,19 @@ default Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
8383
return Mono.fromRunnable(() -> enqueueWithDelay(q, m, delayMs));
8484
}
8585

86+
/**
87+
* Resolve the wait duration that listener pollers should pass to {@link #pop}. The default uses
88+
* the listener container's polling interval, preserving existing Redis behavior where that value
89+
* also controls idle sleeps. Backends with native long-poll controls can override this so their
90+
* fetch wait can be tuned independently.
91+
*
92+
* @param pollingInterval listener container polling interval
93+
* @return wait duration for listener-driven pop calls
94+
*/
95+
default Duration getPollWait(Duration pollingInterval) {
96+
return pollingInterval;
97+
}
98+
8699
List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait);
87100

88101
/**

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/PostProcessingHandler.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,18 +225,11 @@ private void handleRetryExceededMessage(JobImpl job, int failureCount, Throwable
225225
}
226226
}
227227

228-
private int getMaxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
229-
return rqueueMessage.getRetryCount() == null
230-
? queueDetail.getNumRetry()
231-
: rqueueMessage.getRetryCount();
232-
}
233-
234228
private void handleFailure(JobImpl job, int failureCount, Throwable throwable) {
235229
if (job.getQueueDetail().isDoNotRetryError(throwable)) {
236230
handleRetryExceededMessage(job, failureCount, throwable);
237231
} else {
238-
int maxRetryCount = getMaxRetryCount(job.getRqueueMessage(), job.getQueueDetail());
239-
if (failureCount < maxRetryCount) {
232+
if (!RetryPolicy.isExhausted(job.getRqueueMessage(), job.getQueueDetail(), failureCount)) {
240233
long delay = taskExecutionBackoff.nextBackOff(
241234
job.getMessage(), job.getRqueueMessage(), failureCount, throwable);
242235
if (delay == TaskExecutionBackOff.STOP) {
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.listener;
18+
19+
import com.github.sonus21.rqueue.config.RqueueConfig;
20+
import com.github.sonus21.rqueue.core.RqueueMessage;
21+
import lombok.AccessLevel;
22+
import lombok.NoArgsConstructor;
23+
24+
@NoArgsConstructor(access = AccessLevel.PRIVATE)
25+
final class RetryPolicy {
26+
27+
static final int UNLIMITED_RETRY_LIMIT = 100_000;
28+
29+
static int maxRetryCount(RqueueMessage rqueueMessage, QueueDetail queueDetail) {
30+
int maxRetryCount = rqueueMessage.getRetryCount() == null
31+
? queueDetail.getNumRetry()
32+
: rqueueMessage.getRetryCount();
33+
if (maxRetryCount == Integer.MAX_VALUE) {
34+
return UNLIMITED_RETRY_LIMIT;
35+
}
36+
return maxRetryCount;
37+
}
38+
39+
static int remainingRetryCount(
40+
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
41+
int maxRetryCount = maxRetryCount(rqueueMessage, queueDetail);
42+
return Math.max(0, maxRetryCount - failureCount);
43+
}
44+
45+
static int retryCountForPoll(
46+
RqueueConfig rqueueConfig,
47+
RqueueMessage rqueueMessage,
48+
QueueDetail queueDetail,
49+
int failureCount) {
50+
int remainingRetryCount = remainingRetryCount(rqueueMessage, queueDetail, failureCount);
51+
if (rqueueConfig.getRetryPerPoll() == -1) {
52+
return remainingRetryCount;
53+
}
54+
return Math.min(rqueueConfig.getRetryPerPoll(), remainingRetryCount);
55+
}
56+
57+
static boolean isExhausted(
58+
RqueueMessage rqueueMessage, QueueDetail queueDetail, int failureCount) {
59+
return remainingRetryCount(rqueueMessage, queueDetail, failureCount) == 0;
60+
}
61+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueExecutor.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,6 @@ private void init() {
111111
this.failureCount = job.getRqueueMessage().getFailureCount();
112112
}
113113

114-
private int getMaxRetryCount() {
115-
return Objects.isNull(job.getRqueueMessage().getRetryCount())
116-
? job.getQueueDetail().getNumRetry()
117-
: job.getRqueueMessage().getRetryCount();
118-
}
119-
120114
private void updateCounter(boolean fail) {
121115
RqueueMetricsCounter counter = beanProvider.getRqueueMetricsCounter();
122116
if (Objects.isNull(counter)) {
@@ -179,11 +173,8 @@ private boolean isOldMessage() {
179173
}
180174

181175
private int getRetryCount() {
182-
int maxRetry = getMaxRetryCount();
183-
if (beanProvider.getRqueueConfig().getRetryPerPoll() == -1) {
184-
return maxRetry;
185-
}
186-
return Math.min(beanProvider.getRqueueConfig().getRetryPerPoll(), maxRetry);
176+
return RetryPolicy.retryCountForPoll(
177+
beanProvider.getRqueueConfig(), job.getRqueueMessage(), job.getQueueDetail(), failureCount);
187178
}
188179

189180
private boolean queueInactive() {
@@ -283,6 +274,7 @@ private void execute() {
283274
private boolean shouldRetry(long maxProcessingTime, int retryCount, int failureCount) {
284275
if (retryCount > 0
285276
&& ExecutionStatus.FAILED.equals(status)
277+
&& !RetryPolicy.isExhausted(rqueueMessage, queueDetail, failureCount)
286278
&& System.currentTimeMillis() < maxProcessingTime) {
287279
boolean doNoRetry = queueDetail.isDoNotRetryError(error);
288280
// it should not be retried based on the exception list

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,10 +517,14 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin
517517

518518
@Override
519519
public void start() {
520-
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
521520
synchronized (lifecycleMgr) {
522-
running = true;
521+
if (running) {
522+
log.debug("Rqueue Message container {} is already running", RqueueConfig.getBrokerId());
523+
return;
524+
}
525+
log.info("Starting Rqueue Message container {}", RqueueConfig.getBrokerId());
523526
doStart();
527+
running = true;
524528
rqueueBeanProvider
525529
.getApplicationEventPublisher()
526530
.publishEvent(new RqueueBootstrapEvent(EVENT_SOURCE, true));

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessagePoller.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
2020
import com.github.sonus21.rqueue.core.RqueueMessage;
2121
import com.github.sonus21.rqueue.core.middleware.Middleware;
22+
import com.github.sonus21.rqueue.core.spi.MessageBroker;
2223
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
2324
import com.github.sonus21.rqueue.utils.Constants;
2425
import com.github.sonus21.rqueue.utils.QueueThreadPool;
@@ -59,13 +60,9 @@ abstract class RqueueMessagePoller extends MessageContainerBase {
5960
}
6061

6162
private List<RqueueMessage> getMessages(QueueDetail queueDetail, int count) {
62-
return rqueueBeanProvider
63-
.getMessageBroker()
64-
.pop(
65-
queueDetail,
66-
queueDetail.resolvedConsumerName(),
67-
count,
68-
Duration.ofMillis(pollingInterval));
63+
MessageBroker broker = rqueueBeanProvider.getMessageBroker();
64+
Duration wait = broker.getPollWait(Duration.ofMillis(pollingInterval));
65+
return broker.pop(queueDetail, queueDetail.resolvedConsumerName(), count, wait);
6966
}
7067

7168
private void execute(

0 commit comments

Comments
 (0)