Skip to content

Commit 483a22b

Browse files
committed
Fix consumer naming conflict: handle stale consumers on startup
When routing NATS through DefaultRqueuePoller, consumer names changed from "rqueue-<queue>-<bean>_<method>" to "rqueue-<queue>". Stale consumers from the previous naming scheme can remain on the NATS server, causing "filtered consumer not unique" errors when the new code tries to create a consumer with the same filter subject. Fix: when ensureConsumer encounters error 10100 (filtered consumer not unique), list all consumers on the stream and find one with the matching filter subject. Reuse the existing consumer instead of failing startup. This allows the app to recover from unclean shutdowns and naming scheme changes without requiring manual NATS cleanup. Assisted-By: Claude Code
1 parent 270c3c2 commit 483a22b

1 file changed

Lines changed: 52 additions & 1 deletion

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/internal/NatsProvisioner.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,60 @@ public void ensureConsumer(
139139
cb.filterSubject(filterSubject);
140140
}
141141
jsm.addOrUpdateConsumer(streamName, cb.build());
142-
} catch (IOException | JetStreamApiException e) {
142+
} catch (JetStreamApiException e) {
143+
// Error 10100 = "filtered consumer not unique" — a consumer with the same filter
144+
// already exists on the stream (stale from a previous naming scheme or crashed run).
145+
// List all consumers and check if one matches our filter; reuse it rather than
146+
// failing the startup.
147+
if (e.getApiErrorCode() == 10100 && filterSubject != null) {
148+
tryFindAndBindStaleConsumer(streamName, filterSubject, consumerName);
149+
return;
150+
}
143151
throw new RqueueNatsException(
144152
"Failed to ensure consumer '" + consumerName + "' on stream '" + streamName + "'", e);
153+
} catch (IOException e) {
154+
throw new RqueueNatsException(
155+
"Failed to ensure consumer '" + consumerName + "' on stream '" + streamName + "'", e);
156+
}
157+
}
158+
159+
/**
160+
* Handle the "filtered consumer not unique" error by finding an existing consumer on the stream
161+
* that matches the desired filter subject. This can happen when:
162+
* - Stale consumers from a previous consumer naming scheme still exist
163+
* - Multiple instances crashed mid-initialization and left orphaned consumers
164+
*
165+
* <p>When found, the broker will bind to the existing consumer (which will work fine as long as
166+
* it has a compatible configuration).
167+
*
168+
* @throws RqueueNatsException if recovery fails
169+
*/
170+
private void tryFindAndBindStaleConsumer(
171+
String streamName, String filterSubject, String preferredConsumerName) {
172+
try {
173+
// List all consumers on the stream and find one matching our filter.
174+
List<String> consumerNames = jsm.getConsumerNames(streamName);
175+
for (String name : consumerNames) {
176+
ConsumerInfo ci = jsm.getConsumerInfo(streamName, name);
177+
ConsumerConfiguration cc = ci.getConsumerConfiguration();
178+
// Check if this consumer has the same filter we're looking for.
179+
if (filterSubject.equals(cc.getFilterSubject())) {
180+
log.log(
181+
Level.INFO,
182+
"Reusing existing consumer '" + name + "' (filter=" + filterSubject + ")"
183+
+ " instead of creating '" + preferredConsumerName + "'");
184+
return; // Bind will use the existing consumer
185+
}
186+
}
187+
// No matching consumer found; this is unexpected, so fail.
188+
throw new RqueueNatsException(
189+
"Filtered consumer with filter '" + filterSubject + "' not found on stream '"
190+
+ streamName + "' despite 'filtered consumer not unique' error");
191+
} catch (IOException | JetStreamApiException e) {
192+
throw new RqueueNatsException(
193+
"Failed to recover from 'filtered consumer not unique' error on stream '" + streamName
194+
+ "'",
195+
e);
145196
}
146197
}
147198

0 commit comments

Comments
 (0)