Skip to content

Commit c6459d3

Browse files
igorjava2025sonus21
authored andcommitted
add fixes after review
(cherry picked from commit 21d6d86)
1 parent 3f5b0a0 commit c6459d3

5 files changed

Lines changed: 107 additions & 12 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.github.sonus21.rqueue.models.MessageMoveResult;
2020
import java.util.List;
21+
import java.util.Optional;
2122
import org.springframework.data.redis.core.RedisTemplate;
2223
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
2324
import reactor.core.publisher.Flux;
@@ -92,4 +93,10 @@ Long scheduleMessage(
9293

9394
Flux<Long> addReactiveMessageWithDelay(
9495
String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage);
96+
97+
Optional<RqueueMessage> findFirstElementFromList(String name);
98+
99+
Optional<RqueueMessage> findFirstElementFromZset(String name);
100+
101+
Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name);
95102
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Arrays;
3131
import java.util.Collections;
3232
import java.util.List;
33+
import java.util.Optional;
3334
import java.util.Set;
3435
import lombok.extern.slf4j.Slf4j;
3536
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
@@ -343,4 +344,19 @@ public RedisTemplate<String, RqueueMessage> getTemplate() {
343344
public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) {
344345
return super.removeFromZset(zsetName, rqueueMessage);
345346
}
347+
348+
@Override
349+
public Optional<RqueueMessage> findFirstElementFromList(String name) {
350+
return readFromList(name, 0, 0).stream().findFirst();
351+
}
352+
353+
@Override
354+
public Optional<RqueueMessage> findFirstElementFromZset(String name) {
355+
return readFromZset(name, 0, 0).stream().findFirst();
356+
}
357+
358+
@Override
359+
public Optional<TypedTuple<RqueueMessage>> findFirstElementFromZsetWithScore(String name) {
360+
return readFromZsetWithScore(name, 0, 0).stream().findFirst();
361+
}
346362
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -151,23 +151,37 @@ boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, Li
151151
}
152152

153153
protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) {
154-
List<?> readyMessages = rqueueBeanProvider
155-
.getRqueueMessageTemplate()
156-
.readFromList(queueDetail.getQueueName(), 0, 0);
157-
158-
if (readyMessages != null && !readyMessages.isEmpty()) {
159-
log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
154+
boolean readyMessagesExists =
155+
rqueueBeanProvider
156+
.getRqueueMessageTemplate()
157+
.findFirstElementFromList(queueDetail.getQueueName())
158+
.isPresent();
159+
if (readyMessagesExists) {
160+
log(
161+
Level.TRACE,
162+
"readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.",
163+
null,
164+
queueDetail.getName());
160165
return true;
161166
}
162167

163168
// Only check delayed messages with score <= current time
164169
long currentTime = System.currentTimeMillis();
165-
List<?> delayedMessages = rqueueBeanProvider
166-
.getRqueueMessageTemplate()
167-
.readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime);
168-
169-
if (delayedMessages != null && !delayedMessages.isEmpty()) {
170-
log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName());
170+
boolean delayedMessagesExists =
171+
rqueueBeanProvider
172+
.getRqueueMessageTemplate()
173+
.findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName())
174+
.filter(element -> element.getScore() <= currentTime)
175+
.isPresent();
176+
177+
if (delayedMessagesExists) {
178+
log(
179+
Level.TRACE,
180+
"delayedMessages exists for scheduled queue '{}' and currentTime '{}',"
181+
+ " existAvailableMessagesForPoll = true.",
182+
null,
183+
queueDetail.getScheduledQueueName(),
184+
currentTime);
171185
return true;
172186
}
173187

rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public Long getAfterPollSleepInterval() {
1111
}
1212

1313
public void setAfterPollSleepInterval(Long afterPollSleepInterval) {
14+
validateTimeInterval(afterPollSleepInterval);
1415
this.afterPollSleepInterval = afterPollSleepInterval;
1516
}
1617

@@ -19,6 +20,14 @@ public Long getSemaphoreWaitTime() {
1920
}
2021

2122
public void setSemaphoreWaitTime(Long semaphoreWaitTime) {
23+
validateTimeInterval(semaphoreWaitTime);
2224
this.semaphoreWaitTime = semaphoreWaitTime;
2325
}
26+
27+
private void validateTimeInterval(Long value) {
28+
if (value == null || value > 0) {
29+
return;
30+
}
31+
throw new IllegalArgumentException("Value must be positive: " + value);
32+
}
2433
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.github.sonus21.rqueue.listener;
2+
3+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
4+
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
6+
import java.util.stream.LongStream;
7+
import org.junit.jupiter.params.ParameterizedTest;
8+
import org.junit.jupiter.params.provider.MethodSource;
9+
import org.junit.jupiter.params.provider.NullSource;
10+
11+
class HardStrictPriorityPollerPropertiesTest {
12+
13+
private final HardStrictPriorityPollerProperties properties =
14+
new HardStrictPriorityPollerProperties();
15+
16+
static LongStream invalidValues() {
17+
return LongStream.of(0L, -1L, -100L, Long.MIN_VALUE);
18+
}
19+
20+
static LongStream validValues() {
21+
return LongStream.of(1L, Long.MAX_VALUE);
22+
}
23+
24+
@ParameterizedTest
25+
@MethodSource("invalidValues")
26+
void setAfterPollSleepIntervalInvalidValues(Long value) {
27+
assertThrows(IllegalArgumentException.class, () -> properties.setAfterPollSleepInterval(value));
28+
}
29+
30+
@ParameterizedTest
31+
@NullSource
32+
@MethodSource("validValues")
33+
void setAfterPollSleepIntervalValidValues(Long value) {
34+
assertDoesNotThrow(() -> properties.setAfterPollSleepInterval(value));
35+
}
36+
37+
@ParameterizedTest
38+
@MethodSource("invalidValues")
39+
void setSemaphoreWaitTimeInvalidValues(Long value) {
40+
assertThrows(IllegalArgumentException.class, () -> properties.setSemaphoreWaitTime(value));
41+
}
42+
43+
@ParameterizedTest
44+
@NullSource
45+
@MethodSource("validValues")
46+
void setSemaphoreWaitTimeValidValues(Long value) {
47+
assertDoesNotThrow(() -> properties.setSemaphoreWaitTime(value));
48+
}
49+
}

0 commit comments

Comments
 (0)