Skip to content

Commit cd71554

Browse files
committed
Move Redis-only @beans from rqueue-core to rqueue-redis
Four Redis-shaped @bean factories that lived in RqueueListenerBaseConfig (rqueue-core) move into RqueueRedisListenerConfig (rqueue-redis): - rqueueRedisLongTemplate (RedisTemplate<String,Long>) - rqueueRedisListenerContainerFactory - stringRqueueRedisTemplate (RqueueRedisTemplate<String>) - rqueueInternalPubSubChannel rqueue-core stays backend-neutral; rqueue-redis becomes the actual home of these Redis-shaped beans, mirroring the role RqueueNatsAutoConfig plays for the NATS backend. The Spring-Boot starter and the non-Boot RqueueListenerConfig already @import RqueueRedisListenerConfig (added with the @bean rqueueStringDao move earlier), so there is no consumer-side change — beans land in the same context they did before, just from a different @configuration class. Removed corresponding imports from RqueueListenerBaseConfig: the static import of RedisUtils.getRedisTemplate and the RqueueInternalPubSubChannel / RqueueRedisListenerContainerFactory imports. Verified locally: :rqueue-core:test :rqueue-nats:test (-DincludeTags=unit) and the e2e NatsBackendEndToEndIT all pass. Assisted-By: Claude Code
1 parent c467d80 commit cd71554

25 files changed

Lines changed: 530 additions & 233 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/config/RqueueListenerBaseConfig.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,18 @@
1616

1717
package com.github.sonus21.rqueue.config;
1818

19-
import static com.github.sonus21.rqueue.utils.RedisUtils.getRedisTemplate;
20-
2119
import com.github.sonus21.rqueue.common.RqueueLockManager;
2220
import com.github.sonus21.rqueue.common.RqueueRedisTemplate;
2321
import com.github.sonus21.rqueue.common.impl.RqueueLockManagerImpl;
2422
import com.github.sonus21.rqueue.converter.MessageConverterProvider;
2523
import com.github.sonus21.rqueue.core.ProcessingQueueMessageScheduler;
2624
import com.github.sonus21.rqueue.core.RqueueBeanProvider;
27-
import com.github.sonus21.rqueue.core.RqueueInternalPubSubChannel;
2825
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2926
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
30-
import com.github.sonus21.rqueue.core.RqueueRedisListenerContainerFactory;
3127
import com.github.sonus21.rqueue.core.ScheduledQueueMessageScheduler;
3228
import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl;
3329
import com.github.sonus21.rqueue.core.impl.UuidV4RqueueMessageIdGenerator;
3430
import com.github.sonus21.rqueue.dao.RqueueStringDao;
35-
import com.github.sonus21.rqueue.dao.impl.RqueueStringDaoImpl;
3631
import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer;
3732
import com.github.sonus21.rqueue.metrics.RqueueQueueMetrics;
3833
import com.github.sonus21.rqueue.utils.RedisUtils;
@@ -206,18 +201,6 @@ protected RqueueMessageTemplate getMessageTemplate(RqueueConfig rqueueConfig) {
206201
return simpleRqueueListenerContainerFactory.getRqueueMessageTemplate();
207202
}
208203

