Skip to content

Commit 69f25ad

Browse files
committed
Implement NATS KV-backed RqueueSystemConfigDao
NatsRqueueSystemConfigDao now persists QueueConfig in a JetStream KV bucket (rqueue-queue-config) keyed by queue name. Entries are serialized via standard Java serialization, matching the Redis impl that also relies on SerializableBase. - getConfigByName / getConfigByNames: KV lookups with an in-process cache mirroring the Redis byCachedXxx behavior. - getQConfig(id): linear scan over bucket keys (admin path; queue counts are small). - saveQConfig / saveAllQConfig: kv.put + cache update. - clearCacheByName: cache eviction. - KV keys sanitized to [A-Za-z0-9_=.-] so queue names with '#' / '$' round-trip transparently. NatsRqueueSystemConfigDaoIT covers save+get, cache hits, getByNames, scan-by-id, and sanitization. Assisted-By: Claude Code
1 parent 477357c commit 69f25ad

3 files changed

Lines changed: 306 additions & 15 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java

Lines changed: 186 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,220 @@
1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
1515
import com.github.sonus21.rqueue.models.db.QueueConfig;
16+
import io.nats.client.Connection;
17+
import io.nats.client.JetStreamApiException;
18+
import io.nats.client.KeyValue;
19+
import io.nats.client.KeyValueManagement;
20+
import io.nats.client.api.KeyValueConfiguration;
21+
import io.nats.client.api.KeyValueEntry;
22+
import io.nats.client.api.KeyValueStatus;
23+
import java.io.ByteArrayInputStream;
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.io.ObjectInputStream;
27+
import java.io.ObjectOutputStream;
28+
import java.util.ArrayList;
1629
import java.util.Collection;
17-
import java.util.Collections;
1830
import java.util.List;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.logging.Level;
34+
import java.util.logging.Logger;
1935
import org.springframework.context.annotation.Conditional;
2036
import org.springframework.stereotype.Repository;
2137

