Skip to content

Commit 2afd2fd

Browse files
committed
Fix consumer binding: return actual consumer name from ensureConsumer
When recovering stale consumers (due to naming scheme changes), ensureConsumer now returns the actual consumer name rather than the preferred name. This allows JetStreamMessageBroker to bind to the correct consumer even when it was recovered with a different name. Previously: ensureConsumer was void, so broker always tried to bind using the preferred name. If a stale consumer with a different name was recovered, the bind would fail with "Consumer not found, required in bind mode" (SUB-90017). Now: ensureConsumer returns the actual consumer name (either created or recovered), and broker uses that for binding. Assisted-By: Claude Code
1 parent 483a22b commit 2afd2fd

2 files changed

Lines changed: 19 additions & 10 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,14 @@ public void ensureStream(String streamName, List<String> subjects) {
8282
* Ensure a durable pull consumer exists. If existing config differs from desired, logs WARN and
8383
* leaves it alone (so users can hand-tune consumers in production).
8484
*/
85-
public void ensureConsumer(
85+
/**
86+
* Ensure a consumer exists on a stream with the given configuration, returning the actual
87+
* consumer name that will be used for binding (may differ from {@code consumerName} if a
88+
* stale/recovered consumer was reused).
89+
*
90+
* @return the actual consumer name to use for binding
91+
*/
92+
public String ensureConsumer(
8693
String streamName,
8794
String consumerName,
8895
Duration ackWait,
@@ -119,7 +126,7 @@ public void ensureConsumer(
119126
+ maxDeliver
120127
+ ") - leaving existing config in place.");
121128
}
122-
return;
129+
return consumerName;
123130
}
124131
if (!config.isAutoCreateConsumers()) {
125132
throw new RqueueNatsException("Consumer '"
@@ -139,14 +146,14 @@ public void ensureConsumer(
139146
cb.filterSubject(filterSubject);
140147
}
141148
jsm.addOrUpdateConsumer(streamName, cb.build());
149+
return consumerName;
142150
} catch (JetStreamApiException e) {
143151
// Error 10100 = "filtered consumer not unique" — a consumer with the same filter
144152
// already exists on the stream (stale from a previous naming scheme or crashed run).
145153
// List all consumers and check if one matches our filter; reuse it rather than
146154
// failing the startup.
147155
if (e.getApiErrorCode() == 10100 && filterSubject != null) {
148-
tryFindAndBindStaleConsumer(streamName, filterSubject, consumerName);
149-
return;
156+
return tryFindAndBindStaleConsumer(streamName, filterSubject, consumerName);
150157
}
151158
throw new RqueueNatsException(
152159
"Failed to ensure consumer '" + consumerName + "' on stream '" + streamName + "'", e);
@@ -162,12 +169,12 @@ public void ensureConsumer(
162169
* - Stale consumers from a previous consumer naming scheme still exist
163170
* - Multiple instances crashed mid-initialization and left orphaned consumers
164171
*
165-
* <p>When found, the broker will bind to the existing consumer (which will work fine as long as
166-
* it has a compatible configuration).
172+
* <p>When found, returns the actual consumer name so the broker can bind to it correctly.
167173
*
174+
* @return the actual consumer name to use for binding
168175
* @throws RqueueNatsException if recovery fails
169176
*/
170-
private void tryFindAndBindStaleConsumer(
177+
private String tryFindAndBindStaleConsumer(
171178
String streamName, String filterSubject, String preferredConsumerName) {
172179
try {
173180
// List all consumers on the stream and find one matching our filter.
@@ -181,7 +188,7 @@ private void tryFindAndBindStaleConsumer(
181188
Level.INFO,
182189
"Reusing existing consumer '" + name + "' (filter=" + filterSubject + ")"
183190
+ " instead of creating '" + preferredConsumerName + "'");
184-
return; // Bind will use the existing consumer
191+
return name; // Return the actual consumer name for binding
185192
}
186193
}
187194
// No matching consumer found; this is unexpected, so fail.

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,16 @@ private List<RqueueMessage> popInternal(
289289
// (one getStreamInfo round-trip per subscription, not per message).
290290
try {
291291
provisioner.ensureStream(stream, List.of(subject));
292-
provisioner.ensureConsumer(
292+
// ensureConsumer returns the actual consumer name to use (may differ if a stale
293+
// consumer was recovered/reused due to naming scheme changes).
294+
String actualConsumerName = provisioner.ensureConsumer(
293295
stream,
294296
consumerName,
295297
config.getConsumerDefaults().getAckWait(),
296298
config.getConsumerDefaults().getMaxDeliver(),
297299
config.getConsumerDefaults().getMaxAckPending(),
298300
subject);
299-
PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, consumerName);
301+
PullSubscribeOptions opts = PullSubscribeOptions.bind(stream, actualConsumerName);
300302
return js.subscribe(subject, opts);
301303
} catch (IOException | JetStreamApiException e) {
302304
throw new RqueueNatsException(

0 commit comments

Comments
 (0)