Skip to content

Commit fbf88ad

Browse files
authored
feat: Remove reactor-rabbitmq dependencie (#167)
* refactor: update RabbitMQ integration * feat: add Reactor RabbitMQ integration and related tests * refactor: replace volatile FluxSink with AtomicReference in ReactiveMessageSender * refactor: replace volatile fields with AtomicReference in message listeners * refactor: replace MyOutboundMessage with OutboundMessage in message handling * refactor: update CloudEventContextWriter to handle numeric values according to spec * refactor: simplify CloudEventDeserializer by consolidating data reading and extension handling * docs: update README to include Reactor RabbitMQ module and original library details * chore: delete starter async-rabbit-standalone * refactor: update resource specifications to use reactor.rabbitmq package * Revert "refactor: replace volatile FluxSink with AtomicReference in ReactiveMessageSender" This reverts commit e69f58c. * Revert "refactor: replace volatile fields with AtomicReference in message listeners" This reverts commit 2be7014 * refactor: replace AtomicBoolean with volatile boolean for published state * docs: update README to reflect removal of volatile fields and other enhancements * refactor: update imports to use reactor.rabbitmq specifications
1 parent 3a65e7a commit fbf88ad

96 files changed

Lines changed: 6198 additions & 358 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,39 @@
1-
![](https://github.com/reactive-commons/reactive-commons-java/workflows/reactive-commons-ci-cd/badge.svg)
2-
[![Reactor RabbitMQ](https://maven-badges.herokuapp.com/maven-central/org.reactivecommons/async-commons-rabbit-starter/badge.svg)](https://mvnrepository.com/artifact/org.reactivecommons/async-commons-rabbit-starter)
1+
![CI/CD](https://github.com/reactive-commons/reactive-commons-java/workflows/reactive-commons-ci-cd/badge.svg)
2+
[![Maven Central](https://img.shields.io/maven-central/v/org.reactivecommons/async-commons-rabbit-starter)](https://central.sonatype.com/artifact/org.reactivecommons/async-commons-rabbit-starter)
3+
34
# reactive-commons-java
5+
46
The purpose of reactive-commons is to provide a set of abstractions and implementations over different patterns and practices that make the foundation of a reactive microservices architecture.
57

6-
Docs: [https://bancolombia.github.io/reactive-commons-java/](https://bancolombia.github.io/reactive-commons-java)
8+
Even though the main purpose is to provide such abstractions in a mostly generic way, they would be of little use without a concrete implementation. So we provide implementations in a best-effort manner that aim to be easy to change, personalize, and extend.
9+
10+
The first approach to this work was to release simple abstractions and a corresponding implementation over asynchronous message-driven communication between microservices, built on top of Project Reactor and Spring Boot.
11+
12+
---
13+
14+
## Documentation
15+
16+
**Full documentation is available at:**
17+
18+
> ### 👉 [https://bancolombia.github.io/reactive-commons-java/](https://bancolombia.github.io/reactive-commons-java)
19+
20+
---
21+
22+
## Related
23+
24+
> - **Other projects:** [https://github.com/bancolombia](https://github.com/bancolombia)
25+
> - **Sponsored by:** [Bancolombia Tech](https://medium.com/bancolombia-tech)
26+
27+
---
728

8-
Other projects: https://github.com/bancolombia
29+
## Third-Party Code Credits
930

10-
Sponsor by: https://medium.com/bancolombia-tech
31+
This project includes source code internalized from the following open-source libraries:
1132

12-
Even though the main purpose is to provide such abstractions in a mostly generic way such abstractions would be of little use without a concrete implementation so we provide some implementations in a best effors maner that aim to be easy to change, personalize and extend.
33+
### reactor-rabbitmq
34+
- **Repository:** [https://github.com/spring-attic/reactor-rabbitmq](https://github.com/spring-attic/reactor-rabbitmq)
35+
- **License:** [Apache License 2.0](https://github.com/spring-attic/reactor-rabbitmq/blob/main/LICENSE)
1336

14-
The first approach to this work was to release a very simple abstractions and a corresponding implementation over asyncronous message driven communication between microservices build on top of project-reactor and spring boot.
37+
### CloudEvents JSON Jackson
38+
- **Repository:** [https://github.com/cloudevents/sdk-java](https://github.com/cloudevents/sdk-java)
39+
- **License:** [Apache License 2.0](https://github.com/cloudevents/sdk-java/blob/main/LICENSE)

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,110 @@ void serveQueryDelegateWithLambda() {
296296
.hasSize(1);
297297
}
298298

299+
@Test
300+
void handleCommandWithDomain() {
301+
SomeDomainCommandHandler<SomeDataClass> handler = new SomeDomainCommandHandler<>();
302+
registry.handleCommand(domain, name, handler, SomeDataClass.class);
303+
assertThat(registry.getCommandHandlers().get(domain))
304+
.anySatisfy(registered -> assertThat(registered)
305+
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
306+
.containsExactly(name, SomeDataClass.class)).hasSize(1);
307+
}
308+
309+
@Test
310+
void handleCloudEventCommandWithDomain() {
311+
SomeCloudCommandHandler handler = new SomeCloudCommandHandler();
312+
registry.handleCloudEventCommand(domain, name, handler);
313+
assertThat(registry.getCommandHandlers().get(domain))
314+
.anySatisfy(registered -> assertThat(registered)
315+
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
316+
.containsExactly(name, CloudEvent.class)).hasSize(1);
317+
}
318+
319+
@Test
320+
void handleRawCommandWithDomain() {
321+
SomeRawCommandEventHandler handler = new SomeRawCommandEventHandler();
322+
registry.handleRawCommand(domain, handler);
323+
assertThat(registry.getCommandHandlers().get(domain))
324+
.anySatisfy(registered -> assertThat(registered)
325+
.extracting(RegisteredCommandHandler::path, RegisteredCommandHandler::inputClass)
326+
.containsExactly("", RawMessage.class)).hasSize(1);
327+
}
328+
329+
@Test
330+
void listenDomainRawEvent() {
331+
SomeRawEventHandler handler = new SomeRawEventHandler();
332+
registry.listenDomainRawEvent(domain, name, handler);
333+
assertThat(registry.getDomainEventListeners().get(domain))
334+
.anySatisfy(registered -> assertThat(registered)
335+
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
336+
.containsExactly(name, RawMessage.class)).hasSize(1);
337+
}
338+
339+
@Test
340+
void listenNotificationEventWithDomain() {
341+
registry.listenNotificationEvent(domain, name, evt -> Mono.empty(), SomeDataClass.class);
342+
assertThat(registry.getEventNotificationListener().get(domain))
343+
.anySatisfy(listener -> assertThat(listener.path()).isEqualTo(name)).hasSize(1);
344+
}
345+
346+
@Test
347+
void listenNotificationCloudEventWithDomain() {
348+
registry.listenNotificationCloudEvent(domain, name, ce -> Mono.empty());
349+
assertThat(registry.getEventNotificationListener().get(domain))
350+
.anySatisfy(listener -> assertThat(listener)
351+
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
352+
.containsExactly(name, CloudEvent.class)).hasSize(1);
353+
}
354+
355+
@Test
356+
void listenNotificationRawEventWithDomain() {
357+
SomeRawEventHandler handler = new SomeRawEventHandler();
358+
registry.listenNotificationRawEvent(domain, name, handler);
359+
assertThat(registry.getEventNotificationListener().get(domain))
360+
.anySatisfy(registered -> assertThat(registered)
361+
.extracting(RegisteredEventListener::path, RegisteredEventListener::inputClass)
362+
.containsExactly(name, RawMessage.class)).hasSize(1);
363+
}
364+
365+
@Test
366+
void serveCloudEventQueryDelegate() {
367+
QueryHandlerDelegate<Void, CloudEvent> delegate = (from, ce) -> Mono.empty();
368+
registry.serveCloudEventQuery(name, delegate);
369+
assertThat(registry.getHandlers().get(DEFAULT_DOMAIN))
370+
.anySatisfy(registered -> assertThat(registered)
371+
.extracting(RegisteredQueryHandler::path, RegisteredQueryHandler::queryClass)
372+
.containsExactly(name, CloudEvent.class)).hasSize(1);
373+
}
374+
375+
@Test
376+
void listenQueueSimple() {
377+
registry.listenQueue("myQueue", msg -> Mono.empty());
378+
assertThat(registry.getQueueHandlers().get(DEFAULT_DOMAIN))
379+
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
380+
}
381+
382+
@Test
383+
void listenQueueWithDomain() {
384+
registry.listenQueue(domain, "myQueue", msg -> Mono.empty());
385+
assertThat(registry.getQueueHandlers().get(domain))
386+
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
387+
}
388+
389+
@Test
390+
void listenQueueWithTopology() {
391+
registry.listenQueue("myQueue", msg -> Mono.empty(), creator -> Mono.empty());
392+
assertThat(registry.getQueueHandlers().get(DEFAULT_DOMAIN))
393+
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
394+
}
395+
396+
@Test
397+
void listenQueueWithDomainAndTopology() {
398+
registry.listenQueue(domain, "myQueue", msg -> Mono.empty(), creator -> Mono.empty());
399+
assertThat(registry.getQueueHandlers().get(domain))
400+
.anySatisfy(q -> assertThat(q.queueName()).isEqualTo("myQueue")).hasSize(1);
401+
}
402+
299403
private static class SomeQueryHandlerDelegate implements QueryHandlerDelegate<Void, SomeDataClass> {
300404
@Override
301405
public Mono<Void> handle(From from, SomeDataClass message) {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.reactivecommons.async.commons;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.reactivecommons.async.api.handlers.CommandHandler;
5+
import org.reactivecommons.async.commons.communications.Message;
6+
import reactor.core.publisher.Mono;
7+
import reactor.test.StepVerifier;
8+
9+
import java.util.Map;
10+
11+
import static org.mockito.ArgumentMatchers.any;
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.when;
14+
15+
class CommandExecutorTest {
16+
17+
@SuppressWarnings("unchecked")
18+
@Test
19+
void executeCallsHandlerWithConvertedMessage() {
20+
CommandHandler<String> handler = mock(CommandHandler.class);
21+
when(handler.handle(any())).thenReturn(Mono.empty());
22+
23+
CommandExecutor<String> executor = new CommandExecutor<>(handler, msg -> "converted");
24+
StepVerifier.create(executor.execute(createMessage()))
25+
.verifyComplete();
26+
}
27+
28+
private Message createMessage() {
29+
return new Message() {
30+
@Override
31+
public String getType() { return "test"; }
32+
@Override
33+
public byte[] getBody() { return new byte[0]; }
34+
@Override
35+
public Properties getProperties() {
36+
return new Properties() {
37+
@Override
38+
public String getContentType() { return "application/json"; }
39+
@Override
40+
public long getContentLength() { return 0; }
41+
@Override
42+
public Map<String, Object> getHeaders() { return Map.of(); }
43+
};
44+
}
45+
};
46+
}
47+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package org.reactivecommons.async.commons;
2+
3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.builder.CloudEventBuilder;
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Test;
7+
import org.reactivecommons.api.domain.DomainEvent;
8+
import org.reactivecommons.api.domain.DomainEventBus;
9+
import org.reactivecommons.async.commons.communications.Message;
10+
import org.reactivecommons.async.commons.converters.MessageConverter;
11+
import org.reactivecommons.async.commons.exceptions.MessageConversionException;
12+
import reactor.core.publisher.Mono;
13+
14+
import java.net.URI;
15+
import java.util.Map;
16+
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.Mockito.*;
20+
21+
@SuppressWarnings("unchecked")
22+
class DLQDiscardNotifierTest {
23+
24+
DomainEventBus eventBus;
25+
MessageConverter messageConverter;
26+
DLQDiscardNotifier notifier;
27+
28+
@BeforeEach
29+
void setUp() {
30+
eventBus = mock(DomainEventBus.class);
31+
messageConverter = mock(MessageConverter.class);
32+
notifier = new DLQDiscardNotifier(eventBus, messageConverter);
33+
when(eventBus.emit(any(DomainEvent.class))).thenReturn(Mono.empty());
34+
when(eventBus.emit(any(CloudEvent.class))).thenReturn(Mono.empty());
35+
}
36+
37+
@Test
38+
void notifyDiscardWithCloudEvent() {
39+
var message = createMessage("body".getBytes(), "application/cloudevents+json");
40+
var cloudEvent = CloudEventBuilder.v1()
41+
.withId("123")
42+
.withType("test.type")
43+
.withSource(URI.create("/test"))
44+
.build();
45+
when(messageConverter.readCloudEvent(message)).thenReturn(cloudEvent);
46+
47+
notifier.notifyDiscard(message).block();
48+
49+
verify(eventBus).emit(argThat((CloudEvent ce) -> ce.getType().equals("test.type.dlq")));
50+
}
51+
52+
@Test
53+
void notifyDiscardWithCommandJson() {
54+
var message = createMessage(
55+
"{\"name\":\"myCmd\",\"commandId\":\"cid\",\"data\":{}}".getBytes(),
56+
"application/json");
57+
58+
// The notifier reads the message as JsonSkeleton via messageConverter.readValue
59+
// We mock readValue to throw for CloudEvent check (not cloud event), then succeed for skeleton
60+
when(messageConverter.readCloudEvent(message)).thenThrow(new RuntimeException("not a CE"));
61+
62+
// Since message content-type is not cloud event, it reads JsonSkeleton
63+
// We need to mock readValue for the private JsonSkeleton class
64+
// Instead, use doAnswer to return a command-like object
65+
doAnswer(inv -> {
66+
Class<?> cls = inv.getArgument(1);
67+
var mapper = new tools.jackson.databind.json.JsonMapper();
68+
return mapper.readValue(message.getBody(), cls);
69+
}).when(messageConverter).readValue(eq(message), any(Class.class));
70+
71+
notifier.notifyDiscard(message).block();
72+
73+
verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("myCmd.dlq")));
74+
}
75+
76+
@Test
77+
void notifyDiscardWithEventJson() {
78+
var message = createMessage(
79+
"{\"name\":\"myEvt\",\"eventId\":\"eid\",\"data\":{}}".getBytes(),
80+
"application/json");
81+
82+
doAnswer(inv -> {
83+
Class<?> cls = inv.getArgument(1);
84+
var mapper = new tools.jackson.databind.json.JsonMapper();
85+
return mapper.readValue(message.getBody(), cls);
86+
}).when(messageConverter).readValue(eq(message), any(Class.class));
87+
88+
notifier.notifyDiscard(message).block();
89+
90+
verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("myEvt.dlq")));
91+
}
92+
93+
@Test
94+
void notifyDiscardWithQueryJson() {
95+
var message = createMessage(
96+
"{\"resource\":\"query.test\",\"queryData\":{}}".getBytes(),
97+
"application/json");
98+
99+
doAnswer(inv -> {
100+
Class<?> cls = inv.getArgument(1);
101+
var mapper = new tools.jackson.databind.json.JsonMapper();
102+
return mapper.readValue(message.getBody(), cls);
103+
}).when(messageConverter).readValue(eq(message), any(Class.class));
104+
105+
notifier.notifyDiscard(message).block();
106+
107+
verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("query.test.dlq")));
108+
}
109+
110+
@Test
111+
void notifyDiscardWithUnreadableMessage() {
112+
var message = createMessage("garbage".getBytes(), "application/json");
113+
114+
when(messageConverter.readValue(eq(message), any(Class.class)))
115+
.thenThrow(new MessageConversionException("cannot read"));
116+
117+
notifier.notifyDiscard(message).block();
118+
119+
verify(eventBus).emit(argThat((DomainEvent<?> ev) -> ev.getName().equals("corruptData.dlq")));
120+
}
121+
122+
@Test
123+
void notifyDiscardHandlesEmitError() {
124+
var message = createMessage("body".getBytes(), "application/json");
125+
when(messageConverter.readValue(eq(message), any(Class.class)))
126+
.thenThrow(new MessageConversionException("bad"));
127+
when(eventBus.emit(any(DomainEvent.class))).thenReturn(Mono.error(new RuntimeException("bus error")));
128+
129+
// Should not throw, returns empty
130+
var result = notifier.notifyDiscard(message).block();
131+
assertThat(result).isNull();
132+
}
133+
134+
private Message createMessage(byte[] body, String contentType) {
135+
return new Message() {
136+
@Override
137+
public String getType() {
138+
return "test";
139+
}
140+
141+
@Override
142+
public byte[] getBody() {
143+
return body;
144+
}
145+
146+
@Override
147+
public Properties getProperties() {
148+
return new Properties() {
149+
@Override
150+
public String getContentType() {
151+
return contentType;
152+
}
153+
154+
@Override
155+
public long getContentLength() {
156+
return body.length;
157+
}
158+
159+
@Override
160+
public Map<String, Object> getHeaders() {
161+
return Map.of();
162+
}
163+
};
164+
}
165+
};
166+
}
167+
}

0 commit comments

Comments
 (0)