Skip to content

Commit ecc377e

Browse files
committed
feat: multi-consumer support with per-consumer poller tracking and worker registry
Rename natsConsumerName -> consumerName to make the concept broker-agnostic. Introduce pollerKey (queue##consumer) in RqueueMessageListenerContainer to prevent the second consumer of a multi-consumer queue being skipped by the bare-name dedup check. Wire worker registry heartbeats and capacity-exhausted tracking through consumerTrackingKey so each consumer is reported separately. Add NATS stream retention policy and a 14-day maxAge default; wire the message broker onto RqueueMessageTemplateImpl at container start for non-Redis publish paths. Extract BaseListener/JobListener in the spring-boot example. Assisted-By: Claude Code
1 parent bb1ef19 commit ecc377e

30 files changed

Lines changed: 373 additions & 206 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,17 @@
4444
public class RqueueConfig {
4545

4646
@Getter
47-
private static final String brokerId = UUID.randomUUID().toString();
47+
private static final String brokerId = generateBrokerId();
48+
49+
private static String generateBrokerId() {
50+
String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
51+
java.util.Random random = new java.util.Random();
52+
StringBuilder sb = new StringBuilder(8);
53+
for (int i = 0; i < 8; i++) {
54+
sb.append(chars.charAt(random.nextInt(chars.length())));
55+
}
56+
return sb.toString();
57+
}
4858

4959
private static final AtomicLong counter = new AtomicLong(1);
5060
private final RedisConnectionFactory connectionFactory;

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@
3737
public final class EndpointRegistry {
3838

3939
private static final Object lock = new Object();
40-
private static final Map<String, QueueDetail> queueNameToDetail = new HashMap<>();
40+
// composite key: "queueName" for single-consumer, "queueName##consumerName" for multi-consumer
41+
private static final Map<String, QueueDetail> registry = new HashMap<>();
42+
// first registered QueueDetail per queue name — used by enqueue / management operations
43+
private static final Map<String, QueueDetail> primaryByName = new HashMap<>();
44+
45+
private static String compositeKey(QueueDetail qd) {
46+
String cn = qd.getConsumerName();
47+
return (cn != null && !cn.isEmpty()) ? qd.getName() + "##" + cn : qd.getName();
48+
}
4149

4250
private EndpointRegistry() {}
4351

@@ -51,7 +59,7 @@ private EndpointRegistry() {}
5159
* @see #get(String, String)
5260
*/
5361
public static QueueDetail get(String queueName) {
54-
QueueDetail queueDetail = queueNameToDetail.get(queueName);
62+
QueueDetail queueDetail = primaryByName.get(queueName);
5563
if (queueDetail == null) {
5664
throw new QueueDoesNotExist(queueName);
5765
}
@@ -68,7 +76,7 @@ public static QueueDetail get(String queueName) {
6876
*/
6977
public static QueueDetail get(String queueName, String priority) {
7078
QueueDetail queueDetail =
71-
queueNameToDetail.get(PriorityUtils.getQueueNameForPriority(queueName, priority));
79+
primaryByName.get(PriorityUtils.getQueueNameForPriority(queueName, priority));
7280
if (queueDetail == null) {
7381
throw new QueueDoesNotExist(queueName);
7482
}
@@ -77,24 +85,27 @@ public static QueueDetail get(String queueName, String priority) {
7785

7886
public static void register(QueueDetail queueDetail) {
7987
synchronized (lock) {
80-
if (queueNameToDetail.containsKey(queueDetail.getName())) {
88+
String ck = compositeKey(queueDetail);
89+
if (registry.containsKey(ck)) {
8190
throw new OverrideException(queueDetail.getName());
8291
}
83-
queueNameToDetail.put(queueDetail.getName(), queueDetail);
92+
registry.put(ck, queueDetail);
93+
primaryByName.putIfAbsent(queueDetail.getName(), queueDetail);
8494
lock.notifyAll();
8595
}
8696
}
8797

8898
public static void delete() {
8999
synchronized (lock) {
90-
queueNameToDetail.clear();
100+
registry.clear();
101+
primaryByName.clear();
91102
lock.notifyAll();
92103
}
93104
}
94105

95106
public static List<String> getActiveQueues() {
96107
synchronized (lock) {
97-
List<String> queues = queueNameToDetail.values().stream()
108+
List<String> queues = primaryByName.values().stream()
98109
.filter(QueueDetail::isActive)
99110
.map(QueueDetail::getName)
100111
.collect(Collectors.toList());
@@ -105,7 +116,7 @@ public static List<String> getActiveQueues() {
105116

106117
public static List<QueueDetail> getActiveQueueDetails() {
107118
synchronized (lock) {
108-
List<QueueDetail> queueDetails = queueNameToDetail.values().stream()
119+
List<QueueDetail> queueDetails = registry.values().stream()
109120
.filter(QueueDetail::isActive)
110121
.collect(Collectors.toList());
111122
lock.notifyAll();
@@ -115,7 +126,7 @@ public static List<QueueDetail> getActiveQueueDetails() {
115126

116127
public static Map<String, QueueDetail> getActiveQueueMap() {
117128
synchronized (lock) {
118-
Map<String, QueueDetail> queueDetails = queueNameToDetail.values().stream()
129+
Map<String, QueueDetail> queueDetails = primaryByName.values().stream()
119130
.filter(QueueDetail::isActive)
120131
.collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
121132
lock.notifyAll();
@@ -126,7 +137,7 @@ public static Map<String, QueueDetail> getActiveQueueMap() {
126137
public static String toStr() {
127138
StringBuilder builder = new StringBuilder();
128139
synchronized (lock) {
129-
List<QueueDetail> queueDetails = new ArrayList<>(queueNameToDetail.values());
140+
List<QueueDetail> queueDetails = new ArrayList<>(registry.values());
130141
queueDetails.sort(Comparator.comparing(QueueDetail::getName));
131142
for (QueueDetail q : queueDetails) {
132143
builder.append(q.toString());
@@ -142,6 +153,6 @@ public static int getActiveQueueCount() {
142153
}
143154

144155
public static int getRegisteredQueueCount() {
145-
return queueNameToDetail.size();
156+
return registry.size();
146157
}
147158
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/middleware/HandlerMiddleware.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void handle(Job job, Callable<Void> next) throws Exception {
4848
payload,
4949
buildMessageHeaders(
5050
job.getQueueDetail().getName(),
51+
job.getQueueDetail().getConsumerName(),
5152
rqueueMessage,
5253
job,
5354
execution,

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/DefaultRqueuePoller.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class DefaultRqueuePoller extends RqueueMessagePoller {
3838
private final QueueThreadPool queueThreadPool;
3939

4040
DefaultRqueuePoller(
41+
String pollerKey,
4142
QueueDetail queueDetail,
4243
QueueThreadPool queueThreadPool,
4344
RqueueBeanProvider rqueueBeanProvider,
@@ -96,6 +97,8 @@ void poll() {
9697
logNotAvailable();
9798
TimeoutUtils.sleepLog(pollingInterval, false);
9899
} else {
100+
// Pass the pollerKey (queues[0]) so pollAndExecute's isQueueActive check uses the same
101+
// key that queueRunningState was keyed with (may be "queueName##consumerName").
99102
super.poll(-1, queueDetail.getName(), queueDetail, queueThreadPool);
100103
lastNotAvailableAt = null;
101104
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/MappingInformation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class MappingInformation implements Comparable<MappingInformation> {
3535
@EqualsAndHashCode.Include
3636
private final Set<String> queueNames;
3737

38+
@EqualsAndHashCode.Include
39+
private final String consumerName;
40+
3841
private final int numRetry;
3942
private final String deadLetterQueueName;
4043
private final boolean deadLetterConsumerEnabled;
@@ -46,7 +49,6 @@ class MappingInformation implements Comparable<MappingInformation> {
4649
private final boolean primary;
4750
private final int batchSize;
4851
private final Set<Class<? extends Throwable>> doNotRetry;
49-
private final String natsConsumerName;
5052

5153
@Override
5254
public String toString() {

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class QueueDetail extends SerializableBase {
8484
private final Duration natsAckWaitOverride;
8585
private final Integer natsMaxDeliverOverride;
8686
private final Duration natsDedupWindow;
87-
private final String natsConsumerName;
87+
private final String consumerName;
8888

8989
public boolean isDlqSet() {
9090
return !StringUtils.isEmpty(deadLetterQueueName);
@@ -165,14 +165,30 @@ private QueueDetail cloneQueueDetail(
165165
.concurrency(concurrency)
166166
.priority(Collections.singletonMap(Constants.DEFAULT_PRIORITY_KEY, priority))
167167
.doNotRetry(doNotRetry)
168-
.natsConsumerName(natsConsumerName)
168+
.consumerName(consumerName)
169169
.build();
170170
}
171171

172172
public Duration visibilityDuration() {
173173
return Duration.ofMillis(visibilityTimeout);
174174
}
175175

176+
/**
177+
* Returns the effective JetStream consumer name for this queue. When {@link #consumerName} is
178+
* explicitly set it is returned as-is. Otherwise a default is derived from the queue name:
179+
* primary (non-system-generated) queues get {@code {name}-consumer-primary}; system-generated
180+
* priority sub-queues get {@code {name}-consumer}. The name is sanitized so that characters
181+
* outside {@code [A-Za-z0-9_-]} (e.g. the {@code ::} priority suffix separator) are replaced
182+
* with {@code -}, producing a valid NATS consumer name in all cases.
183+
*/
184+
public String resolvedConsumerName() {
185+
if (consumerName != null && !consumerName.isEmpty()) {
186+
return consumerName;
187+
}
188+
String sanitized = name.replaceAll("[^A-Za-z0-9_-]", "-");
189+
return systemGenerated ? sanitized + "-consumer" : sanitized + "-consumer-primary";
190+
}
191+
176192
/**
177193
* Resolves the JetStream stream name. When {@link #natsStream} is null the default
178194
* derivation {@code "rqueue-" + queueName} is used so existing queue configs keep

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHandler.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,15 @@ protected void registerHandlerMethod(Object handler, Method method, MappingInfor
252252

253253
@Override
254254
protected void handleMessageInternal(Message<?> message, String lookupDestination) {
255-
MappingInformation mapping = this.destinationLookup.get(lookupDestination);
255+
String consumerName = (String) message.getHeaders().get(RqueueMessageHeaders.CONSUMER_NAME);
256+
String lookupKey = (consumerName != null && !consumerName.isEmpty())
257+
? consumerLookupKey(lookupDestination, consumerName)
258+
: lookupDestination;
259+
MappingInformation mapping = this.destinationLookup.get(lookupKey);
256260
Set<Match> matches = new HashSet<>();
257-
addMatchesToCollection(mapping, message, matches);
261+
if (mapping != null) {
262+
addMatchesToCollection(mapping, message, matches);
263+
}
258264
if (matches.isEmpty()) {
259265
handleNoMatch(this.getHandlerMethodMap().keySet(), lookupDestination, message);
260266
return;
@@ -327,7 +333,7 @@ private MappingInformation getMappingInformation(RqueueListener rqueueListener)
327333
.priority(priorityMap)
328334
.batchSize(batchSize)
329335
.doNotRetry(new HashSet<>(Arrays.asList(rqueueListener.doNotRetry())))
330-
.natsConsumerName(natsConsumerName.isEmpty() ? null : natsConsumerName)
336+
.consumerName(natsConsumerName.isEmpty() ? null : natsConsumerName)
331337
.build();
332338
if (mappingInformation.isValid()) {
333339
return mappingInformation;
@@ -523,13 +529,25 @@ private Set<String> resolveQueueNames(RqueueListener rqueueListener) {
523529

524530
@Override
525531
protected Set<String> getDirectLookupDestinations(MappingInformation mapping) {
526-
Set<String> destinations = new HashSet<>(mapping.getQueueNames());
532+
String consumerName = mapping.getConsumerName();
533+
Set<String> destinations = new HashSet<>();
527534
for (String queueName : mapping.getQueueNames()) {
528-
destinations.addAll(PriorityUtils.getNamesFromPriority(queueName, mapping.getPriority()));
535+
if (consumerName != null && !consumerName.isEmpty()) {
536+
// Consumer-qualified key so two @RqueueListener methods with different consumerNames
537+
// on the same queue get independent destinationLookup entries.
538+
destinations.add(consumerLookupKey(queueName, consumerName));
539+
} else {
540+
destinations.add(queueName);
541+
destinations.addAll(PriorityUtils.getNamesFromPriority(queueName, mapping.getPriority()));
542+
}
529543
}
530544
return destinations;
531545
}
532546

547+
static String consumerLookupKey(String queueName, String consumerName) {
548+
return queueName + "##" + consumerName;
549+
}
550+
533551
@Override
534552
protected String getDestination(Message<?> message) {
535553
return (String) message.getHeaders().get(RqueueMessageHeaders.DESTINATION);

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageHeaders.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ public final class RqueueMessageHeaders {
5656
*/
5757
public static final String EXECUTION = "execution";
5858

59+
/**
60+
* NATS JetStream consumer name — set when a message was fetched for a specific named consumer so
61+
* that {@link RqueueMessageHandler} can route it to the correct {@code @RqueueListener} method.
62+
* Absent (null) for single-consumer queues and all Redis-backed queues.
63+
*/
64+
public static final String CONSUMER_NAME = "consumerName";
65+
5966
private static final MessageHeaders emptyMessageHeaders =
6067
new MessageHeaders(Collections.emptyMap());
6168

@@ -71,8 +78,21 @@ public static MessageHeaders buildMessageHeaders(
7178
Job job,
7279
Execution execution,
7380
MessageHeaders messageHeaders) {
74-
Map<String, Object> headers = new HashMap<>(9);
81+
return buildMessageHeaders(destination, null, rqueueMessage, job, execution, messageHeaders);
82+
}
83+
84+
public static MessageHeaders buildMessageHeaders(
85+
String destination,
86+
String consumerName,
87+
RqueueMessage rqueueMessage,
88+
Job job,
89+
Execution execution,
90+
MessageHeaders messageHeaders) {
91+
Map<String, Object> headers = new HashMap<>(10);
7592
headers.put(DESTINATION, destination);
93+
if (consumerName != null && !consumerName.isEmpty()) {
94+
headers.put(CONSUMER_NAME, consumerName);
95+
}
7696
headers.put(ID, rqueueMessage.getId());
7797
headers.put(MESSAGE, rqueueMessage);
7898
if (job != null) {

0 commit comments

Comments
 (0)