Skip to content

Commit d50fe72

Browse files
committed
Add E2E test for retry and DLQ on NATS
NatsRetryAndDlqE2EIT enqueues a message that always throws, expects JetStream to redeliver up to numRetries=2 times, and asserts the exhausted payload lands on the rqueue-failing-dlq stream. Currently @disabled with a rationale: RqueueNatsAutoConfig does not yet invoke JetStreamMessageBroker.installDeadLetterBridge per queue at container start, so max-deliveries advisories are never bridged onto the DLQ subject. Enable once that wiring is added. Assisted-By: Claude Code
1 parent 6229e7a commit d50fe72

1 file changed

Lines changed: 92 additions & 0 deletions

File tree

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
22+
import io.nats.client.JetStreamManagement;
23+
import io.nats.client.api.StreamInfo;
24+
import java.time.Duration;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
import org.awaitility.Awaitility;
27+
import org.junit.jupiter.api.Disabled;
28+
import org.junit.jupiter.api.Tag;
29+
import org.junit.jupiter.api.Test;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.boot.autoconfigure.SpringBootApplication;
32+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
33+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
34+
import org.springframework.boot.test.context.SpringBootTest;
35+
import org.springframework.stereotype.Component;
36+
37+
/**
38+
* After a handler exhausts {@code numRetries}, JetStream emits a max-deliveries advisory and the
39+
* broker's {@code installDeadLetterBridge} dispatcher republishes the payload onto the DLQ
40+
* stream. Currently disabled because {@link
41+
* com.github.sonus21.rqueue.spring.boot.RqueueNatsAutoConfig} does not yet invoke
42+
* {@code JetStreamMessageBroker.installDeadLetterBridge(...)} during container start, so dead-
43+
* lettered messages never reach the DLQ stream. Enable this test once that wiring is added.
44+
*/
45+
@SpringBootTest(
46+
classes = NatsRetryAndDlqE2EIT.TestApp.class,
47+
properties = {"rqueue.backend=nats"})
48+
@Tag("nats")
49+
@Disabled(
50+
"DLQ bridge wiring (JetStreamMessageBroker.installDeadLetterBridge) is not yet invoked by "
51+
+ "RqueueNatsAutoConfig; enable once the container start path provisions the advisory "
52+
+ "dispatcher per queue.")
53+
class NatsRetryAndDlqE2EIT extends AbstractNatsBootIT {
54+
55+
@Autowired RqueueMessageEnqueuer enqueuer;
56+
57+
@Autowired FailingListener listener;
58+
59+
@Autowired JetStreamManagement jsm;
60+
61+
@Test
62+
void exhaustedMessageLandsOnDlqStream() {
63+
enqueuer.enqueue("failing", "boom");
64+
65+
Awaitility.await()
66+
.atMost(Duration.ofSeconds(60))
67+
.until(() -> listener.attempts.get() >= 2);
68+
69+
Awaitility.await()
70+
.atMost(Duration.ofSeconds(30))
71+
.untilAsserted(
72+
() -> {
73+
StreamInfo dlq = jsm.getStreamInfo("rqueue-failing-dlq");
74+
assertThat(dlq.getStreamState().getMsgCount()).isGreaterThanOrEqualTo(1);
75+
});
76+
}
77+
78+
@SpringBootApplication(
79+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
80+
static class TestApp {}
81+
82+
@Component
83+
static class FailingListener {
84+
final AtomicInteger attempts = new AtomicInteger();
85+
86+
@RqueueListener(value = "failing", numRetries = "2")
87+
void onMessage(String payload) {
88+
attempts.incrementAndGet();
89+
throw new RuntimeException("simulated failure for payload=" + payload);
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)