Skip to content

Commit 9b2b043

Browse files
authored
NPE fix #172 (#173)
* NPE fix #172 * released
1 parent c046782 commit 9b2b043

File tree

6 files changed

+205
-7
lines changed

6 files changed

+205
-7
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# [Rqueue] New and Notable Changes
2+
### [2.11.1] - 18-Nov-2022
3+
### Fixes
4+
Bug introduced by 2.11
5+
26
### [2.11] - 07-Nov-2022
37

48
### Fixes

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
7878
<dependency>
7979
<groupId>com.github.sonus21</groupId>
8080
<artifactId>rqueue-spring-boot-starter</artifactId>
81-
<version>2.11-RELEASE</version>
81+
<version>2.11.1-RELEASE</version>
8282
</dependency>
8383
```
8484
@@ -91,14 +91,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
9191
* Add Dependency
9292
* Gradle
9393
```groovy
94-
implementation 'com.github.sonus21:rqueue-spring:2.11-RELEASE'
94+
implementation 'com.github.sonus21:rqueue-spring:2.11.1-RELEASE'
9595
```
9696
* Maven
9797
```xml
9898
<dependency>
9999
<groupId>com.github.sonus21</groupId>
100100
<artifactId>rqueue-spring</artifactId>
101-
<version>2.11-RELEASE</version>
101+
<version>2.11.1-RELEASE</version>
102102
</dependency>
103103
```
104104

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ext {
7070

7171
subprojects {
7272
group = 'com.github.sonus21'
73-
version = '2.11-RELEASE'
73+
version = '2.11.1-RELEASE'
7474

7575
dependencies {
7676
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,17 +368,17 @@ private long getNextScheduleTimeInternal(Long value, Exception e) {
368368
int errCount = 0;
369369
long nextTime;
370370
if (null != e) {
371-
errCount = errorCount.getOrDefault(queueName, 0) + 1;
371+
errCount = errorCount.getOrDefault(name, 0) + 1;
372372
if (errCount % 3 == 0) {
373373
getLogger().error("Message mover task is failing continuously queue: {}", name, e);
374374
}
375375
long delay = (long) (100 * Math.pow(1.5, errCount));
376376
delay = Math.min(delay, rqueueSchedulerConfig.getMaxMessageMoverDelay());
377377
nextTime = System.currentTimeMillis() + delay;
378378
}else{
379-
nextTime = getNextScheduleTime(queueName, value);
379+
nextTime = getNextScheduleTime(name, value);
380380
}
381-
errorCount.put(queueName, errCount);
381+
errorCount.put(name, errCount);
382382
return nextTime;
383383
}
384384

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.github.sonus21.rqueue.core;
2+
3+
import static com.github.sonus21.rqueue.utils.TimeoutUtils.sleep;
4+
import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.Mockito.doAnswer;
8+
import static org.mockito.Mockito.doReturn;
9+
10+
import com.github.sonus21.TestBase;
11+
import com.github.sonus21.rqueue.CoreUnitTest;
12+
import com.github.sonus21.rqueue.config.RqueueConfig;
13+
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
14+
import com.github.sonus21.rqueue.core.ProcessingQueueMessageSchedulerTest.ProcessingQTestMessageScheduler;
15+
import com.github.sonus21.rqueue.listener.QueueDetail;
16+
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
17+
import com.github.sonus21.rqueue.utils.TestUtils;
18+
import com.github.sonus21.rqueue.utils.ThreadUtils;
19+
import com.github.sonus21.test.TestTaskScheduler;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.mockito.InjectMocks;
24+
import org.mockito.Mock;
25+
import org.mockito.MockedStatic;
26+
import org.mockito.Mockito;
27+
import org.mockito.MockitoAnnotations;
28+
import org.springframework.data.redis.ClusterRedirectException;
29+
import org.springframework.data.redis.RedisConnectionFailureException;
30+
import org.springframework.data.redis.RedisSystemException;
31+
import org.springframework.data.redis.TooManyClusterRedirectionsException;
32+
import org.springframework.data.redis.core.RedisCallback;
33+
import org.springframework.data.redis.core.RedisTemplate;
34+
35+
@CoreUnitTest
36+
@SuppressWarnings("unchecked")
37+
38+
class MessageScheduleTest extends TestBase {
39+
@InjectMocks
40+
private final ProcessingQTestMessageScheduler messageScheduler = new ProcessingQTestMessageScheduler();
41+
private final String queue = "queue";
42+
private final QueueDetail queueDetail = TestUtils.createQueueDetail(queue);
43+
@Mock
44+
private RqueueSchedulerConfig rqueueSchedulerConfig;
45+
@Mock
46+
private RqueueConfig rqueueConfig;
47+
@Mock
48+
private RedisTemplate<String, Long> redisTemplate;
49+
@Mock
50+
private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
51+
52+
@BeforeEach
53+
public void init() {
54+
MockitoAnnotations.openMocks(this);
55+
EndpointRegistry.delete();
56+
EndpointRegistry.register(queueDetail);
57+
}
58+
59+
@Test
60+
void onCompletionOfExistingTaskNewTaskShouldBeSubmitted() throws Exception {
61+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
62+
doReturn(1).when(rqueueSchedulerConfig).getProcessingMessageThreadPoolSize();
63+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
64+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
65+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
66+
AtomicInteger counter = new AtomicInteger(0);
67+
doAnswer(
68+
invocation -> {
69+
counter.incrementAndGet();
70+
return null;
71+
})
72+
.when(redisTemplate)
73+
.execute(any(RedisCallback.class));
74+
TestTaskScheduler scheduler = new TestTaskScheduler();
75+
threadUtils
76+
.when(() -> ThreadUtils.createTaskScheduler(1, "processingQueueMsgScheduler-", 60))
77+
.thenReturn(scheduler);
78+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
79+
waitFor(() -> counter.get() >= 1, "scripts are getting executed");
80+
sleep(10);
81+
messageScheduler.destroy();
82+
assertTrue(scheduler.submittedTasks() >= 2);
83+
}
84+
}
85+
86+
87+
@Test
88+
void multipleTasksAreRunningForTheSameQueue() throws Exception {
89+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
90+
doReturn(1).when(rqueueSchedulerConfig).getProcessingMessageThreadPoolSize();
91+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
92+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
93+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
94+
AtomicInteger counter = new AtomicInteger(0);
95+
doAnswer(
96+
invocation -> {
97+
counter.incrementAndGet();
98+
return System.currentTimeMillis();
99+
})
100+
.when(redisTemplate)
101+
.execute(any(RedisCallback.class));
102+
TestTaskScheduler scheduler = new TestTaskScheduler();
103+
threadUtils
104+
.when(() -> ThreadUtils.createTaskScheduler(1, "processingQueueMsgScheduler-", 60))
105+
.thenReturn(scheduler);
106+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
107+
waitFor(() -> counter.get() >= 2, "scripts are getting executed");
108+
sleep(10);
109+
messageScheduler.destroy();
110+
assertTrue(scheduler.submittedTasks() >= 3);
111+
}
112+
}
113+
114+
@Test
115+
void taskShouldBeScheduledOnFailure() throws Exception {
116+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
117+
doReturn(1).when(rqueueSchedulerConfig).getProcessingMessageThreadPoolSize();
118+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
119+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
120+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
121+
doReturn(10000L).when(rqueueSchedulerConfig).getMaxMessageMoverDelay();
122+
AtomicInteger counter = new AtomicInteger(0);
123+
doAnswer(
124+
invocation -> {
125+
counter.incrementAndGet();
126+
throw new RedisSystemException("Something is not correct", new NullPointerException("oops!"));
127+
})
128+
.when(redisTemplate)
129+
.execute(any(RedisCallback.class));
130+
TestTaskScheduler scheduler = new TestTaskScheduler();
131+
threadUtils
132+
.when(() -> ThreadUtils.createTaskScheduler(1, "processingQueueMsgScheduler-", 60))
133+
.thenReturn(scheduler);
134+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
135+
waitFor(() -> counter.get() >= 2, "scripts are getting executed");
136+
sleep(10);
137+
messageScheduler.destroy();
138+
assertTrue(scheduler.submittedTasks() >= 3);
139+
}
140+
}
141+
142+
@Test
143+
void continuousTaskFailTask() throws Exception{
144+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
145+
doReturn(1).when(rqueueSchedulerConfig).getProcessingMessageThreadPoolSize();
146+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
147+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
148+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
149+
doReturn(100L).when(rqueueSchedulerConfig).getMaxMessageMoverDelay();
150+
AtomicInteger counter = new AtomicInteger(0);
151+
doAnswer(
152+
invocation -> {
153+
int count = counter.incrementAndGet();
154+
if(count % 3 == 0){
155+
throw new RedisSystemException("Something is not correct", new NullPointerException("oops!"));
156+
}
157+
if(count % 3 == 1){
158+
throw new RedisConnectionFailureException("Unknown host");
159+
}
160+
throw new ClusterRedirectException(3, "localhost", 9004, new TooManyClusterRedirectionsException("too many redirects") );
161+
})
162+
.when(redisTemplate)
163+
.execute(any(RedisCallback.class));
164+
TestTaskScheduler scheduler = new TestTaskScheduler();
165+
threadUtils
166+
.when(() -> ThreadUtils.createTaskScheduler(1, "processingQueueMsgScheduler-", 60))
167+
.thenReturn(scheduler);
168+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
169+
waitFor(() -> counter.get() >= 5, "scripts are getting executed");
170+
sleep(10);
171+
messageScheduler.destroy();
172+
assertTrue(scheduler.submittedTasks() >= 6);
173+
}
174+
}
175+
176+
}

rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ProcessingQueueMessageSchedulerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.github.sonus21.rqueue.config.RqueueSchedulerConfig;
2727
import com.github.sonus21.rqueue.listener.QueueDetail;
2828
import com.github.sonus21.rqueue.utils.TestUtils;
29+
import java.util.List;
30+
import java.util.Vector;
2931
import org.junit.jupiter.api.BeforeEach;
3032
import org.junit.jupiter.api.Test;
3133
import org.mockito.InjectMocks;
@@ -87,4 +89,20 @@ void getNextScheduleTimeFastQueue() {
8789
assertEquals(
8890
currentTime + 1000L, messageScheduler.getNextScheduleTime(fastQueue, currentTime + 1000L));
8991
}
92+
93+
94+
static class ProcessingQTestMessageScheduler extends ProcessingQueueMessageScheduler {
95+
96+
List<Boolean> scheduleList;
97+
98+
ProcessingQTestMessageScheduler() {
99+
this.scheduleList = new Vector<>();
100+
}
101+
102+
@Override
103+
protected synchronized void schedule(String queueName, Long startTime, boolean forceSchedule) {
104+
super.schedule(queueName, startTime, forceSchedule);
105+
this.scheduleList.add(forceSchedule);
106+
}
107+
}
90108
}

0 commit comments

Comments
 (0)