Skip to content

Commit 3c378e4

Browse files
committed
add worker heart beat
1 parent a752acc commit 3c378e4

File tree

71 files changed

+4137
-388
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+4137
-388
lines changed

rqueue-core/src/main/java/com/github/sonus21/rqueue/common/RqueueRedisTemplate.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public Long rpush(String listName, V val) {
6666
return redisTemplate.opsForList().rightPush(listName, val);
6767
}
6868

69+
public Long lpush(String listName, V val) {
70+
return redisTemplate.opsForList().leftPush(listName, val);
71+
}
72+
6973
public Long addToSet(String setName, V... values) {
7074
return redisTemplate.opsForSet().add(setName, values);
7175
}
@@ -98,6 +102,23 @@ public void set(String key, V val, Duration duration) {
98102
redisTemplate.opsForValue().set(key, val, duration.toMillis(), TimeUnit.MILLISECONDS);
99103
}
100104

105+
public void putHashValue(String key, String hashKey, V val) {
106+
redisTemplate.opsForHash().put(key, hashKey, val);
107+
}
108+
109+
@SuppressWarnings("unchecked")
110+
public Map<String, V> getHashEntries(String key) {
111+
return (Map<String, V>) (Map<?, ?>) redisTemplate.opsForHash().entries(key);
112+
}
113+
114+
public Long deleteHashValues(String key, String... hashKeys) {
115+
return redisTemplate.opsForHash().delete(key, (Object[]) hashKeys);
116+
}
117+
118+
public Boolean expire(String key, Duration duration) {
119+
return redisTemplate.expire(key, duration.toMillis(), TimeUnit.MILLISECONDS);
120+
}
121+
101122
public Boolean setIfAbsent(String lockKey, V val, Duration duration) {
102123
boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, val);
103124
if (result) {

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueConfig.java

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
@Configuration
4444
public class RqueueConfig {
4545

46+
@Getter
4647
private static final String brokerId = UUID.randomUUID().toString();
4748
private static final AtomicLong counter = new AtomicLong(1);
4849
private final RedisConnectionFactory connectionFactory;
@@ -139,9 +140,26 @@ public class RqueueConfig {
139140
@Value("${rqueue.completed.job.cleanup.interval:30000}")
140141
private long completedJobCleanupIntervalInMs;
141142

142-
public static String getBrokerId() {
143-
return brokerId;
144-
}
143+
@Value("${rqueue.worker.registry.enabled:true}")
144+
private boolean workerRegistryEnabled;
145+
146+
@Value("${rqueue.worker.registry.worker.ttl:300}")
147+
private long workerRegistryWorkerTtlInSeconds;
148+
149+
@Value("${rqueue.worker.registry.worker.heartbeat.interval:60}")
150+
private long workerRegistryWorkerHeartbeatIntervalInSeconds;
151+
152+
@Value("${rqueue.worker.registry.queue.ttl:3600}")
153+
private long workerRegistryQueueTtlInSeconds;
154+
155+
@Value("${rqueue.worker.registry.queue.heartbeat.interval:15}")
156+
private long workerRegistryQueueHeartbeatIntervalInSeconds;
157+
158+
@Value("${rqueue.worker.registry.key.prefix:worker::}")
159+
private String workerRegistryKeyPrefix;
160+
161+
@Value("${rqueue.worker.registry.queue.key.prefix:q-pollers::}")
162+
private String workerRegistryQueueKeyPrefix;
145163

146164
public boolean messageInTerminalStateShouldBeStored() {
147165
return getMessageDurabilityInTerminalStateInSecond() > 0;
@@ -294,6 +312,14 @@ public String getJobsKey(String messageId) {
294312
return prefix + jobsCollectionNamePrefix + messageId;
295313
}
296314

315+
public String getWorkerRegistryKey(String workerId) {
316+
return prefix + workerRegistryKeyPrefix + workerId;
317+
}
318+
319+
public String getWorkerRegistryQueueKey(String queueName) {
320+
return prefix + workerRegistryQueueKeyPrefix + getTaggedName(queueName);
321+
}
322+
297323
public String getDelDataName(String queueName) {
298324
return prefix
299325
+ delPrefix
@@ -307,6 +333,22 @@ public Duration getJobDurabilityInTerminalState() {
307333
return Duration.ofSeconds(jobDurabilityInTerminalStateInSecond);
308334
}
309335

336+
public Duration getWorkerRegistryWorkerTtl() {
337+
return Duration.ofSeconds(workerRegistryWorkerTtlInSeconds);
338+
}
339+
340+
public Duration getWorkerRegistryWorkerHeartbeatInterval() {
341+
return Duration.ofSeconds(workerRegistryWorkerHeartbeatIntervalInSeconds);
342+
}
343+
344+
public Duration getWorkerRegistryQueueTtl() {
345+
return Duration.ofSeconds(workerRegistryQueueTtlInSeconds);
346+
}
347+
348+
public Duration getWorkerRegistryQueueHeartbeatInterval() {
349+
return Duration.ofSeconds(workerRegistryQueueHeartbeatIntervalInSeconds);
350+
}
351+
310352
public String getLibVersion() {
311353
if (StringUtils.isEmpty(version)) {
312354
ClassPathResource resource =

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,20 @@
2525
import com.github.sonus21.rqueue.core.ProcessingQueueMessageScheduler;
2626
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
2727
import com.github.sonus21.rqueue.core.RqueueInternalPubSubChannel;
28+
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2829
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2930
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
3031
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
3132
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
33+
import com.github.sonus21.rqueue.core.impl.UuidV4RqueueMessageIdGenerator;
3234
import com.github.sonus21.rqueue.dao.RqueueStringDao;
3335
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
3436
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3537
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
38+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
39+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistryImpl;
3640
import com.github.sonus21.rqueue.utils.RedisUtils;
41+
import com.github.sonus21.rqueue.utils.condition.MissingRqueueMessageIdGenerator;
3742
import com.github.sonus21.rqueue.utils.condition.ReactiveEnabled;
3843
import com.github.sonus21.rqueue.utils.pebble.ResourceLoader;
3944
import com.github.sonus21.rqueue.utils.pebble.RqueuePebbleExtension;
@@ -151,6 +156,12 @@ public RqueueWebConfig rqueueWebConfig() {
151156
return new RqueueWebConfig();
152157
}
153158

159+
@Bean
160+
@Conditional(MissingRqueueMessageIdGenerator.class)
161+
public RqueueMessageIdGenerator rqueueMessageIdGenerator() {
162+
return new UuidV4RqueueMessageIdGenerator();
163+
}
164+
154165
@Bean
155166
public RqueueSchedulerConfig rqueueSchedulerConfig() {
156167
return new RqueueSchedulerConfig();
@@ -215,6 +226,11 @@ public RqueueStringDao rqueueStringDao(RqueueConfig rqueueConfig) {
215226
return new RqueueStringDaoImpl(rqueueConfig);
216227
}
217228

229+
@Bean
230+
public RqueueWorkerRegistry rqueueWorkerRegistry(RqueueConfig rqueueConfig) {
231+
return new RqueueWorkerRegistryImpl(rqueueConfig);
232+
}
233+
218234
@Bean
219235
public RqueueLockManager rqueueLockManager(RqueueStringDao rqueueStringDao) {
220236
return new RqueueLockManagerImpl(rqueueStringDao);

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueWebConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public class RqueueWebConfig {
4848
@Value("${rqueue.web.max.message.move.count:1000}")
4949
private int maxMessageMoveCount;
5050

51+
@Value("${rqueue.web.queue.page.size:12}")
52+
private int queuePageSize;
53+
54+
@Value("${rqueue.web.worker.page.size:10}")
55+
private int workerPageSize;
56+
5157
/**
5258
* Whether queue stats should be collected or not. When this flag is disabled, metric data won't
5359
* be available in the dashboard.

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
2525
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
2626
import com.github.sonus21.rqueue.metrics.RqueueMetricsCounter;
27+
import com.github.sonus21.rqueue.worker.RqueueWorkerRegistry;
2728
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
2829
import lombok.Getter;
2930
import lombok.Setter;
@@ -55,6 +56,9 @@ public class RqueueBeanProvider {
5556
@Autowired(required = false)
5657
private RqueueMetricsCounter rqueueMetricsCounter;
5758

59+
@Autowired(required = false)
60+
private RqueueWorkerRegistry rqueueWorkerRegistry;
61+
5862
@Autowired
5963
private RqueueMessageHandler rqueueMessageHandler;
6064

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.core;
18+
19+
@FunctionalInterface
20+
public interface RqueueMessageIdGenerator {
21+
22+
String generate();
23+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ void moveMessageWithDelay(
4949

5050
Long addMessage(String queueName, RqueueMessage rqueueMessage);
5151

52+
Long addMessageAtFront(String queueName, RqueueMessage rqueueMessage);
53+
5254
Boolean addToZset(String zsetName, RqueueMessage rqueueMessage, long score);
5355

5456
List<RqueueMessage> getAllMessages(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.github.sonus21.rqueue.config.RqueueConfig;
2727
import com.github.sonus21.rqueue.core.EndpointRegistry;
2828
import com.github.sonus21.rqueue.core.RqueueMessage;
29+
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2930
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
3031
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
3132
import com.github.sonus21.rqueue.dao.RqueueStringDao;
@@ -51,25 +52,23 @@ abstract class BaseMessageSender {
5152
protected final MessageHeaders messageHeaders;
5253
protected final MessageConverter messageConverter;
5354
protected final RqueueMessageTemplate messageTemplate;
54-
55-
@Autowired
56-
protected RqueueStringDao rqueueStringDao;
57-
58-
@Autowired
59-
protected RqueueConfig rqueueConfig;
60-
61-
@Autowired
62-
protected RqueueMessageMetadataService rqueueMessageMetadataService;
55+
protected final RqueueMessageIdGenerator messageIdGenerator;
56+
@Autowired protected RqueueStringDao rqueueStringDao;
57+
@Autowired protected RqueueConfig rqueueConfig;
58+
@Autowired protected RqueueMessageMetadataService rqueueMessageMetadataService;
6359

6460
BaseMessageSender(
6561
RqueueMessageTemplate messageTemplate,
6662
MessageConverter messageConverter,
67-
MessageHeaders messageHeaders) {
63+
MessageHeaders messageHeaders,
64+
RqueueMessageIdGenerator messageIdGenerator) {
6865
notNull(messageTemplate, "messageTemplate cannot be null");
6966
notNull(messageConverter, "messageConverter cannot be null");
67+
notNull(messageIdGenerator, "messageIdGenerator cannot be null");
7068
this.messageTemplate = messageTemplate;
7169
this.messageConverter = messageConverter;
7270
this.messageHeaders = messageHeaders;
71+
this.messageIdGenerator = messageIdGenerator;
7372
}
7473

7574
protected Object storeMessageMetadata(
@@ -119,14 +118,16 @@ protected String pushMessage(
119118
Long delayInMilliSecs,
120119
boolean isUnique) {
121120
QueueDetail queueDetail = EndpointRegistry.get(queueName);
122-
RqueueMessage rqueueMessage = buildMessage(
123-
messageConverter,
124-
queueName,
125-
messageId,
126-
message,
127-
retryCount,
128-
delayInMilliSecs,
129-
messageHeaders);
121+
RqueueMessage rqueueMessage =
122+
buildMessage(
123+
messageIdGenerator,
124+
messageConverter,
125+
queueName,
126+
messageId,
127+
message,
128+
retryCount,
129+
delayInMilliSecs,
130+
messageHeaders);
130131
try {
131132
storeMessageMetadata(rqueueMessage, delayInMilliSecs, false, isUnique);
132133
enqueue(queueDetail, rqueueMessage, delayInMilliSecs, false);
@@ -146,14 +147,16 @@ protected String pushMessage(
146147
protected String pushPeriodicMessage(
147148
String queueName, String messageId, Object message, long periodInMilliSeconds) {
148149
QueueDetail queueDetail = EndpointRegistry.get(queueName);
149-
RqueueMessage rqueueMessage = buildPeriodicMessage(
150-
messageConverter,
151-
queueName,
152-
messageId,
153-
message,
154-
null,
155-
periodInMilliSeconds,
156-
messageHeaders);
150+
RqueueMessage rqueueMessage =
151+
buildPeriodicMessage(
152+
messageIdGenerator,
153+
messageConverter,
154+
queueName,
155+
messageId,
156+
message,
157+
null,
158+
periodInMilliSeconds,
159+
messageHeaders);
157160
try {
158161
storeMessageMetadata(rqueueMessage, periodInMilliSeconds, false, false);
159162
enqueue(queueDetail, rqueueMessage, periodInMilliSeconds, false);

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/MessageSweeper.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,7 @@
4242

4343
@Slf4j
4444
public class MessageSweeper {
45-
4645
private static MessageSweeper messageSweeper;
47-
4846
private final ExecutorService executorService;
4947
private final RqueueMessageTemplate messageTemplate;
5048
private final RqueueMessageMetadataService rqueueMessageMetadataService;

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/ReactiveRqueueMessageEnqueuerImpl.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.sonus21.rqueue.core.EndpointRegistry;
2222
import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer;
2323
import com.github.sonus21.rqueue.core.RqueueMessage;
24+
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2425
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2526
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
2627
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
@@ -41,7 +42,19 @@ public ReactiveRqueueMessageEnqueuerImpl(
4142
RqueueMessageTemplate messageTemplate,
4243
MessageConverter messageConverter,
4344
MessageHeaders messageHeaders) {
44-
super(messageTemplate, messageConverter, messageHeaders);
45+
this(
46+
messageTemplate,
47+
messageConverter,
48+
messageHeaders,
49+
new UuidV4RqueueMessageIdGenerator());
50+
}
51+
52+
public ReactiveRqueueMessageEnqueuerImpl(
53+
RqueueMessageTemplate messageTemplate,
54+
MessageConverter messageConverter,
55+
MessageHeaders messageHeaders,
56+
RqueueMessageIdGenerator messageIdGenerator) {
57+
super(messageTemplate, messageConverter, messageHeaders, messageIdGenerator);
4558
}
4659

4760
@SuppressWarnings("unchecked")
@@ -55,14 +68,16 @@ private <T> Mono<T> pushReactiveMessage(
5568
boolean isUnique,
5669
Function<RqueueMessage, Mono<T>> monoConverter) {
5770
QueueDetail queueDetail = EndpointRegistry.get(queueName);
58-
RqueueMessage rqueueMessage = builder.build(
59-
messageConverter,
60-
queueName,
61-
messageId,
62-
message,
63-
retryCount,
64-
delayInMilliSecs,
65-
messageHeaders);
71+
RqueueMessage rqueueMessage =
72+
builder.build(
73+
messageIdGenerator,
74+
messageConverter,
75+
queueName,
76+
messageId,
77+
message,
78+
retryCount,
79+
delayInMilliSecs,
80+
messageHeaders);
6681
try {
6782
Mono<Boolean> storeResult =
6883
(Mono<Boolean>) storeMessageMetadata(rqueueMessage, delayInMilliSecs, true, isUnique);
@@ -280,6 +295,7 @@ private interface MonoConverter<T> {
280295
@FunctionalInterface
281296
private interface MessageBuilder {
282297
RqueueMessage build(
298+
RqueueMessageIdGenerator messageIdGenerator,
283299
MessageConverter converter,
284300
String queueName,
285301
String messageId,

0 commit comments

Comments
 (0)