Skip to content

Commit c68143d

Browse files
authored
Add hard strict priority mode (#279)
* Add HARD_STRICT mode See details there #276 * format code via google-java-format (version 1.35.0) * fix comment * add fixes after review --------- Co-authored-by: igorjava2025 <->
1 parent 0e0fa2a commit c68143d

File tree

9 files changed

+559
-1
lines changed

9 files changed

+559
-1
lines changed

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2828
import com.github.sonus21.rqueue.core.middleware.Middleware;
2929
import com.github.sonus21.rqueue.core.support.MessageProcessor;
30+
import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties;
3031
import com.github.sonus21.rqueue.listener.RqueueMessageHandler;
3132
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3233
import com.github.sonus21.rqueue.models.enums.PriorityMode;
@@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory {
9293

9394
// Set priority mode for the pollers
9495
private PriorityMode priorityMode = PriorityMode.WEIGHTED;
96+
// Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller
97+
private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
9598

9699
/**
97100
* Whether all beans of spring application should be inspected to find methods annotated with
@@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() {
348351
if (messageHeaders != null) {
349352
messageListenerContainer.setMessageHeaders(messageHeaders);
350353
}
354+
if (hardStrictPriorityPollerProperties != null) {
355+
messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties);
356+
}
351357
return messageListenerContainer;
352358
}
353359

@@ -486,6 +492,15 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
486492
this.messageHeaders = messageHeaders;
487493
}
488494

495+
public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
496+
return this.hardStrictPriorityPollerProperties;
497+
}
498+
499+
public void setHardStrictPriorityPollerProperties(
500+
HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
501+
this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
502+
}
503+
489504
/**
490505
* Rqueue scans all beans to find method annotated with {@link RqueueListener}.
491506
*

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.github.sonus21.rqueue.models.MessageMoveResult;
2020
import java.util.List;
21+
import java.util.Optional;
2122
import org.springframework.data.redis.core.RedisTemplate;
2223
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
2324
import reactor.core.publisher.Flux;
@@ -92,4 +93,10 @@ Long scheduleMessage(
9293

9394
Flux<Long> addReactiveMessageWithDelay(
9495
String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage);
96+
97+
Optional<RqueueMessage> findFirstElementFromList(String name);
98+
99+
Optional<RqueueMessage> findFirstElementFromZset(String name);
100+
101+
Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name);
95102
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Arrays;
3131
import java.util.Collections;
3232
import java.util.List;
33+
import java.util.Optional;
3334
import java.util.Set;
3435
import lombok.extern.slf4j.Slf4j;
3536
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -343,4 +344,19 @@ public RedisTemplate<String, RqueueMessage> getTemplate() {
343344
public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
344345
return super.removeFromZset(zsetName, rqueueMessage);
345346
}
347+
348+
@Override
349+
public Optional<RqueueMessage> findFirstElementFromList(String name) {
350+
return readFromList(name, 0, 0).stream().findFirst();
351+
}
352+
353+
@Override
354+
public Optional<RqueueMessage> findFirstElementFromZset(String name) {
355+
return readFromZset(name, 0, 0).stream().findFirst();
356+
}
357+
358+
@Override
359+
public Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name) {
360+
return readFromZsetWithScore(name, 0, 0).stream().findFirst();
361+
}
346362
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.listener;
18+
19+
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
20+
import com.github.sonus21.rqueue.core.middleware.Middleware;
21+
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr;
22+
import com.github.sonus21.rqueue.utils.Constants;
23+
import com.github.sonus21.rqueue.utils.QueueThreadPool;
24+
import com.github.sonus21.rqueue.utils.TimeoutUtils;
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
31+
import org.slf4j.event.Level;
32+
import org.springframework.messaging.MessageHeaders;
33+
34+
/**
35+
* Use it only with priority queues. Message processing can be slow. The hard strict priority
36+
* algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller More details see in
37+
* <a href="https://github.com/sonus21/rqueue/issues/276">GitHub project issue</a>
38+
*/
39+
class HardStrictPriorityPoller extends RqueueMessagePoller {
40+
41+
private final RqueueBeanProvider rqueueBeanProvider;
42+
43+
private final Map<String, QueueDetail> queueNameToDetail;
44+
private final Map<String, QueueThreadPool> queueNameToThread;
45+
private final Map<String, Long> queueDeactivationTime = new HashMap<>();
46+
private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
47+
48+
HardStrictPriorityPoller(
49+
String groupName,
50+
final List<QueueDetail> queueDetails,
51+
final Map<String, QueueThreadPool> queueNameToThread,
52+
RqueueBeanProvider rqueueBeanProvider,
53+
QueueStateMgr queueStateMgr,
54+
List<Middleware> middlewares,
55+
long pollingInterval,
56+
long backoffTime,
57+
PostProcessingHandler postProcessingHandler,
58+
MessageHeaders messageHeaders,
59+
HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
60+
super(
61+
"HardStrict-" + groupName,
62+
rqueueBeanProvider,
63+
queueStateMgr,
64+
middlewares,
65+
pollingInterval,
66+
backoffTime,
67+
postProcessingHandler,
68+
messageHeaders);
69+
70+
this.rqueueBeanProvider = rqueueBeanProvider;
71+
// Sort queues by priority once during initialization
72+
List<QueueDetail> queueDetailList = new ArrayList<>(queueDetails);
73+
queueDetailList.sort(
74+
(o1, o2) ->
75+
o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)
76+
- o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY));
77+
78+
this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList());
79+
this.queueNameToDetail =
80+
queueDetailList.stream()
81+
.collect(Collectors.toMap(QueueDetail::getName, Function.identity()));
82+
this.queueNameToThread = queueNameToThread;
83+
this.hardStrictPriorityPollerProperties =
84+
hardStrictPriorityPollerProperties != null
85+
? hardStrictPriorityPollerProperties
86+
: new HardStrictPriorityPollerProperties();
87+
}
88+
89+
@Override
90+
public void start() {
91+
log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues);
92+
while (true) {
93+
if (shouldExit()) {
94+
return;
95+
}
96+
97+
boolean messageFoundInAnyQueue = false;
98+
99+
try {
100+
for (String queue : queues) {
101+
if (eligibleForPolling(queue) && !isDeactivated(queue)) {
102+
QueueThreadPool queueThreadPool = queueNameToThread.get(queue);
103+
QueueDetail queueDetail = queueNameToDetail.get(queue);
104+
poll(-1, queue, queueDetail, queueThreadPool);
105+
106+
if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) {
107+
TimeoutUtils.sleepLog(
108+
hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false);
109+
}
110+
111+
if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) {
112+
// break current cycle and start new cycle
113+
// it allow to process queue with the higher priority
114+
messageFoundInAnyQueue = true;
115+
break;
116+
}
117+
}
118+
}
119+
120+
// If no messages were found across all queues, sleep for the polling interval
121+
if (!messageFoundInAnyQueue) {
122+
TimeoutUtils.sleepLog(pollingInterval, false);
123+
}
124+
125+
} catch (Throwable e) {
126+
log(Level.ERROR, "Exception in the poller {}", e, e.getMessage());
127+
if (shouldExit()) {
128+
return;
129+
}
130+
TimeoutUtils.sleepLog(backoffTime, false);
131+
}
132+
}
133+
}
134+
135+
boolean existMessagesInCurrentQueueOrHigherPriorityQueue(
136+
String currentQueue, List<String> queues) {
137+
for (String queue : queues) {
138+
if (eligibleForPolling(queue) && !isDeactivated(queue)) {
139+
QueueDetail queueDetail = queueNameToDetail.get(queue);
140+
if (existAvailableMessagesForPoll(queueDetail)) {
141+
// the current or higher priority queue contains messages that need to be processed.
142+
return true;
143+
}
144+
}
145+
// we check all queues from the highest priority to current queue
146+
if (queue.equals(currentQueue)) {
147+
return false;
148+
}
149+
}
150+
// unexpected behavior, need more details if it occurs
151+
log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues);
152+
return false;
153+
}
154+
155+
protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) {
156+
boolean readyMessagesExists =
157+
rqueueBeanProvider
158+
.getRqueueMessageTemplate()
159+
.findFirstElementFromList(queueDetail.getQueueName())
160+
.isPresent();
161+
if (readyMessagesExists) {
162+
log(
163+
Level.TRACE,
164+
"readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.",
165+
null,
166+
queueDetail.getName());
167+
return true;
168+
}
169+
170+
// Only check delayed messages with score <= current time
171+
long currentTime = System.currentTimeMillis();
172+
boolean delayedMessagesExists =
173+
rqueueBeanProvider
174+
.getRqueueMessageTemplate()
175+
.findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName())
176+
.filter(element -> element.getScore() <= currentTime)
177+
.isPresent();
178+
179+
if (delayedMessagesExists) {
180+
log(
181+
Level.TRACE,
182+
"delayedMessages exists for scheduled queue '{}' and currentTime '{}',"
183+
+ " existAvailableMessagesForPoll = true.",
184+
null,
185+
queueDetail.getScheduledQueueName(),
186+
currentTime);
187+
return true;
188+
}
189+
190+
return false;
191+
}
192+
193+
private boolean isDeactivated(String queue) {
194+
Long deactivationTime = queueDeactivationTime.get(queue);
195+
if (deactivationTime == null) {
196+
return false;
197+
}
198+
if (System.currentTimeMillis() - deactivationTime > pollingInterval) {
199+
queueDeactivationTime.remove(queue);
200+
return false;
201+
}
202+
return true;
203+
}
204+
205+
@Override
206+
long getSemaphoreWaitTime() {
207+
return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null
208+
? hardStrictPriorityPollerProperties.getSemaphoreWaitTime()
209+
: 20L;
210+
}
211+
212+
@Override
213+
void deactivate(int index, String queue, DeactivateType deactivateType) {
214+
if (deactivateType == DeactivateType.POLL_FAILED) {
215+
// Pause in case of connection errors or polling failures
216+
TimeoutUtils.sleepLog(backoffTime, false);
217+
} else {
218+
// Mark deactivation time if the queue is empty
219+
queueDeactivationTime.put(queue, System.currentTimeMillis());
220+
}
221+
}
222+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.github.sonus21.rqueue.listener;
2+
3+
public class HardStrictPriorityPollerProperties {
4+
// set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime"
5+
// local load tests have correct strict priority algorithm work and good performance with them
6+
private Long afterPollSleepInterval = 30L;
7+
private Long semaphoreWaitTime = 15L;
8+
9+
public Long getAfterPollSleepInterval() {
10+
return afterPollSleepInterval;
11+
}
12+
13+
public void setAfterPollSleepInterval(Long afterPollSleepInterval) {
14+
validateTimeInterval(afterPollSleepInterval);
15+
this.afterPollSleepInterval = afterPollSleepInterval;
16+
}
17+
18+
public Long getSemaphoreWaitTime() {
19+
return this.semaphoreWaitTime;
20+
}
21+
22+
public void setSemaphoreWaitTime(Long semaphoreWaitTime) {
23+
validateTimeInterval(semaphoreWaitTime);
24+
this.semaphoreWaitTime = semaphoreWaitTime;
25+
}
26+
27+
private void validateTimeInterval(Long value) {
28+
if (value == null || value > 0) {
29+
return;
30+
}
31+
throw new IllegalArgumentException("Value must be positive: " + value);
32+
}
33+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer
106106
private int phase = Integer.MAX_VALUE;
107107
private PriorityMode priorityMode;
108108
private MessageHeaders messageHeaders;
109+
private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties;
109110

110111
public RqueueMessageListenerContainer(
111112
RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) {
@@ -515,6 +516,21 @@ protected void startGroup(String groupName, List<QueueDetail> queueDetails) {
515516
backOffTime,
516517
postProcessingHandler,
517518
getMessageHeaders()));
519+
} else if (getPriorityMode() == PriorityMode.HARD_STRICT) {
520+
future =
521+
taskExecutor.submit(
522+
new HardStrictPriorityPoller(
523+
StringUtils.groupName(groupName),
524+
queueDetails,
525+
queueThread,
526+
rqueueBeanProvider,
527+
queueStateMgr,
528+
getMiddleWares(),
529+
pollingInterval,
530+
backOffTime,
531+
postProcessingHandler,
532+
getMessageHeaders(),
533+
getHardStrictPriorityPollerProperties()));
518534
} else {
519535
future =
520536
taskExecutor.submit(
@@ -712,6 +728,15 @@ public void setMessageHeaders(MessageHeaders messageHeaders) {
712728
this.messageHeaders = messageHeaders;
713729
}
714730

731+
public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() {
732+
return this.hardStrictPriorityPollerProperties;
733+
}
734+
735+
public void setHardStrictPriorityPollerProperties(
736+
HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) {
737+
this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties;
738+
}
739+
715740
class QueueStateMgr {
716741

717742
Set<String> pausedQueues = ConcurrentHashMap.newKeySet();

0 commit comments

Comments
 (0)