Skip to content

Commit 406c8b1

Browse files
Codex/remove nats key tags (#301)
* Add global retry limit Assisted-By: Codex * Apply Palantir Java Format * Bump version to RC9 Assisted-By: Codex * Remove Redis key tags from NATS KV keys Assisted-By: Codex * Apply Palantir Java Format --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 5a90da0 commit 406c8b1

9 files changed

Lines changed: 99 additions & 53 deletions

File tree

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.github.sonus21.rqueue.models.db.RqueueJob;
1616
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
1717
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
18+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
1819
import io.nats.client.JetStreamApiException;
1920
import io.nats.client.KeyValue;
2021
import io.nats.client.api.KeyValueEntry;
@@ -69,15 +70,15 @@ public void createJob(RqueueJob rqueueJob, Duration expiry) {
6970
@Override
7071
public void save(RqueueJob rqueueJob, Duration expiry) {
7172
try {
72-
kv(expiry).put(sanitize(rqueueJob.getId()), serialize(rqueueJob));
73+
kv(expiry).put(NatsKvKeys.sanitize(rqueueJob.getId()), serialize(rqueueJob));
7374
} catch (IOException | JetStreamApiException e) {
7475
log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e);
7576
}
7677
}
7778

7879
@Override
7980
public RqueueJob findById(String jobId) {
80-
return loadByKey(sanitize(jobId));
81+
return loadByKey(NatsKvKeys.sanitize(jobId));
8182
}
8283

8384
@Override
@@ -111,7 +112,7 @@ public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
111112
@Override
112113
public void delete(String jobId) {
113114
try {
114-
kv(null).delete(sanitize(jobId));
115+
kv(null).delete(NatsKvKeys.sanitize(jobId));
115116
} catch (IOException | JetStreamApiException e) {
116117
log.log(Level.WARNING, "delete job " + jobId + " failed", e);
117118
}
@@ -164,9 +165,4 @@ private RqueueJob deserialize(byte[] bytes) {
164165
return null;
165166
}
166167
}
167-
168-
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
169-
private static String sanitize(String key) {
170-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
171-
}
172168
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.github.sonus21.rqueue.models.db.QueueStatistics;
2121
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
2222
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
23+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
2324
import io.nats.client.JetStreamApiException;
2425
import io.nats.client.KeyValue;
2526
import io.nats.client.api.KeyValueEntry;
@@ -72,7 +73,7 @@ public QueueStatistics findById(String id) {
7273
return null;
7374
}
7475
try {
75-
KeyValueEntry entry = kv().get(sanitize(id));
76+
KeyValueEntry entry = kv().get(NatsKvKeys.sanitize(id));
7677
if (entry == null || entry.getValue() == null) {
7778
return null;
7879
}
@@ -104,7 +105,7 @@ public void save(QueueStatistics queueStatistics) {
104105
throw new IllegalArgumentException("id cannot be null: " + queueStatistics);
105106
}
106107
try {
107-
kv().put(sanitize(queueStatistics.getId()), serialize(queueStatistics));
108+
kv().put(NatsKvKeys.sanitize(queueStatistics.getId()), serialize(queueStatistics));
108109
} catch (IOException | JetStreamApiException e) {
109110
log.log(Level.WARNING, "save id=" + queueStatistics.getId() + " failed", e);
110111
}
@@ -124,9 +125,4 @@ private QueueStatistics deserialize(byte[] bytes) {
124125
return null;
125126
}
126127
}
127-
128-
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
129-
private static String sanitize(String key) {
130-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
131-
}
132128
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueSystemConfigDao.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.github.sonus21.rqueue.models.db.QueueConfig;
1616
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
1717
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
18+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
1819
import io.nats.client.JetStreamApiException;
1920
import io.nats.client.KeyValue;
2021
import io.nats.client.api.KeyValueEntry;
@@ -74,7 +75,7 @@ public QueueConfig getConfigByName(String name, boolean cached) {
7475
return hit;
7576
}
7677
}
77-
QueueConfig loaded = loadByKey(sanitize(name));
78+
QueueConfig loaded = loadByKey(NatsKvKeys.sanitize(name));
7879
if (loaded != null) {
7980
cache.put(name, loaded);
8081
}
@@ -120,7 +121,7 @@ public List<QueueConfig> findAllQConfig(Collection<String> ids) {
120121
@Override
121122
public void saveQConfig(QueueConfig queueConfig) {
122123
try {
123-
kv().put(sanitize(queueConfig.getName()), serialize(queueConfig));
124+
kv().put(NatsKvKeys.sanitize(queueConfig.getName()), serialize(queueConfig));
124125
cache.put(queueConfig.getName(), queueConfig);
125126
} catch (IOException | JetStreamApiException e) {
126127
log.log(Level.WARNING, "saveQConfig " + queueConfig.getName() + " failed", e);
@@ -188,9 +189,4 @@ private QueueConfig deserialize(byte[] bytes) {
188189
return null;
189190
}
190191
}
191-
192-
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
193-
private static String sanitize(String key) {
194-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
195-
}
196192
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats.kv;
11+
12+
/** Utility methods for converting rqueue keys into NATS JetStream KV keys. */
13+
public final class NatsKvKeys {
14+
15+
private NatsKvKeys() {}
16+
17+
/**
18+
* NATS KV keys do not need Redis Cluster hash tags. Strip any {@code {name}} tag first, then
19+
* coerce the remaining key into the conservative KV key character set used by this module.
20+
*/
21+
public static String sanitize(String key) {
22+
if (key == null) {
23+
return "_";
24+
}
25+
String sanitized = key.replaceAll("\\{([^{}]*)}", "$1").replaceAll("[^A-Za-z0-9_=.-]", "_");
26+
return sanitized.isEmpty() ? "_" : sanitized;
27+
}
28+
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/lock/NatsRqueueLockManager.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1515
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
1616
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
17+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
1718
import io.nats.client.JetStreamApiException;
1819
import io.nats.client.KeyValue;
1920
import io.nats.client.api.KeyValueEntry;
@@ -57,7 +58,7 @@ public NatsRqueueLockManager(NatsProvisioner provisioner) {
5758
public boolean acquireLock(String lockKey, String lockValue, Duration duration) {
5859
try {
5960
KeyValue kv = provisioner.ensureKv(BUCKET_NAME, duration);
60-
kv.create(sanitize(lockKey), lockValue.getBytes(StandardCharsets.UTF_8));
61+
kv.create(NatsKvKeys.sanitize(lockKey), lockValue.getBytes(StandardCharsets.UTF_8));
6162
return true;
6263
} catch (JetStreamApiException existing) {
6364
// Most common path: key already exists; another holder owns the lock.
@@ -75,7 +76,7 @@ public boolean acquireLock(String lockKey, String lockValue, Duration duration)
7576
public boolean releaseLock(String lockKey, String lockValue) {
7677
try {
7778
KeyValue kv = provisioner.ensureKv(BUCKET_NAME, Duration.ofSeconds(60));
78-
String key = sanitize(lockKey);
79+
String key = NatsKvKeys.sanitize(lockKey);
7980
KeyValueEntry entry = kv.get(key);
8081
if (entry == null) {
8182
return false;
@@ -91,13 +92,4 @@ public boolean releaseLock(String lockKey, String lockValue) {
9192
return false;
9293
}
9394
}
94-
95-
/**
96-
* KV keys allow {@code [A-Za-z0-9_=.-]} only; coerce other characters to {@code _} so the
97-
* caller can pass arbitrary lock keys. {@code $} and {@code #} surface in queue/listener
98-
* names from inner classes and the legacy separator respectively.
99-
*/
100-
private static String sanitize(String key) {
101-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
102-
}
10395
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.github.sonus21.rqueue.models.enums.MessageStatus;
1818
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
1919
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
20+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
2021
import com.github.sonus21.rqueue.service.RqueueMessageMetadataService;
2122
import io.nats.client.JetStreamApiException;
2223
import io.nats.client.KeyValue;
@@ -72,13 +73,13 @@ private KeyValue kv() throws IOException, JetStreamApiException {
7273

7374
@Override
7475
public MessageMetadata get(String id) {
75-
return loadByKey(sanitize(id));
76+
return loadByKey(NatsKvKeys.sanitize(id));
7677
}
7778

7879
@Override
7980
public void delete(String id) {
8081
try {
81-
kv().delete(sanitize(id));
82+
kv().delete(NatsKvKeys.sanitize(id));
8283
} catch (IOException | JetStreamApiException e) {
8384
log.log(Level.WARNING, "delete metadata " + id + " failed", e);
8485
}
@@ -106,7 +107,7 @@ public List<MessageMetadata> findAll(Collection<String> ids) {
106107
@Override
107108
public void save(MessageMetadata messageMetadata, Duration ttl, boolean checkUnique) {
108109
try {
109-
kv().put(sanitize(messageMetadata.getId()), serialize(messageMetadata));
110+
kv().put(NatsKvKeys.sanitize(messageMetadata.getId()), serialize(messageMetadata));
110111
} catch (IOException | JetStreamApiException e) {
111112
log.log(Level.WARNING, "save metadata " + messageMetadata.getId() + " failed", e);
112113
}
@@ -160,7 +161,7 @@ public List<TypedTuple<MessageMetadata>> readMessageMetadataForQueue(
160161
try {
161162
List<String> keys = new ArrayList<>(kv().keys());
162163
List<TypedTuple<MessageMetadata>> out = new ArrayList<>();
163-
String prefix = sanitize(queueName);
164+
String prefix = NatsKvKeys.sanitize(queueName);
164165
for (String k : keys) {
165166
if (!k.startsWith(prefix)) {
166167
continue;
@@ -193,7 +194,7 @@ public void saveMessageMetadataForQueue(
193194
public void deleteQueueMessages(String queueName, long before) {
194195
try {
195196
List<String> keys = new ArrayList<>(kv().keys());
196-
String prefix = sanitize(queueName);
197+
String prefix = NatsKvKeys.sanitize(queueName);
197198
for (String k : keys) {
198199
if (!k.startsWith(prefix)) {
199200
continue;
@@ -238,9 +239,4 @@ private MessageMetadata deserialize(byte[] bytes) {
238239
return null;
239240
}
240241
}
241-
242-
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
243-
private static String sanitize(String key) {
244-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
245-
}
246242
}

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStore.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.github.sonus21.rqueue.models.registry.RqueueWorkerInfo;
2121
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
2222
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
23+
import com.github.sonus21.rqueue.nats.kv.NatsKvKeys;
2324
import com.github.sonus21.rqueue.worker.WorkerRegistryStore;
2425
import io.nats.client.JetStreamApiException;
2526
import io.nats.client.KeyValue;
@@ -86,7 +87,7 @@ public void putWorkerInfo(String workerKey, RqueueWorkerInfo info, Duration ttl)
8687
}
8788
try {
8889
KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
89-
kv.put(sanitize(workerKey), serialize(info));
90+
kv.put(NatsKvKeys.sanitize(workerKey), serialize(info));
9091
} catch (IOException | JetStreamApiException e) {
9192
log.log(Level.WARNING, "putWorkerInfo " + workerKey + " failed", e);
9293
}
@@ -96,7 +97,7 @@ public void putWorkerInfo(String workerKey, RqueueWorkerInfo info, Duration ttl)
9697
public void deleteWorkerInfo(String workerKey) {
9798
try {
9899
KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
99-
kv.delete(sanitize(workerKey));
100+
kv.delete(NatsKvKeys.sanitize(workerKey));
100101
} catch (IOException | JetStreamApiException e) {
101102
log.log(Level.WARNING, "deleteWorkerInfo " + workerKey + " failed", e);
102103
}
@@ -111,7 +112,7 @@ public Map<String, RqueueWorkerInfo> getWorkerInfos(Collection<String> workerKey
111112
try {
112113
KeyValue kv = provisioner.ensureKv(WORKER_BUCKET, workerBucketTtl);
113114
for (String key : workerKeys) {
114-
KeyValueEntry entry = kv.get(sanitize(key));
115+
KeyValueEntry entry = kv.get(NatsKvKeys.sanitize(key));
115116
if (entry == null || entry.getValue() == null) {
116117
continue;
117118
}
@@ -146,7 +147,7 @@ public Map<String, String> getQueueHeartbeats(String queueKey) {
146147
Map<String, String> out = new LinkedHashMap<>();
147148
try {
148149
KeyValue kv = provisioner.ensureKv(HEARTBEAT_BUCKET, heartbeatBucketTtl);
149-
String prefix = sanitize(queueKey) + SEP;
150+
String prefix = NatsKvKeys.sanitize(queueKey) + SEP;
150151
List<String> keys = new ArrayList<>(kv.keys());
151152
for (String k : keys) {
152153
if (!k.startsWith(prefix)) {
@@ -197,12 +198,7 @@ public void refreshQueueTtl(String queueKey, Duration ttl) {
197198
// ---- helpers ----------------------------------------------------------
198199

199200
private static String compositeKey(String queueKey, String workerId) {
200-
return sanitize(queueKey) + SEP + sanitize(workerId);
201-
}
202-
203-
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
204-
private static String sanitize(String key) {
205-
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
201+
return NatsKvKeys.sanitize(queueKey) + SEP + NatsKvKeys.sanitize(workerId);
206202
}
207203

208204
private byte[] serialize(RqueueWorkerInfo info) throws IOException {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats.kv;
11+
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
14+
import com.github.sonus21.rqueue.nats.NatsUnitTest;
15+
import org.junit.jupiter.api.Test;
16+
17+
@NatsUnitTest
18+
class NatsKvKeysTest {
19+
20+
@Test
21+
void sanitizeStripsRedisHashTagsBeforeReplacingUnsupportedCharacters() {
22+
assertEquals(
23+
"__rq__q-pollers__email-notification-dispatch-queue",
24+
NatsKvKeys.sanitize("__rq::q-pollers::{email-notification-dispatch-queue}"));
25+
}
26+
27+
@Test
28+
void sanitizeReplacesUnsupportedCharacters() {
29+
assertEquals("orders_2__w_1", NatsKvKeys.sanitize("orders$2::w#1"));
30+
}
31+
32+
@Test
33+
void sanitizeReturnsFallbackForNullOrEmptyResult() {
34+
assertEquals("_", NatsKvKeys.sanitize(null));
35+
assertEquals("_", NatsKvKeys.sanitize("{}"));
36+
}
37+
}

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/worker/NatsWorkerRegistryStoreTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,15 @@ void putQueueHeartbeat_sanitisesSpecialCharsInQueueKey()
176176
verify(heartbeatKv).put(eq("orders_2__w_1"), any(byte[].class));
177177
}
178178

179+
@Test
180+
void putQueueHeartbeat_stripsRedisHashTagsFromQueueKey()
181+
throws IOException, JetStreamApiException {
182+
String queueKey = "__rq::q-pollers::{email-notification-dispatch-queue}";
183+
store.putQueueHeartbeat(queueKey, "w1", "{}");
184+
verify(heartbeatKv)
185+
.put(eq("__rq__q-pollers__email-notification-dispatch-queue__w1"), any(byte[].class));
186+
}
187+
179188
@Test
180189
void putQueueHeartbeat_ioException_swallowed() throws IOException, JetStreamApiException {
181190
doThrow(new IOException("bucket gone")).when(heartbeatKv).put(anyString(), any(byte[].class));

0 commit comments

Comments
 (0)