Skip to content

Commit 6fdee02

Browse files
committed
feat: QueueType (QUEUE/STREAM) mode for @RqueueListener + NATS stream provisioning
- Add QueueType enum (QUEUE = WorkQueue retention, STREAM = Limits/fan-out) - Add queueMode() attribute to @RqueueListener, default QUEUE - Thread QueueType through MappingInformation → QueueDetail builder - MessageBroker.onQueueRegistered() hook; JetStreamMessageBroker overrides it to provision the stream immediately on registerQueue() calls (producer-only mode) - NatsProvisioner.ensureStream(name, subjects, QueueType) picks WorkQueue vs Limits retention; warns and preserves existing config on mismatch - NatsStreamValidator and JetStreamMessageBroker pass q.getType() to provisioner - RqueueRedisConfigImportSelector: lazy-load RqueueRedisListenerConfig via ImportSelector so excluding rqueue-redis no longer crashes Spring at startup - Fix JetStreamMessageBrokerPeekIT and IndependentConsumersIT to use QueueType instead of the now-bypassed setRetention() config workaround - Add JetStreamQueueModeIT: 5 E2E contracts covering stream retention, consumer reuse/position preservation, competing-consumer delivery, and fan-out delivery Assisted-By: Claude Code
1 parent 2681414 commit 6fdee02

21 files changed

Lines changed: 649 additions & 34 deletions

File tree