209-
@Bean
210-
@Conditional(RedisBackendCondition.class)
211-
public RedisTemplate<String, Long> rqueueRedisLongTemplate(RqueueConfig rqueueConfig) {
212-
return getRedisTemplate(rqueueConfig.getConnectionFactory());
213-
}
214-
215-
@Bean
216-
@Conditional(RedisBackendCondition.class)
217-
public RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory() {
218-
return new RqueueRedisListenerContainerFactory();
219-
}
220-
221204
/**
222205
* This scheduler is used to pull messages from a scheduled queue to their respective queue.
223206
* Internally it moves messages from ZSET to LIST based on the priority and current time.
@@ -242,18 +225,6 @@ public ProcessingQueueMessageScheduler processingMessageScheduler() {
242225
return new ProcessingQueueMessageScheduler();
243226
}
244227

245-
@Bean
246-
@Conditional(RedisBackendCondition.class)
247-
public RqueueRedisTemplate<String> stringRqueueRedisTemplate(RqueueConfig rqueueConfig) {
248-
return new RqueueRedisTemplate<>(rqueueConfig.getConnectionFactory());
249-
}
250-
251-
@Bean
252-
@Conditional(RedisBackendCondition.class)
253-
public RqueueStringDao rqueueStringDao(RqueueConfig rqueueConfig) {
254-
return new RqueueStringDaoImpl(rqueueConfig);
255-
}
256-
257228
@Bean
258229
@Conditional(RedisBackendCondition.class)
259230
public RqueueWorkerRegistry rqueueWorkerRegistry(RqueueConfig rqueueConfig) {
@@ -305,20 +276,4 @@ public RqueueBeanProvider rqueueBeanProvider() {
305276
return new RqueueBeanProvider();
306277
}
307278

308-
@Bean
309-
@Conditional(RedisBackendCondition.class)
310-
public RqueueInternalPubSubChannel rqueueInternalPubSubChannel(
311-
RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory,
312-
RqueueMessageListenerContainer rqueueMessageListenerContainer,
313-
RqueueConfig rqueueConfig,
314-
RqueueBeanProvider rqueueBeanProvider,
315-
@Qualifier("stringRqueueRedisTemplate")
316-
RqueueRedisTemplate<String> stringRqueueRedisTemplate) {
317-
return new RqueueInternalPubSubChannel(
318-
rqueueRedisListenerContainerFactory,
319-
rqueueMessageListenerContainer,
320-
rqueueConfig,
321-
stringRqueueRedisTemplate,
322-
rqueueBeanProvider);
323-
}
324279
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
3030
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
3131
import com.github.sonus21.rqueue.core.impl.MessageSweeper.MessageDeleteRequest;
32-
import com.github.sonus21.rqueue.dao.RqueueStringDao;
3332
import com.github.sonus21.rqueue.exception.DuplicateMessageException;
3433
import com.github.sonus21.rqueue.listener.QueueDetail;
3534
import com.github.sonus21.rqueue.models.db.MessageMetadata;
@@ -54,9 +53,6 @@ abstract class BaseMessageSender {
5453
protected final RqueueMessageTemplate messageTemplate;
5554
protected final RqueueMessageIdGenerator messageIdGenerator;
5655

57-
@Autowired
58-
protected RqueueStringDao rqueueStringDao;
59-
6056
@Autowired
6157
protected RqueueConfig rqueueConfig;
6258

rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.github.sonus21.rqueue.config.MetricsProperties;
2020
import com.github.sonus21.rqueue.core.EndpointRegistry;
21-
import com.github.sonus21.rqueue.dao.RqueueStringDao;
2221
import com.github.sonus21.rqueue.listener.QueueDetail;
2322
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
2423
import io.micrometer.core.instrument.Gauge;
@@ -49,47 +48,42 @@ public class RqueueMetrics implements RqueueMetricsRegistry {
4948
private MeterRegistry meterRegistry;
5049

5150
@Autowired
52-
private RqueueStringDao rqueueStringDao;
51+
private RqueueQueueMetricsProvider queueMetricsProvider;
5352

5453
public RqueueMetrics(QueueCounter queueCounter) {
5554
this.queueCounter = queueCounter;
5655
}
5756

58-
private long size(String name, boolean isZset) {
59-
Long val = isZset ? rqueueStringDao.getSortedSetSize(name) : rqueueStringDao.getListSize(name);
60-
return val == null ? 0 : val;
61-
}
62-
6357
private void monitor() {
6458
for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
6559
Tags queueTags =
6660
Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
6761
Gauge.builder(
6862
metricsProperties.getMetricName(QUEUE_SIZE),
6963
queueDetail,
70-
c -> size(queueDetail.getQueueName(), false))
64+
c -> queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
7165
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
7266
.description("The number of entries in this queue")
7367
.register(meterRegistry);
7468
Gauge.builder(
7569
metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE),
7670
queueDetail,
77-
c -> size(queueDetail.getProcessingQueueName(), true))
71+
c -> queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
7872
.tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
7973
.description("The number of entries in the processing queue")
8074
.register(meterRegistry);
8175
Gauge.builder(
8276
metricsProperties.getMetricName(SCHEDULED_QUEUE_SIZE),
8377
queueDetail,
84-
c -> size(queueDetail.getScheduledQueueName(), true))
78+
c -> queueMetricsProvider.getScheduledMessageCount(queueDetail.getName()))
8579
.tags(queueTags.and(QUEUE_KEY, queueDetail.getScheduledQueueName()))
8680
.description("The number of entries waiting in the scheduled queue")
8781
.register(meterRegistry);
8882
if (queueDetail.isDlqSet()) {
8983
Builder<QueueDetail> builder = Gauge.builder(
9084
metricsProperties.getMetricName(DEAD_LETTER_QUEUE_SIZE),
9185
queueDetail,
92-
c -> size(queueDetail.getDeadLetterQueueName(), false));
86+
c -> queueMetricsProvider.getDeadLetterMessageCount(queueDetail.getName()));
9387
builder.tags(queueTags);
9488
builder.description("The number of entries in the dead letter queue");
9589
builder.register(meterRegistry);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.metrics;
17+
18+
/**
19+
* Backend-agnostic provider of queue-depth metrics. Each backend (Redis, NATS, ...) supplies its
20+
* own implementation; consumers like {@link RqueueMetrics} read sizes through this interface
21+
* instead of reaching into a Redis-shaped DAO.
22+
*
23+
* <p>The {@code queueName} argument is the user-facing queue name (the value bound on
24+
* {@code @RqueueListener(value="...")}), not an internal storage key. Implementations are
25+
* responsible for mapping it to the appropriate backend-specific key(s).
26+
*
27+
* <p>Implementations must return {@code 0} when a queue has no messages of the requested kind
28+
* (rather than throwing) so callers can use the values directly as gauge readings.
29+
*/
30+
public interface RqueueQueueMetricsProvider {
31+
32+
/**
33+
* Number of messages waiting to be consumed from {@code queueName} — i.e. enqueued and ready for
34+
* a worker to pick up, excluding messages already in-flight (processing) or scheduled for a
35+
* future delivery time.
36+
*/
37+
long getPendingMessageCount(String queueName);
38+
39+
/**
40+
* Number of messages enqueued to {@code queueName} with a future delivery time that has not yet
41+
* elapsed. Backends that don't support delayed delivery return {@code 0}.
42+
*/
43+
long getScheduledMessageCount(String queueName);
44+
45+
/**
46+
* Number of messages currently in-flight for {@code queueName} — handed to a worker but not yet
47+
* acked or nacked. Backends without an explicit in-flight set return {@code 0}.
48+
*/
49+
long getProcessingMessageCount(String queueName);
50+
51+
/**
52+
* Number of messages in the dead-letter queue associated with {@code queueName}. Returns
53+
* {@code 0} when no DLQ is configured for the queue or the backend does not surface DLQ depth.
54+
*/
55+
long getDeadLetterMessageCount(String queueName);
56+
}

