Skip to content

Commit 2cb5bdf

Browse files
committed
Cherry pick #3242.
1 parent ee17b51 commit 2cb5bdf

24 files changed

Lines changed: 624 additions & 87 deletions

File tree

api/src/main/java/io/smallrye/reactive/messaging/json/JsonMapping.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.smallrye.reactive.messaging.json;
22

3+
import java.lang.reflect.Type;
4+
35
/**
46
* Interface to abstract json serialization to/from string.
57
*/
@@ -29,4 +31,16 @@ public interface JsonMapping {
2931
* @return object of requested class
3032
*/
3133
<T> T fromJson(String str, Class<T> type);
34+
35+
/**
36+
* Deserialize an object from it's JSON string representation.
37+
*
38+
* @param str JSON string
39+
* @param type type of object
40+
* @param <T> generic parametrization class
41+
* @return object of requested class
42+
*/
43+
default <T> T fromJson(String str, Type type) {
44+
throw new UnsupportedOperationException("Deserialization for a java.lang.reflect.Type is not supported");
45+
}
3246
}

documentation/src/main/docs/rabbitmq/receiving-messages-from-rabbitmq.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,11 @@ value of the RabbitMQ received message Envelope `content_type` and
7979

8080
| content_encoding | content_type | Type |
8181
|------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
82-
| *Value present* | *n/a* | `byte[]` |
82+
| *Value present* | *n/a* | `io.vertx.core.buffer.Buffer` |
8383
| *No value* | `text/plain` | `String` |
8484
| *No value* | `application/json` | a JSON element which can be a [`JsonArray`](https://vertx.io/docs/apidocs/io/vertx/core/json/JsonArray.html), [`JsonObject`](https://vertx.io/docs/apidocs/io/vertx/core/json/JsonObject.html), `String`, ... if the buffer contains an array, object, string,... |
85-
| *No value* | *Anything else* | `byte[]` |
85+
| *No value* | `application/json` | Anything else (if one of the `JsonMapping` modules like `smallrye-reactive-messaging-jackson`, `smallrye-reactive-messaging-jsonb` is present on the classpath) |
86+
| *No value* | *Anything else* | `io.vertx.core.buffer.Buffer` |
8687

8788
If you send objects with this RabbitMQ connector (outbound connector),
8889
they are encoded as JSON and sent with `content_type` set to

examples/rabbitmq-quickstart/src/main/java/acme/Receiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,9 @@ public void consume(JsonObject p) {
2020
System.out.println("received jsonobject price: " + price.price);
2121
}
2222

23+
@Incoming("from-rabbitmq-type")
24+
public void consume(Price price) {
25+
System.out.println("received price: " + price.price);
26+
}
27+
2328
}

examples/rabbitmq-quickstart/src/main/resources/META-INF/microprofile-config.properties

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,10 @@ mp.messaging.outgoing.to-rabbitmq-string-emitter.connector=smallrye-rabbitmq
3434
mp.messaging.outgoing.to-rabbitmq-string-emitter.exchange.name=amq.topic
3535

3636
mp.messaging.outgoing.to-rabbitmq-string-emitter.default-routing-key=stringexample1
37+
38+
# Incoming
39+
mp.messaging.incoming.from-rabbitmq-type.connector=smallrye-rabbitmq
40+
mp.messaging.incoming.from-rabbitmq-type.exchange.name=amq.topic
41+
42+
mp.messaging.incoming.from-rabbitmq-type.queue.name=my-queue3
43+
mp.messaging.incoming.from-rabbitmq-type.routing-keys=typeexample1

smallrye-reactive-messaging-jackson/src/main/java/io/smallrye/reactive/messaging/json/jackson/JacksonMapping.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.smallrye.reactive.messaging.json.jackson;
22