nats-task-v2.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# NATS backend — v2 task tracker
2+
3+
## v1 status: COMPLETE
4+
5+
All v1 items are done and 360 unit tests pass. Branch `nats-backend` is ready to merge.
6+
7+
---
8+
9+
## v2 pending items
10+
11+
### 1. Web dashboard — NATS gaps
12+
13+
Controllers are no longer Redis-gated but several operations throw `BackendCapabilityException` (HTTP 501) on NATS. The front-end should hide unsupported panels proactively instead of relying on 501s.
14+
15+
- Expose `GET /rqueue/api/capabilities` returning the `Capabilities` record so the UI can conditionally hide panels.
16+
- Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove`.
17+
- Wire the flags into Pebble templates (scheduled panel, cron jobs panel, chart panel already have `hideScheduledPanel` / `hideCronJobs` hooks in `DataViewResponse`).
18+
19+
Affected services that throw on NATS today:
20+
- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream)
21+
- `RqueueUtilityServiceImpl` — move/enqueue admin ops
22+
- `NatsMessageBrowsingRepository.viewData` — positional message browse
23+
24+
### 2. Reactive listener container
25+
26+
Only the enqueue side is reactive in v1. The listener/pop side still uses blocking `BrokerMessagePoller` threads.
27+
28+
- Implement a `ReactiveMessagePoller` using `js.publishAsync` + Project Reactor for the NATS path.
29+
- Gate behind `@Conditional(ReactiveEnabled.class)` + `NatsBackendCondition`.
30+
31+
### 3. Delayed / scheduled / cron messages on NATS
32+
33+
`enqueueWithDelay` throws `UnsupportedOperationException` in v1. Options:
34+
35+
- Use NATS JetStream `MaxAge` + a separate "delay bucket" stream per delay tier (coarse buckets: 1s, 5s, 30s, 5m, 1h).
36+
- Or implement a lightweight delay-scheduler sidecar using KV TTL expiry events.
37+
38+
### 4. `priorityGroup` weighting on NATS
39+
40+
In v1, cross-queue `priorityGroup` weighting logs a boot WARN and is not honored. `BrokerMessagePoller` spawns one thread per `(queue, consumerName, priority)` triple at fixed weight.
41+
42+
- Implement weighted round-robin across pollers sharing the same `priorityGroup`.
43+
44+
### 5. Elastic concurrency (`@RqueueListener.concurrency min < max`)
45+
46+
Falls back to fixed `max` on NATS in v1. Implement auto-scaling poller count based on queue depth via `MessageBroker.size()`.
47+
48+
### 6. `@RqueueHandler(primary)` on NATS
49+
50+
Ignored in v1 with a single boot WARN. On NATS all handler methods are dispatched independently (one consumer per method). `primary` could select the default handler when message type is ambiguous.
51+
52+
### 7. Spring Boot configuration metadata (source annotation processor)
53+
54+
`rqueue.nats.auto-create-kv-buckets` and sibling properties appear in the built metadata JSON but only from the compiled artifact. Add `spring-boot-configuration-processor` to the starter's `annotationProcessor` deps so IDEs pick up descriptions and defaults without a pre-build step.
55+
56+
### 8. `RqueueStringDao` javadoc
57+
58+
Mark as Redis-internal in the interface javadoc so future contributors don't try to add a NATS impl.
59+
60+
---
61+
62+
## Local verification commands
63+
64+
```bash
65+
./gradlew :rqueue-core:test :rqueue-redis:test :rqueue-web:test :rqueue-nats:test -DincludeTags=unit
66+
./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT"
67+
./gradlew :rqueue-nats:test -DincludeTags=nats
68+
```
69+
70+
## Commit-rule reminder
71+
72+
`CLAUDE.md` forbids `Co-Authored-By:` for any AI tool. Use `Assisted-By: Claude Code` only.

rqueue-core/src/main/java/com/github/sonus21/rqueue/annotation/RqueueListener.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.github.sonus21.rqueue.annotation;
1818

19+
import com.github.sonus21.rqueue.enums.QueueType;
1920
import com.github.sonus21.rqueue.utils.Constants;
2021
import java.lang.annotation.Documented;
2122
import java.lang.annotation.ElementType;
@@ -204,4 +205,23 @@
204205
* @return durable consumer name override; empty for "auto-generate"
205206
*/
206207
String consumerName() default "";
208+
209+
/**
210+
* Delivery mode for this queue.
211+
*
212+
* <ul>
213+
* <li>{@link QueueType#QUEUE} (default) — competing-consumer semantics: each
214+
* message is delivered to exactly one listener instance. Maps to a JetStream
215+
* {@code WorkQueue} stream. Multiple listeners share the load.
216+
* <li>{@link QueueType#STREAM} — fan-out semantics: every independent listener
217+
* group receives a copy of each message. Maps to a JetStream {@code Limits} stream.
218+
* Each listener must use a distinct {@link #consumerName()} so JetStream tracks its
219+
* own position independently.
220+
* </ul>
221+
*
222+
* <p>The Redis backend ignores this attribute (Redis queue names already encode the routing).
223+
*
224+
* @return queue delivery mode
225+
*/
226+
QueueType queueMode() default QueueType.QUEUE;
207227
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
3030
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
3131
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
32+
import com.github.sonus21.rqueue.enums.QueueType;
3233
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
3334
import com.github.sonus21.rqueue.listener.QueueDetail;
3435
import com.github.sonus21.rqueue.models.db.MessageMetadata;
@@ -99,12 +100,13 @@ protected Object enqueue(
99100
}
100101

101102
/**
102-
* Priority-aware enqueue. When a non-Redis {@link com.github.sonus21.rqueue.core.spi.MessageBroker}
103-
* is set on the underlying {@link RqueueMessageTemplate} (i.e. capabilities advertise
104-
* {@code !usesPrimaryHandlerDispatch}) this routes the publish through
103+
* Priority-aware enqueue. When a non-Redis
104+
* {@link com.github.sonus21.rqueue.core.spi.MessageBroker} is set on the underlying
105+
* {@link RqueueMessageTemplate} (i.e. capabilities advertise {@code !usesPrimaryHandlerDispatch})
106+
* this routes the publish through
105107
* {@link com.github.sonus21.rqueue.core.spi.MessageBroker#enqueue(QueueDetail, String,
106-
* RqueueMessage)} so backends like NATS can publish to a priority-specific subject. Otherwise
107-
* the existing Redis-shaped path is used; Redis already encodes priority in the queue name so
108+
* RqueueMessage)} so backends like NATS can publish to a priority-specific subject. Otherwise the
109+
* existing Redis-shaped path is used; Redis already encodes priority in the queue name so
108110
* {@code priority} is ignored.
109111
*/
110112
protected Object enqueue(
@@ -218,7 +220,8 @@ protected Object deleteAllMessages(QueueDetail queueDetail) {
218220
MessageDeleteRequest.builder().queueDetail(queueDetail).build());
219221
}
220222

221-
protected void registerQueueInternal(String queueName, String... priorities) {
223+
protected void registerQueueInternal(String queueName, QueueType type,
224+
String... priorities) {
222225
validateQueue(queueName);
223226
notNull(priorities, "priorities cannot be null");
224227
Map<String, Integer> priorityMap = new HashMap<>();
@@ -236,8 +239,10 @@ protected void registerQueueInternal(String queueName, String... priorities) {
236239
.processingQueueName(rqueueConfig.getProcessingQueueName(queueName))
237240
.processingQueueChannelName(rqueueConfig.getProcessingQueueChannelName(queueName))
238241
.priority(priorityMap)
242+
.type(type)
239243
.build();
240244
EndpointRegistry.register(queueDetail);
245+
notifyBrokerQueueRegistered(queueDetail);
241246
for (String priority : priorities) {
242247
String suffix = PriorityUtils.getSuffix(priority);
243248
queueDetail = QueueDetail.builder()
@@ -252,6 +257,14 @@ protected void registerQueueInternal(String queueName, String... priorities) {
252257
.priority(Collections.singletonMap(DEFAULT_PRIORITY_KEY, 1))
253258
.build();
254259
EndpointRegistry.register(queueDetail);
260+
notifyBrokerQueueRegistered(queueDetail);
261+
}
262+
}
263+
264+
private void notifyBrokerQueueRegistered(QueueDetail queueDetail) {
265+
com.github.sonus21.rqueue.core.spi.MessageBroker broker = messageTemplate.getMessageBroker();
266+
if (broker != null) {
267+
broker.onQueueRegistered(queueDetail);
255268
}
256269
}
257270
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ default void enqueue(QueueDetail q, String priority, RqueueMessage m) {
4646

4747
void enqueueWithDelay(QueueDetail q, RqueueMessage m, long delayMs);
4848

49+
/**
50+
* Called by {@code RqueueEndpointManager.registerQueue} after a queue is added to the registry.
51+
* Backends that need to provision resources (e.g. JetStream streams) at registration time should
52+
* override this. The default is a no-op so Redis and other backends are unaffected.
53+
*/
54+
default void onQueueRegistered(QueueDetail q) {}
55+
4956
/**
5057
* Reactive variant of {@link #enqueue(QueueDetail, RqueueMessage)}. The default falls back to the
5158
* blocking implementation wrapped in {@code Mono.fromRunnable}; backends with native async
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.github.sonus21.rqueue.enums;
2+
3+
public enum QueueType {
4+
/**
5+
* WorkQueue semantics: competing consumers, each message delivered to exactly one listener.
6+
*/
7+
QUEUE,
8+
/**
9+
* Stream/fan-out semantics: every independent listener group receives all messages.
10+
*/
11+
STREAM
12+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.github.sonus21.rqueue.utils.Constants.DELTA_BETWEEN_RE_ENQUEUE_TIME;
2020
import static com.github.sonus21.rqueue.utils.Constants.MIN_EXECUTION_TIME;
2121

22+
import com.github.sonus21.rqueue.enums.QueueType;
2223
import com.github.sonus21.rqueue.models.Concurrency;
2324
import java.util.Map;
2425
import java.util.Set;
@@ -50,6 +51,9 @@ class MappingInformation implements Comparable<MappingInformation> {
5051
private final int batchSize;
5152
private final Set<Class<? extends Throwable>> doNotRetry;
5253

54+
@Builder.Default
55+
private final QueueType queueType = QueueType.QUEUE;
56+
5357
@Override
5458
public String toString() {
5559
return String.join(", ", queueNames);

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.github.sonus21.rqueue.listener;
1818

1919
import com.fasterxml.jackson.annotation.JsonIgnore;
20+
import com.github.sonus21.rqueue.enums.QueueType;
2021
import com.github.sonus21.rqueue.models.Concurrency;
2122
import com.github.sonus21.rqueue.models.SerializableBase;
2223
import com.github.sonus21.rqueue.models.db.DeadLetterQueue;
@@ -260,11 +261,6 @@ public int resolvedMaxDeliver(int fallback) {
260261
return natsMaxDeliverOverride != null ? natsMaxDeliverOverride : fallback;
261262
}
262263

263-
public enum QueueType {
264-
QUEUE,
265-
STREAM
266-
}
267-
268264
public boolean isDoNotRetryError(Throwable throwable) {
269265
if (Objects.isNull(throwable)) {
270266
return false;

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ private MappingInformation getMappingInformation(RqueueListener rqueueListener)
334334
.batchSize(batchSize)
335335
.doNotRetry(new HashSet<>(Arrays.asList(rqueueListener.doNotRetry())))
336336
.consumerName(natsConsumerName.isEmpty() ? null : natsConsumerName)
337+
.queueType(rqueueListener.queueMode())
337338
.build();
338339
if (mappingInformation.isValid()) {
339340
return mappingInformation;

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ private List<QueueDetail> getQueueDetail(String queue, MappingInformation mappin
503503
.priorityGroup(priorityGroup)
504504
.doNotRetry(mappingInformation.getDoNotRetry())
505505
.consumerName(mappingInformation.getConsumerName())
506+
.type(mappingInformation.getQueueType())
506507
.build();
507508
List<QueueDetail> queueDetails;
508509
if (queueDetail.getPriority().size() <= 1) {

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010
package com.github.sonus21.rqueue.nats.internal;
1111

12+
import com.github.sonus21.rqueue.enums.QueueType;
1213
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
1314
import com.github.sonus21.rqueue.nats.RqueueNatsException;
1415
import io.nats.client.Connection;
@@ -23,6 +24,7 @@
2324
import io.nats.client.api.DeliverPolicy;
2425
import io.nats.client.api.KeyValueConfiguration;
2526
import io.nats.client.api.KeyValueStatus;
27+
import io.nats.client.api.RetentionPolicy;
2628
import io.nats.client.api.StreamConfiguration;
2729
import io.nats.client.api.StreamInfo;
2830
import java.io.IOException;
@@ -118,11 +120,29 @@ public KeyValue ensureKv(String bucketName, Duration ttl)
118120
// ---- Stream provisioning ----------------------------------------------
119121

120122
/**
121-
* Ensure a JetStream stream exists with the given subjects. Hits the NATS backend at most once
122-
* per stream name per process lifetime; subsequent calls return immediately from the in-process
123-
* cache.
123+
* Ensure a JetStream stream exists with the given subjects, using {@link QueueType#QUEUE}
124+
* (WorkQueue retention) as the default. Callers that have a {@link QueueType} available should
125+
* use {@link #ensureStream(String, List, QueueType)} instead.
124126
*/
125127
public void ensureStream(String streamName, List<String> subjects) {
128+
ensureStream(streamName, subjects, QueueType.QUEUE);
129+
}
130+
131+
/**
132+
* Ensure a JetStream stream exists with the given subjects and retention policy derived from
133+
* {@code queueType}:
134+
* <ul>
135+
* <li>{@link QueueType#QUEUE} — {@link io.nats.client.api.RetentionPolicy#WorkQueue}: each
136+
* message is delivered to exactly one consumer; competing-consumer semantics.
137+
* <li>{@link QueueType#STREAM} — {@link io.nats.client.api.RetentionPolicy#Limits}: every
138+
* independent durable consumer group receives all messages; stream/fan-out semantics.
139+
* </ul>
140+
*
141+
* <p>Hits the NATS backend at most once per stream name per process lifetime; subsequent calls
142+
* return immediately from the in-process cache. If the stream already exists with a different
143+
* retention policy, a WARNING is logged and the existing config is left untouched.
144+
*/
145+
public void ensureStream(String streamName, List<String> subjects, QueueType queueType) {
126146
if (streamsDone.contains(streamName)) {
127147
return;
128148
}
@@ -133,6 +153,9 @@ public void ensureStream(String streamName, List<String> subjects) {
133153
}
134154
try {
135155
StreamInfo existing = safeGetStreamInfo(streamName);
156+
RetentionPolicy desired = queueType == QueueType.STREAM
157+
? RetentionPolicy.Limits
158+
: RetentionPolicy.WorkQueue;
136159
if (existing == null) {
137160
if (!config.isAutoCreateStreams()) {
138161
throw new RqueueNatsException(
@@ -144,7 +167,7 @@ public void ensureStream(String streamName, List<String> subjects) {
144167
.subjects(subjects)
145168
.replicas(sd.getReplicas())
146169
.storageType(sd.getStorage())
147-
.retentionPolicy(sd.getRetention())
170+
.retentionPolicy(desired)
148171
.duplicateWindow(sd.getDuplicateWindow())
149172
.compressionOption(CompressionOption.S2);
150173
if (sd.getMaxMsgs() > 0) {
@@ -159,6 +182,15 @@ public void ensureStream(String streamName, List<String> subjects) {
159182
b.maxAge(sd.getMaxAge());
160183
}
161184
jsm.addStream(b.build());
185+
} else {
186+
RetentionPolicy actual = existing.getConfiguration().getRetentionPolicy();
187+
if (actual != desired) {
188+
log.log(
189+
Level.WARNING,
190+
"Stream ''{0}'' exists with retention={1} but queueMode requires retention={2}"
191+
+ " — leaving existing config in place.",
192+
new Object[] {streamName, actual, desired});
193+
}
162194
}
163195
} catch (IOException | JetStreamApiException e) {
164196
throw new RqueueNatsException(

0 commit comments

Comments
 (0)