Skip to content

Commit fdc3540

Browse files
committed
Implement NATS KV-backed RqueueMessageMetadataService
NatsRqueueMessageMetadataService now persists MessageMetadata in a JetStream KV bucket (rqueue-message-metadata), keyed by metadata id which the Redis impl computes via RqueueMessageUtils.getMessageMetaId (queue + message id). Entries are serialized via Java serialization. Implemented: - get / save / delete / deleteAll / findAll - getByMessageId composes the meta-id like Redis does. - deleteMessage marks the deleted flag + deletedOn timestamp without physically removing — matches Redis impl semantics. - getOrCreateMessageMetadata returns the cached entry when present or a fresh ENQUEUED one otherwise. - saveReactive wraps save in Mono.fromCallable. - readMessageMetadataForQueue + saveMessageMetadataForQueue + deleteQueueMessages walk the bucket scoped by sanitized queue prefix; rqueue's typical metadata volume is small enough that a linear scan is acceptable for v1, an explicit reverse index is a follow-up. - KV keys sanitized to [A-Za-z0-9_=.-]. NatsRqueueMessageMetadataServiceIT covers save+get, getByMessageId, delete, deleteMessage flag semantics, getOrCreate (existing + new), and findAll. All 8 pass against a real JetStream. Local sanity: full :rqueue-nats:test (-DincludeTags=nats) and the NatsBackendEndToEndIT both green. Assisted-By: Claude Code
1 parent c1d6547 commit fdc3540

2 files changed

Lines changed: 333 additions & 19 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java

Lines changed: 210 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,77 +12,268 @@
1212

1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.core.RqueueMessage;
15+
import com.github.sonus21.rqueue.core.support.RqueueMessageUtils;
1516
import com.github.sonus21.rqueue.models.db.MessageMetadata;
1617
import com.github.sonus21.rqueue.models.enums.MessageStatus;
1718
import com.github.sonus21.rqueue.web.service.RqueueMessageMetadataService;
19+
import io.nats.client.Connection;
20+
import io.nats.client.JetStreamApiException;
21+
import io.nats.client.KeyValue;
22+
import io.nats.client.KeyValueManagement;
23+
import io.nats.client.api.KeyValueConfiguration;
24+
import io.nats.client.api.KeyValueEntry;
25+
import io.nats.client.api.KeyValueStatus;
26+
import java.io.ByteArrayInputStream;
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
30+
import java.io.ObjectOutputStream;
1831
import java.time.Duration;
32+
import java.util.ArrayList;
1933
import java.util.Collection;
2034
import java.util.Collections;
2135
import java.util.List;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
import java.util.logging.Level;
38+
import java.util.logging.Logger;
2239
import org.springframework.context.annotation.Conditional;
40+
import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
2341
import org.springframework.stereotype.Service;
2442
import reactor.core.publisher.Mono;
2543