3+
import java.lang.reflect.Type;
4+
35
import jakarta.annotation.Priority;
46
import jakarta.enterprise.context.ApplicationScoped;
57
import jakarta.inject.Inject;
@@ -33,4 +35,13 @@ public <T> T fromJson(String str, Class<T> type) {
3335
throw new RuntimeException(e);
3436
}
3537
}
38+
39+
@Override
40+
public <T> T fromJson(String str, Type type) {
41+
try {
42+
return objectMapper.readValue(str, objectMapper.constructType(type));
43+
} catch (JsonProcessingException e) {
44+
throw new RuntimeException(e);
45+
}
46+
}
3647
}

smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/TestMapping.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.smallrye.reactive.messaging.jms;
22

3+
import java.lang.reflect.Type;
4+
35
import jakarta.enterprise.context.ApplicationScoped;
46
import jakarta.json.bind.Jsonb;
57
import jakarta.json.bind.JsonbBuilder;
@@ -20,4 +22,9 @@ public String toJson(Object object) {
2022
public <T> T fromJson(String str, Class<T> type) {
2123
return jsonb.fromJson(str, type);
2224
}
25+
26+
@Override
27+
public <T> T fromJson(String str, Type type) {
28+
return jsonb.fromJson(str, type);
29+
}
2330
}

smallrye-reactive-messaging-jsonb/src/main/java/io/smallrye/reactive/messaging/json/jsonb/JsonBMapping.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.smallrye.reactive.messaging.json.jsonb;
22

3+
import java.lang.reflect.Type;
4+
35
import jakarta.annotation.Priority;
46
import jakarta.enterprise.context.ApplicationScoped;
57
import jakarta.inject.Inject;
@@ -22,4 +24,9 @@ public String toJson(Object object) {
2224
public <T> T fromJson(String str, Class<T> type) {
2325
return jsonb.fromJson(str, type);
2426
}
27+
28+
@Override
29+
public <T> T fromJson(String str, Type type) {
30+
return jsonb.fromJson(str, type);
31+
}
2532
}

smallrye-reactive-messaging-rabbitmq/pom.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,23 @@
8181
<artifactId>toxiproxy</artifactId>
8282
<scope>test</scope>
8383
</dependency>
84+
85+
<dependency>
86+
<groupId>jakarta.json</groupId>
87+
<artifactId>jakarta.json-api</artifactId>
88+
<scope>test</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>jakarta.json.bind</groupId>
92+
<artifactId>jakarta.json.bind-api</artifactId>
93+
<scope>test</scope>
94+
</dependency>
95+
<dependency>
96+
<groupId>org.eclipse</groupId>
97+
<artifactId>yasson</artifactId>
98+
<version>${yasson.version}</version>
99+
<scope>test</scope>
100+
</dependency>
84101
</dependencies>
85102

86103
<build>