rqueue-core/src/test/java/com/github/sonus21/rqueue/metrics/RqueueMetricsTest.java

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
2020
import static org.mockito.ArgumentMatchers.any;
21-
import static org.mockito.ArgumentMatchers.anyString;
22-
import static org.mockito.Mockito.doAnswer;
2321
import static org.mockito.Mockito.times;
2422
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
2524

2625
import com.github.sonus21.TestBase;
2726
import com.github.sonus21.rqueue.CoreUnitTest;
2827
import com.github.sonus21.rqueue.config.MetricsProperties;
2928
import com.github.sonus21.rqueue.core.EndpointRegistry;
30-
import com.github.sonus21.rqueue.dao.RqueueStringDao;
3129
import com.github.sonus21.rqueue.listener.QueueDetail;
3230
import com.github.sonus21.rqueue.models.event.RqueueBootstrapEvent;
3331
import com.github.sonus21.rqueue.utils.TestUtils;
@@ -54,7 +52,7 @@ class RqueueMetricsTest extends TestBase {
5452
TestUtils.createQueueDetail(simpleQueue, deadLetterQueue);
5553

5654
@Mock
57-
private RqueueStringDao rqueueStringDao;
55+
private RqueueQueueMetricsProvider queueMetricsProvider;
5856

5957
@Mock
6058
private QueueCounter queueCounter;
@@ -120,45 +118,24 @@ private RqueueMetrics rqueueMetrics(
120118
RqueueMetrics metrics = new RqueueMetrics(queueCounter);
121119
FieldUtils.writeField(metrics, "meterRegistry", meterRegistry, true);
122120
FieldUtils.writeField(metrics, "metricsProperties", metricsProperties, true);
121+
FieldUtils.writeField(metrics, "queueMetricsProvider", queueMetricsProvider, true);
123122
return metrics;
124123
}
125124

126125
@Test
127126
void queueStatistics() throws IllegalAccessException {
128-
doAnswer(invocation -> {
129-
String zsetName = invocation.getArgument(0);
130-
if (zsetName.equals(scheduledQueueDetail.getScheduledQueueName())) {
131-
return 5L;
132-
}
133-
if (zsetName.equals(simpleQueueDetail.getProcessingQueueName())) {
134-
return 10L;
135-
}
136-
if (zsetName.equals(scheduledQueueDetail.getProcessingQueueName())) {
137-
return 15L;
138-
}
139-
return null;
140-
})
141-
.when(rqueueStringDao)
142-
.getSortedSetSize(anyString());
127+
// All four gauges read through RqueueQueueMetricsProvider, keyed by user-facing queue name.
128+
when(queueMetricsProvider.getPendingMessageCount(simpleQueue)).thenReturn(100L);
129+
when(queueMetricsProvider.getPendingMessageCount(scheduledQueue)).thenReturn(200L);
130+
when(queueMetricsProvider.getProcessingMessageCount(simpleQueue)).thenReturn(10L);
131+
when(queueMetricsProvider.getProcessingMessageCount(scheduledQueue)).thenReturn(15L);
132+
when(queueMetricsProvider.getScheduledMessageCount(simpleQueue)).thenReturn(0L);
133+
when(queueMetricsProvider.getScheduledMessageCount(scheduledQueue)).thenReturn(5L);
134+
// Only simpleQueue has a DLQ configured; the scheduledQueue gauge is never registered.
135+
when(queueMetricsProvider.getDeadLetterMessageCount(simpleQueue)).thenReturn(300L);
143136

144-
doAnswer(invocation -> {
145-
String listName = invocation.getArgument(0);
146-
if (listName.equals(simpleQueueDetail.getQueueName())) {
147-
return 100L;
148-
}
149-
if (listName.equals(scheduledQueueDetail.getQueueName())) {
150-
return 200L;
151-
}
152-
if (listName.equals(deadLetterQueue)) {
153-
return 300L;
154-
}
155-
return null;
156-
})
157-
.when(rqueueStringDao)
158-
.getListSize(anyString());
159137
MeterRegistry meterRegistry = new SimpleMeterRegistry();
160138
RqueueMetrics metrics = rqueueMetrics(meterRegistry, metricsProperties);
161-
FieldUtils.writeField(metrics, "rqueueStringDao", rqueueStringDao, true);
162139
metrics.onApplicationEvent(new RqueueBootstrapEvent("Test", true));
163140
verifyQueueStatistics(meterRegistry, simpleQueue, 100, 10, 300, 0);
164141
verifyQueueStatistics(meterRegistry, scheduledQueue, 200, 15, 0, 5);

0 commit comments

Comments
 (0)