Skip to content

Commit 28fc887

Browse files
committed
feat: add QueueType-aware registerQueue, ObjectMapper constructor, and version bump
- RqueueEndpointManager: add registerQueue(name, QueueType, priorities) overload; old no-arg signature preserved as default method for backward compat - RqueueEndpointManagerImpl: implements new QueueType-aware registration - GenericMessageConverter: add constructor accepting a custom ObjectMapper - JetStreamMessageBrokerProducerOnlyIT: refactor to domain event POJOs - build.gradle: bump version to 4.0.0-SK1 Assisted-By: Claude Code
1 parent 2122d66 commit 28fc887

5 files changed

Lines changed: 143 additions & 63 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ ext {
8585

8686
subprojects {
8787
group = "com.github.sonus21"
88-
version = "4.0.0-LC"
88+
version = "4.0.0-SK1"
8989

9090
dependencies {
9191
// https://mvnrepository.com/artifact/org.springframework/spring-messaging

rqueue-core/src/main/java/com/github/sonus21/rqueue/converter/GenericMessageConverter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static org.springframework.util.Assert.notNull;
2020

21+
import com.github.sonus21.rqueue.serdes.RqJacksonSerDes;
22+
import com.github.sonus21.rqueue.serdes.RqJacksonTypeFactory;
2123
import com.github.sonus21.rqueue.serdes.RqueueSerDes;
2224
import com.github.sonus21.rqueue.serdes.RqueueTypeFactory;
2325
import com.github.sonus21.rqueue.serdes.SerializationUtils;
@@ -40,6 +42,7 @@
4042
import tools.jackson.core.JsonGenerator;
4143
import tools.jackson.core.JsonParser;
4244
import tools.jackson.databind.DeserializationContext;
45+
import tools.jackson.databind.ObjectMapper;
4346
import tools.jackson.databind.SerializationContext;
4447
import tools.jackson.databind.annotation.JsonDeserialize;
4548
import tools.jackson.databind.annotation.JsonSerialize;
@@ -61,6 +64,12 @@ public GenericMessageConverter() {
6164
new SmartMessageSerDes(SerializationUtils.getSerDes(), SerializationUtils.getTypeFactory());
6265
}
6366

67+
public GenericMessageConverter(ObjectMapper objectMapper) {
68+
notNull(objectMapper, "objectMapper cannot be null");
69+
this.smartMessageSerDes = new SmartMessageSerDes(new RqJacksonSerDes(objectMapper),
70+
new RqJacksonTypeFactory(objectMapper));
71+
}
72+
6473
public GenericMessageConverter(RqueueSerDes serDes, RqueueTypeFactory typeFactory) {
6574
notNull(serDes, "serDes cannot be null");
6675
notNull(typeFactory, "typeFactory cannot be null");

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueEndpointManager.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.github.sonus21.rqueue.core;
1818

19+
import com.github.sonus21.rqueue.enums.QueueType;
1920
import com.github.sonus21.rqueue.listener.QueueDetail;
2021
import com.github.sonus21.rqueue.utils.PriorityUtils;
2122
import java.util.List;
@@ -35,7 +36,19 @@ public interface RqueueEndpointManager {
3536
* @param name name of the queue
3637
* @param priorities list of priorities to be used while sending message on this queue.
3738
*/
38-
void registerQueue(String name, String... priorities);
39+
default void registerQueue(String name, String... priorities) {
40+
registerQueue(name, QueueType.QUEUE, priorities);
41+
}
42+
43+
44+
/**
45+
* Use this method to register any queue, that's only used for sending message.
46+
*
47+
* @param name name of the queue
48+
* @param type queue type
49+
* @param priorities list of priorities to be used while sending message on this queue.
50+
*/
51+
void registerQueue(String name, QueueType type, String... priorities);
3952

4053
/**
4154
* Check if a queue is registered.

rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueEndpointManagerImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.sonus21.rqueue.core.RqueueMessageIdGenerator;
2222
import com.github.sonus21.rqueue.core.RqueueMessageTemplate;
2323
import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao;
24+
import com.github.sonus21.rqueue.enums.QueueType;
2425
import com.github.sonus21.rqueue.exception.QueueDoesNotExist;
2526
import com.github.sonus21.rqueue.listener.QueueDetail;
2627
import com.github.sonus21.rqueue.models.db.QueueConfig;
@@ -64,8 +65,8 @@ public RqueueEndpointManagerImpl(
6465
}
6566

6667
@Override
67-
public void registerQueue(String name, String... priorities) {
68-
registerQueueInternal(name, priorities);
68+
public void registerQueue(String name, QueueType type, String... priorities) {
69+
registerQueueInternal(name, type, priorities);
6970
}
7071

7172
@Override

rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerProducerOnlyIT.java

Lines changed: 116 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -14,112 +14,169 @@
1414
import com.github.sonus21.rqueue.core.RqueueMessage;
1515
import com.github.sonus21.rqueue.listener.QueueDetail;
1616
import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker;
17+
import com.github.sonus21.rqueue.serdes.SerializationUtils;
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.UUID;
21+
import lombok.AllArgsConstructor;
22+
import lombok.Data;
23+
import lombok.NoArgsConstructor;
1724
import org.junit.jupiter.api.Test;
1825
import reactor.core.publisher.Flux;
1926
import reactor.test.StepVerifier;
27+
import tools.jackson.databind.ObjectMapper;
2028

2129
/**
22-
* End-to-end producer-only smoke test: the broker enqueues messages but never pops or acks them.
23-
* Covers plain enqueue, priority enqueue, and reactive enqueue — verifying that all variants land
24-
* in JetStream and are reflected by {@link JetStreamMessageBroker#size}.
30+
* End-to-end producer-only smoke test: the broker enqueues typed domain events but never pops or
31+
* acks them, mirroring the producer-only application mode where the process only publishes work.
32+
*
33+
* <p>Covers plain enqueue, priority enqueue, and reactive enqueue — verifying that all variants
34+
* land in JetStream and are reflected by {@link JetStreamMessageBroker#size}.
2535
*/
2636
@NatsIntegrationTest
2737
class JetStreamMessageBrokerProducerOnlyIT extends AbstractJetStreamIT {
2838

39+
private static final ObjectMapper MAPPER = SerializationUtils.objectMapper;
40+
41+
// ---- minimal domain events used as message payloads --------------------
42+
43+
@Data
44+
@NoArgsConstructor
45+
@AllArgsConstructor
46+
static class EmailEvent {
47+
private String id;
48+
private String to;
49+
private String subject;
50+
}
51+
52+
@Data
53+
@NoArgsConstructor
54+
@AllArgsConstructor
55+
static class JobEvent {
56+
private String id;
57+
private String type;
58+
}
59+
60+
@Data
61+
@NoArgsConstructor
62+
@AllArgsConstructor
63+
static class NotificationEvent {
64+
private String id;
65+
private String message;
66+
}
67+
68+
// ---- helpers -----------------------------------------------------------
69+
70+
private static String serialize(Object event) throws Exception {
71+
return MAPPER.writeValueAsString(event);
72+
}
73+
74+
private static RqueueMessage rqueueMessage(String id, Object event) throws Exception {
75+
return RqueueMessage.builder().id(id).message(serialize(event)).build();
76+
}
77+
78+
// ---- tests -------------------------------------------------------------
79+
2980
@Test
30-
void enqueue_messagesAccumulateInStream() throws Exception {
31-
QueueDetail q = mockQueue("po-plain-" + System.nanoTime());
81+
void enqueueEmailEvents_accumulateInStream() throws Exception {
82+
QueueDetail emailQueue = mockQueue("email-queue-" + System.nanoTime());
3283
try (JetStreamMessageBroker broker =
3384
JetStreamMessageBroker.builder().connection(connection).build()) {
34-
int count = 10;
85+
int count = 5;
3586
for (int i = 0; i < count; i++) {
36-
broker.enqueue(
37-
q, RqueueMessage.builder().id("m-" + i).message("payload-" + i).build());
87+
EmailEvent event = new EmailEvent(UUID.randomUUID().toString(),
88+
"user" + i + "@example.com", "Subject " + i);
89+
broker.enqueue(emailQueue, rqueueMessage("email-" + i, event));
3890
}
39-
assertEquals(count, broker.size(q), "all enqueued messages should be visible in the stream");
91+
assertEquals(count, broker.size(emailQueue),
92+
"all email events should be visible in the stream");
4093
}
4194
}
4295

4396
@Test
44-
void enqueueWithPriority_messagesAccumulateInPriorityStreams() throws Exception {
45-
QueueDetail q = mockQueue("po-prio-" + System.nanoTime());
97+
void enqueueJobEvents_accumulateInStream() throws Exception {
98+
QueueDetail jobQueue = mockQueue("job-queue-" + System.nanoTime());
4699
try (JetStreamMessageBroker broker =
47100
JetStreamMessageBroker.builder().connection(connection).build()) {
48-
String[] priorities = {"high", "low", "critical"};
49-
int perPriority = 5;
101+
String[] types = {"FULL_TIME", "PART_TIME", "CONTRACT"};
102+
for (int i = 0; i < types.length; i++) {
103+
JobEvent event = new JobEvent(UUID.randomUUID().toString(), types[i]);
104+
broker.enqueue(jobQueue, rqueueMessage("job-" + i, event));
105+
}
106+
assertEquals(types.length, broker.size(jobQueue),
107+
"all job events should be visible in the stream");
108+
}
109+
}
110+
111+
@Test
112+
void enqueueWithPriority_notificationEvents_accumulateInPriorityStreams() throws Exception {
113+
QueueDetail notifQueue = mockQueue("notif-queue-" + System.nanoTime());
114+
try (JetStreamMessageBroker broker =
115+
JetStreamMessageBroker.builder().connection(connection).build()) {
116+
String[] priorities = {"high", "low"};
117+
int perPriority = 4;
50118
for (String priority : priorities) {
51119
for (int i = 0; i < perPriority; i++) {
52-
broker.enqueue(
53-
q,
54-
priority,
55-
RqueueMessage.builder()
56-
.id(priority + "-m-" + i)
57-
.message("payload-" + i)
58-
.build());
120+
NotificationEvent event = new NotificationEvent(
121+
UUID.randomUUID().toString(), priority + "-notification-" + i);
122+
broker.enqueue(notifQueue, priority, rqueueMessage(priority + "-notif-" + i, event));
59123
}
60124
}
61-
// Each priority maps to its own JetStream stream; verify each independently.
62-
// subjectFor(q, priority) = prefix + q.getName() + "_" + priority, so size(pq) where
63-
// pq.getName() = q.getName() + "_" + priority resolves to the same stream.
64125
for (String priority : priorities) {
65-
QueueDetail pq = mockQueue(q.getName() + "_" + priority);
66-
assertEquals(
67-
perPriority,
68-
broker.size(pq),
69-
"priority=" + priority + " stream should hold " + perPriority + " messages");
126+
QueueDetail pq = mockQueue(notifQueue.getName() + "_" + priority);
127+
assertEquals(perPriority, broker.size(pq),
128+
"priority=" + priority + " stream should hold " + perPriority + " notification events");
70129
}
71130
}
72131
}
73132

74133
@Test
75-
void enqueueReactive_messagesAccumulateInStream() {
76-
QueueDetail q = mockQueue("po-reactive-" + System.nanoTime());
134+
void enqueueReactive_emailEvents_accumulateInStream() throws Exception {
135+
QueueDetail emailQueue = mockQueue("email-reactive-" + System.nanoTime());
77136
try (JetStreamMessageBroker broker =
78137
JetStreamMessageBroker.builder().connection(connection).build()) {
79-
int count = 8;
80-
Flux<Void> publishes = Flux.range(0, count)
81-
.flatMap(i -> broker.enqueueReactive(
82-
q,
83-
RqueueMessage.builder()
84-
.id("rm-" + i)
85-
.message("reactive-payload-" + i)
86-
.build()));
87-
138+
int count = 6;
139+
List<RqueueMessage> messages = new ArrayList<>();
140+
for (int i = 0; i < count; i++) {
141+
EmailEvent event = new EmailEvent(
142+
UUID.randomUUID().toString(), "user" + i + "@example.com", "RE: item " + i);
143+
messages.add(rqueueMessage("re-email-" + i, event));
144+
}
145+
Flux<Void> publishes = Flux.fromIterable(messages)
146+
.flatMap(m -> broker.enqueueReactive(emailQueue, m));
88147
StepVerifier.create(publishes).verifyComplete();
89-
90-
assertEquals(
91-
count, broker.size(q), "all reactively enqueued messages should be in the stream");
148+
assertEquals(count, broker.size(emailQueue),
149+
"all reactively enqueued email events should be in the stream");
92150
}
93151
}
94152

95153
@Test
96-
void mixedEnqueue_allVariantsLandInCorrectStreams() {
97-
String base = "po-mixed-" + System.nanoTime();
98-
QueueDetail mainQ = mockQueue(base);
99-
QueueDetail highQ = mockQueue(base + "_high");
154+
void mixedEvents_allVariantsLandInCorrectStreams() throws Exception {
155+
String base = "mixed-events-" + System.nanoTime();
156+
QueueDetail mainQueue = mockQueue(base);
157+
QueueDetail highQueue = mockQueue(base + "_high");
100158

101159
try (JetStreamMessageBroker broker =
102160
JetStreamMessageBroker.builder().connection(connection).build()) {
103161

104-
// 3 plain messages on the main queue
162+
// 3 email events on the main queue
105163
for (int i = 0; i < 3; i++) {
106-
broker.enqueue(
107-
mainQ, RqueueMessage.builder().id("plain-" + i).message("p" + i).build());
164+
EmailEvent email = new EmailEvent(UUID.randomUUID().toString(),
165+
"to" + i + "@example.com", "Hello " + i);
166+
broker.enqueue(mainQueue, rqueueMessage("email-" + i, email));
108167
}
109-
// 2 priority messages on the "high" sub-queue
168+
// 2 job events on the "high" priority sub-queue
110169
for (int i = 0; i < 2; i++) {
111-
broker.enqueue(
112-
mainQ,
113-
"high",
114-
RqueueMessage.builder().id("high-" + i).message("h" + i).build());
170+
JobEvent job = new JobEvent(UUID.randomUUID().toString(), "CONTRACT");
171+
broker.enqueue(mainQueue, "high", rqueueMessage("job-high-" + i, job));
115172
}
116-
// 1 reactive message on the main queue
117-
StepVerifier.create(broker.enqueueReactive(
118-
mainQ, RqueueMessage.builder().id("react-0").message("r0").build()))
173+
// 1 notification reactively on the main queue
174+
NotificationEvent notif = new NotificationEvent(UUID.randomUUID().toString(), "reactive notif");
175+
StepVerifier.create(broker.enqueueReactive(mainQueue, rqueueMessage("notif-0", notif)))
119176
.verifyComplete();
120177

121-
assertEquals(4L, broker.size(mainQ), "main stream: 3 plain + 1 reactive");
122-
assertEquals(2L, broker.size(highQ), "high-priority stream: 2 messages");
178+
assertEquals(4L, broker.size(mainQueue), "main stream: 3 email + 1 reactive notification");
179+
assertEquals(2L, broker.size(highQueue), "high-priority stream: 2 job events");
123180
}
124181
}
125182
}

0 commit comments

Comments
 (0)