2644
/**
27-
* NATS-backend stub {@link RqueueMessageMetadataService} for non-Redis backends. Reads return null/empty;
28-
* writes are silently ignored. The producer happy path on NATS short-circuits {@code
29-
* BaseMessageSender.storeMessageMetadata} via the broker capability flag, so metadata writes
30-
* never reach this stub. Admin/dashboard read methods get an empty view.
45+
* NATS-backed {@link RqueueMessageMetadataService} using a JetStream KV bucket as the metadata
46+
* store. Entries are keyed by metadata id (which the Redis impl computes via
47+
* {@link RqueueMessageUtils#getMessageMetaId}) and serialized via Java serialization.
48+
*
49+
* <p>Per-queue read methods ({@link #readMessageMetadataForQueue}) walk the bucket; rqueue
50+
* normally tracks recent metadata only so the volume is acceptable for v1. The Redis impl keeps
51+
* a per-queue ZSET as an explicit reverse index — that's a follow-up here.
52+
*
53+
* <p>{@link #saveReactive} wraps the synchronous {@code save} in a {@code Mono}; rqueue's
54+
* reactive enqueue path is the only caller and short-circuits storeMessageMetadata when the
55+
* broker has {@code !usesPrimaryHandlerDispatch}, so this method is rarely hit in practice.
3156
*/
3257
@Service
3358
@Conditional(NatsBackendCondition.class)
3459
public class NatsRqueueMessageMetadataService implements RqueueMessageMetadataService {
3560

61+
private static final Logger log =
62+
Logger.getLogger(NatsRqueueMessageMetadataService.class.getName());
63+
private static final String BUCKET_NAME = "rqueue-message-metadata";
64+
65+
private final Connection connection;
66+
private final KeyValueManagement kvm;
67+
private final AtomicReference<KeyValue> kvRef = new AtomicReference<>();
68+
69+
public NatsRqueueMessageMetadataService(Connection connection) throws IOException {
70+
this.connection = connection;
71+
this.kvm = connection.keyValueManagement();
72+
}
73+
74+
private KeyValue ensureBucket() throws IOException, JetStreamApiException {
75+
KeyValue cached = kvRef.get();
76+
if (cached != null) {
77+
return cached;
78+
}
79+
synchronized (this) {
80+
cached = kvRef.get();
81+
if (cached != null) {
82+
return cached;
83+
}
84+
try {
85+
KeyValueStatus status = kvm.getStatus(BUCKET_NAME);
86+
if (status != null) {
87+
KeyValue kv = connection.keyValue(BUCKET_NAME);
88+
kvRef.set(kv);
89+
return kv;
90+
}
91+
} catch (JetStreamApiException missing) {
92+
// fall through
93+
}
94+
kvm.create(KeyValueConfiguration.builder().name(BUCKET_NAME).build());
95+
KeyValue kv = connection.keyValue(BUCKET_NAME);
96+
kvRef.set(kv);
97+
return kv;
98+
}
99+
}
100+
36101
@Override
37102
public MessageMetadata get(String id) {
38-
return null;
103+
return loadByKey(sanitize(id));
39104
}
40105

41106
@Override
42-
public void delete(String id) {}
107+
public void delete(String id) {
108+
try {
109+
KeyValue kv = ensureBucket();
110+
kv.delete(sanitize(id));
111+
} catch (IOException | JetStreamApiException e) {
112+
log.log(Level.WARNING, "delete metadata " + id + " failed", e);
113+
}
114+
}
43115

44116
@Override
45-
public void deleteAll(Collection<String> ids) {}
117+
public void deleteAll(Collection<String> ids) {
118+
for (String id : ids) {
119+
delete(id);
120+
}
121+
}
46122

47123
@Override
48124
public List<MessageMetadata> findAll(Collection<String> ids) {
49-
return Collections.emptyList();
125+
List<MessageMetadata> out = new ArrayList<>(ids.size());
126+
for (String id : ids) {
127+
MessageMetadata m = get(id);
128+
if (m != null) {
129+
out.add(m);
130+
}
131+
}
132+
return out;
50133
}
51134

52135
@Override
53-
public void save(MessageMetadata messageMetadata, Duration ttl, boolean checkUnique) {}
136+
public void save(MessageMetadata messageMetadata, Duration ttl, boolean checkUnique) {
137+
try {
138+
KeyValue kv = ensureBucket();
139+
kv.put(sanitize(messageMetadata.getId()), serialize(messageMetadata));
140+
} catch (IOException | JetStreamApiException e) {
141+
log.log(Level.WARNING, "save metadata " + messageMetadata.getId() + " failed", e);
142+
}
143+
}
54144

55145
@Override
56146
public MessageMetadata getByMessageId(String queueName, String messageId) {
57-
return null;
147+
return get(RqueueMessageUtils.getMessageMetaId(queueName, messageId));
58148
}
59149

60150
@Override
61151
public boolean deleteMessage(String queueName, String messageId, Duration ttl) {
62-
return false;
152+
String metaId = RqueueMessageUtils.getMessageMetaId(queueName, messageId);
153+
MessageMetadata m = get(metaId);
154+
if (m == null) {
155+
return false;
156+
}
157+
m.setDeleted(true);
158+
m.setDeletedOn(System.currentTimeMillis());
159+
save(m, ttl, false);
160+
return true;
63161
}
64162

65163
@Override
66164
public MessageMetadata getOrCreateMessageMetadata(RqueueMessage rqueueMessage) {
67-
return new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
165+
String metaId =
166+
RqueueMessageUtils.getMessageMetaId(rqueueMessage.getQueueName(), rqueueMessage.getId());
167+
MessageMetadata existing = get(metaId);
168+
return existing != null ? existing : new MessageMetadata(rqueueMessage, MessageStatus.ENQUEUED);
68169
}
69170

70171
@Override
71172
public Mono<Boolean> saveReactive(MessageMetadata m, Duration ttl, boolean checkUnique) {
72-
return Mono.just(Boolean.TRUE);
173+
return Mono.fromCallable(
174+
() -> {
175+
save(m, ttl, checkUnique);
176+
return Boolean.TRUE;
177+
});
73178
}
74179

75180
@Override
76-
public void deleteQueueMessages(String queueName, long before) {}
181+
public List<TypedTuple<MessageMetadata>> readMessageMetadataForQueue(
182+
String queueName, long start, long end) {
183+
// The Redis impl uses a ZSET sorted by createdAt for pagination. We don't have a sorted
184+
// index here, so this returns all metadata for the queue and the caller paginates.
185+
try {
186+
KeyValue kv = ensureBucket();
187+
List<String> keys = new ArrayList<>(kv.keys());
188+
List<TypedTuple<MessageMetadata>> out = new ArrayList<>();
189+
String prefix = sanitize(queueName);
190+
for (String k : keys) {
191+
if (!k.startsWith(prefix)) {
192+
continue;
193+
}
194+
MessageMetadata m = loadByKey(k);
195+
if (m != null) {
196+
out.add(TypedTuple.of(m, (double) m.getUpdatedOn()));
197+
}
198+
}
199+
return out;
200+
} catch (IOException | JetStreamApiException | InterruptedException e) {
201+
log.log(Level.WARNING, "readMessageMetadataForQueue " + queueName + " failed", e);
202+
if (e instanceof InterruptedException) {
203+
Thread.currentThread().interrupt();
204+
}
205+
return Collections.emptyList();
206+
}
207+
}
77208

78209
@Override
79210
public void saveMessageMetadataForQueue(
80-
String queueName, MessageMetadata messageMetadata, Long ttlInMillisecond) {}
211+
String queueName, MessageMetadata messageMetadata, Long ttlInMillisecond) {
212+
save(
213+
messageMetadata,
214+
ttlInMillisecond == null ? null : Duration.ofMillis(ttlInMillisecond),
215+
false);
216+
}
81217

