Skip to content

Commit 3bd0f5d

Browse files
committed
Add E2E tests for concurrency, consumerName override, reactive enqueue
Adds three NATS-tagged Spring Boot end-to-end integration tests plus a shared AbstractNatsBootIT base class that lifts the Testcontainers + @DynamicPropertySource boilerplate. - NatsConcurrencyE2EIT: proves @RqueueListener(concurrency="3") yields >1 parallel handler invocations on JetStream pull subscribers. - NatsConsumerNameOverrideE2EIT: confirms an explicit consumerName attribute lands on the JetStream stream as a durable consumer with that exact name (queried via JetStreamManagement). - NatsReactiveEnqueueE2EIT: enqueues 5 messages via ReactiveRqueueMessageEnqueuer (Flux.merge) and verifies a sync listener receives all 5; requires rqueue.reactive.enabled=true. Assisted-By: Claude Code
1 parent b94956d commit 3bd0f5d

4 files changed

Lines changed: 293 additions & 0 deletions

File tree

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import org.springframework.test.context.DynamicPropertyRegistry;
19+
import org.springframework.test.context.DynamicPropertySource;
20+
import org.testcontainers.containers.GenericContainer;
21+
import org.testcontainers.containers.wait.strategy.Wait;
22+
import org.testcontainers.junit.jupiter.Container;
23+
import org.testcontainers.junit.jupiter.Testcontainers;
24+
import org.testcontainers.utility.DockerImageName;
25+
26+
/**
27+
* Common Testcontainers + dynamic-property boilerplate for NATS-backed end-to-end tests.
28+
*
29+
* <p>Subclasses declare their own {@code @SpringBootApplication} test config (typically excluding
30+
* Redis auto-config, see {@link NatsBackendEndToEndIT} for the reference pattern) and any
31+
* {@code @RqueueListener} beans they need. The container is lifecycle-managed by the
32+
* {@link Testcontainers} extension and shared across all tests in a single subclass.
33+
*/
34+
@Testcontainers(disabledWithoutDocker = true)
35+
abstract class AbstractNatsBootIT {
36+
37+
@Container
38+
static final GenericContainer<?> NATS = new GenericContainer<>(
39+
DockerImageName.parse("nats:2.10-alpine"))
40+
.withCommand("-js")
41+
.withExposedPorts(4222)
42+
.waitingFor(Wait.forLogMessage(".*Server is ready.*\\n", 1));
43+
44+
@DynamicPropertySource
45+
static void natsProps(DynamicPropertyRegistry r) {
46+
r.add(
47+
"rqueue.nats.connection.url",
48+
() -> "nats://" + NATS.getHost() + ":" + NATS.getMappedPort(4222));
49+
}
50+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.junit.jupiter.api.Tag;
26+
import org.junit.jupiter.api.Test;
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.boot.autoconfigure.SpringBootApplication;
29+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
30+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
31+
import org.springframework.boot.test.context.SpringBootTest;
32+
import org.springframework.stereotype.Component;
33+
34+
/**
35+
* End-to-end test confirming that {@code @RqueueListener(concurrency=...)} actually runs more
36+
* than one handler invocation in parallel against the NATS backend. We don't assert an exact
37+
* parallelism value because JetStream prefetch + thread scheduling makes that flaky; observing
38+
* any parallelism &gt; 1 is enough proof the concurrency knob is wired through to a pull
39+
* subscription with multiple poller threads.
40+
*/
41+
@SpringBootTest(
42+
classes = NatsConcurrencyE2EIT.TestApp.class,
43+
properties = {"rqueue.backend=nats"})
44+
@Tag("nats")
45+
class NatsConcurrencyE2EIT extends AbstractNatsBootIT {
46+
47+
@Autowired RqueueMessageEnqueuer enqueuer;
48+
49+
@Autowired ConcurrencyListener listener;
50+
51+
@Test
52+
void parallelInvocationsAreObserved() throws Exception {
53+
for (int i = 0; i < 30; i++) {
54+
enqueuer.enqueue("conc-e2e", "msg-" + i);
55+
}
56+
assertThat(listener.latch.await(45, TimeUnit.SECONDS)).isTrue();
57+
assertThat(listener.maxParallel.get())
58+
.as("at least 2 concurrent invocations should have been observed")
59+
.isGreaterThanOrEqualTo(2);
60+
}
61+
62+
@SpringBootApplication(
63+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
64+
static class TestApp {}
65+
66+
@Component
67+
static class ConcurrencyListener {
68+
final CountDownLatch latch = new CountDownLatch(30);
69+
final AtomicInteger active = new AtomicInteger();
70+
final AtomicInteger maxParallel = new AtomicInteger();
71+
72+
@RqueueListener(value = "conc-e2e", concurrency = "3")
73+
void onMessage(String payload) throws InterruptedException {
74+
int now = active.incrementAndGet();
75+
maxParallel.updateAndGet(curr -> Math.max(curr, now));
76+
try {
77+
Thread.sleep(200L);
78+
} finally {
79+
active.decrementAndGet();
80+
latch.countDown();
81+
}
82+
}
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
22+
import io.nats.client.JetStreamManagement;
23+
import io.nats.client.api.ConsumerInfo;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
import org.junit.jupiter.api.Tag;
27+
import org.junit.jupiter.api.Test;
28+
import org.springframework.beans.factory.annotation.Autowired;
29+
import org.springframework.boot.autoconfigure.SpringBootApplication;
30+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
31+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
32+
import org.springframework.boot.test.context.SpringBootTest;
33+
import org.springframework.stereotype.Component;
34+
35+
/**
36+
* Verifies that {@code @RqueueListener(consumerName="...")} causes the JetStream durable consumer
37+
* to be created with that exact name (rather than the auto-derived
38+
* {@code rqueue-<queue>-<bean>#<method>} form). After a message round-trips successfully, we
39+
* query the broker via {@link JetStreamManagement#getConsumerInfo} and assert the override is
40+
* present.
41+
*/
42+
@SpringBootTest(
43+
classes = NatsConsumerNameOverrideE2EIT.TestApp.class,
44+
properties = {"rqueue.backend=nats"})
45+
@Tag("nats")
46+
class NatsConsumerNameOverrideE2EIT extends AbstractNatsBootIT {
47+
48+
@Autowired RqueueMessageEnqueuer enqueuer;
49+
50+
@Autowired CustomConsumerListener listener;
51+
52+
@Autowired JetStreamManagement jsm;
53+
54+
@Test
55+
void overriddenConsumerNameIsRegisteredOnTheStream() throws Exception {
56+
enqueuer.enqueue("custom-consumer", "hello");
57+
assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue();
58+
59+
ConsumerInfo info = jsm.getConsumerInfo("rqueue-custom-consumer", "my-custom-consumer");
60+
assertThat(info).isNotNull();
61+
assertThat(info.getName()).isEqualTo("my-custom-consumer");
62+
}
63+
64+
@SpringBootApplication(
65+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
66+
static class TestApp {}
67+
68+
@Component
69+
static class CustomConsumerListener {
70+
final CountDownLatch latch = new CountDownLatch(1);
71+
72+
@RqueueListener(value = "custom-consumer", consumerName = "my-custom-consumer")
73+
void onMessage(String payload) {
74+
latch.countDown();
75+
}
76+
}
77+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (c) 2024-2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.ReactiveRqueueMessageEnqueuer;
22+
import java.time.Duration;
23+
import java.util.ArrayList;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
import org.junit.jupiter.api.Tag;
29+
import org.junit.jupiter.api.Test;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.boot.autoconfigure.SpringBootApplication;
32+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
33+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
34+
import org.springframework.boot.test.context.SpringBootTest;
35+
import org.springframework.stereotype.Component;
36+
import reactor.core.publisher.Flux;
37+
38+
/**
39+
* Verifies the reactive producer path on the NATS backend: enqueueing 5 messages via
40+
* {@link ReactiveRqueueMessageEnqueuer} (subscribed via {@link Flux#merge}) and confirming a
41+
* synchronous {@code @RqueueListener} on the same queue receives all 5.
42+
*/
43+
@SpringBootTest(
44+
classes = NatsReactiveEnqueueE2EIT.TestApp.class,
45+
properties = {"rqueue.backend=nats", "rqueue.reactive.enabled=true"})
46+
@Tag("nats")
47+
class NatsReactiveEnqueueE2EIT extends AbstractNatsBootIT {
48+
49+
@Autowired ReactiveRqueueMessageEnqueuer reactiveEnqueuer;
50+
51+
@Autowired ReactiveListener listener;
52+
53+
@Test
54+
void reactivelyEnqueuedMessagesAreReceivedByListener() throws Exception {
55+
List<reactor.core.publisher.Mono<String>> publishers = new ArrayList<>();
56+
for (int i = 0; i < 5; i++) {
57+
publishers.add(reactiveEnqueuer.enqueue("reactive-e2e", "rx-" + i));
58+
}
59+
List<String> ids = Flux.merge(publishers).collectList().block(Duration.ofSeconds(15));
60+
assertThat(ids).hasSize(5);
61+
62+
assertThat(listener.latch.await(20, TimeUnit.SECONDS)).isTrue();
63+
assertThat(listener.received)
64+
.containsExactlyInAnyOrder("rx-0", "rx-1", "rx-2", "rx-3", "rx-4");
65+
}
66+
67+
@SpringBootApplication(
68+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
69+
static class TestApp {}
70+
71+
@Component
72+
static class ReactiveListener {
73+
final CountDownLatch latch = new CountDownLatch(5);
74+
final List<String> received = Collections.synchronizedList(new ArrayList<>());
75+
76+
@RqueueListener(value = "reactive-e2e")
77+
void onMessage(String payload) {
78+
received.add(payload);
79+
latch.countDown();
80+
}
81+
}
82+
}

0 commit comments

Comments
 (0)