Skip to content

Commit 6278b97

Browse files
authored
get pending/schedule/processing message count (#71)
1 parent 6469f9f commit 6278b97

4 files changed

Lines changed: 121 additions & 6 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
2929
import com.github.sonus21.rqueue.dao.RqueueStringDao;
3030
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
31+
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
3132
import com.github.sonus21.rqueue.utils.RedisUtils;
3233
import com.github.sonus21.rqueue.web.view.DateTimeFunction;
3334
import com.github.sonus21.rqueue.web.view.DeadLetterQueuesFunction;
@@ -186,4 +187,10 @@ public JtwigViewResolver rqueueViewResolver() {
186187
viewResolver.setSuffix(".html");
187188
return viewResolver;
188189
}
190+
191+
@Bean
192+
public RqueueQueueMetrics rqueueQueueMetrics(
193+
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
194+
return new RqueueQueueMetrics(stringRqueueRedisTemplate);
195+
}
189196
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2021 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
17+
package com.github.sonus21.rqueue.metrics;
18+
19+
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
20+
import com.github.sonus21.rqueue.core.EndpointRegistry;
21+
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
22+
import com.github.sonus21.rqueue.listener.QueueDetail;
23+
24+
/**
25+
* This class reports queue message counter.
26+
*
27+
* <p>Count can be sent to some monitoring tool like Prometheus, influx db etc
28+
*/
29+
public class RqueueQueueMetrics {
30+
31+
private final RqueueRedisTemplate<String> redisTemplate;
32+
33+
public RqueueQueueMetrics(RqueueRedisTemplate<String> redisTemplate) {
34+
this.redisTemplate = redisTemplate;
35+
}
36+
37+
/**
38+
* Get number of messages waiting for consumption
39+
*
40+
* @param queue queue name
41+
* @return -1 if queue is not registered otherwise message count
42+
*/
43+
public long getPendingMessageCount(String queue) {
44+
try {
45+
QueueDetail queueDetail = EndpointRegistry.get(queue);
46+
return redisTemplate.getListSize(queueDetail.getQueueName());
47+
} catch (QueueDoesNotExist e) {
48+
return -1;
49+
}
50+
}
51+
52+
/**
53+
* Get number of messages waiting in delayed queue, these messages would move to pending queue as
54+
* soon as the scheduled time is reach.
55+
*
56+
* @param queue queue name
57+
* @return -1 if queue is not registered otherwise message count
58+
*/
59+
public long getScheduledMessageCount(String queue) {
60+
try {
61+
QueueDetail queueDetail = EndpointRegistry.get(queue);
62+
return redisTemplate.getZsetSize(queueDetail.getDelayedQueueName());
63+
} catch (QueueDoesNotExist e) {
64+
return -1;
65+
}
66+
}
67+
68+
/**
69+
* Get number of messages those are currently being processed
70+
*
71+
* @param queue queue name
72+
* @return -1 if queue is not registered otherwise message count
73+
*/
74+
public long getProcessingMessageCount(String queue) {
75+
try {
76+
QueueDetail queueDetail = EndpointRegistry.get(queue);
77+
return redisTemplate.getZsetSize(queueDetail.getProcessingQueueName());
78+
} catch (QueueDoesNotExist e) {
79+
return -1;
80+
}
81+
}
82+
}

rqueue-spring-boot-starter/src/test/java/com/github/sonus21/rqueue/spring/boot/tests/integration/BootMetricsTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,9 @@ void metricStatus() throws TimedOutException {
5757
void countStatusTest() throws TimedOutException {
5858
this.verifyCountStatus();
5959
}
60+
61+
@Test
62+
void messageCount() throws TimedOutException {
63+
verifyQueueMessageCount();
64+
}
6065
}

rqueue-spring-common-test/src/main/java/com/github/sonus21/rqueue/test/tests/MetricTest.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,26 @@
1919
import static com.github.sonus21.rqueue.utils.TimeoutUtils.waitFor;
2020
import static com.google.common.collect.Lists.newArrayList;
2121

22+
import com.github.sonus21.rqueue.core.EndpointRegistry;
2223
import com.github.sonus21.rqueue.exception.TimedOutException;
24+
import com.github.sonus21.rqueue.listener.QueueDetail;
25+
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
2326
import com.github.sonus21.rqueue.test.common.SpringTestBase;
2427
import com.github.sonus21.rqueue.test.dto.Email;
2528
import com.github.sonus21.rqueue.test.dto.Job;
2629
import com.github.sonus21.rqueue.test.dto.Notification;
2730
import com.github.sonus21.rqueue.utils.Constants;
31+
import com.github.sonus21.rqueue.utils.TimeoutUtils;
2832
import io.micrometer.core.instrument.MeterRegistry;
2933
import java.time.Instant;
3034
import org.springframework.beans.factory.annotation.Autowired;
3135

3236
public abstract class MetricTest extends SpringTestBase {
33-
@Autowired protected MeterRegistry meterRegistry;
37+
38+
@Autowired
39+
protected MeterRegistry meterRegistry;
40+
@Autowired
41+
protected RqueueQueueMetrics rqueueQueueMetrics;
3442

3543
protected void verifyDelayedQueueStatus() throws TimedOutException {
3644
long maxDelay = 0;
@@ -141,12 +149,25 @@ protected void verifyCountStatus() throws TimedOutException {
141149
waitFor(
142150
() ->
143151
meterRegistry
144-
.get("failure.count")
145-
.tags("rqueue", "test")
146-
.tags("queue", emailQueue)
147-
.counter()
148-
.count()
152+
.get("failure.count")
153+
.tags("rqueue", "test")
154+
.tags("queue", emailQueue)
155+
.counter()
156+
.count()
149157
== 0,
150158
"stats collection");
151159
}
160+
161+
protected void verifyQueueMessageCount() throws TimedOutException {
162+
QueueDetail queueDetail = EndpointRegistry.get(notificationQueue);
163+
enqueue(queueDetail.getQueueName(), i -> Notification.newInstance(), 1000);
164+
enqueueIn(queueDetail.getDelayedQueueName(), i -> Notification.newInstance(), i -> 30_000L,
165+
100);
166+
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getProcessingMessageCount(notificationQueue) > 0,
167+
"at least one message in processing");
168+
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getScheduledMessageCount(notificationQueue) > 0,
169+
"at least one message in scheduled queue");
170+
TimeoutUtils.waitFor(() -> rqueueQueueMetrics.getPendingMessageCount(notificationQueue) > 0,
171+
"at least one message in pending queue");
172+
}
152173
}

0 commit comments

Comments
 (0)