82218
@Override
83-
public java.util.List<
84-
org.springframework.data.redis.core.ZSetOperations.TypedTuple<MessageMetadata>>
85-
readMessageMetadataForQueue(String queueName, long start, long end) {
86-
return Collections.emptyList();
219+
public void deleteQueueMessages(String queueName, long before) {
220+
try {
221+
KeyValue kv = ensureBucket();
222+
List<String> keys = new ArrayList<>(kv.keys());
223+
String prefix = sanitize(queueName);
224+
for (String k : keys) {
225+
if (!k.startsWith(prefix)) {
226+
continue;
227+
}
228+
MessageMetadata m = loadByKey(k);
229+
if (m != null && m.getUpdatedOn() < before) {
230+
kv.delete(k);
231+
}
232+
}
233+
} catch (IOException | JetStreamApiException | InterruptedException e) {
234+
log.log(Level.WARNING, "deleteQueueMessages " + queueName + " failed", e);
235+
if (e instanceof InterruptedException) {
236+
Thread.currentThread().interrupt();
237+
}
238+
}
239+
}
240+
241+
// ---- helpers ----------------------------------------------------------
242+
243+
private MessageMetadata loadByKey(String key) {
244+
try {
245+
KeyValue kv = ensureBucket();
246+
KeyValueEntry entry = kv.get(key);
247+
if (entry == null || entry.getValue() == null) {
248+
return null;
249+
}
250+
return deserialize(entry.getValue());
251+
} catch (IOException | JetStreamApiException e) {
252+
log.log(Level.WARNING, "loadByKey " + key + " failed", e);
253+
return null;
254+
}
255+
}
256+
257+
private static byte[] serialize(MessageMetadata m) throws IOException {
258+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
259+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
260+
oos.writeObject(m);
261+
}
262+
return baos.toByteArray();
263+
}
264+
265+
private static MessageMetadata deserialize(byte[] bytes) {
266+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
267+
Object o = ois.readObject();
268+
return o instanceof MessageMetadata ? (MessageMetadata) o : null;
269+
} catch (IOException | ClassNotFoundException e) {
270+
log.log(Level.WARNING, "deserialize MessageMetadata failed", e);
271+
return null;
272+
}
273+
}
274+
275+
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
276+
private static String sanitize(String key) {
277+
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
87278
}
88279
}

0 commit comments

Comments
 (0)