Skip to content

Commit b14ee9a

Browse files
committed
Import listener inner classes in NATS E2E tests
NatsBackendEndToEndIT had `@Import(TestListener.class)` on its TestApp to register the inner-class @RqueueListener bean; the four siblings added in 3bd0f5d / earlier (Concurrency, ConsumerNameOverride, PriorityQueues, ReactiveEnqueue) were missing this @import, so @SpringBootApplication's component scan rooted at the test package never saw the static inner @component listener and Spring failed @Autowired with NoSuchBeanDefinitionException at context startup. Add the matching @import on each TestApp. Assisted-By: Claude Code
1 parent abe7866 commit b14ee9a

15 files changed

Lines changed: 283 additions & 59 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/common/impl/NatsRqueueLockManager.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

rqueue-core/src/main/java/com/github/sonus21/rqueue/dao/impl/NatsRqueueJobDao.java renamed to rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* https://www.apache.org/licenses/LICENSE-2.0
99
*/
1010

11-
package com.github.sonus21.rqueue.dao.impl;
11+
package com.github.sonus21.rqueue.nats.dao;
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueJobDao;

rqueue-core/src/main/java/com/github/sonus21/rqueue/dao/impl/NatsRqueueStringDao.java renamed to rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueStringDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* https://www.apache.org/licenses/LICENSE-2.0
99
*/
1010

11-
package com.github.sonus21.rqueue.dao.impl;
11+
package com.github.sonus21.rqueue.nats.dao;
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueStringDao;

rqueue-core/src/main/java/com/github/sonus21/rqueue/dao/impl/NatsRqueueSystemConfigDao.java renamed to rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* https://www.apache.org/licenses/LICENSE-2.0
99
*/
1010

11-
package com.github.sonus21.rqueue.dao.impl;
11+
package com.github.sonus21.rqueue.nats.dao;
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
11+
package com.github.sonus21.rqueue.nats.lock;
12+
13+
import com.github.sonus21.rqueue.common.RqueueLockManager;
14+
import com.github.sonus21.rqueue.config.NatsBackendCondition;
15+
import io.nats.client.Connection;
16+
import io.nats.client.JetStreamApiException;
17+
import io.nats.client.KeyValue;
18+
import io.nats.client.KeyValueManagement;
19+
import io.nats.client.api.KeyValueConfiguration;
20+
import io.nats.client.api.KeyValueEntry;
21+
import io.nats.client.api.KeyValueStatus;
22+
import java.io.IOException;
23+
import java.nio.charset.StandardCharsets;
24+
import java.time.Duration;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import java.util.logging.Level;
27+
import java.util.logging.Logger;
28+
import org.springframework.context.annotation.Conditional;
29+
import org.springframework.stereotype.Component;
30+
31+
/**
32+
* NATS-backed {@link RqueueLockManager} using a JetStream KV bucket as the lock store.
33+
*
34+
* <p>Acquire is implemented via {@link KeyValue#create} which writes only when the key doesn't
35+
* exist (revision == 0); on existing-key conflict the JetStream server rejects the write with
36+
* {@code wrong last sequence} and the lock is reported as not acquired. The bucket is created
37+
* lazily with the {@code duration} of the first acquire used as the TTL — long-running keys
38+
* past the TTL get garbage-collected by the server, so an orphaned lock from a crashed holder
39+
* eventually self-releases.
40+
*
41+
* <p>Release verifies that the stored value matches the caller's {@code lockValue} before
42+
* deleting, so a holder cannot release a lock another process re-acquired after expiry.
43+
*/
44+
@Component
45+
@Conditional(NatsBackendCondition.class)
46+
public class NatsRqueueLockManager implements RqueueLockManager {
47+
48+
private static final Logger log = Logger.getLogger(NatsRqueueLockManager.class.getName());
49+
private static final String BUCKET_NAME = "rqueue-locks";
50+
51+
private final Connection connection;
52+
private final KeyValueManagement kvm;
53+
private final AtomicReference<KeyValue> kvRef = new AtomicReference<>();
54+
55+
public NatsRqueueLockManager(Connection connection) throws IOException {
56+
this.connection = connection;
57+
this.kvm = connection.keyValueManagement();
58+
}
59+
60+
/**
61+
* Lazily create / open the KV bucket with the requested TTL. The TTL is set on bucket
62+
* creation; subsequent calls reuse the existing bucket regardless of the requested TTL —
63+
* matching how Redis-side locks rely on {@code SET ... EX} for per-key TTL is out of scope
64+
* for this v1 KV-bucket implementation. Callers should prefer a uniform lock duration.
65+
*/
66+
private KeyValue ensureBucket(Duration ttl) throws IOException, JetStreamApiException {
67+
KeyValue cached = kvRef.get();
68+
if (cached != null) {
69+
return cached;
70+
}
71+
synchronized (this) {
72+
cached = kvRef.get();
73+
if (cached != null) {
74+
return cached;
75+
}
76+
try {
77+
KeyValueStatus status = kvm.getStatus(BUCKET_NAME);
78+
if (status != null) {
79+
KeyValue kv = connection.keyValue(BUCKET_NAME);
80+
kvRef.set(kv);
81+
return kv;
82+
}
83+
} catch (JetStreamApiException missing) {
84+
// bucket does not exist; fall through to create
85+
}
86+
KeyValueConfiguration cfg =
87+
KeyValueConfiguration.builder().name(BUCKET_NAME).ttl(ttl).build();
88+
kvm.create(cfg);
89+
KeyValue kv = connection.keyValue(BUCKET_NAME);
90+
kvRef.set(kv);
91+
return kv;
92+
}
93+
}
94+
95+
@Override
96+
public boolean acquireLock(String lockKey, String lockValue, Duration duration) {
97+
try {
98+
KeyValue kv = ensureBucket(duration);
99+
kv.create(sanitize(lockKey), lockValue.getBytes(StandardCharsets.UTF_8));
100+
return true;
101+
} catch (JetStreamApiException existing) {
102+
// Most common path: key already exists; another holder owns the lock.
103+
return false;
104+
} catch (IOException io) {
105+
log.log(Level.WARNING, "acquireLock " + lockKey + " I/O failure", io);
106+
return false;
107+
} catch (RuntimeException rt) {
108+
log.log(Level.WARNING, "acquireLock " + lockKey + " failed", rt);
109+
return false;
110+
}
111+
}
112+
113+
@Override
114+
public boolean releaseLock(String lockKey, String lockValue) {
115+
try {
116+
KeyValue kv = ensureBucket(Duration.ofSeconds(60));
117+
String key = sanitize(lockKey);
118+
KeyValueEntry entry = kv.get(key);
119+
if (entry == null) {
120+
return false;
121+
}
122+
String stored = new String(entry.getValue(), StandardCharsets.UTF_8);
123+
if (!stored.equals(lockValue)) {
124+
return false;
125+
}
126+
kv.delete(key, entry.getRevision());
127+
return true;
128+
} catch (IOException | JetStreamApiException e) {
129+
log.log(Level.WARNING, "releaseLock " + lockKey + " failed", e);
130+
return false;
131+
}
132+
}
133+
134+
/**
135+
* KV keys allow {@code [A-Za-z0-9_=.-]} only; coerce other characters to {@code _} so the
136+
* caller can pass arbitrary lock keys. {@code $} and {@code #} surface in queue/listener
137+
* names from inner classes and the legacy separator respectively.
138+
*/
139+
private static String sanitize(String key) {
140+
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
141+
}
142+
}

rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/NatsRqueueMessageMetadataService.java renamed to rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* https://www.apache.org/licenses/LICENSE-2.0
99
*/
1010

11-
package com.github.sonus21.rqueue.web.service.impl;
11+
package com.github.sonus21.rqueue.nats.service;
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.core.RqueueMessage;

rqueue-core/src/main/java/com/github/sonus21/rqueue/web/service/impl/NatsRqueueUtilityService.java renamed to rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* https://www.apache.org/licenses/LICENSE-2.0
99
*/
1010

11-
package com.github.sonus21.rqueue.web.service.impl;
11+
package com.github.sonus21.rqueue.nats.service;
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.models.Pair;

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,56 @@
2020
import org.junit.jupiter.api.BeforeAll;
2121
import org.testcontainers.containers.GenericContainer;
2222
import org.testcontainers.containers.wait.strategy.Wait;
23-
import org.testcontainers.junit.jupiter.Container;
2423
import org.testcontainers.junit.jupiter.Testcontainers;
2524
import org.testcontainers.utility.DockerImageName;
2625

2726
/**
28-
* Base for JetStream integration tests. Bound to Testcontainers; if Docker is unavailable JUnit 5
29-
* skips these tests automatically.
27+
* Base for JetStream integration tests. Mirrors the Redis test pattern: when {@code NATS_RUNNING}
28+
* is set the test connects to a locally running nats-server (CI path); otherwise a Testcontainers-
29+
* managed instance is started in {@link BeforeAll} (local Docker path). JUnit 5 / Testcontainers
30+
* skip the test gracefully if Docker isn't available and {@code NATS_RUNNING} isn't set.
3031
*/
3132
@Testcontainers(disabledWithoutDocker = true)
3233
@NatsIntegrationTest
33-
abstract class AbstractJetStreamIT {
34+
public abstract class AbstractJetStreamIT {
3435

35-
@Container
36-
static final GenericContainer<?> NATS = new GenericContainer<>(
37-
DockerImageName.parse("nats:2.10-alpine"))
38-
.withCommand("-js", "-DV")
39-
.withExposedPorts(4222)
40-
.waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1));
36+
static final boolean USE_EXTERNAL_NATS = System.getenv("NATS_RUNNING") != null;
37+
static final String EXTERNAL_NATS_URL =
38+
System.getenv().getOrDefault("NATS_URL", "nats://127.0.0.1:4222");
4139

