Skip to content

Commit c1d6547

Browse files
committed
Implement NATS KV-backed RqueueJobDao
NatsRqueueJobDao now persists RqueueJob in a JetStream KV bucket (rqueue-jobs) keyed by job id and serialized via Java serialization. - createJob / save: kv.put with optional TTL on first bucket creation. - findById: kv.get → deserialize. - findJobsByIdIn: per-id lookups. - finByMessageId / finByMessageIdIn: linear scan over bucket keys. Volumes are small (in-flight + recent retry history); a reverse index is a follow-up. - delete: kv.delete. - KV keys sanitized to [A-Za-z0-9_=.-] so weird ids round-trip. NatsRqueueJobDaoIT covers save+findById, missing key, multi-id lookup, findByMessageId (multiple jobs per message), findByMessageIdIn, delete, and sanitization. All 7 pass against a real JetStream. Assisted-By: Claude Code
1 parent 69f25ad commit c1d6547

2 files changed

Lines changed: 281 additions & 12 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java

Lines changed: 166 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,201 @@
1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueJobDao;
1515
import com.github.sonus21.rqueue.models.db.RqueueJob;
16+
import io.nats.client.Connection;
17+
import io.nats.client.JetStreamApiException;
18+
import io.nats.client.KeyValue;
19+
import io.nats.client.KeyValueManagement;
20+
import io.nats.client.api.KeyValueConfiguration;
21+
import io.nats.client.api.KeyValueEntry;
22+
import io.nats.client.api.KeyValueStatus;
23+
import java.io.ByteArrayInputStream;
24+
import java.io.ByteArrayOutputStream;
25+
import java.io.IOException;
26+
import java.io.ObjectInputStream;
27+
import java.io.ObjectOutputStream;
1628
import java.time.Duration;
29+
import java.util.ArrayList;
1730
import java.util.Collection;
1831
import java.util.Collections;
1932
import java.util.List;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
2036
import org.springframework.context.annotation.Conditional;
2137
import org.springframework.stereotype.Repository;
2238

2339
/**
24-
* NATS-backend stub for {@link RqueueJobDao}. Job tracking persistence is a Redis-only feature
25-
* in v1; this stub returns empty/null and silently drops writes so the bean graph stays
26-
* consistent. A NATS-native implementation can replace this in a follow-up.
40+
* NATS-backed {@link RqueueJobDao} using a JetStream KV bucket as the job store. Entries are
41+
* keyed by job id and serialized via Java serialization. Look-ups by message id walk the bucket
42+
* keys; for the volumes rqueue typically tracks (current in-flight + recent retry history) this
43+
* is acceptable for v1 — the Redis impl uses an explicit reverse index, that's a follow-up here.
44+
*
45+
* <p>The {@code expiry} on save / create is currently best-effort: the bucket is created on the
46+
* first call with the expiry as its TTL. Subsequent saves reuse the same bucket regardless of
47+
* the requested per-key expiry.
2748
*/
2849
@Repository
2950
@Conditional(NatsBackendCondition.class)
3051
public class NatsRqueueJobDao implements RqueueJobDao {
52+
53+
private static final Logger log = Logger.getLogger(NatsRqueueJobDao.class.getName());
54+
private static final String BUCKET_NAME = "rqueue-jobs";
55+
56+
private final Connection connection;
57+
private final KeyValueManagement kvm;
58+
private final AtomicReference<KeyValue> kvRef = new AtomicReference<>();
59+
60+
public NatsRqueueJobDao(Connection connection) throws IOException {
61+
this.connection = connection;
62+
this.kvm = connection.keyValueManagement();
63+
}
64+
65+
private KeyValue ensureBucket(Duration ttl) throws IOException, JetStreamApiException {
66+
KeyValue cached = kvRef.get();
67+
if (cached != null) {
68+
return cached;
69+
}
70+
synchronized (this) {
71+
cached = kvRef.get();
72+
if (cached != null) {
73+
return cached;
74+
}
75+
try {
76+
KeyValueStatus status = kvm.getStatus(BUCKET_NAME);
77+
if (status != null) {
78+
KeyValue kv = connection.keyValue(BUCKET_NAME);
79+
kvRef.set(kv);
80+
return kv;
81+
}
82+
} catch (JetStreamApiException missing) {
83+
// fall through
84+
}
85+
KeyValueConfiguration.Builder cfg = KeyValueConfiguration.builder().name(BUCKET_NAME);
86+
if (ttl != null && !ttl.isZero() && !ttl.isNegative()) {
87+
cfg.ttl(ttl);
88+
}
89+
kvm.create(cfg.build());
90+
KeyValue kv = connection.keyValue(BUCKET_NAME);
91+
kvRef.set(kv);
92+
return kv;
93+
}
94+
}
95+
3196
@Override
32-
public void createJob(RqueueJob rqueueJob, Duration expiry) {}
97+
public void createJob(RqueueJob rqueueJob, Duration expiry) {
98+
save(rqueueJob, expiry);
99+
}
33100

34101
@Override
35-
public void save(RqueueJob rqueueJob, Duration expiry) {}
102+
public void save(RqueueJob rqueueJob, Duration expiry) {
103+
try {
104+
KeyValue kv = ensureBucket(expiry);
105+
kv.put(sanitize(rqueueJob.getId()), serialize(rqueueJob));
106+
} catch (IOException | JetStreamApiException e) {
107+
log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e);
108+
}
109+
}
36110

