Skip to content

Commit 8c36fec

Browse files
committed
Rewrite OutgoingRabbitMQChannel to use SenderProcessor for graceful shutdown
- Fix RabbitMQMessageConverter to serialize complex objects as JSON - Re-enable disabled tests, replace Thread.sleep with Awaitility
1 parent 5269911 commit 8c36fec

8 files changed

Lines changed: 314 additions & 484 deletions

File tree

smallrye-reactive-messaging-rabbitmq-og/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@
9292
<scope>test</scope>
9393
</dependency>
9494

95+
<!-- Jackson Databind for JSON serialization of complex payloads -->
96+
<dependency>
97+
<groupId>com.fasterxml.jackson.core</groupId>
98+
<artifactId>jackson-databind</artifactId>
99+
<scope>test</scope>
100+
</dependency>
101+
95102
<dependency>
96103
<groupId>io.smallrye.reactive</groupId>
97104
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>

smallrye-reactive-messaging-rabbitmq-og/src/main/java/io/smallrye/reactive/messaging/rabbitmq/og/RabbitMQConnector.java

Lines changed: 2 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -203,38 +203,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
203203

204204
outgoings.add(channel);
205205

206-
// Wrap Reactive Streams Subscriber as Flow.Subscriber
207-
return new Flow.Subscriber<Message<?>>() {
208-
@Override
209-
public void onSubscribe(Flow.Subscription subscription) {
210-
channel.onSubscribe(new org.reactivestreams.Subscription() {
211-
@Override
212-
public void request(long n) {
213-
subscription.request(n);
214-
}
215-
216-
@Override
217-
public void cancel() {
218-
subscription.cancel();
219-
}
220-
});
221-
}
222-
223-
@Override
224-
public void onNext(Message<?> message) {
225-
channel.onNext(message);
226-
}
227-
228-
@Override
229-
public void onError(Throwable throwable) {
230-
channel.onError(throwable);
231-
}
232-
233-
@Override
234-
public void onComplete() {
235-
channel.onComplete();
236-
}
237-
};
206+
return channel.getSink();
238207
}
239208

240209
@Override
@@ -286,12 +255,7 @@ public void terminate(
286255

287256
// Clean up all outgoing channels
288257
for (io.smallrye.reactive.messaging.rabbitmq.og.internals.OutgoingRabbitMQChannel outgoing : outgoings) {
289-
try {
290-
outgoing.onComplete();
291-
} catch (Exception e) {
292-
// Log but continue cleanup
293-
e.printStackTrace();
294-
}
258+
outgoing.closeQuietly();
295259
}
296260
outgoings.clear();
297261

smallrye-reactive-messaging-rabbitmq-og/src/main/java/io/smallrye/reactive/messaging/rabbitmq/og/RabbitMQMessageConverter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
import com.rabbitmq.client.AMQP;
1212

13+
import io.vertx.core.json.JsonObject;
14+
1315
/**
1416
* Utility class for converting between Reactive Messaging Message and RabbitMQ message format.
1517
* Handles different payload types and content-type determination.
@@ -166,9 +168,8 @@ private static byte[] getBodyFromPayload(Object payload) {
166168
return payload.toString().getBytes(StandardCharsets.UTF_8);
167169
}
168170

169-
// For other types, convert to string representation
170-
// In a full implementation, you might want to use Jackson for JSON serialization
171-
return payload.toString().getBytes(StandardCharsets.UTF_8);
171+
// For complex objects, serialize to JSON
172+
return JsonObject.mapFrom(payload).encode().getBytes(StandardCharsets.UTF_8);
172173
}
173174

174175
/**

0 commit comments

Comments
 (0)