Skip to content

Commit c046782

Browse files
authored
Fixing bug #170 #169 (#171)
Fixing bug #170 #169
1 parent a433394 commit c046782

File tree

8 files changed

+140
-28
lines changed

8 files changed

+140
-28
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
# [Rqueue] New and Notable Changes
2+
### [2.11] - 07-Nov-2022
3+
4+
### Fixes
5+
6+
* Message mover unreliability, scheduled message were not getting consumed once redis connection error occurs
7+
* Upgraded Jquery version
8+
29

310
### [2.10.2] - 16-Jul-2022
411

@@ -295,4 +302,6 @@ Fixes:
295302

296303
[2.10.2]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.10.2-RELEASE
297304

305+
[2.11]: https://repo1.maven.org/maven2/com/github/sonus21/rqueue-core/2.11-RELEASE
306+
298307
[122]: https://github.com/sonus21/rqueue/issues/122

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
7171
* Add dependency
7272
* Gradle
7373
```groovy
74-
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.10.2-RELEASE'
74+
implementation 'com.github.sonus21:rqueue-spring-boot-starter:2.11-RELEASE'
7575
```
7676
* Maven
7777
```xml
7878
<dependency>
7979
<groupId>com.github.sonus21</groupId>
8080
<artifactId>rqueue-spring-boot-starter</artifactId>
81-
<version>2.10.2-RELEASE</version>
81+
<version>2.11-RELEASE</version>
8282
</dependency>
8383
```
8484
@@ -91,14 +91,14 @@ Release Version: [Maven central](https://search.maven.org/search?q=g:com.github.
9191
* Add Dependency
9292
* Gradle
9393
```groovy
94-
implementation 'com.github.sonus21:rqueue-spring:2.10.2-RELEASE'
94+
implementation 'com.github.sonus21:rqueue-spring:2.11-RELEASE'
9595
```
9696
* Maven
9797
```xml
9898
<dependency>
9999
<groupId>com.github.sonus21</groupId>
100100
<artifactId>rqueue-spring</artifactId>
101-
<version>2.10.2-RELEASE</version>
101+
<version>2.11-RELEASE</version>
102102
</dependency>
103103
```
104104

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ ext {
7070

7171
subprojects {
7272
group = 'com.github.sonus21'
73-
version = '2.10.2-RELEASE'
73+
version = '2.11-RELEASE'
7474

7575
dependencies {
7676
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueSchedulerConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public class RqueueSchedulerConfig {
6565
private int processingMessageThreadPoolSize;
6666

6767
// How frequently messages should be moved from scheduled queues to source queue
68-
@Value("${rqueue.scheduler.scheduled.message.time.interval:5000}")
68+
@Value("${rqueue.scheduler.scheduled.message.time.interval:2000}")
6969
private long scheduledMessageTimeIntervalInMilli;
70+
71+
72+
// Maximum delay for message mover task due to failure
73+
@Value("${rqueue.scheduler.max.message.mover.delay:60000}")
74+
private long maxMessageMoverDelay;
7075
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/MessageScheduler.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public abstract class MessageScheduler
7373
private RedisTemplate<String, Long> redisTemplate;
7474

7575
private Map<String, QueueScheduler> queueSchedulers;
76+
private Map<String, Integer> errorCount;
7677

7778
protected abstract Logger getLogger();
7879

@@ -212,6 +213,7 @@ protected void initialize() {
212213
channelNameToQueueName = new ConcurrentHashMap<>(queueNames.size());
213214
queueNameToLastMessageScheduleTime = new ConcurrentHashMap<>(queueNames.size());
214215
queueSchedulers = new ConcurrentHashMap<>(queueNames.size());
216+
errorCount = new ConcurrentHashMap<>(queueNames.size());
215217
createScheduler(queueNames.size());
216218
if (isRedisEnabled()) {
217219
messageSchedulerListener = new MessageSchedulerListener();
@@ -362,26 +364,50 @@ private class MessageMoverTask implements Runnable {
362364
private final String zsetName;
363365
private final boolean processingQueue;
364366

367+
private long getNextScheduleTimeInternal(Long value, Exception e) {
368+
int errCount = 0;
369+
long nextTime;
370+
if (null != e) {
371+
errCount = errorCount.getOrDefault(queueName, 0) + 1;
372+
if (errCount % 3 == 0) {
373+
getLogger().error("Message mover task is failing continuously queue: {}", name, e);
374+
}
375+
long delay = (long) (100 * Math.pow(1.5, errCount));
376+
delay = Math.min(delay, rqueueSchedulerConfig.getMaxMessageMoverDelay());
377+
nextTime = System.currentTimeMillis() + delay;
378+
}else{
379+
nextTime = getNextScheduleTime(queueName, value);
380+
}
381+
errorCount.put(queueName, errCount);
382+
return nextTime;
383+
}
384+
365385
@Override
366386
public void run() {
367387
getLogger().debug("Running {}", this);
388+
Long value = null;
389+
Exception e = null;
368390
try {
369391
if (isQueueActive(name)) {
370392
long currentTime = System.currentTimeMillis();
371-
Long value =
393+
value =
372394
defaultScriptExecutor.execute(
373395
redisScript,
374396
Arrays.asList(queueName, zsetName),
375397
currentTime,
376398
MAX_MESSAGES,
377399
processingQueue ? 1 : 0);
378-
long nextExecutionTime = getNextScheduleTime(name, value);
400+
}
401+
} catch (RedisSystemException ex) {
402+
e = ex;
403+
} catch (Exception ex) {
404+
e = ex;
405+
getLogger().warn("Task execution failed for the queue: {}", getName(), e);
406+
} finally {
407+
if (isQueueActive(name)) {
408+
long nextExecutionTime = getNextScheduleTimeInternal(value, e);
379409
schedule(name, nextExecutionTime, true);
380410
}
381-
} catch (RedisSystemException e) {
382-
// no op
383-
} catch (Exception e) {
384-
getLogger().warn("Task execution failed for the queue: {}", name, e);
385411
}
386412
}
387413

rqueue-core/src/main/resources/public/rqueue/vendor/jquery/jquery.min.js

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rqueue-core/src/main/resources/public/rqueue/vendor/jquery/jquery.min.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rqueue-core/src/test/java/com/github/sonus21/rqueue/core/ScheduledQueueMessageSchedulerTest.java

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,13 @@
5252
import org.mockito.MockedStatic;
5353
import org.mockito.Mockito;
5454
import org.mockito.MockitoAnnotations;
55+
import org.springframework.data.redis.ClusterRedirectException;
56+
import org.springframework.data.redis.RedisConnectionFailureException;
57+
import org.springframework.data.redis.RedisSystemException;
58+
import org.springframework.data.redis.TooManyClusterRedirectionsException;
5559
import org.springframework.data.redis.connection.DefaultMessage;
5660
import org.springframework.data.redis.connection.MessageListener;
61+
import org.springframework.data.redis.connection.RedisInvalidSubscriptionException;
5762
import org.springframework.data.redis.core.RedisCallback;
5863
import org.springframework.data.redis.core.RedisTemplate;
5964
import org.springframework.data.redis.listener.ChannelTopic;
@@ -62,15 +67,20 @@
6267
@SuppressWarnings("unchecked")
6368
class ScheduledQueueMessageSchedulerTest extends TestBase {
6469

65-
@InjectMocks private final TestMessageScheduler messageScheduler = new TestMessageScheduler();
70+
@InjectMocks
71+
private final TestMessageScheduler messageScheduler = new TestMessageScheduler();
6672
private final String slowQueue = "slow-queue";
6773
private final String fastQueue = "fast-queue";
6874
private final QueueDetail slowQueueDetail = TestUtils.createQueueDetail(slowQueue);
6975
private final QueueDetail fastQueueDetail = TestUtils.createQueueDetail(fastQueue);
70-
@Mock private RqueueSchedulerConfig rqueueSchedulerConfig;
71-
@Mock private RqueueConfig rqueueConfig;
72-
@Mock private RedisTemplate<String, Long> redisTemplate;
73-
@Mock private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
76+
@Mock
77+
private RqueueSchedulerConfig rqueueSchedulerConfig;
78+
@Mock
79+
private RqueueConfig rqueueConfig;
80+
@Mock
81+
private RedisTemplate<String, Long> redisTemplate;
82+
@Mock
83+
private RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory;
7484

7585
@BeforeEach
7686
public void init() {
@@ -230,10 +240,10 @@ void startSubmitsTaskAndThatGetsExecuted() throws Exception {
230240
doReturn(1000L).when(rqueueSchedulerConfig).getScheduledMessageTimeIntervalInMilli();
231241
AtomicInteger counter = new AtomicInteger(0);
232242
doAnswer(
233-
invocation -> {
234-
counter.incrementAndGet();
235-
return null;
236-
})
243+
invocation -> {
244+
counter.incrementAndGet();
245+
return null;
246+
})
237247
.when(redisTemplate)
238248
.execute(any(RedisCallback.class));
239249
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
@@ -251,10 +261,10 @@ void onCompletionOfExistingTaskNewTaskShouldBeSubmitted() throws Exception {
251261
doReturn(1000L).when(rqueueSchedulerConfig).getScheduledMessageTimeIntervalInMilli();
252262
AtomicInteger counter = new AtomicInteger(0);
253263
doAnswer(
254-
invocation -> {
255-
counter.incrementAndGet();
256-
return null;
257-
})
264+
invocation -> {
265+
counter.incrementAndGet();
266+
return null;
267+
})
258268
.when(redisTemplate)
259269
.execute(any(RedisCallback.class));
260270
TestTaskScheduler scheduler = new TestTaskScheduler();
@@ -269,6 +279,68 @@ void onCompletionOfExistingTaskNewTaskShouldBeSubmitted() throws Exception {
269279
}
270280
}
271281

282+
@Test
283+
void taskShouldBeScheduledOnFailure() throws Exception {
284+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
285+
doReturn(1).when(rqueueSchedulerConfig).getScheduledMessageThreadPoolSize();
286+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
287+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
288+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
289+
doReturn(10000L).when(rqueueSchedulerConfig).getMaxMessageMoverDelay();
290+
AtomicInteger counter = new AtomicInteger(0);
291+
doAnswer(
292+
invocation -> {
293+
counter.incrementAndGet();
294+
throw new RedisSystemException("Something is not correct", new NullPointerException("oops!"));
295+
})
296+
.when(redisTemplate)
297+
.execute(any(RedisCallback.class));
298+
TestTaskScheduler scheduler = new TestTaskScheduler();
299+
threadUtils
300+
.when(() -> ThreadUtils.createTaskScheduler(1, "scheduledQueueMsgScheduler-", 60))
301+
.thenReturn(scheduler);
302+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
303+
waitFor(() -> counter.get() >= 1, "scripts are getting executed");
304+
sleep(10);
305+
messageScheduler.destroy();
306+
assertTrue(scheduler.submittedTasks() >= 2);
307+
}
308+
}
309+
310+
@Test
311+
void continuousTaskFailTask() throws Exception{
312+
try (MockedStatic<ThreadUtils> threadUtils = Mockito.mockStatic(ThreadUtils.class)) {
313+
doReturn(1).when(rqueueSchedulerConfig).getScheduledMessageThreadPoolSize();
314+
doReturn(true).when(rqueueSchedulerConfig).isAutoStart();
315+
doReturn(true).when(rqueueSchedulerConfig).isEnabled();
316+
doReturn(true).when(rqueueSchedulerConfig).isRedisEnabled();
317+
doReturn(100L).when(rqueueSchedulerConfig).getMaxMessageMoverDelay();
318+
AtomicInteger counter = new AtomicInteger(0);
319+
doAnswer(
320+
invocation -> {
321+
int count = counter.incrementAndGet();
322+
if(count % 3 == 0){
323+
throw new RedisSystemException("Something is not correct", new NullPointerException("oops!"));
324+
}
325+
if(count % 3 == 1){
326+
throw new RedisConnectionFailureException("Unknown host");
327+
}
328+
throw new ClusterRedirectException(3, "localhost", 9004, new TooManyClusterRedirectionsException("too many redirects") );
329+
})
330+
.when(redisTemplate)
331+
.execute(any(RedisCallback.class));
332+
TestTaskScheduler scheduler = new TestTaskScheduler();
333+
threadUtils
334+
.when(() -> ThreadUtils.createTaskScheduler(1, "scheduledQueueMsgScheduler-", 60))
335+
.thenReturn(scheduler);
336+
messageScheduler.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
337+
waitFor(() -> counter.get() >= 10, "scripts are getting executed");
338+
sleep(10);
339+
messageScheduler.destroy();
340+
assertTrue(scheduler.submittedTasks() >= 11);
341+
}
342+
}
343+
272344
@Test
273345
void onMessageListenerTest() throws Exception {
274346
doReturn(true).when(rqueueSchedulerConfig).isEnabled();

0 commit comments

Comments
 (0)