Skip to content

Commit 3839668

Browse files
committed
Add MsgPack listener E2E coverage
1 parent d3e3e51 commit 3839668

2 files changed

Lines changed: 409 additions & 0 deletions

File tree

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Copyright (c) 2026 Sonu Kumar
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* You may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and limitations under the License.
14+
*
15+
*/
16+
package com.github.sonus21.rqueue.spring.boot.integration;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
import com.github.sonus21.rqueue.annotation.RqueueListener;
21+
import com.github.sonus21.rqueue.core.RqueueMessage;
22+
import com.github.sonus21.rqueue.core.RqueueMessageEnqueuer;
23+
import com.github.sonus21.rqueue.listener.RqueueMessageHeaders;
24+
import com.github.sonus21.rqueue.test.application.BaseApplication;
25+
import java.util.Objects;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
import org.junit.jupiter.api.Tag;
30+
import org.junit.jupiter.params.ParameterizedTest;
31+
import org.junit.jupiter.params.provider.EnumSource;
32+
import org.springframework.boot.WebApplicationType;
33+
import org.springframework.boot.autoconfigure.SpringBootApplication;
34+
import org.springframework.boot.builder.SpringApplicationBuilder;
35+
import org.springframework.boot.data.redis.autoconfigure.DataRedisAutoConfiguration;
36+
import org.springframework.boot.data.redis.autoconfigure.DataRedisReactiveAutoConfiguration;
37+
import org.springframework.context.ConfigurableApplicationContext;
38+
import org.springframework.context.annotation.Import;
39+
import org.springframework.messaging.Message;
40+
import org.springframework.messaging.handler.annotation.Header;
41+
import org.springframework.stereotype.Component;
42+
43+
@Tag("springBootIntegration")
44+
@Tag("integration")
45+
@Tag("springBoot")
46+
class MessagePackageListenerTest {
47+
48+
private static final String MESSAGE_PACKAGE_QUEUE = "message-package-listener-package";
49+
private static final String MESSAGE_QUEUE = "message-package-listener-message";
50+
private static final String NATS_STREAM_PREFIX = "rqueue-js-messagePackageListener-";
51+
private static final String NATS_SUBJECT_PREFIX = "rqueue.js.messagePackageListener.";
52+
private static final int REDIS_PORT = 8032;
53+
54+
@ParameterizedTest(name = "{0}")
55+
@EnumSource(BackendUnderTest.class)
56+
void listenerReceivesMsgPackPayloadInsideSpringMessage(BackendUnderTest backend)
57+
throws Exception {
58+
try (ConfigurableApplicationContext context = startContext(backend)) {
59+
TestListener listener = context.getBean(TestListener.class);
60+
ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message-package");
61+
62+
String messageId =
63+
context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_PACKAGE_QUEUE, payload);
64+
65+
assertThat(listener.messagePackageLatch.await(20, TimeUnit.SECONDS)).isTrue();
66+
assertThat(listener.messagePackage.get()).isNotNull();
67+
assertThat(listener.messagePackage.get().getPayload()).isEqualTo(payload);
68+
assertRqueueMessage(
69+
listener.messagePackageRqueueMessage.get(), messageId, MESSAGE_PACKAGE_QUEUE, payload);
70+
}
71+
}
72+
73+
@ParameterizedTest(name = "{0}")
74+
@EnumSource(BackendUnderTest.class)
75+
void listenerReceivesMsgPackPayload(BackendUnderTest backend) throws Exception {
76+
try (ConfigurableApplicationContext context = startContext(backend)) {
77+
TestListener listener = context.getBean(TestListener.class);
78+
ListenerPayload payload = new ListenerPayload(backend.name(), "msgpack-message");
79+
80+
String messageId =
81+
context.getBean(RqueueMessageEnqueuer.class).enqueue(MESSAGE_QUEUE, payload);
82+
83+
assertThat(listener.messageLatch.await(20, TimeUnit.SECONDS)).isTrue();
84+
assertThat(listener.message.get()).isEqualTo(payload);
85+
assertRqueueMessage(listener.messageRqueueMessage.get(), messageId, MESSAGE_QUEUE, payload);
86+
}
87+
}
88+
89+
private ConfigurableApplicationContext startContext(BackendUnderTest backend) {
90+
if (backend.isNats()) {
91+
AbstractNatsBootIT.startNats();
92+
AbstractNatsBootIT.deleteStreamsWithPrefix(NATS_STREAM_PREFIX);
93+
}
94+
return new SpringApplicationBuilder(backend.applicationClass())
95+
.web(WebApplicationType.NONE)
96+
.properties(backend.properties())
97+
.run();
98+
}
99+
100+
private static void assertRqueueMessage(
101+
RqueueMessage rqueueMessage,
102+
String messageId,
103+
String queueName,
104+
ListenerPayload expectedPayload) {
105+
assertThat(rqueueMessage).isNotNull();
106+
assertThat(rqueueMessage.getId()).isEqualTo(messageId);
107+
assertThat(rqueueMessage.getQueueName()).isEqualTo(queueName);
108+
assertThat(MsgPackMessageConverterProvider.isMsgPack(rqueueMessage.getMessage()))
109+
.isTrue();
110+
assertThat(MsgPackMessageConverterProvider.decode(rqueueMessage.getMessage()))
111+
.isEqualTo(expectedPayload);
112+
}
113+
114+
enum BackendUnderTest {
115+
REDIS(RedisTestApp.class, new String[] {
116+
"rqueue.backend=redis",
117+
"spring.data.redis.host=127.0.0.1",
118+
"spring.data.redis.port=" + REDIS_PORT,
119+
"mysql.db.name=MessagePackageListenerTestRedis",
120+
"use.system.redis=false"
121+
}),
122+
NATS(NatsTestApp.class, new String[] {});
123+
124+
private final Class<?> applicationClass;
125+
private final String[] properties;
126+
127+
BackendUnderTest(Class<?> applicationClass, String[] properties) {
128+
this.applicationClass = applicationClass;
129+
this.properties = properties;
130+
}
131+
132+
Class<?> applicationClass() {
133+
return applicationClass;
134+
}
135+
136+
String[] properties() {
137+
String[] common = new String[] {
138+
"rqueue.message.converter.provider.class="
139+
+ MsgPackMessageConverterProvider.class.getName(),
140+
};
141+
String[] backendProperties = isNats()
142+
? new String[] {
143+
"rqueue.backend=nats",
144+
"rqueue.nats.naming.stream-prefix=" + NATS_STREAM_PREFIX,
145+
"rqueue.nats.naming.subject-prefix=" + NATS_SUBJECT_PREFIX,
146+
"rqueue.nats.connection.url=" + AbstractNatsBootIT.activeNatsUrl()
147+
}
148+
: properties;
149+
String[] merged = new String[common.length + backendProperties.length];
150+
System.arraycopy(common, 0, merged, 0, common.length);
151+
System.arraycopy(backendProperties, 0, merged, common.length, backendProperties.length);
152+
return merged;
153+
}
154+
155+
boolean isNats() {
156+
return this == NATS;
157+
}
158+
}
159+
160+
@SpringBootApplication
161+
@Import(TestListener.class)
162+
static class RedisTestApp extends BaseApplication {}
163+
164+
@SpringBootApplication(
165+
exclude = {DataRedisAutoConfiguration.class, DataRedisReactiveAutoConfiguration.class})
166+
@Import(TestListener.class)
167+
static class NatsTestApp {}
168+
169+
@Component
170+
static class TestListener {
171+
172+
final CountDownLatch messagePackageLatch = new CountDownLatch(1);
173+
final CountDownLatch messageLatch = new CountDownLatch(1);
174+
final AtomicReference<Message<ListenerPayload>> messagePackage = new AtomicReference<>();
175+
final AtomicReference<RqueueMessage> messagePackageRqueueMessage = new AtomicReference<>();
176+
final AtomicReference<ListenerPayload> message = new AtomicReference<>();
177+
final AtomicReference<RqueueMessage> messageRqueueMessage = new AtomicReference<>();
178+
179+
@RqueueListener(value = MESSAGE_PACKAGE_QUEUE)
180+
void onMessagePackage(
181+
Message<ListenerPayload> message,
182+
@Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) {
183+
messagePackage.set(message);
184+
messagePackageRqueueMessage.set(rqueueMessage);
185+
messagePackageLatch.countDown();
186+
}
187+
188+
@RqueueListener(value = MESSAGE_QUEUE)
189+
void onMessage(
190+
ListenerPayload message,
191+
@Header(RqueueMessageHeaders.MESSAGE) RqueueMessage rqueueMessage) {
192+
this.message.set(message);
193+
messageRqueueMessage.set(rqueueMessage);
194+
messageLatch.countDown();
195+
}
196+
}
197+
198+
static class ListenerPayload {
199+
200+
private String backend;
201+
private String body;
202+
203+
ListenerPayload() {}
204+
205+
ListenerPayload(String backend, String body) {
206+
this.backend = backend;
207+
this.body = body;
208+
}
209+
210+
public String getBackend() {
211+
return backend;
212+
}
213+
214+
public void setBackend(String backend) {
215+
this.backend = backend;
216+
}
217+
218+
public String getBody() {
219+
return body;
220+
}
221+
222+
public void setBody(String body) {
223+
this.body = body;
224+
}
225+
226+
@Override
227+
public boolean equals(Object other) {
228+
if (this == other) {
229+
return true;
230+
}
231+
if (!(other instanceof ListenerPayload)) {
232+
return false;
233+
}
234+
ListenerPayload that = (ListenerPayload) other;
235+
return Objects.equals(backend, that.backend) && Objects.equals(body, that.body);
236+
}
237+
238+
@Override
239+
public int hashCode() {
240+
return Objects.hash(backend, body);
241+
}
242+
}
243+
}

0 commit comments

Comments
 (0)