37111
@Override
38112
public RqueueJob findById(String jobId) {
39-
return null;
113+
return loadByKey(sanitize(jobId));
40114
}
41115

42116
@Override
43117
public List<RqueueJob> findJobsByIdIn(Collection<String> jobIds) {
44-
return Collections.emptyList();
118+
List<RqueueJob> out = new ArrayList<>(jobIds.size());
119+
for (String id : jobIds) {
120+
RqueueJob j = findById(id);
121+
if (j != null) {
122+
out.add(j);
123+
}
124+
}
125+
return out;
45126
}
46127

47128
@Override
48-
public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
49-
return Collections.emptyList();
129+
public List<RqueueJob> finByMessageId(String messageId) {
130+
if (messageId == null) {
131+
return Collections.emptyList();
132+
}
133+
return scanForMessageIds(Collections.singletonList(messageId));
50134
}
51135

52136
@Override
53-
public List<RqueueJob> finByMessageId(String messageId) {
54-
return Collections.emptyList();
137+
public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
138+
if (messageIds == null || messageIds.isEmpty()) {
139+
return Collections.emptyList();
140+
}
141+
return scanForMessageIds(messageIds);
55142
}
56143

57144
@Override
58-
public void delete(String jobId) {}
145+
public void delete(String jobId) {
146+
try {
147+
KeyValue kv = ensureBucket(null);
148+
kv.delete(sanitize(jobId));
149+
} catch (IOException | JetStreamApiException e) {
150+
log.log(Level.WARNING, "delete job " + jobId + " failed", e);
151+
}
152+
}
153+
154+
// ---- helpers ----------------------------------------------------------
155+
156+
private RqueueJob loadByKey(String key) {
157+
try {
158+
KeyValue kv = ensureBucket(null);
159+
KeyValueEntry entry = kv.get(key);
160+
if (entry == null || entry.getValue() == null) {
161+
return null;
162+
}
163+
return deserialize(entry.getValue());
164+
} catch (IOException | JetStreamApiException e) {
165+
log.log(Level.WARNING, "loadByKey " + key + " failed", e);
166+
return null;
167+
}
168+
}
169+
170+
private List<RqueueJob> scanForMessageIds(Collection<String> messageIds) {
171+
try {
172+
KeyValue kv = ensureBucket(null);
173+
List<String> keys = new ArrayList<>(kv.keys());
174+
List<RqueueJob> out = new ArrayList<>();
175+
for (String k : keys) {
176+
RqueueJob j = loadByKey(k);
177+
if (j != null && messageIds.contains(j.getMessageId())) {
178+
out.add(j);
179+
}
180+
}
181+
return out;
182+
} catch (IOException | JetStreamApiException | InterruptedException e) {
183+
log.log(Level.WARNING, "scanForMessageIds failed", e);
184+
if (e instanceof InterruptedException) {
185+
Thread.currentThread().interrupt();
186+
}
187+
return Collections.emptyList();
188+
}
189+
}
190+
191+
private static byte[] serialize(RqueueJob job) throws IOException {
192+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
193+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
194+
oos.writeObject(job);
195+
}
196+
return baos.toByteArray();
197+
}
198+
199+
private static RqueueJob deserialize(byte[] bytes) {
200+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
201+
Object o = ois.readObject();
202+
return o instanceof RqueueJob ? (RqueueJob) o : null;
203+
} catch (IOException | ClassNotFoundException e) {
204+
log.log(Level.WARNING, "deserialize RqueueJob failed", e);
205+
return null;
206+
}
207+
}
208+
209+
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
210+
private static String sanitize(String key) {
211+
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
212+
}
59213
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats.dao;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertNotNull;
14+
import static org.junit.jupiter.api.Assertions.assertNull;
15+
import static org.junit.jupiter.api.Assertions.assertTrue;
16+
17+
import com.github.sonus21.rqueue.models.db.RqueueJob;
18+
import com.github.sonus21.rqueue.models.enums.JobStatus;
19+
import com.github.sonus21.rqueue.nats.AbstractJetStreamIT;
20+
import io.nats.client.JetStreamApiException;
21+
import io.nats.client.KeyValueManagement;
22+
import java.io.IOException;
23+
import java.time.Duration;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Test;
28+
29+
/** Round-trip exercise for {@link NatsRqueueJobDao} against a real JetStream KV. */
30+
class NatsRqueueJobDaoIT extends AbstractJetStreamIT {
31+
32+
private NatsRqueueJobDao dao;
33+
34+
@BeforeEach
35+
void freshBucket() throws IOException, JetStreamApiException {
36+
KeyValueManagement kvm = connection.keyValueManagement();
37+
try {
38+
kvm.delete("rqueue-jobs");
39+
} catch (JetStreamApiException notFound) {
40+
// first run
41+
}
42+
dao = new NatsRqueueJobDao(connection);
43+
}
44+
45+
private RqueueJob job(String id, String messageId) {
46+
RqueueJob j = new RqueueJob();
47+
j.setId(id);
48+
j.setMessageId(messageId);
49+
j.setStatus(JobStatus.CREATED);
50+
j.setCreatedAt(System.currentTimeMillis());
51+
return j;
52+
}
53+
54+
@Test
55+
void saveAndFindById() {
56+
RqueueJob j = job("j1", "m1");
57+
dao.save(j, Duration.ofMinutes(1));
58+
59+
RqueueJob back = dao.findById("j1");
60+
assertNotNull(back);
61+
assertEquals("j1", back.getId());
62+
assertEquals("m1", back.getMessageId());
63+
assertEquals(JobStatus.CREATED, back.getStatus());
64+
}
65+
66+
@Test
67+
void findByIdMissingReturnsNull() {
68+
assertNull(dao.findById("never-saved"));
69+
}
70+
71+
@Test
72+
void findJobsByIdIn() {
73+
dao.save(job("a", "ma"), Duration.ofMinutes(1));
74+
dao.save(job("b", "mb"), Duration.ofMinutes(1));
75+
dao.save(job("c", "mc"), Duration.ofMinutes(1));
76+
List<RqueueJob> got = dao.findJobsByIdIn(Arrays.asList("a", "b", "missing"));
77+
assertEquals(2, got.size());
78+
}
79+
80+
@Test
81+
void findByMessageId() {
82+
dao.save(job("j1", "msg-X"), Duration.ofMinutes(1));
83+
dao.save(job("j2", "msg-X"), Duration.ofMinutes(1)); // same message, different attempt
84+
dao.save(job("j3", "msg-Y"), Duration.ofMinutes(1));
85+
List<RqueueJob> jobsForX = dao.finByMessageId("msg-X");
86+
assertEquals(2, jobsForX.size());
87+
assertTrue(jobsForX.stream().allMatch(j -> "msg-X".equals(j.getMessageId())));
88+
}
89+
90+
@Test
91+
void findByMessageIdIn() {
92+
dao.save(job("j1", "m1"), Duration.ofMinutes(1));
93+
dao.save(job("j2", "m2"), Duration.ofMinutes(1));
94+
dao.save(job("j3", "m3"), Duration.ofMinutes(1));
95+
List<RqueueJob> got = dao.finByMessageIdIn(Arrays.asList("m1", "m3"));
96+
assertEquals(2, got.size());
97+
}
98+
99+
@Test
100+
void deleteRemovesEntry() {
101+
dao.save(job("to-delete", "mx"), Duration.ofMinutes(1));
102+
assertNotNull(dao.findById("to-delete"));
103+
dao.delete("to-delete");
104+
assertNull(dao.findById("to-delete"));
105+
}
106+
107+
@Test
108+
void sanitizationLetsIllegalIdsRoundTrip() {
109+
RqueueJob j = job("job#with$weird:chars", "msg-1");
110+
dao.save(j, Duration.ofMinutes(1));
111+
RqueueJob back = dao.findById("job#with$weird:chars");
112+
assertNotNull(back);
113+
assertEquals("msg-1", back.getMessageId());
114+
}
115+
}

0 commit comments

Comments
 (0)