smallrye-reactive-messaging-rabbitmq/revapi.json

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,39 @@
2929
"new": "method <T> java.util.Optional<T> io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::getHeader(java.lang.String, java.lang.Class<T>)",
3030
"annotation": "@io.smallrye.common.constraint.Nullable",
3131
"justification": "Switch to the SmallRye Common Nullable annotation instead of abusing the one from Vert.x Codegen"
32-
}
33-
]
32+
},
33+
{
34+
"code": "java.method.visibilityIncreased",
35+
"old": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage)",
36+
"new": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage)",
37+
"oldVisibility": "package",
38+
"newVisibility": "public",
39+
"justification": "Incoming Metadata constructor visibility increased to public"
40+
},
41+
{
42+
"code": "java.method.visibilityIncreased",
43+
"old": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(com.rabbitmq.client.BasicProperties, com.rabbitmq.client.Envelope)",
44+
"new": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage, java.lang.String)",
45+
"oldVisibility": "package",
46+
"newVisibility": "public",
47+
"justification": "Incoming Metadata constructor visibility increased to public"
48+
}, {
49+
"code": "java.method.visibilityIncreased",
50+
"old": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage)",
51+
"new": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage)",
52+
"oldVisibility": "package",
53+
"newVisibility": "public",
54+
"justification": "Incoming Metadata constructor visibility increased to public"
55+
},
56+
{
57+
"code": "java.method.visibilityIncreased",
58+
"old": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(com.rabbitmq.client.BasicProperties, com.rabbitmq.client.Envelope)",
59+
"new": "method void io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMetadata::<init>(io.vertx.rabbitmq.RabbitMQMessage, java.lang.String)",
60+
"oldVisibility": "package",
61+
"newVisibility": "public",
62+
"justification": "Incoming Metadata constructor visibility increased to public"
63+
64+
}]
3465
}
3566
}, {
3667
"extension" : "revapi.reporter.json",

smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/IncomingRabbitMQMessage.java

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
2121
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
2222
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
23-
import io.vertx.core.buffer.Buffer;
2423
import io.vertx.mutiny.core.Context;
2524
import io.vertx.mutiny.rabbitmq.RabbitMQMessage;
2625

@@ -74,12 +73,20 @@ public IncomingRabbitMQMessage(RabbitMQMessage delegate, ClientHolder holder,
7473
this.holder = holder;
7574
this.context = holder.getContext();
7675
this.contentTypeOverride = contentTypeOverride;
77-
this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message);
76+
this.rabbitMQMetadata = new IncomingRabbitMQMetadata(this.message, contentTypeOverride);
7877
this.onNack = onNack;
7978
this.onAck = onAck;
8079
this.metadata = captureContextMetadata(rabbitMQMetadata);
80+
final String contentType = getEffectiveContentType().orElse(null);
81+
final String contentEncoding = msg.properties().getContentEncoding();
82+
if (contentEncoding != null) {
83+
// Just silence the warning if we have a binary message
84+
if (!HttpHeaderValues.APPLICATION_OCTET_STREAM.toString().equalsIgnoreCase(contentType)) {
85+
log.typeConversionFallback();
86+
}
87+
}
8188
//noinspection unchecked
82-
this.payload = (T) convertPayload(message);
89+
this.payload = (T) msg.body();
8390
}
8491

8592
@Override
@@ -164,39 +171,6 @@ public Metadata getMetadata() {
164171
return metadata;
165172
}
166173

167-
private Object convertPayload(io.vertx.rabbitmq.RabbitMQMessage msg) {
168-
// Neither of these are guaranteed to be non-null
169-
String contentType = msg.properties().getContentType();
170-
final String contentEncoding = msg.properties().getContentEncoding();
171-
final Buffer body = msg.body();
172-
173-
if (this.contentTypeOverride != null) {
174-
contentType = contentTypeOverride;
175-
}
176-
177-
// If there is a content encoding specified, we don't try to unwrap
178-
if (contentEncoding == null) {
179-
try {
180-
// Do our best with text and json
181-
if (HttpHeaderValues.APPLICATION_JSON.toString().equalsIgnoreCase(contentType)) {
182-
// This could be JsonArray, JsonObject, String etc. depending on buffer contents
183-
return body.toJson();
184-
} else if (HttpHeaderValues.TEXT_PLAIN.toString().equalsIgnoreCase(contentType)) {
185-
return body.toString();
186-
}
187-
} catch (Throwable t) {
188-
log.typeConversionFallback();
189-
}
190-
// Otherwise fall back to raw byte array
191-
} else {
192-
// Just silence the warning if we have a binary message
193-
if (!HttpHeaderValues.APPLICATION_OCTET_STREAM.toString().equalsIgnoreCase(contentType)) {
194-
log.typeConversionFallback();
195-
}
196-
}
197-
return body.getBytes();
198-
}
199-
200174
public Map<String, Object> getHeaders() {
201175
return rabbitMQMetadata.getHeaders();
202176
}
@@ -205,6 +179,10 @@ public Optional<String> getContentType() {
205179
return rabbitMQMetadata.getContentType();
206180
}
207181

182+
public Optional<String> getEffectiveContentType() {
183+
return Optional.ofNullable(contentTypeOverride).or(rabbitMQMetadata::getContentType);
184+
}
185+
208186
public Optional<String> getContentEncoding() {
209187
return rabbitMQMetadata.getContentEncoding();
210188
}

0 commit comments

Comments
 (0)