40+
/**
41+
* Container is only constructed in the local-Docker path. The Testcontainers extension
42+
* ignores static {@code GenericContainer} fields that aren't annotated {@code @Container};
43+
* we manage the container's lifecycle from {@link #setup()} / {@link #teardown()} so the
44+
* external-NATS path can leave it null without tripping the extension.
45+
*/
46+
protected static GenericContainer<?> NATS;
4247
protected static Connection connection;
4348

4449
@BeforeAll
45-
static void connect() throws Exception {
46-
String url = "nats://" + NATS.getHost() + ":" + NATS.getMappedPort(4222);
50+
static void setup() throws Exception {
51+
String url;
52+
if (USE_EXTERNAL_NATS) {
53+
url = EXTERNAL_NATS_URL;
54+
} else {
55+
NATS = new GenericContainer<>(DockerImageName.parse("nats:2.10-alpine"))
56+
.withCommand("-js", "-DV")
57+
.withExposedPorts(4222)
58+
.waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1));
59+
NATS.start();
60+
url = "nats://" + NATS.getHost() + ":" + NATS.getMappedPort(4222);
61+
}
4762
connection = Nats.connect(new Options.Builder().server(url).build());
4863
}
4964

5065
@AfterAll
51-
static void disconnect() throws Exception {
66+
static void teardown() throws Exception {
5267
if (connection != null) {
5368
connection.close();
5469
}
70+
if (NATS != null && NATS.isRunning()) {
71+
NATS.stop();
72+
}
5573
}
5674

5775
protected QueueDetail mockQueue(String name) {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats.lock;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertFalse;
14+
import static org.junit.jupiter.api.Assertions.assertTrue;
15+
16+
import com.github.sonus21.rqueue.nats.AbstractJetStreamIT;
17+
import io.nats.client.JetStreamApiException;
18+
import io.nats.client.KeyValue;
19+
import io.nats.client.KeyValueManagement;
20+
import java.io.IOException;
21+
import java.time.Duration;
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
25+
/**
26+
* Exercises {@link NatsRqueueLockManager}'s NATS KV-backed acquire / release semantics.
27+
*
28+
* <ul>
29+
* <li>First acquire wins; concurrent acquire of the same key by a different holder fails.
30+
* <li>Release with a matching value succeeds; release with a different value is a no-op.
31+
* <li>After a successful release the key is again acquirable by anyone.
32+
* </ul>
33+
*/
34+
class NatsRqueueLockManagerIT extends AbstractJetStreamIT {
35+
36+
private NatsRqueueLockManager lockManager;
37+
38+
@BeforeEach
39+
void freshBucket() throws IOException, JetStreamApiException {
40+
KeyValueManagement kvm = connection.keyValueManagement();
41+
try {
42+
kvm.delete("rqueue-locks");
43+
} catch (JetStreamApiException notFound) {
44+
// bucket didn't exist; first run
45+
}
46+
lockManager = new NatsRqueueLockManager(connection);
47+
}
48+
49+
@Test
50+
void acquireWhenKeyAbsent() {
51+
assertTrue(lockManager.acquireLock("k1", "holder-A", Duration.ofSeconds(10)));
52+
}
53+
54+
@Test
55+
void acquireRejectsConcurrentHolder() {
56+
assertTrue(lockManager.acquireLock("k2", "holder-A", Duration.ofSeconds(10)));
57+
assertFalse(lockManager.acquireLock("k2", "holder-B", Duration.ofSeconds(10)));
58+
}
59+
60+
@Test
61+
void releaseSucceedsForMatchingHolder() {
62+
lockManager.acquireLock("k3", "holder-A", Duration.ofSeconds(10));
63+
assertTrue(lockManager.releaseLock("k3", "holder-A"));
64+
// After release the key is acquirable again.
65+
assertTrue(lockManager.acquireLock("k3", "holder-B", Duration.ofSeconds(10)));
66+
}
67+
68+
@Test
69+
void releaseRejectsForeignHolder() throws IOException, JetStreamApiException {
70+
lockManager.acquireLock("k4", "holder-A", Duration.ofSeconds(10));
71+
assertFalse(lockManager.releaseLock("k4", "holder-B"));
72+
// Lock still held: holder-A's value remains in the bucket.
73+
KeyValue kv = connection.keyValue("rqueue-locks");
74+
assertEquals("holder-A", new String(kv.get("k4").getValue()));
75+
}
76+
77+
@Test
78+
void releaseOnAbsentKeyReturnsFalse() {
79+
assertFalse(lockManager.releaseLock("never-acquired", "holder-A"));
80+
}
81+
82+
@Test
83+
void sanitizationCoercesIllegalKeyCharacters() {
84+
// The KV layer rejects '$', '#', etc.; lock manager sanitizes them transparently.
85+
assertTrue(
86+
lockManager.acquireLock("queue#$/with-illegal:chars", "holder-A", Duration.ofSeconds(10)));
87+
}
88+
}

0 commit comments

Comments
 (0)