Skip to content

Commit 7649666

Browse files
committed
Fix NATS IT init when NATS_RUNNING=true; NATS backend WIP
AbstractNatsBootIT used `@Container` on a static field that was set to null when NATS_RUNNING=true. The Testcontainers JUnit extension rejects null-valued @container fields with "Container NATS needs to be initialized", causing every Nats*IT to fail at extension-init time — before Spring booted or even attempted to talk to the externally-started nats-server. Drop @container; start the container lazily in @BeforeAll (and in the DynamicPropertySource callback, which fires earlier) only when not using external NATS. CI path stays Docker-free; local fallback still uses Testcontainers. Also includes in-progress NATS-backend work: NatsBackendCondition, NATS-backed DAO / lock-manager / message-metadata / utility-service stubs, and the corresponding gating in RqueueBeanProvider, sender / endpoint / message-manager impls, RqueueMetrics, and the listener auto- configs. Assisted-By: Claude Code
1 parent e0a87b7 commit 7649666

15 files changed

Lines changed: 500 additions & 43 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.common.impl;
12+
13+
import com.github.sonus21.rqueue.common.RqueueLockManager;
14+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
15+
import java.time.Duration;
16+
import org.springframework.context.annotation.Conditional;
17+
import org.springframework.stereotype.Component;
18+
19+
/**
20+
* NATS-backend stub {@link RqueueLockManager} for non-Redis backends. Redis-only callers (admin endpoints,
21+
* the scheduler) are themselves gated; on the NATS path the lock primitive isn't needed because
22+
* JetStream's durable consumers serialize delivery. This impl returns {@code true} for
23+
* acquire/release so any inadvertent caller proceeds without blocking; if a caller really
24+
* depends on mutual exclusion through this manager it will need a backend-specific path.
25+
*/
26+
@Component
27+
@Conditional(NatsBackendCondition.class)
28+
public class NatsRqueueLockManager implements RqueueLockManager {
29+
@Override
30+
public boolean acquireLock(String lockKey, String lockValue, Duration duration) {
31+
return true;
32+
}
33+
34+
@Override
35+
public boolean releaseLock(String lockKey, String lockValue) {
36+
return true;
37+
}
38+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.config;
12+
13+
import org.springframework.context.annotation.Condition;
14+
import org.springframework.context.annotation.ConditionContext;
15+
import org.springframework.core.type.AnnotatedTypeMetadata;
16+
17+
/**
18+
* Spring {@link Condition} that matches when the active rqueue backend is {@link Backend#NATS}.
19+
* Companion to {@link RedisBackendCondition}; together they ensure every Redis-shaped interface
20+
* gets exactly one bean (Redis impl when backend=redis, no-op stub when backend=nats) so
21+
* consumers don't need {@code @Autowired(required = false)} or null guards.
22+
*/
23+
public class NatsBackendCondition implements Condition {
24+
@Override
25+
public boolean matches(ConditionContext ctx, AnnotatedTypeMetadata md) {
26+
return RedisBackendCondition.resolveBackend(ctx) == Backend.NATS;
27+
}
28+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueBeanProvider.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,13 @@
3535
@Setter
3636
public class RqueueBeanProvider {
3737

38-
// Redis-only collaborators — absent on the NATS backend path; consumers must null-check.
39-
@Autowired(required = false)
38+
@Autowired
4039
private RqueueMessageMetadataService rqueueMessageMetadataService;
4140

42-
@Autowired(required = false)
41+
@Autowired
4342
private RqueueSystemConfigDao rqueueSystemConfigDao;
4443

45-
@Autowired(required = false)
44+
@Autowired
4645
private RqueueJobDao rqueueJobDao;
4746

4847
@Autowired
@@ -51,7 +50,7 @@ public class RqueueBeanProvider {
5150
@Autowired
5251
private ApplicationEventPublisher applicationEventPublisher;
5352

54-
@Autowired(required = false)
53+
@Autowired
5554
private RqueueLockManager rqueueLockManager;
5655

5756
@Autowired(required = false)

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/BaseMessageSender.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,13 @@ abstract class BaseMessageSender {
5454
protected final RqueueMessageTemplate messageTemplate;
5555
protected final RqueueMessageIdGenerator messageIdGenerator;
5656

57-
// Redis-only collaborators — absent when rqueue.backend=nats. Methods that touch them
58-
// already short-circuit on the broker capability flag, so a null is safe.
59-
@Autowired(required = false)
57+
@Autowired
6058
protected RqueueStringDao rqueueStringDao;
6159

6260
@Autowired
6361
protected RqueueConfig rqueueConfig;
6462

65-
@Autowired(required = false)
63+
@Autowired
6664
protected RqueueMessageMetadataService rqueueMessageMetadataService;
6765

6866
BaseMessageSender(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,10 @@
4242

4343
public class RqueueEndpointManagerImpl extends BaseMessageSender implements RqueueEndpointManager {
4444

45-
// Both Redis-only — absent on the NATS backend; the dashboard / system-config endpoints
46-
// they back are gated by RedisBackendCondition and won't be invoked when these are null.
47-
@Autowired(required = false)
45+
@Autowired
4846
private RqueueUtilityService rqueueUtilityService;
4947

50-
@Autowired(required = false)
48+
@Autowired
5149
private RqueueSystemConfigDao rqueueSystemConfigDao;
5250

5351
public RqueueEndpointManagerImpl(

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageManagerImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@
4444
@Slf4j
4545
public class RqueueMessageManagerImpl extends BaseMessageSender implements RqueueMessageManager {
4646

47-
// Redis-only — null when rqueue.backend=nats; deletion APIs that need it are no-op
48-
// on the NATS path (the broker manages its own delivery state via JetStream).
49-
@Autowired(required = false)
47+
@Autowired
5048
private RqueueLockManager rqueueLockManager;
5149

5250
public RqueueMessageManagerImpl(
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.dao.impl;
12+
13+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14+
import com.github.sonus21.rqueue.dao.RqueueJobDao;
15+
import com.github.sonus21.rqueue.models.db.RqueueJob;
16+
import java.time.Duration;
17+
import java.util.Collection;
18+
import java.util.Collections;
19+
import java.util.List;
20+
import org.springframework.context.annotation.Conditional;
21+
import org.springframework.stereotype.Repository;
22+
23+
/**
24+
* NATS-backend stub for {@link RqueueJobDao}. Job tracking persistence is a Redis-only feature
25+
* in v1; this stub returns empty/null and silently drops writes so the bean graph stays
26+
* consistent. A NATS-native implementation can replace this in a follow-up.
27+
*/
28+
@Repository
29+
@Conditional(NatsBackendCondition.class)
30+
public class NatsRqueueJobDao implements RqueueJobDao {
31+
@Override public void createJob(RqueueJob rqueueJob, Duration expiry) {}
32+
@Override public void save(RqueueJob rqueueJob, Duration expiry) {}
33+
@Override public RqueueJob findById(String jobId) { return null; }
34+
@Override public List<RqueueJob> findJobsByIdIn(Collection<String> jobIds) { return Collections.emptyList(); }
35+
@Override public List<RqueueJob> finByMessageIdIn(List<String> messageIds) { return Collections.emptyList(); }
36+
@Override public List<RqueueJob> finByMessageId(String messageId) { return Collections.emptyList(); }
37+
@Override public void delete(String jobId) {}
38+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.dao.impl;
12+
13+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14+
import com.github.sonus21.rqueue.dao.RqueueStringDao;
15+
import java.time.Duration;
16+
import java.util.Collection;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import java.util.Map;
20+
import org.springframework.context.annotation.Conditional;
21+
import org.springframework.data.redis.connection.DataType;
22+
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
23+
import org.springframework.stereotype.Component;
24+
25+
/**
26+
* NATS-backend stub {@link RqueueStringDao} for non-Redis backends. Reads return empty/zero/null so
27+
* dashboard gauges and metric collectors register zero values cleanly; writes are silently
28+
* ignored. The runtime produce-and-consume path on NATS does not invoke this DAO.
29+
*/
30+
@Component
31+
@Conditional(NatsBackendCondition.class)
32+
public class NatsRqueueStringDao implements RqueueStringDao {
33+
34+
@Override
35+
public Map<String, List<Object>> readFromLists(List<String> keys) {
36+
return Collections.emptyMap();
37+
}
38+
39+
@Override
40+
public List<Object> readFromList(String key) {
41+
return Collections.emptyList();
42+
}
43+
44+
@Override
45+
public void appendToListWithListExpiry(String listName, String data, Duration duration) {}
46+
47+
@Override
48+
public void appendToSet(String setName, String... data) {}
49+
50+
@Override
51+
public List<String> readFromSet(String setName) {
52+
return Collections.emptyList();
53+
}
54+
55+
@Override
56+
public Boolean delete(String key) {
57+
return Boolean.FALSE;
58+
}
59+
60+
@Override
61+
public void set(String key, Object data) {}
62+
63+
@Override
64+
public Object get(String key) {
65+
return null;
66+
}
67+
68+
@Override
69+
public Object delete(Collection<String> keys) {
70+
return null;
71+
}
72+
73+
@Override
74+
public Object deleteAndSet(
75+
Collection<String> keysToBeRemoved, Map<String, Object> objectsToBeStored) {
76+
return null;
77+
}
78+
79+
@Override
80+
public Boolean setIfAbsent(String key, String value, Duration duration) {
81+
return Boolean.TRUE;
82+
}
83+
84+
@Override
85+
public Long getListSize(String name) {
86+
return 0L;
87+
}
88+
89+
@Override
90+
public Long getSortedSetSize(String name) {
91+
return 0L;
92+
}
93+
94+
@Override
95+
public DataType type(String key) {
96+
return DataType.NONE;
97+
}
98+
99+
@Override
100+
public Boolean deleteIfSame(String key, String value) {
101+
return Boolean.FALSE;
102+
}
103+
104+
@Override
105+
public void addToOrderedSetWithScore(String key, String value, long score) {}
106+
107+
@Override
108+
public List<TypedTuple<String>> readFromOrderedSetWithScoreBetween(
109+
String key, long start, long end) {
110+
return Collections.emptyList();
111+
}
112+
113+
@Override
114+
public void deleteAll(String key, long min, long max) {}
115+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.dao.impl;
12+
13+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
14+
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
15+
import com.github.sonus21.rqueue.models.db.QueueConfig;
16+
import java.util.Collection;
17+
import java.util.Collections;
18+
import java.util.List;
19+
import org.springframework.context.annotation.Conditional;
20+
import org.springframework.stereotype.Repository;
21+
22+
/** NATS-backend stub {@link RqueueSystemConfigDao} for non-Redis backends. */
23+
@Repository
24+
@Conditional(NatsBackendCondition.class)
25+
public class NatsRqueueSystemConfigDao implements RqueueSystemConfigDao {
26+
@Override public QueueConfig getConfigByName(String name) { return null; }
27+
@Override public List<QueueConfig> getConfigByNames(Collection<String> names) { return Collections.emptyList(); }
28+
@Override public QueueConfig getConfigByName(String name, boolean cached) { return null; }
29+
@Override public QueueConfig getQConfig(String id, boolean cached) { return null; }
30+
@Override public List<QueueConfig> findAllQConfig(Collection<String> ids) { return Collections.emptyList(); }
31+
@Override public void saveQConfig(QueueConfig queueConfig) {}
32+
@Override public void saveAllQConfig(List<QueueConfig> newConfigs) {}
33+
@Override public void clearCacheByName(String name) {}
34+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/metrics/RqueueMetrics.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,16 @@ public class RqueueMetrics implements RqueueMetricsRegistry {
4848
@Autowired
4949
private MeterRegistry meterRegistry;
5050

51-
// Redis-only — null when rqueue.backend=nats; size() returns 0 in that case.
52-
@Autowired(required = false)
51+
@Autowired
5352
private RqueueStringDao rqueueStringDao;
5453

5554
public RqueueMetrics(QueueCounter queueCounter) {
5655
this.queueCounter = queueCounter;
5756
}
5857

5958
private long size(String name, boolean isZset) {
60-
if (rqueueStringDao == null) {
61-
// No Redis available (e.g. rqueue.backend=nats); these metrics are Redis-shaped.
62-
return 0;
63-
}
64-
Long val;
65-
if (!isZset) {
66-
val = rqueueStringDao.getListSize(name);
67-
} else {
68-
val = rqueueStringDao.getSortedSetSize(name);
69-
}
70-
if (val == null) {
71-
return 0;
72-
}
73-
return val;
59+
Long val = isZset ? rqueueStringDao.getSortedSetSize(name) : rqueueStringDao.getListSize(name);
60+
return val == null ? 0 : val;
7461
}
7562

7663
private void monitor() {

0 commit comments

Comments
 (0)