Skip to content

Commit 0dbdda3

Browse files
committed
feat(nats): honour visibilityTimeout/numRetry on JetStream consumer
Map QueueDetail.visibilityTimeout to JetStream ackWait and numRetry to maxDeliver (numRetry + 1, with Integer.MAX_VALUE as the unlimited sentinel) when the durable pull consumer is provisioned. Falls back to RqueueNatsConfig.consumerDefaults when the per-queue value is unset. Resolution lives in JetStreamMessageBroker.resolveAckWait / resolveMaxDeliver and is used by both popInternal and the bootstrap NatsStreamValidator so the consumer is created with the right config upfront (provisioner only logs a warning on existing-config drift). Also drops dead, never-set NATS-related fields from QueueDetail (natsStream, natsSubject, natsDlqStream, natsDlqSubject, natsMaxDeliverOverride, natsDedupWindow) and their resolved* helpers, plus the corresponding test. Broker DLQ name helpers now compute directly from prefix + suffix. Assisted-By: Claude Code
1 parent 7eaac48 commit 0dbdda3

6 files changed

Lines changed: 299 additions & 248 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,6 @@ public class QueueDetail extends SerializableBase {
7373
private String priorityGroup;
7474
private Set<Class<? extends Throwable>> doNotRetry;
7575

76-
// ---------------------------------------------------------------------------
77-
// NATS / JetStream-related fields. All nullable and additive: when null the
78-
// resolved* helpers below derive sensible defaults from queueName /
79-
// visibilityTimeout / numRetry. Existing Redis-shaped behavior is unchanged.
80-
// ---------------------------------------------------------------------------
81-
private final String natsStream;
82-
private final String natsSubject;
83-
private final String natsDlqStream;
84-
private final String natsDlqSubject;
85-
private final Duration natsAckWaitOverride;
86-
private final Integer natsMaxDeliverOverride;
87-
private final Duration natsDedupWindow;
8876
private final String consumerName;
8977

9078
public boolean isDlqSet() {
@@ -190,77 +178,6 @@ public String resolvedConsumerName() {
190178
return systemGenerated ? sanitized + "-consumer" : sanitized + "-consumer-primary";
191179
}
192180

193-
/**
194-
* Resolves the JetStream stream name. When {@link #natsStream} is null the default
195-
* derivation {@code "rqueue-" + queueName} is used so existing queue configs keep
196-
* working with a NATS broker without explicit overrides.
197-
*/
198-
public String resolvedNatsStream() {
199-
return natsStream != null ? natsStream : "rqueue-" + queueName;
200-
}
201-
202-
/**
203-
* Resolves the JetStream subject. Falls back to {@code "rqueue." + queueName}.
204-
*/
205-
public String resolvedNatsSubject() {
206-
return natsSubject != null ? natsSubject : "rqueue." + queueName;
207-
}
208-
209-
/**
210-
* Resolves the JetStream stream name for a specific priority bucket. When {@code priority} is
211-
* null or empty falls back to {@link #resolvedNatsStream()}; otherwise appends {@code "-" +
212-
* priority} to the resolved base stream name. Used by the NATS broker when a queue declares
213-
* per-priority sub-streams.
214-
*/
215-
public String resolvedNatsStreamForPriority(String priority) {
216-
if (priority == null || priority.isEmpty()) {
217-
return resolvedNatsStream();
218-
}
219-
return resolvedNatsStream() + "-" + priority;
220-
}
221-
222-
/**
223-
* Resolves the JetStream subject for a specific priority bucket. Falls back to
224-
* {@link #resolvedNatsSubject()} when {@code priority} is null/empty; otherwise appends
225-
* {@code "." + priority}.
226-
*/
227-
public String resolvedNatsSubjectForPriority(String priority) {
228-
if (priority == null || priority.isEmpty()) {
229-
return resolvedNatsSubject();
230-
}
231-
return resolvedNatsSubject() + "." + priority;
232-
}
233-
234-
/**
235-
* Resolves the dead-letter stream name. Falls back to {@code resolvedNatsStream() + "-dlq"}.
236-
*/
237-
public String resolvedNatsDlqStream() {
238-
return natsDlqStream != null ? natsDlqStream : resolvedNatsStream() + "-dlq";
239-
}
240-
241-
/**
242-
* Resolves the dead-letter subject. Falls back to {@code resolvedNatsSubject() + ".dlq"}.
243-
*/
244-
public String resolvedNatsDlqSubject() {
245-
return natsDlqSubject != null ? natsDlqSubject : resolvedNatsSubject() + ".dlq";
246-
}
247-
248-
/**
249-
* Returns the JetStream ack-wait, falling back to the supplied {@code fallback}
250-
* (typically the {@link #visibilityDuration()}) when no explicit override is set.
251-
*/
252-
public Duration resolvedAckWait(Duration fallback) {
253-
return natsAckWaitOverride != null ? natsAckWaitOverride : fallback;
254-
}
255-
256-
/**
257-
* Returns the JetStream max-deliver count, falling back to {@code fallback}
258-
* (typically {@code numRetry + 1}) when no explicit override is set.
259-
*/
260-
public int resolvedMaxDeliver(int fallback) {
261-
return natsMaxDeliverOverride != null ? natsMaxDeliverOverride : fallback;
262-
}
263-
264181
public boolean isDoNotRetryError(Throwable throwable) {
265182
if (Objects.isNull(throwable)) {
266183
return false;

rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/QueueDetailNatsFieldsTest.java

Lines changed: 0 additions & 135 deletions
This file was deleted.

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024-2026 Sonu Kumar
2+
* Copyright (c) 2026 Sonu Kumar
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* You may not use this file except in compliance with the License.
@@ -65,13 +65,13 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable {
6565
private static final Capabilities CAPS = new Capabilities(false, false, false, false);
6666

6767
/**
68-
* Translation of {@link Duration#ZERO} (the Redis "non-blocking" pop convention used by
69-
* {@code RqueueMessagePoller}) into the smallest positive duration JetStream will accept on a
70-
* pull fetch. Long enough that messages already buffered for the consumer come back in the same
71-
* call, short enough that an empty sub-queue inside a priority group does not stall the poll
72-
* cycle.
68+
* Lower bound for fetch wait when the caller passes a non-positive duration. JetStream rejects
69+
* zero on a pull fetch, so any zero/negative wait is rounded up to this minimum. Callers that
70+
* want long-poll semantics should pass the desired wait explicitly (e.g. the listener
71+
* container's {@code pollingInterval}); this constant only guards against accidental zero waits
72+
* from non-listener callers.
7373
*/
74-
private static final Duration NON_BLOCKING_FETCH_WAIT = Duration.ofMillis(50);
74+
private static final Duration MIN_FETCH_WAIT = Duration.ofMillis(50);
7575

7676
private final Connection connection;
7777
private final JetStream js;
@@ -115,7 +115,6 @@ public static Builder builder() {
115115

116116
// ---- subject / stream naming -------------------------------------------
117117

118-
// TODO: once Phase 1 lands, read additive QueueDetail.getNatsSubject() / getNatsStream() if set.
119118
private String subjectFor(QueueDetail q) {
120119
return config.getSubjectPrefix() + q.getName();
121120
}
@@ -149,15 +148,11 @@ private String streamFor(QueueDetail q, String priority) {
149148
}
150149

151150
private String dlqStreamFor(QueueDetail q) {
152-
return q.getNatsDlqStream() != null
153-
? q.getNatsDlqStream()
154-
: streamFor(q) + config.getDlqStreamSuffix();
151+
return streamFor(q) + config.getDlqStreamSuffix();
155152
}
156153

157154
private String dlqSubjectFor(QueueDetail q) {
158-
return q.getNatsDlqSubject() != null
159-
? q.getNatsDlqSubject()
160-
: subjectFor(q) + config.getDlqSubjectSuffix();
155+
return subjectFor(q) + config.getDlqSubjectSuffix();
161156
}
162157

163158
/** Stream description shown in {@code nats stream info} so operators can map back to rqueue. */
@@ -310,7 +305,13 @@ public Mono<Void> enqueueWithDelayReactive(QueueDetail q, RqueueMessage m, long
310305
@Override
311306
public List<RqueueMessage> pop(QueueDetail q, String consumerName, int batch, Duration wait) {
312307
return popInternal(
313-
streamFor(q), subjectFor(q), resolveConsumerName(q.getName(), consumerName), batch, wait);
308+
streamFor(q),
309+
subjectFor(q),
310+
resolveConsumerName(q.getName(), consumerName),
311+
batch,
312+
wait,
313+
resolveAckWait(q, config),
314+
resolveMaxDeliver(q, config));
314315
}
315316

316317
@Override
@@ -321,26 +322,64 @@ public List<RqueueMessage> pop(
321322
subjectFor(q, priority),
322323
resolveConsumerName(q.getName(), consumerName),
323324
batch,
324-
wait);
325+
wait,
326+
resolveAckWait(q, config),
327+
resolveMaxDeliver(q, config));
325328
}
326329

327330
private static String resolveConsumerName(String queueName, String consumerName) {
328331
return (consumerName != null && !consumerName.isEmpty()) ? consumerName : "rqueue-" + queueName;
329332
}
330333

334+
/**
335+
* Resolve the JetStream {@code ackWait} for this queue's pull consumer: per-queue
336+
* {@link QueueDetail#getVisibilityTimeout()} (when positive), else the global
337+
* {@link RqueueNatsConfig.ConsumerDefaults#getAckWait()}. Honouring visibilityTimeout makes the
338+
* NATS backend match the contract every other rqueue backend exposes: a message stays invisible
339+
* to other consumers for that window and is redelivered if not acked in time.
340+
*/
341+
public static Duration resolveAckWait(QueueDetail q, RqueueNatsConfig config) {
342+
long vt = q.getVisibilityTimeout();
343+
if (vt > 0) {
344+
return Duration.ofMillis(vt);
345+
}
346+
return config.getConsumerDefaults().getAckWait();
347+
}
348+
349+
/**
350+
* Resolve the JetStream {@code maxDeliver} from per-queue {@link QueueDetail#getNumRetry()}
351+
* (counted as initial delivery + N retries = numRetry + 1). The {@link Integer#MAX_VALUE}
352+
* "retry forever" sentinel maps to JetStream's unlimited value ({@code -1}); non-positive
353+
* numRetry falls back to {@link RqueueNatsConfig.ConsumerDefaults#getMaxDeliver()}.
354+
*/
355+
public static long resolveMaxDeliver(QueueDetail q, RqueueNatsConfig config) {
356+
int numRetry = q.getNumRetry();
357+
if (numRetry == Integer.MAX_VALUE) {
358+
return -1L;
359+
}
360+
if (numRetry > 0) {
361+
return numRetry + 1L;
362+
}
363+
return config.getConsumerDefaults().getMaxDeliver();
364+
}
365+
331366
private List<RqueueMessage> popInternal(
332-
String stream, String subject, String consumerName, int batch, Duration wait) {
333-
// Use default fetch wait if none provided. If zero duration is passed (Redis "non-blocking"
334-
// pop convention used by RqueueMessagePoller), translate to a short but positive duration:
335-
// JetStream rejects zero, but using the multi-second defaultFetchWait blocks empty pulls long
336-
// enough that under Weighted/Strict priority polling the cycle starves real queues — a single
337-
// empty sub-queue in a priority group can absorb the whole 30s test budget. Use the smallest
338-
// value JetStream still tolerates so empty sub-queues yield back to polling immediately.
367+
String stream,
368+
String subject,
369+
String consumerName,
370+
int batch,
371+
Duration wait,
372+
Duration ackWait,
373+
long maxDeliver) {
374+
// Honour the caller-supplied wait — this is the listener container's pollingInterval for
375+
// RqueueMessagePoller, and lets JetStream long-poll instead of the broker firing a steady
376+
// stream of $JS.API.CONSUMER.MSG.NEXT requests. Only fall back when the caller didn't
377+
// express a preference; zero/negative waits are rounded up to the JetStream minimum.
339378
Duration fetchWait;
340379
if (wait == null) {
341380
fetchWait = config.getDefaultFetchWait();
342-
} else if (wait.isZero()) {
343-
fetchWait = NON_BLOCKING_FETCH_WAIT;
381+
} else if (wait.isZero() || wait.isNegative()) {
382+
fetchWait = MIN_FETCH_WAIT;
344383
} else {
345384
fetchWait = wait;
346385
}
@@ -352,8 +391,8 @@ private List<RqueueMessage> popInternal(
352391
String actualConsumerName = provisioner.ensureConsumer(
353392
stream,
354393
consumerName,
355-
config.getConsumerDefaults().getAckWait(),
356-
config.getConsumerDefaults().getMaxDeliver(),
394+
ackWait,
395+
maxDeliver,
357396
config.getConsumerDefaults().getMaxAckPending());
358397
PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, actualConsumerName);
359398
// Consumer has no filter subject; pass null so the NATS client doesn't validate

0 commit comments

Comments
 (0)