Skip to content

Commit d0d72a7

Browse files
committed
Fix fetch wait duration validation for NATS backend
RqueueMessagePoller passes Duration.ZERO when calling pop(), which works for Redis (used as 'no timeout'). However, JetStreamMessageBroker forwards this to JetStream's fetch() API, which requires a positive duration and rejects ZERO. Fix: Treat ZERO duration the same as null - use config.getDefaultFetchWait() instead. This maintains compatibility with the Redis poller interface while ensuring JetStream gets a valid fetch duration. Solves: "Fetch wait duration must be supplied and greater than 0" errors during polling. Assisted-By: Claude Code
1 parent 2afd2fd commit d0d72a7

1 file changed

Lines changed: 3 additions & 1 deletion

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,9 @@ private static String resolveConsumerName(String queueName, String consumerName)
276276

277277
private List<RqueueMessage> popInternal(
278278
String stream, String subject, String consumerName, int batch, Duration wait) {
279-
Duration fetchWait = wait != null ? wait : config.getDefaultFetchWait();
279+
// Use default fetch wait if none provided OR if zero duration is passed (Redis compatibility).
280+
// JetStream requires a positive duration for fetch().
281+
Duration fetchWait = (wait != null && !wait.isZero()) ? wait : config.getDefaultFetchWait();
280282
String key = stream + "/" + consumerName;
281283
JetStreamSubscription sub = subscriptionCache.computeIfAbsent(key, k -> {
282284
// computeIfAbsent runs at most once per (stream, consumer) per JVM, so both the stream

0 commit comments

Comments
 (0)