Skip to content

Commit 4b33d58

Browse files
committed
Fix NATS retry and DLQ routing
Three bugs prevented messages from retrying and reaching the DLQ: 1. Missing parkForRetry override: default nack() replays original payload bytes so failureCount embedded in RqueueMessage never accumulated across deliveries. Override acks the original and re-publishes the updated message (with incremented failureCount). Nats-Msg-Id is suffixed with the failure count to bypass the stream deduplication window. 2. Missing moveToDlq override: default re-enqueued to the source queue. Override acks the original and publishes to the targetQueue's NATS stream and subject (streamPrefix+targetQueue / subjectPrefix+targetQueue), the same naming convention used for every other queue. 3. autoCreateDlqStream defaulted to true; changed to false so advisory-bridge DLQ streams are opt-in rather than created automatically for every queue. Assisted-By: Claude Code
1 parent d0d72a7 commit 4b33d58

22 files changed

Lines changed: 596 additions & 340 deletions

File tree

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,38 @@ default boolean extendVisibilityTimeout(QueueDetail q, RqueueMessage m, long del
147147

148148
long size(QueueDetail q);
149149

150+
/**
151+
* Short label for the storage backend shown in the dashboard "Queue Storage Footprint" section
152+
* header (e.g. "Redis", "NATS"). Defaults to "Redis".
153+
*/
154+
default String storageKicker() {
155+
return "Redis";
156+
}
157+
158+
/**
159+
* One-line description for the storage backend shown below the footprint section heading.
160+
* Defaults to the Redis description.
161+
*/
162+
default String storageDescription() {
163+
return "Underlying Redis structures for the queues visible on this page.";
164+
}
165+
166+
/**
167+
* Display name for the primary storage unit backing the given queue's messages (pending,
168+
* in-flight, and completed). Returns {@code null} to fall back to the Redis key name.
169+
*/
170+
default String storageDisplayName(QueueDetail q) {
171+
return null;
172+
}
173+
174+
/**
175+
* Display name for the dead-letter storage unit of the given queue. Returns {@code null} to
176+
* fall back to the DLQ key name stored in {@code DeadLetterQueue}.
177+
*/
178+
default String dlqStorageDisplayName(QueueDetail q) {
179+
return null;
180+
}
181+
150182
AutoCloseable subscribe(String channel, Consumer<String> handler);
151183

152184
void publish(String channel, String payload);

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/RqueueNatsConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class RqueueNatsConfig {
3333

3434
private boolean autoCreateStreams = true;
3535
private boolean autoCreateConsumers = true;
36-
private boolean autoCreateDlqStream = true;
36+
private boolean autoCreateDlqStream = false;
3737

3838
private StreamDefaults streamDefaults = new StreamDefaults();
3939
private ConsumerDefaults consumerDefaults = new ConsumerDefaults();

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueJobDao.java

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,11 @@
1313
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1414
import com.github.sonus21.rqueue.dao.RqueueJobDao;
1515
import com.github.sonus21.rqueue.models.db.RqueueJob;
16+
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
1617
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
17-
import io.nats.client.Connection;
1818
import io.nats.client.JetStreamApiException;
1919
import io.nats.client.KeyValue;
20-
import io.nats.client.KeyValueManagement;
21-
import io.nats.client.api.KeyValueConfiguration;
2220
import io.nats.client.api.KeyValueEntry;
23-
import io.nats.client.api.KeyValueStatus;
2421
import java.io.ByteArrayInputStream;
2522
import java.io.ByteArrayOutputStream;
2623
import java.io.IOException;
@@ -31,7 +28,6 @@
3128
import java.util.Collection;
3229
import java.util.Collections;
3330
import java.util.List;
34-
import java.util.concurrent.atomic.AtomicReference;
3531
import java.util.logging.Level;
3632
import java.util.logging.Logger;
3733
import org.springframework.context.annotation.Conditional;
@@ -56,44 +52,14 @@ public class NatsRqueueJobDao implements RqueueJobDao {
5652
private static final Logger log = Logger.getLogger(NatsRqueueJobDao.class.getName());
5753
private static final String BUCKET_NAME = NatsKvBuckets.JOBS;
5854

59-
private final Connection connection;
60-
private final KeyValueManagement kvm;
61-
private final AtomicReference<KeyValue> kvRef = new AtomicReference<>();
55+
private final NatsProvisioner provisioner;
6256

63-
public NatsRqueueJobDao(Connection connection) throws IOException {
64-
this.connection = connection;
65-
this.kvm = connection.keyValueManagement();
57+
public NatsRqueueJobDao(NatsProvisioner provisioner) {
58+
this.provisioner = provisioner;
6659
}
6760

68-
private KeyValue ensureBucket(Duration ttl) throws IOException, JetStreamApiException {
69-
KeyValue cached = kvRef.get();
70-
if (cached != null) {
71-
return cached;
72-
}
73-
synchronized (this) {
74-
cached = kvRef.get();
75-
if (cached != null) {
76-
return cached;
77-
}
78-
try {
79-
KeyValueStatus status = kvm.getStatus(BUCKET_NAME);
80-
if (status != null) {
81-
KeyValue kv = connection.keyValue(BUCKET_NAME);
82-
kvRef.set(kv);
83-
return kv;
84-
}
85-
} catch (JetStreamApiException missing) {
86-
// fall through
87-
}
88-
KeyValueConfiguration.Builder cfg = KeyValueConfiguration.builder().name(BUCKET_NAME);
89-
if (ttl != null && !ttl.isZero() && !ttl.isNegative()) {
90-
cfg.ttl(ttl);
91-
}
92-
kvm.create(cfg.build());
93-
KeyValue kv = connection.keyValue(BUCKET_NAME);
94-
kvRef.set(kv);
95-
return kv;
96-
}
61+
private KeyValue kv(Duration ttl) throws IOException, JetStreamApiException {
62+
return provisioner.ensureKv(BUCKET_NAME, ttl);
9763
}
9864

9965
@Override
@@ -104,8 +70,7 @@ public void createJob(RqueueJob rqueueJob, Duration expiry) {
10470
@Override
10571
public void save(RqueueJob rqueueJob, Duration expiry) {
10672
try {
107-
KeyValue kv = ensureBucket(expiry);
108-
kv.put(sanitize(rqueueJob.getId()), serialize(rqueueJob));
73+
kv(expiry).put(sanitize(rqueueJob.getId()), serialize(rqueueJob));
10974
} catch (IOException | JetStreamApiException e) {
11075
log.log(Level.WARNING, "save job " + rqueueJob.getId() + " failed", e);
11176
}
@@ -147,8 +112,7 @@ public List<RqueueJob> finByMessageIdIn(List<String> messageIds) {
147112
@Override
148113
public void delete(String jobId) {
149114
try {
150-
KeyValue kv = ensureBucket(null);
151-
kv.delete(sanitize(jobId));
115+
kv(null).delete(sanitize(jobId));
152116
} catch (IOException | JetStreamApiException e) {
153117
log.log(Level.WARNING, "delete job " + jobId + " failed", e);
154118
}
@@ -158,8 +122,7 @@ public void delete(String jobId) {
158122

159123
private RqueueJob loadByKey(String key) {
160124
try {
161-
KeyValue kv = ensureBucket(null);
162-
KeyValueEntry entry = kv.get(key);
125+
KeyValueEntry entry = kv(null).get(key);
163126
if (entry == null || entry.getValue() == null) {
164127
return null;
165128
}
@@ -172,8 +135,7 @@ private RqueueJob loadByKey(String key) {
172135

173136
private List<RqueueJob> scanForMessageIds(Collection<String> messageIds) {
174137
try {
175-
KeyValue kv = ensureBucket(null);
176-
List<String> keys = new ArrayList<>(kv.keys());
138+
List<String> keys = new ArrayList<>(kv(null).keys());
177139
List<RqueueJob> out = new ArrayList<>();
178140
for (String k : keys) {
179141
RqueueJob j = loadByKey(k);

rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/dao/NatsRqueueQStatsDao.java

Lines changed: 99 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,133 @@
66
* You may obtain a copy of the License at
77
*
88
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
914
*/
1015

1116
package com.github.sonus21.rqueue.nats.dao;
1217

1318
import com.github.sonus21.rqueue.config.NatsBackendCondition;
1419
import com.github.sonus21.rqueue.dao.RqueueQStatsDao;
1520
import com.github.sonus21.rqueue.models.db.QueueStatistics;
21+
import com.github.sonus21.rqueue.nats.internal.NatsProvisioner;
22+
import com.github.sonus21.rqueue.nats.kv.NatsKvBuckets;
23+
import io.nats.client.JetStreamApiException;
24+
import io.nats.client.KeyValue;
25+
import io.nats.client.api.KeyValueEntry;
26+
import java.io.ByteArrayInputStream;
27+
import java.io.ByteArrayOutputStream;
28+
import java.io.IOException;
29+
import java.io.ObjectInputStream;
30+
import java.io.ObjectOutputStream;
31+
import java.util.ArrayList;
1632
import java.util.Collection;
17-
import java.util.Collections;
1833
import java.util.List;
34+
import java.util.logging.Level;
35+
import java.util.logging.Logger;
1936
import org.springframework.context.annotation.Conditional;
37+
import org.springframework.context.annotation.DependsOn;
2038
import org.springframework.stereotype.Repository;
2139

2240
/**
23-
* NATS-backend stub for {@link RqueueQStatsDao}. The Redis impl persists per-queue daily
24-
* aggregates as serialized {@link QueueStatistics} objects driving the dashboard charts; on
25-
* NATS we no-op writes and return empty reads in v1 so that
26-
* {@code RqueueJobMetricsAggregatorService} can boot without a missing-bean failure even
27-
* though the chart panel is empty.
41+
* NATS-backed {@link RqueueQStatsDao} using the {@code rqueue-queue-stats} JetStream KV bucket.
42+
*
43+
* <p>Each {@link QueueStatistics} entry is stored as a single KV record keyed by its
44+
* {@link QueueStatistics#getId()} (e.g. {@code __rq::q-stat::job-morgue}), serialized via Java
45+
* serialization to match the other NATS DAO implementations. The key is sanitized to the NATS KV
46+
* character set before storage.
2847
*
29-
* <p>Replace with a NATS-native impl (a dedicated {@code rqueue-queue-stats} KV bucket
30-
* mirroring the pattern of {@code NatsRqueueSystemConfigDao}) when chart support lands for
31-
* NATS.
48+
* <p>No bucket-level TTL is set: the aggregator service calls
49+
* {@link QueueStatistics#pruneStats} before each {@link #save}, so stale per-day entries inside
50+
* the object are trimmed to {@code rqueue.web.statistic.history.day} days automatically.
3251
*/
3352
@Repository
3453
@Conditional(NatsBackendCondition.class)
54+
@DependsOn("natsKvBucketValidator")
3555
public class NatsRqueueQStatsDao implements RqueueQStatsDao {
3656

57+
private static final Logger log = Logger.getLogger(NatsRqueueQStatsDao.class.getName());
58+
private static final String BUCKET_NAME = NatsKvBuckets.QUEUE_STATS;
59+
60+
private final NatsProvisioner provisioner;
61+
62+
public NatsRqueueQStatsDao(NatsProvisioner provisioner) {
63+
this.provisioner = provisioner;
64+
}
65+
66+
private KeyValue kv() throws IOException, JetStreamApiException {
67+
return provisioner.ensureKv(BUCKET_NAME, null);
68+
}
69+
3770
@Override
3871
public QueueStatistics findById(String id) {
39-
return null;
72+
if (id == null) {
73+
return null;
74+
}
75+
try {
76+
KeyValueEntry entry = kv().get(sanitize(id));
77+
if (entry == null || entry.getValue() == null) {
78+
return null;
79+
}
80+
return deserialize(entry.getValue());
81+
} catch (IOException | JetStreamApiException e) {
82+
log.log(Level.WARNING, "findById id=" + id + " failed", e);
83+
return null;
84+
}
4085
}
4186

4287
@Override
4388
public List<QueueStatistics> findAll(Collection<String> ids) {
44-
return Collections.emptyList();
89+
List<QueueStatistics> out = new ArrayList<>(ids.size());
90+
for (String id : ids) {
91+
QueueStatistics stat = findById(id);
92+
if (stat != null) {
93+
out.add(stat);
94+
}
95+
}
96+
return out;
4597
}
4698

4799
@Override
48100
public void save(QueueStatistics queueStatistics) {
49-
// intentionally no-op until a NATS-native chart store lands
101+
if (queueStatistics == null) {
102+
throw new IllegalArgumentException("queueStatistics cannot be null");
103+
}
104+
if (queueStatistics.getId() == null) {
105+
throw new IllegalArgumentException("id cannot be null: " + queueStatistics);
106+
}
107+
try {
108+
kv().put(sanitize(queueStatistics.getId()), serialize(queueStatistics));
109+
} catch (IOException | JetStreamApiException e) {
110+
log.log(Level.WARNING, "save id=" + queueStatistics.getId() + " failed", e);
111+
}
112+
}
113+
114+
// ---- helpers ----------------------------------------------------------
115+
116+
private static byte[] serialize(QueueStatistics stat) throws IOException {
117+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
118+
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
119+
oos.writeObject(stat);
120+
}
121+
return baos.toByteArray();
122+
}
123+
124+
private static QueueStatistics deserialize(byte[] bytes) {
125+
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
126+
Object o = ois.readObject();
127+
return o instanceof QueueStatistics ? (QueueStatistics) o : null;
128+
} catch (IOException | ClassNotFoundException e) {
129+
log.log(Level.WARNING, "deserialize QueueStatistics failed", e);
130+
return null;
131+
}
132+
}
133+
134+
/** KV keys allow {@code [A-Za-z0-9_=.-]} only. */
135+
private static String sanitize(String key) {
136+
return key == null ? "_" : key.replaceAll("[^A-Za-z0-9_=.-]", "_");
50137
}
51138
}

0 commit comments

Comments
 (0)