Skip to content

Commit b94956d

Browse files
committed
Add unit tests for JetStreamMessageBroker subject derivation and dispatchers
Assisted-By: Claude Code
1 parent a0fffc3 commit b94956d

1 file changed

Lines changed: 216 additions & 0 deletions

File tree

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*/
10+
package com.github.sonus21.rqueue.nats;
11+
12+
import static java.nio.charset.StandardCharsets.UTF_8;
13+
import static org.junit.jupiter.api.Assertions.assertNotNull;
14+
import static org.junit.jupiter.api.Assertions.assertThrows;
15+
import static org.mockito.ArgumentMatchers.any;
16+
import static org.mockito.ArgumentMatchers.eq;
17+
import static org.mockito.Mockito.mock;
18+
import static org.mockito.Mockito.never;
19+
import static org.mockito.Mockito.times;
20+
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.when;
22+
23+
import com.github.sonus21.rqueue.core.RqueueMessage;
24+
import com.github.sonus21.rqueue.listener.QueueDetail;
25+
import io.nats.client.Connection;
26+
import io.nats.client.Dispatcher;
27+
import io.nats.client.JetStream;
28+
import io.nats.client.JetStreamApiException;
29+
import io.nats.client.JetStreamManagement;
30+
import io.nats.client.MessageHandler;
31+
import io.nats.client.api.PublishAck;
32+
import io.nats.client.api.StreamInfo;
33+
import io.nats.client.impl.Headers;
34+
import java.io.IOException;
35+
import java.util.concurrent.CompletableFuture;
36+
import org.junit.jupiter.api.Test;
37+
import reactor.test.StepVerifier;
38+
import tools.jackson.databind.ObjectMapper;
39+
40+
/**
41+
* Non-container unit tests for {@link JetStreamMessageBroker} that mock the underlying NATS
42+
* primitives. These tests target subject naming, pub/sub plumbing, and exception wrapping —
43+
* end-to-end JetStream behavior is covered by the Docker-gated ITs.
44+
*/
45+
@NatsUnitTest
46+
class JetStreamMessageBrokerUnitTest {
47+
48+
private static QueueDetail queueNamed(String name) {
49+
QueueDetail q = mock(QueueDetail.class);
50+
when(q.getName()).thenReturn(name);
51+
return q;
52+
}
53+
54+
/** Build a broker with all NATS primitives mocked and stream provisioning short-circuited. */
55+
private static Fixture newFixture(RqueueNatsConfig config) {
56+
Connection conn = mock(Connection.class);
57+
JetStream js = mock(JetStream.class);
58+
JetStreamManagement jsm = mock(JetStreamManagement.class);
59+
StreamInfo info = mock(StreamInfo.class);
60+
try {
61+
// Pretend every stream already exists so provisioner returns early without addStream().
62+
when(jsm.getStreamInfo(any(String.class))).thenReturn(info);
63+
} catch (IOException | JetStreamApiException unreachable) {
64+
throw new AssertionError(unreachable);
65+
}
66+
JetStreamMessageBroker broker =
67+
new JetStreamMessageBroker(conn, js, jsm, config, new ObjectMapper());
68+
return new Fixture(conn, js, jsm, broker);
69+
}
70+
71+
@Test
72+
void enqueue_publishesToPrefixedSubject() throws Exception {
73+
Fixture f = newFixture(RqueueNatsConfig.defaults());
74+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
75+
.thenReturn(mock(PublishAck.class));
76+
f.broker.enqueue(queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build());
77+
verify(f.js, times(1)).publish(eq("rqueue.orders"), any(Headers.class), any(byte[].class));
78+
}
79+
80+
@Test
81+
void enqueueWithPriority_appendsPrioritySuffixToSubject() throws Exception {
82+
Fixture f = newFixture(RqueueNatsConfig.defaults());
83+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
84+
.thenReturn(mock(PublishAck.class));
85+
f.broker.enqueue(
86+
queueNamed("orders"), "high", RqueueMessage.builder().id("m1").message("hi").build());
87+
verify(f.js, times(1)).publish(eq("rqueue.orders.high"), any(Headers.class), any(byte[].class));
88+
}
89+
90+
@Test
91+
void enqueueWithEmptyPriority_fallsBackToUnsuffixedSubject() throws Exception {
92+
Fixture f = newFixture(RqueueNatsConfig.defaults());
93+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
94+
.thenReturn(mock(PublishAck.class));
95+
f.broker.enqueue(
96+
queueNamed("orders"), "", RqueueMessage.builder().id("m1").message("hi").build());
97+
verify(f.js, times(1)).publish(eq("rqueue.orders"), any(Headers.class), any(byte[].class));
98+
}
99+
100+
@Test
101+
void enqueue_honorsCustomSubjectPrefix() throws Exception {
102+
RqueueNatsConfig cfg = RqueueNatsConfig.defaults().setSubjectPrefix("custom.");
103+
Fixture f = newFixture(cfg);
104+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
105+
.thenReturn(mock(PublishAck.class));
106+
f.broker.enqueue(queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build());
107+
verify(f.js, times(1)).publish(eq("custom.orders"), any(Headers.class), any(byte[].class));
108+
}
109+
110+
@Test
111+
void enqueue_wrapsIoExceptionInRqueueNatsException() throws Exception {
112+
Fixture f = newFixture(RqueueNatsConfig.defaults());
113+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
114+
.thenThrow(new IOException("boom"));
115+
RqueueNatsException ex =
116+
assertThrows(
117+
RqueueNatsException.class,
118+
() ->
119+
f.broker.enqueue(
120+
queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build()));
121+
assertNotNull(ex.getCause());
122+
}
123+
124+
@Test
125+
void enqueue_wrapsJetStreamApiExceptionInRqueueNatsException() throws Exception {
126+
Fixture f = newFixture(RqueueNatsConfig.defaults());
127+
when(f.js.publish(any(String.class), any(Headers.class), any(byte[].class)))
128+
.thenThrow(mock(JetStreamApiException.class));
129+
assertThrows(
130+
RqueueNatsException.class,
131+
() ->
132+
f.broker.enqueue(
133+
queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build()));
134+
}
135+
136+
@Test
137+
void publish_writesUtf8BytesToConnection() {
138+
Fixture f = newFixture(RqueueNatsConfig.defaults());
139+
f.broker.publish("chan-1", "hello");
140+
verify(f.conn, times(1)).publish("chan-1", "hello".getBytes(UTF_8));
141+
}
142+
143+
@Test
144+
void subscribe_createsDispatcherAndSubscribesChannel_closeReleasesIt() throws Exception {
145+
Fixture f = newFixture(RqueueNatsConfig.defaults());
146+
Dispatcher d = mock(Dispatcher.class);
147+
when(f.conn.createDispatcher(any(MessageHandler.class))).thenReturn(d);
148+
when(d.subscribe(any(String.class))).thenReturn(d);
149+
150+
AutoCloseable closer = f.broker.subscribe("chan-1", payload -> {});
151+
verify(f.conn, times(1)).createDispatcher(any(MessageHandler.class));
152+
verify(d, times(1)).subscribe("chan-1");
153+
verify(f.conn, never()).closeDispatcher(any());
154+
155+
closer.close();
156+
verify(f.conn, times(1)).closeDispatcher(d);
157+
}
158+
159+
@Test
160+
void enqueueReactive_completesWhenPublishFutureCompletes() {
161+
Fixture f = newFixture(RqueueNatsConfig.defaults());
162+
PublishAck ack = mock(PublishAck.class);
163+
CompletableFuture<PublishAck> done = CompletableFuture.completedFuture(ack);
164+
when(f.js.publishAsync(any(String.class), any(Headers.class), any(byte[].class)))
165+
.thenReturn(done);
166+
167+
StepVerifier.create(
168+
f.broker.enqueueReactive(
169+
queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build()))
170+
.verifyComplete();
171+
verify(f.js, times(1)).publishAsync(eq("rqueue.orders"), any(Headers.class), any(byte[].class));
172+
}
173+
174+
@Test
175+
void enqueueReactive_wrapsAsyncFailureInRqueueNatsException() {
176+
Fixture f = newFixture(RqueueNatsConfig.defaults());
177+
CompletableFuture<PublishAck> failed = new CompletableFuture<>();
178+
failed.completeExceptionally(new IOException("network down"));
179+
when(f.js.publishAsync(any(String.class), any(Headers.class), any(byte[].class)))
180+
.thenReturn(failed);
181+
182+
StepVerifier.create(
183+
f.broker.enqueueReactive(
184+
queueNamed("orders"), RqueueMessage.builder().id("m1").message("hi").build()))
185+
.expectError(RqueueNatsException.class)
186+
.verify();
187+
}
188+
189+
@Test
190+
void enqueueWithDelayReactive_returnsErrorMonoOfUOE() {
191+
Fixture f = newFixture(RqueueNatsConfig.defaults());
192+
StepVerifier.create(
193+
f.broker.enqueueWithDelayReactive(
194+
queueNamed("orders"),
195+
RqueueMessage.builder().id("m1").message("hi").build(),
196+
100))
197+
.expectError(UnsupportedOperationException.class)
198+
.verify();
199+
}
200+
201+
// ---- helper -----------------------------------------------------------
202+
203+
private static final class Fixture {
204+
final Connection conn;
205+
final JetStream js;
206+
final JetStreamManagement jsm;
207+
final JetStreamMessageBroker broker;
208+
209+
Fixture(Connection conn, JetStream js, JetStreamManagement jsm, JetStreamMessageBroker broker) {
210+
this.conn = conn;
211+
this.js = js;
212+
this.jsm = jsm;
213+
this.broker = broker;
214+
}
215+
}
216+
}

0 commit comments

Comments
 (0)