Skip to content

Commit 9b1939d

Browse files
committed
Add E2E tests for priority queues and multi-listener fan-out on NATS
- NatsPriorityQueuesE2EIT: a single @RqueueListener with priority="high=10,low=1" consumes 5 high + 5 low messages enqueued via RqueueMessageEnqueuer.enqueueWithPriority and we assert per- priority counts come out exact. - NatsMultipleListenersOnSameQueueE2EIT: documents the desired fan-out semantics across two @RqueueListener methods on the same queue, but is @disabled because the default WorkQueue stream retention only allows a single filter-overlapping consumer; enable once the broker supports Limits/Interest retention per queue. Assisted-By: Claude Code
1 parent 3bd0f5d commit 9b1939d

2 files changed

Lines changed: 181 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.jupiter.api.Disabled;
28+
import org.junit.jupiter.api.Tag;
29+
import org.junit.jupiter.api.Test;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.boot.autoconfigure.SpringBootApplication;
32+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
33+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
34+
import org.springframework.boot.test.context.SpringBootTest;
35+
import org.springframework.stereotype.Component;
36+
37+
/**
38+
* Two {@code @RqueueListener} methods on the same queue should each receive every message
39+
* published to it (independent durable consumers, fan-out semantics). Currently disabled because
40+
* the default JetStream stream retention is {@code WorkQueue}, which permits only one
41+
* filter-overlapping consumer at a time and deletes a message after the first ack — so true
42+
* fan-out is not possible with the v1 broker defaults. To enable this test, the broker would
43+
* need to switch to {@code Limits} or {@code Interest} retention for queues with multiple
44+
* listeners.
45+
*/
46+
@SpringBootTest(
47+
classes = NatsMultipleListenersOnSameQueueE2EIT.TestApp.class,
48+
properties = {"rqueue.backend=nats"})
49+
@Tag("nats")
50+
@Disabled(
51+
"Default JetStream retention=WorkQueue prevents true fan-out across multiple consumers; "
52+
+ "enable once retention is configurable per queue or defaulted to Limits/Interest.")
53+
class NatsMultipleListenersOnSameQueueE2EIT extends AbstractNatsBootIT {
54+
55+
@Autowired RqueueMessageEnqueuer enqueuer;
56+
57+
@Autowired ListenerOne one;
58+
59+
@Autowired ListenerTwo two;
60+
61+
@Test
62+
void bothListenersReceiveAllMessages() throws Exception {
63+
for (int i = 0; i < 5; i++) {
64+
enqueuer.enqueue("multi", "fan-" + i);
65+
}
66+
assertThat(one.latch.await(20, TimeUnit.SECONDS)).isTrue();
67+
assertThat(two.latch.await(20, TimeUnit.SECONDS)).isTrue();
68+
assertThat(one.received).hasSize(5);
69+
assertThat(two.received).hasSize(5);
70+
}
71+
72+
@SpringBootApplication(
73+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
74+
static class TestApp {}
75+
76+
@Component
77+
static class ListenerOne {
78+
final CountDownLatch latch = new CountDownLatch(5);
79+
final List<String> received = Collections.synchronizedList(new ArrayList<>());
80+
81+
@RqueueListener(value = "multi")
82+
void onMessage(String payload) {
83+
received.add(payload);
84+
latch.countDown();
85+
}
86+
}
87+
88+
@Component
89+
static class ListenerTwo {
90+
final CountDownLatch latch = new CountDownLatch(5);
91+
final List<String> received = Collections.synchronizedList(new ArrayList<>());
92+
93+
@RqueueListener(value = "multi")
94+
void onMessage(String payload) {
95+
received.add(payload);
96+
latch.countDown();
97+
}
98+
}
99+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.boot.autoconfigure.SpringBootApplication;
31+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
32+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
33+
import org.springframework.boot.test.context.SpringBootTest;
34+
import org.springframework.stereotype.Component;
35+
36+
/**
37+
* Verifies queue-level priority on the NATS backend: a single listener with
38+
* {@code priority="high=10,low=1"} consumes from two internal sub-queues
39+
* ({@code pq_high} and {@code pq_low}) and the producer sends to each via
40+
* {@link RqueueMessageEnqueuer#enqueueWithPriority}. We assert all 10 messages are
41+
* received and that 5 messages with payload prefix "high-" and 5 with "low-" arrive.
42+
*/
43+
@SpringBootTest(
44+
classes = NatsPriorityQueuesE2EIT.TestApp.class,
45+
properties = {"rqueue.backend=nats"})
46+
@Tag("nats")
47+
class NatsPriorityQueuesE2EIT extends AbstractNatsBootIT {
48+
49+
@Autowired RqueueMessageEnqueuer enqueuer;
50+
51+
@Autowired PriorityListener listener;
52+
53+
@Test
54+
void messagesEnqueuedAtBothPrioritiesAreReceived() throws Exception {
55+
for (int i = 0; i < 5; i++) {
56+
enqueuer.enqueueWithPriority("pq", "high", "high-" + i);
57+
enqueuer.enqueueWithPriority("pq", "low", "low-" + i);
58+
}
59+
assertThat(listener.latch.await(30, TimeUnit.SECONDS)).isTrue();
60+
61+
long highCount = listener.received.stream().filter(s -> s.startsWith("high-")).count();
62+
long lowCount = listener.received.stream().filter(s -> s.startsWith("low-")).count();
63+
assertThat(highCount).isEqualTo(5);
64+
assertThat(lowCount).isEqualTo(5);
65+
}
66+
67+
@SpringBootApplication(
68+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
69+
static class TestApp {}
70+
71+
@Component
72+
static class PriorityListener {
73+
final CountDownLatch latch = new CountDownLatch(10);
74+
final List<String> received = Collections.synchronizedList(new ArrayList<>());
75+
76+
@RqueueListener(value = "pq", priority = "high=10,low=1")
77+
void onMessage(String payload) {
78+
received.add(payload);
79+
latch.countDown();
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)