22-
/** NATS-backend stub {@link RqueueSystemConfigDao} for non-Redis backends. */
38+
/**
39+
* NATS-backed {@link RqueueSystemConfigDao} using a JetStream KV bucket as the queue-config
40+
* store. Entries are keyed by {@link QueueConfig#getName()} and serialized via standard Java
41+
* serialization, matching the Redis impl which also relies on
42+
* {@link com.github.sonus21.rqueue.models.SerializableBase}.
43+
*
44+
* <p>An in-process cache mirrors the Redis impl's {@code byCachedXxx} methods for parity.
45+
* {@link #clearCacheByName(String)} evicts; {@link #saveQConfig(QueueConfig)} keeps the cache
46+
* in sync.
47+
*/
2348
@Repository
2449
@Conditional(NatsBackendCondition.class)
2550
public class NatsRqueueSystemConfigDao implements RqueueSystemConfigDao {
26-
@Override
27-
public QueueConfig getConfigByName(String name) {
28-
return null;
51+
52+
private static final Logger log = Logger.getLogger(NatsRqueueSystemConfigDao.class.getName());
53+
private static final String BUCKET_NAME = "rqueue-queue-config";
54+
55+
private final Connection connection;
56+
private final KeyValueManagement kvm;
57+
private final AtomicReference<KeyValue> kvRef = new AtomicReference<>();
58+
private final ConcurrentHashMap<String, QueueConfig> cache = new ConcurrentHashMap<>();
59+
60+
public NatsRqueueSystemConfigDao(Connection connection) throws IOException {
61+
this.connection = connection;
62+
this.kvm = connection.keyValueManagement();
63+
}
64+
65+
private KeyValue ensureBucket() throws IOException, JetStreamApiException {
66+
KeyValue cached = kvRef.get();
67+
if (cached != null) {
68+
return cached;
69+
}
70+
synchronized (this) {
71+
cached = kvRef.get();
72+
if (cached != null) {
73+
return cached;
74+
}
75+
try {
76+
KeyValueStatus status = kvm.getStatus(BUCKET_NAME);
77+
if (status != null) {
78+
KeyValue kv = connection.keyValue(BUCKET_NAME);
79+
kvRef.set(kv);
80+
return kv;
81+
}
82+
} catch (JetStreamApiException missing) {
83+
// fall through to create
84+
}
85+
kvm.create(KeyValueConfiguration.builder().name(BUCKET_NAME).build());
86+
KeyValue kv = connection.keyValue(BUCKET_NAME);
87+
kvRef.set(kv);
88+
return kv;
89+
}
2990
}
3091

3192
@Override
32-
public List<QueueConfig> getConfigByNames(Collection<String> names) {
33-
return Collections.emptyList();
93+
public QueueConfig getConfigByName(String name) {
94+
return getConfigByName(name, true);
3495
}
3596

3697
@Override
3798
public QueueConfig getConfigByName(String name, boolean cached) {
38-
return null;
99+
if (cached) {
100+
QueueConfig hit = cache.get(name);
101+
if (hit != null) {
102+
return hit;
103+
}
104+
}
105+
QueueConfig loaded = loadByKey(sanitize(name));
106+
if (loaded != null) {
107+
cache.put(name, loaded);
108+
}
109+
return loaded;
39110
}
40111

41112
@Override
42113
public QueueConfig getQConfig(String id, boolean cached) {
43-
return null;
114+
if (cached) {
115+
for (QueueConfig hit : cache.values()) {
116+
if (id != null && id.equals(hit.getId())) {
117+
return hit;
118+
}
119+
}
120+
}
121+
return scanForId(id);
122+
}
123+
124+
@Override
125+
public List<QueueConfig> getConfigByNames(Collection<String> names) {
126+
List<QueueConfig> out = new ArrayList<>(names.size());
127+
for (String n : names) {
128+
QueueConfig c = getConfigByName(n);
129+
if (c != null) {
130+
out.add(c);
131+
}
132+
}
133+
return out;
44134
}
45135

46136
@Override
47137
public List<QueueConfig> findAllQConfig(Collection<String> ids) {
48-
return Collections.emptyList();
138+
List<QueueConfig> out = new ArrayList<>(ids.size());
139+
for (String id : ids) {
140+
QueueConfig c = getQConfig(id, true);
141+
if (c != null) {
142+
out.add(c);
143+
}
144+
}
145+
return out;
49146
}
50147

51148
@Override
52-
public void saveQConfig(QueueConfig queueConfig) {}
149+
public void saveQConfig(QueueConfig queueConfig) {
150+
try {
151+
KeyValue kv = ensureBucket();
152+
kv.put(sanitize(queueConfig.getName()), serialize(queueConfig));
153+
cache.put(queueConfig.getName(), queueConfig);
154+
} catch (IOException | JetStreamApiException e) {
155+
log.log(Level.WARNING, "saveQConfig " + queueConfig.getName() + " failed", e);
156+
}
157+
}
53158

54159
@Override
55-
public void saveAllQConfig(List<QueueConfig> newConfigs) {}
160+
public void saveAllQConfig(List<QueueConfig> newConfigs) {
161+
for (QueueConfig c : newConfigs) {
162+
saveQConfig(c);
163+
}
164+
}
56165

57166
@Override
58-
public void clearCacheByName(String name) {}
167+
public void clearCacheByName(String name) {
168+
cache.remove(name);
169+
}
170+
171+
// ---- helpers ----------------------------------------------------------
172+
173+
private QueueConfig loadByKey(String key) {
174+
try {
175+
KeyValue kv = ensureBucket();
176+
KeyValueEntry entry = kv.get(key);
177+
if (entry == null || entry.getValue() == null) {
178+
return null;
179+
}
180+
return deserialize(entry.getValue());
181+
} catch (IOException | JetStreamApiException e) {
182+
log.log(Level.WARNING, "loadByKey " + key + " failed", e);
183+
return null;
184+
}
185+
}
186+
187+
private QueueConfig scanForId(String id) {
188+
if (id == null) {
189+
return null;
190+
}
191+
try {
192+
KeyValue kv = ensureBucket();
193+
List<String> keys = new ArrayList<>(kv.keys());
194+
for (String k : keys) {
195+
QueueConfig c = loadByKey(k);
196+
if (c != null && id.equals(c.getId())) {
197+
return c;
198+
}
199+
}
200+
return null;
201+
} catch (IOException | JetStreamApiException | InterruptedException e) {
202+
log.log(Level.WARNING, "scanForId " + id + " failed", e);
203+
if (e instanceof InterruptedException) {
204+
Thread.currentThread().interrupt();
205+
}
206+
return null;
207+
}
208+
}
209+
210+
private static byte[] serialize(QueueConfig c) throws IOException {
211+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
212+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
213+
oos.writeObject(c);
214+
}
215+
return baos.toByteArray();
216+
}
217+
218+
private static QueueConfig deserialize(byte[] bytes) {
219+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
220+
Object o = ois.readObject();
221+
return o instanceof QueueConfig ? (QueueConfig) o : null;
222+
} catch (IOException | ClassNotFoundException e) {
223+
log.log(Level.WARNING, "deserialize QueueConfig failed", e);
224+
return null;
225+
}
226+
}
227+
228+
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
229+
private static String sanitize(String key) {
230+
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
231+
}
59232
}

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerEnqueueAckIT.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,16 @@ void enqueuePopAck_drainsStream() throws Exception {
4545
}
4646
}
4747
assertEquals(10, received);
48-
// WorkQueue retention removes acked msgs from stream
49-
assertEquals(0L, broker.size(q));
48+
// WorkQueue retention removes acked msgs from the stream, but JetStream processes ACKs
49+
// asynchronously: nm.ack() is fire-and-forget, so size() can briefly observe non-zero
50+
// until the server applies the deletion. Poll for drain instead of asserting strictly.
51+
long deadlineNanos = System.nanoTime() + Duration.ofSeconds(5).toNanos();
52+
long size = broker.size(q);
53+
while (size != 0L && System.nanoTime() < deadlineNanos) {
54+
Thread.sleep(50L);
55+
size = broker.size(q);
56+
}
57+
assertEquals(0L, size);
5058
}
5159
}
5260
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats.dao;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertNotNull;
14+
import static org.junit.jupiter.api.Assertions.assertNull;
15+
16+
import com.github.sonus21.rqueue.models.db.QueueConfig;
17+
import com.github.sonus21.rqueue.nats.AbstractJetStreamIT;
18+
import io.nats.client.JetStreamApiException;
19+
import io.nats.client.KeyValueManagement;
20+
import java.io.IOException;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import org.junit.jupiter.api.BeforeEach;
24+
import org.junit.jupiter.api.Test;
25+
26+
/** Round-trip exercise for {@link NatsRqueueSystemConfigDao} against a real JetStream KV. */
27+
class NatsRqueueSystemConfigDaoIT extends AbstractJetStreamIT {
28+
29+
private NatsRqueueSystemConfigDao dao;
30+
31+
@BeforeEach
32+
void freshBucket() throws IOException, JetStreamApiException {
33+
KeyValueManagement kvm = connection.keyValueManagement();
34+
try {
35+
kvm.delete("rqueue-queue-config");
36+
} catch (JetStreamApiException notFound) {
37+
// first run
38+
}
39+
dao = new NatsRqueueSystemConfigDao(connection);
40+
}
41+
42+
private QueueConfig sample(String id, String name) {
43+
QueueConfig c = new QueueConfig();
44+
c.setId(id);
45+
c.setName(name);
46+
c.setQueueName(name);
47+
c.setNumRetry(3);
48+
c.setVisibilityTimeout(30_000L);
49+
return c;
50+
}
51+
52+
@Test
53+
void saveAndGetByName() {
54+
QueueConfig c = sample("id-1", "orders");
55+
dao.saveQConfig(c);
56+
57+
QueueConfig back = dao.getConfigByName("orders", false);
58+
assertNotNull(back);
59+
assertEquals("id-1", back.getId());
60+
assertEquals("orders", back.getName());
61+
assertEquals(3, back.getNumRetry());
62+
assertEquals(30_000L, back.getVisibilityTimeout());
63+
}
64+
65+
@Test
66+
void getByNameMissingReturnsNull() {
67+
assertNull(dao.getConfigByName("never-saved", false));
68+
}
69+
70+
@Test
71+
void cachedReadDoesNotRoundtripJetStream() {
72+
QueueConfig c = sample("id-2", "cached-q");
73+
dao.saveQConfig(c);
74+
// First read primes the cache; second hits cache.
75+
QueueConfig first = dao.getConfigByName("cached-q");
76+
QueueConfig second = dao.getConfigByName("cached-q");
77+
assertNotNull(first);
78+
assertEquals(first, second);
79+
80+
dao.clearCacheByName("cached-q");
81+
QueueConfig third = dao.getConfigByName("cached-q");
82+
assertNotNull(third);
83+
assertEquals("cached-q", third.getName());
84+
}
85+
86+
@Test
87+
void saveAllAndGetByNames() {
88+
dao.saveAllQConfig(Arrays.asList(sample("a", "qa"), sample("b", "qb"), sample("c", "qc")));
89+
List<QueueConfig> got = dao.getConfigByNames(Arrays.asList("qa", "qb", "missing"));
90+
assertEquals(2, got.size());
91+
}
92+
93+
@Test
94+
void getQConfigByIdScan() {
95+
dao.saveAllQConfig(Arrays.asList(sample("alpha", "q1"), sample("beta", "q2")));
96+
QueueConfig found = dao.getQConfig("beta", false);
97+
assertNotNull(found);
98+
assertEquals("q2", found.getName());
99+
}
100+
101+
@Test
102+
void sanitizationLetsNamesWithIllegalCharsRoundTrip() {
103+
// Note: name is preserved on the QueueConfig itself; only the KV key is sanitized.
104+
QueueConfig c = sample("id-x", "queue#with$weird:chars");
105+
dao.saveQConfig(c);
106+
QueueConfig back = dao.getConfigByName("queue#with$weird:chars", false);
107+
assertNotNull(back);
108+
assertEquals("id-x", back.getId());
109+
}
110+
}

0 commit comments

Comments
 (0)