Skip to content

Commit 83684a5

Browse files
committed
fix: three NATS integration test failures
1. Default retention WorkQueue (was Limits) Streams must use WorkQueue retention so that acked messages are removed from the stream and broker.size() drains to 0. The peek IT already sets Limits explicitly so it is unaffected. Also update RqueueNatsProperties so the Spring Boot auto-config matches. 2. provisionDlq always creates the DLQ stream autoCreateDlqStream gates automatic bootstrap provisioning, not explicit calls. provisionDlq() is opt-in by the caller; guard it with ensureStream directly instead of ensureDlqStream, which also checked the flag. 3. Priority stream names use underscore suffix (PriorityUtils convention) streamFor/subjectFor(q, priority) used a dash separator (pq-high) but the poller's expanded QueueDetail.name uses the PriorityUtils suffix (pq_high). Align both to PriorityUtils.getSuffix() so enqueue and pop target the same stream. Update NatsStreamValidator's priority loop and the unit test assertion accordingly. Assisted-By: Claude Code
1 parent fa57a6a commit 83684a5

5 files changed

Lines changed: 22 additions & 16 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/RqueueNatsConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public static RqueueNatsConfig defaults() {
5353
public static class StreamDefaults {
5454
private int replicas = 1;
5555
private StorageType storage = StorageType.File;
56-
private RetentionPolicy retention = RetentionPolicy.Limits;
56+
private RetentionPolicy retention = RetentionPolicy.WorkQueue;
5757
private Duration duplicateWindow = Duration.ofMinutes(2);
5858
private long maxMsgs = -1;
5959
private long maxBytes = -1;

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.github.sonus21.rqueue.core.spi.MessageBroker;
1717
import com.github.sonus21.rqueue.listener.QueueDetail;
1818
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
19+
import com.github.sonus21.rqueue.utils.PriorityUtils;
1920
import com.github.sonus21.rqueue.nats.RqueueNatsException;
2021
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
2122
import com.github.sonus21.rqueue.serdes.RqJacksonSerDes;
@@ -114,26 +115,27 @@ private String streamFor(QueueDetail q) {
114115
}
115116

116117
/**
117-
* Resolve the priority-specific subject. Returns the unsuffixed subject when {@code priority} is
118-
* null or empty; otherwise appends {@code "." + priority}. Mirrors the naming used by
119-
* {@link QueueDetail#resolvedNatsSubjectForPriority(String)}.
118+
* Resolve the priority-specific subject. Uses the same {@code "_priority"} suffix as
119+
* {@link com.github.sonus21.rqueue.utils.PriorityUtils#getSuffix(String)} so the subject
120+
* matches the expanded {@link QueueDetail#getName()} used by the poller (e.g. {@code "pq_high"}).
120121
*/
121122
private String subjectFor(QueueDetail q, String priority) {
122123
if (priority == null || priority.isEmpty()) {
123124
return subjectFor(q);
124125
}
125-
return subjectFor(q) + "." + priority;
126+
return config.getSubjectPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
126127
}
127128

128129
/**
129-
* Resolve the priority-specific stream. Returns the unsuffixed stream when {@code priority} is
130-
* null or empty; otherwise appends {@code "-" + priority}.
130+
* Resolve the priority-specific stream. Uses the same {@code "_priority"} suffix as
131+
* {@link com.github.sonus21.rqueue.utils.PriorityUtils#getSuffix(String)} so the stream name
132+
* matches what the poller derives from the expanded {@link QueueDetail#getName()}.
131133
*/
132134
private String streamFor(QueueDetail q, String priority) {
133135
if (priority == null || priority.isEmpty()) {
134136
return streamFor(q);
135137
}
136-
return streamFor(q) + "-" + priority;
138+
return config.getStreamPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
137139
}
138140

139141
private String dlqStreamFor(QueueDetail q) {
@@ -547,10 +549,9 @@ public void close() {
547549
* {@link #installDeadLetterBridge(QueueDetail, String)}.
548550
*/
549551
public void provisionDlq(QueueDetail q) {
550-
if (!config.isAutoCreateDlqStream()) {
551-
return;
552-
}
553-
provisioner.ensureDlqStream(dlqStreamFor(q), List.of(dlqSubjectFor(q)));
552+
// Explicit call — always provision, bypassing the autoCreateDlqStream flag.
553+
// That flag gates automatic provisioning at bootstrap; here the caller is explicitly opting in.
554+
provisioner.ensureStream(dlqStreamFor(q), List.of(dlqSubjectFor(q)));
554555
}
555556

556557
/**

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import com.github.sonus21.rqueue.core.EndpointRegistry;
1919
import com.github.sonus21.rqueue.listener.QueueDetail;
2020
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
21+
import com.github.sonus21.rqueue.utils.Constants;
22+
import com.github.sonus21.rqueue.utils.PriorityUtils;
2123
import com.github.sonus21.rqueue.nats.RqueueNatsConfig;
2224
import com.github.sonus21.rqueue.nats.RqueueNatsException;
2325
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
@@ -102,8 +104,11 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
102104

103105
if (q.getPriority() != null) {
104106
for (String priority : q.getPriority().keySet()) {
105-
String pStream = mainStream + "-" + priority;
106-
String pSubject = mainSubject + "." + priority;
107+
if (Constants.DEFAULT_PRIORITY_KEY.equals(priority)) {
108+
continue; // DEFAULT entry is the queue itself; already handled above
109+
}
110+
String pStream = config.getStreamPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
111+
String pSubject = config.getSubjectPrefix() + q.getName() + PriorityUtils.getSuffix(priority);
107112
total += tryEnsure(failures, pStream, pSubject);
108113
tryEnsureConsumer(failures, pStream, q.resolvedConsumerName(), cd);
109114
}

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception {
9191
"high",
9292
RqueueMessage.builder().id("m1").message("hi").build());
9393
verify(f.js, times(1))
94-
.publish(eq("rqueue.js.orders.high"), any(Headers.class), any(byte[].class));
94+
.publish(eq("rqueue.js.orders_high"), any(Headers.class), any(byte[].class));
9595
}
9696

9797
@Test

rqueue-spring-boot-starter/src/main/java/com/github/sonus21/rqueue/spring/boot/RqueueNatsProperties.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public static class Connection {
6666
public static class Stream {
6767
private int replicas = 1;
6868
private String storage = "FILE";
69-
private String retention = "LIMITS";
69+
private String retention = "WORKQUEUE";
7070
private Duration maxAge = Duration.ofDays(14);
7171
private long maxBytes = -1;
7272
private long maxMessages = -1;

0 commit comments

Comments
 (0)