Skip to content

Commit fff4936

Browse files
committed
Cache stream/consumer provisioning in NatsProvisioner; provision at bootstrap
NatsProvisioner.ensureStream and ensureConsumer previously hit the NATS backend on every call. Add double-checked in-process caches (streamsDone set, consumerCache map) matching the existing kvCache pattern so each stream and consumer is checked/created at most once per process lifetime. NatsStreamValidator now provisions consumers at bootstrap alongside streams so the NatsProvisioner caches are warm before any poller fires its first pop. JetStreamMessageBroker.popInternal drops the redundant ensureStream call entirely; the ensureConsumer call remains only to resolve the actual consumer name (stale-rebind path), and now hits only the in-process cache. Also fix a pre-existing type error: nm.metaData() returns NatsJetStreamMetaData, not MessageInfo, and the method is deliveredCount() not deliveryCount(). Assisted-By: Claude Code
1 parent f342ced commit fff4936

3 files changed

Lines changed: 124 additions & 108 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java

Lines changed: 86 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.io.IOException;
2929
import java.time.Duration;
3030
import java.util.List;
31+
import java.util.Set;
3132
import java.util.concurrent.ConcurrentHashMap;
3233
import java.util.logging.Level;
3334
import java.util.logging.Logger;
@@ -50,6 +51,14 @@ public class NatsProvisioner {
5051
private final ConcurrentHashMap<String, KeyValue> kvCache = new ConcurrentHashMap<>();
5152
private final ConcurrentHashMap<String, Object> kvLocks = new ConcurrentHashMap<>();
5253

54+
// stream name → provisioned (set membership acts as the boolean flag)
55+
private final Set<String> streamsDone = ConcurrentHashMap.newKeySet();
56+
private final ConcurrentHashMap<String, Object> streamLocks = new ConcurrentHashMap<>();
57+
58+
// "streamName/requestedConsumerName" → actual consumer name (may differ for stale-rebind)
59+
private final ConcurrentHashMap<String, String> consumerCache = new ConcurrentHashMap<>();
60+
private final ConcurrentHashMap<String, Object> consumerLocks = new ConcurrentHashMap<>();
61+
5362
public NatsProvisioner(Connection connection, JetStreamManagement jsm, RqueueNatsConfig config)
5463
throws IOException {
5564
this.connection = connection;
@@ -90,8 +99,12 @@ public KeyValue ensureKv(String bucketName, Duration ttl)
9099
} catch (JetStreamApiException missing) {
91100
// bucket absent — fall through to create
92101
}
93-
KeyValueConfiguration.Builder cfg =
94-
KeyValueConfiguration.builder().name(bucketName).compression(true);
102+
RqueueNatsConfig.StreamDefaults sd = config.getStreamDefaults();
103+
KeyValueConfiguration.Builder cfg = KeyValueConfiguration.builder()
104+
.name(bucketName)
105+
.compression(true)
106+
.replicas(sd.getReplicas())
107+
.storageType(sd.getStorage());
95108
if (ttl != null && !ttl.isZero() && !ttl.isNegative()) {
96109
cfg.ttl(ttl);
97110
}
@@ -105,43 +118,54 @@ public KeyValue ensureKv(String bucketName, Duration ttl)
105118
// ---- Stream provisioning ----------------------------------------------
106119

107120
/**
108-
* Ensure a JetStream stream exists with the given subjects. If absent and {@code
109-
* autoCreateStreams=true}, creates one using {@link RqueueNatsConfig.StreamDefaults}.
121+
* Ensure a JetStream stream exists with the given subjects. Hits the NATS backend at most once
122+
* per stream name per process lifetime; subsequent calls return immediately from the in-process
123+
* cache.
110124
*/
111125
public void ensureStream(String streamName, List<String> subjects) {
112-
try {
113-
StreamInfo existing = safeGetStreamInfo(streamName);
114-
if (existing != null) {
126+
if (streamsDone.contains(streamName)) {
127+
return;
128+
}
129+
Object lock = streamLocks.computeIfAbsent(streamName, k -> new Object());
130+
synchronized (lock) {
131+
if (streamsDone.contains(streamName)) {
115132
return;
116133
}
117-
if (!config.isAutoCreateStreams()) {
134+
try {
135+
StreamInfo existing = safeGetStreamInfo(streamName);
136+
if (existing == null) {
137+
if (!config.isAutoCreateStreams()) {
138+
throw new RqueueNatsException(
139+
"Stream '" + streamName + "' does not exist and autoCreateStreams=false");
140+
}
141+
RqueueNatsConfig.StreamDefaults sd = config.getStreamDefaults();
142+
StreamConfiguration.Builder b = StreamConfiguration.builder()
143+
.name(streamName)
144+
.subjects(subjects)
145+
.replicas(sd.getReplicas())
146+
.storageType(sd.getStorage())
147+
.retentionPolicy(sd.getRetention())
148+
.duplicateWindow(sd.getDuplicateWindow())
149+
.compressionOption(CompressionOption.S2);
150+
if (sd.getMaxMsgs() > 0) {
151+
b.maxMessages(sd.getMaxMsgs());
152+
}
153+
if (sd.getMaxBytes() > 0) {
154+
b.maxBytes(sd.getMaxBytes());
155+
}
156+
jsm.addStream(b.build());
157+
}
158+
} catch (IOException | JetStreamApiException e) {
118159
throw new RqueueNatsException(
119-
"Stream '" + streamName + "' does not exist and autoCreateStreams=false");
120-
}
121-
RqueueNatsConfig.StreamDefaults sd = config.getStreamDefaults();
122-
StreamConfiguration.Builder b = StreamConfiguration.builder()
123-
.name(streamName)
124-
.subjects(subjects)
125-
.replicas(sd.getReplicas())
126-
.storageType(sd.getStorage())
127-
.retentionPolicy(sd.getRetention())
128-
.duplicateWindow(sd.getDuplicateWindow())
129-
.compressionOption(CompressionOption.S2);
130-
if (sd.getMaxMsgs() > 0) {
131-
b.maxMessages(sd.getMaxMsgs());
160+
"Failed to ensure stream '" + streamName + "' for subjects " + subjects, e);
132161
}
133-
if (sd.getMaxBytes() > 0) {
134-
b.maxBytes(sd.getMaxBytes());
135-
}
136-
jsm.addStream(b.build());
137-
} catch (IOException | JetStreamApiException e) {
138-
throw new RqueueNatsException(
139-
"Failed to ensure stream '" + streamName + "' for subjects " + subjects, e);
162+
streamsDone.add(streamName);
140163
}
141164
}
142165

143166
/**
144167
* Ensure a durable pull consumer exists, returning the actual consumer name to use for binding.
168+
* Hits the NATS backend at most once per (stream, consumer) pair per process lifetime.
145169
*/
146170
public String ensureConsumer(
147171
String streamName,
@@ -150,43 +174,51 @@ public String ensureConsumer(
150174
long maxDeliver,
151175
long maxAckPending,
152176
String filterSubject) {
177+
String cacheKey = streamName + "/" + consumerName;
178+
String cached = consumerCache.get(cacheKey);
179+
if (cached != null) {
180+
return cached;
181+
}
182+
Object lock = consumerLocks.computeIfAbsent(cacheKey, k -> new Object());
183+
synchronized (lock) {
184+
cached = consumerCache.get(cacheKey);
185+
if (cached != null) {
186+
return cached;
187+
}
188+
String actual = doEnsureConsumer(
189+
streamName, consumerName, ackWait, maxDeliver, maxAckPending, filterSubject);
190+
consumerCache.put(cacheKey, actual);
191+
return actual;
192+
}
193+
}
194+
195+
private String doEnsureConsumer(
196+
String streamName,
197+
String consumerName,
198+
Duration ackWait,
199+
long maxDeliver,
200+
long maxAckPending,
201+
String filterSubject) {
153202
try {
154203
ConsumerInfo info = safeGetConsumerInfo(streamName, consumerName);
155204
if (info != null) {
156205
ConsumerConfiguration cc = info.getConsumerConfiguration();
157206
if (cc.getAckWait() != null && !cc.getAckWait().equals(ackWait)) {
158-
log.log(
159-
Level.WARNING,
160-
"Consumer "
161-
+ streamName
162-
+ "/"
163-
+ consumerName
164-
+ " ackWait differs (existing="
165-
+ cc.getAckWait()
166-
+ ", desired="
167-
+ ackWait
168-
+ ") - leaving existing config in place.");
207+
log.log(Level.WARNING,
208+
"Consumer " + streamName + "/" + consumerName
209+
+ " ackWait differs (existing=" + cc.getAckWait()
210+
+ ", desired=" + ackWait + ") - leaving existing config in place.");
169211
}
170212
if (cc.getMaxDeliver() != maxDeliver) {
171-
log.log(
172-
Level.WARNING,
173-
"Consumer "
174-
+ streamName
175-
+ "/"
176-
+ consumerName
177-
+ " maxDeliver differs (existing="
178-
+ cc.getMaxDeliver()
179-
+ ", desired="
180-
+ maxDeliver
181-
+ ") - leaving existing config in place.");
213+
log.log(Level.WARNING,
214+
"Consumer " + streamName + "/" + consumerName
215+
+ " maxDeliver differs (existing=" + cc.getMaxDeliver()
216+
+ ", desired=" + maxDeliver + ") - leaving existing config in place.");
182217
}
183218
return consumerName;
184219
}
185220
if (!config.isAutoCreateConsumers()) {
186-
throw new RqueueNatsException("Consumer '"
187-
+ consumerName
188-
+ "' on stream '"
189-
+ streamName
221+
throw new RqueueNatsException("Consumer '" + consumerName + "' on stream '" + streamName
190222
+ "' does not exist and autoCreateConsumers=false");
191223
}
192224
ConsumerConfiguration.Builder cb = ConsumerConfiguration.builder()
@@ -204,8 +236,6 @@ public String ensureConsumer(
204236
} catch (JetStreamApiException e) {
205237
// Error 10100 = "filtered consumer not unique" — a consumer with the same filter
206238
// already exists on the stream (stale from a previous naming scheme or crashed run).
207-
// List all consumers and check if one matches our filter; reuse it rather than
208-
// failing the startup.
209239
if (e.getApiErrorCode() == 10100 && filterSubject != null) {
210240
return tryFindAndBindStaleConsumer(streamName, filterSubject, consumerName);
211241
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -292,18 +292,10 @@ private List<RqueueMessage> popInternal(
292292
Duration fetchWait = (wait != null && !wait.isZero()) ? wait : config.getDefaultFetchWait();
293293
String key = stream + "/" + consumerName;
294294
JetStreamSubscription sub = subscriptionCache.computeIfAbsent(key, k -> {
295-
// computeIfAbsent runs at most once per (stream, consumer) per JVM, so both the stream
296-
// and the durable consumer are ensured exactly once on the cold path — not on every pop.
297-
//
298-
// ensureStream here guards against a start-order race: RqueueBootstrapEvent (which drives
299-
// NatsStreamValidator) fires *after* doStart() has already launched the broker pollers.
300-
// The validator is still the authoritative boot-time check for the "autoCreateStreams=false"
301-
// case, but ensureStream() here is idempotent and free if the stream already exists
302-
// (one getStreamInfo round-trip per subscription, not per message).
295+
// NatsStreamValidator provisions the stream and consumer at bootstrap (RqueueBootstrapEvent).
296+
// NatsProvisioner caches both, so ensureConsumer here is a map lookup — no backend call.
297+
// We still call it to resolve the actual consumer name (may differ for stale-rebind).
303298
try {
304-
provisioner.ensureStream(stream, List.of(subject));
305-
// ensureConsumer returns the actual consumer name to use (may differ if a stale
306-
// consumer was recovered/reused due to naming scheme changes).
307299
String actualConsumerName = provisioner.ensureConsumer(
308300
stream,
309301
consumerName,
@@ -324,6 +316,13 @@ private List<RqueueMessage> popInternal(
324316
for (Message nm : msgs) {
325317
try {
326318
RqueueMessage rm = serdes.deserialize(nm.getData(), RqueueMessage.class);
319+
// derive failure count from JetStream redelivery metadata
320+
try {
321+
long deliveredCount = nm.metaData().deliveredCount();
322+
rm.setFailureCount((int) Math.max(0, deliveredCount - 1));
323+
} catch (Exception ignored) {
324+
// defensive: metadata unavailable on non-JetStream messages
325+
}
327326
if (rm.getId() != null) {
328327
inFlight.put(rm.getId(), nm);
329328
}
@@ -371,46 +370,6 @@ public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) {
371370
return true;
372371
}
373372

374-
/**
375-
* NATS redelivery (nak) replays the original payload bytes, so {@code failureCount} embedded in
376-
* the message never increments across deliveries. We ack the original and re-publish the
377-
* {@code updated} message (which already carries the incremented count) so the next delivery sees
378-
* the correct counter. The retry delay is not honoured — NATS JetStream has no server-side
379-
* delayed publish in this version.
380-
*
381-
* <p>The {@code Nats-Msg-Id} header is suffixed with the failure count so the stream's
382-
* deduplication window does not drop the re-published message (same base id, different attempt).
383-
*/
384-
@Override
385-
public void parkForRetry(QueueDetail q, RqueueMessage old, RqueueMessage updated, long delayMs) {
386-
if (old.getId() != null) {
387-
Message nm = inFlight.remove(old.getId());
388-
if (nm != null) {
389-
nm.ack();
390-
}
391-
}
392-
String subject = subjectFor(q);
393-
Headers headers = new Headers();
394-
if (updated.getId() != null) {
395-
headers.add("Nats-Msg-Id", updated.getId() + "-r" + updated.getFailureCount());
396-
}
397-
try {
398-
byte[] payload = serdes.serialize(updated);
399-
js.publish(subject, headers, payload);
400-
} catch (IOException | JetStreamApiException e) {
401-
throw new RqueueNatsException(
402-
"Failed to re-enqueue message id=" + old.getId() + " for retry on queue=" + q.getName(),
403-
e);
404-
} catch (RuntimeException e) {
405-
throw new RqueueNatsException(
406-
"Failed to serialize/re-enqueue message id="
407-
+ old.getId()
408-
+ " for retry on queue="
409-
+ q.getName(),
410-
e);
411-
}
412-
}
413-
414373
@Override
415374
public void moveToDlq(
416375
QueueDetail source,
@@ -624,6 +583,7 @@ public AutoCloseable installDeadLetterBridge(QueueDetail q, String consumerName)
624583
// ---- builder -----------------------------------------------------------
625584

626585
public static class Builder {
586+
627587
private Connection connection;
628588
private JetStream jetStream;
629589
private JetStreamManagement management;

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/NatsStreamValidator.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,21 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
9191
log.log(Level.FINE, "NatsStreamValidator: no active queues registered; nothing to do");
9292
return;
9393
}
94+
RqueueNatsConfig.ConsumerDefaults cd = config.getConsumerDefaults();
9495
List<String> failures = new ArrayList<>();
9596
int total = 0;
9697
for (QueueDetail q : queues) {
9798
String mainStream = config.getStreamPrefix() + q.getName();
9899
String mainSubject = config.getSubjectPrefix() + q.getName();
99100
total += tryEnsure(failures, mainStream, mainSubject);
101+
tryEnsureConsumer(failures, mainStream, consumerName(q.getName()), cd, mainSubject);
100102

101103
if (q.getPriority() != null) {
102104
for (String priority : q.getPriority().keySet()) {
103-
total += tryEnsure(failures, mainStream + "-" + priority, mainSubject + "." + priority);
105+
String pStream = mainStream + "-" + priority;
106+
String pSubject = mainSubject + "." + priority;
107+
total += tryEnsure(failures, pStream, pSubject);
108+
tryEnsureConsumer(failures, pStream, consumerName(q.getName()), cd, pSubject);
104109
}
105110
}
106111

@@ -112,6 +117,7 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
112117
String dlqQueueStream = config.getStreamPrefix() + q.getDeadLetterQueueName();
113118
String dlqQueueSubject = config.getSubjectPrefix() + q.getDeadLetterQueueName();
114119
total += tryEnsure(failures, dlqQueueStream, dlqQueueSubject);
120+
// No consumer needed for the DLQ stream here — the DLQ queue registers its own listener.
115121
}
116122
}
117123
if (!failures.isEmpty()) {
@@ -139,6 +145,26 @@ public void onApplicationEvent(RqueueBootstrapEvent event) {
139145
new Object[] {total, queues.size()});
140146
}
141147

148+
/** Mirrors {@code JetStreamMessageBroker.resolveConsumerName} for a null caller-supplied name. */
149+
static String consumerName(String queueName) {
150+
return "rqueue-" + queueName;
151+
}
152+
153+
private void tryEnsureConsumer(
154+
List<String> failures,
155+
String streamName,
156+
String consumerName,
157+
RqueueNatsConfig.ConsumerDefaults cd,
158+
String filterSubject) {
159+
try {
160+
provisioner.ensureConsumer(
161+
streamName, consumerName, cd.getAckWait(), cd.getMaxDeliver(), cd.getMaxAckPending(),
162+
filterSubject);
163+
} catch (RqueueNatsException e) {
164+
failures.add("consumer " + consumerName + " on " + streamName + ": " + rootCause(e));
165+
}
166+
}
167+
142168
private int tryEnsure(List<String> failures, String streamName, String subject) {
143169
try {
144170
provisioner.ensureStream(streamName, List.of(subject));

0 commit comments

Comments
 (0)