diff --git a/documentation/mkdocs.yml b/documentation/mkdocs.yml index cd79160370..399b859c97 100644 --- a/documentation/mkdocs.yml +++ b/documentation/mkdocs.yml @@ -55,6 +55,7 @@ nav: - amqp/amqp.md - 'Receiving messages': amqp/receiving-amqp-messages.md - 'Sending messages': amqp/sending-amqp-messages.md + - 'AMQP Request/Reply': amqp/request-reply.md - 'Health Checks': amqp/health.md - 'Client Customization': amqp/client-customization.md - 'Using RabbitMQ': amqp/rabbitmq.md diff --git a/documentation/src/main/docs/amqp/amqp.md b/documentation/src/main/docs/amqp/amqp.md index c7f27e118c..f50180dbec 100644 --- a/documentation/src/main/docs/amqp/amqp.md +++ b/documentation/src/main/docs/amqp/amqp.md @@ -9,6 +9,7 @@ With this connector, your application can: - receive messages from an AMQP Broker or Router. - send `Message` to an AMQP *address* +- implement [request-reply](request-reply.md) messaging patterns The AMQP connector is based on the [Vert.x AMQP Client](https://vertx.io/docs/vertx-amqp-client/java/). diff --git a/documentation/src/main/docs/amqp/rabbitmq.md b/documentation/src/main/docs/amqp/rabbitmq.md index 57eb09d7dd..2e7d8c5b59 100644 --- a/documentation/src/main/docs/amqp/rabbitmq.md +++ b/documentation/src/main/docs/amqp/rabbitmq.md @@ -1,12 +1,27 @@ # Using RabbitMQ -This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ -does not provide AMQP 1.0 by default, but there is a plugin for it. To -use RabbitMQ with this connector, enable and configure the [AMQP 1.0 -plugin](https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/v3.8.x/README.md). +RabbitMQ 4.0+ provides native AMQP 1.0 support, making it fully compatible with this connector. +For older versions (3.x), RabbitMQ implements AMQP 0.9.1 and requires the +[AMQP 1.0 plugin](https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/v3.8.x/README.md) to be enabled. -Despite the plugin, a few features won’t work with RabbitMQ. Thus, we -recommend the following configurations. +## Address format + +RabbitMQ 4.0+ uses the v2 address format with `/queues/` and `/exchanges/` prefixes. +When using this connector with RabbitMQ, prefix your addresses accordingly: + +``` properties +mp.messaging.incoming.prices.connector=smallrye-amqp +mp.messaging.incoming.prices.address=/queues/prices + +mp.messaging.outgoing.generated-price.connector=smallrye-amqp +mp.messaging.outgoing.generated-price.address=/queues/prices +mp.messaging.outgoing.generated-price.use-anonymous-sender=false +``` + +## Recommendations + +RabbitMQ support for AMQP 1.0 may differ from other AMQP 1.0 supporting brokers. +We recommend the following configurations. To receive messages from RabbitMQ: @@ -19,11 +34,13 @@ mp.messaging.incoming.prices.durable=false To send messages to RabbitMQ: -- set the destination `address` (anonymous sender are not supported) +- set the destination `address` (anonymous senders are not supported) +- set `use-anonymous-sender` to `false` ``` properties mp.messaging.outgoing.generated-price.connector=smallrye-amqp mp.messaging.outgoing.generated-price.address=prices +mp.messaging.outgoing.generated-price.use-anonymous-sender=false ``` It’s not possible to change the destination dynamically (using message @@ -31,4 +48,9 @@ metadata) when using RabbitMQ. The connector automatically detects that the broker does not support anonymous sender (See ). +## Request/Reply with RabbitMQ + +This connector supports the request-reply pattern with RabbitMQ. +See [AMQP Request/Reply](request-reply.md#using-with-rabbitmq) for details. + Alternatively, you can use the [RabbitMQ connector](../rabbitmq/rabbitmq.md). diff --git a/documentation/src/main/docs/amqp/request-reply.md b/documentation/src/main/docs/amqp/request-reply.md new file mode 100644 index 0000000000..763c701fae --- /dev/null +++ b/documentation/src/main/docs/amqp/request-reply.md @@ -0,0 +1,163 @@ +# AMQP Request/Reply + +!!!warning "Experimental" + AMQP Request Reply Emitter is an experimental feature. + +The AMQP [Request-Reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) pattern allows you to publish a message to an AMQP address and then await for a reply message that responds to the initial request. + +The `AmqpRequestReply` emitter implements the requestor (or the client) of the request-reply pattern for AMQP 1.0 outbound channels: + +``` java +{{ insert('amqp/outbound/AmqpRequestReplyEmitter.java') }} +``` + +The `request` method publishes the request message to the configured target address of the outgoing channel, +and listens on a reply address (by default, the channel name with `-reply` suffix) for a reply message. +When the reply is received the returned `Uni` is completed with the message payload. + +The request send operation generates a correlation id and sets the AMQP `message-id` property, +which it expects to be sent back in the reply message's `correlation-id` property. + +The replier (or the server) can be implemented using a Reactive Messaging processor: + +``` java +{{ insert('amqp/outbound/AmqpReplier.java') }} +``` + +When you need more control over the reply message, such as setting the reply-to address and correlation id explicitly, you can use the `Message` type: + +``` java +{{ insert('amqp/outbound/AmqpReplierWithMetadata.java') }} +``` + +Given the following configuration example: + +```properties +mp.messaging.outgoing.my-request.connector=smallrye-amqp +mp.messaging.outgoing.my-request.address=requests +mp.messaging.outgoing.my-request.reply.address=my-request-reply + +mp.messaging.incoming.request.connector=smallrye-amqp +mp.messaging.incoming.request.address=requests + +mp.messaging.outgoing.reply.connector=smallrye-amqp +mp.messaging.outgoing.reply.address=my-request-reply +mp.messaging.outgoing.reply.use-anonymous-sender=false +``` + +## Requesting with `Message` types + +Like the core Emitter's `send` methods, `request` method also can receive a `Message` type and return a message: + +``` java +{{ insert('amqp/outbound/AmqpRequestReplyMessageEmitter.java') }} +``` + +!!! note + The ingested reply type of the `AmqpRequestReply` is discovered at runtime, + in order to configure a `MessageConverter` to be applied on the incoming message before returning the `Uni` result. + +## Requesting multiple replies + +You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type. + +For example this can be used to aggregate multiple replies to a single request. + +``` java +{{ insert('amqp/outbound/AmqpRequestReplyMultiEmitter.java') }} +``` +Like the other `request` you can also request `Message` types. + +!!! note + The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will + fail. + +## Pending replies and reply timeout + +By default, the `Uni` returned from the `request` method is configured to fail with timeout exception if no reply is received after 5 seconds. +This timeout is configurable with the channel attribute `reply.timeout` (in milliseconds). + +```properties +mp.messaging.outgoing.my-request.reply.timeout=10000 +``` + +A snapshot of the list of pending replies is available through the `AmqpRequestReply#getPendingReplies` method. + +## Scaling Request/Reply + +If multiple requestor instances are configured on the same outgoing address, and the same reply address, +each requestor instance will receive replies of all instances. If an observed correlation id doesn't match +the id of any pending replies, the reply is simply discarded. +With the additional network traffic this allows scaling requestors, (and repliers) dynamically. + +## Correlation Ids + +The AMQP Request/Reply allows configuring the correlation id mechanism completely through a `CorrelationIdHandler` implementation. +The default handler is based on randomly generated UUID strings, set as the AMQP `message-id` on the request message. +The reply message is expected to carry the same value in the `correlation-id` property. + +The correlation id handler implementation can be configured using the `reply.correlation-id.handler` attribute. +The default configuration is `uuid`, +and an alternative `bytes` implementation can be used to generate random binary correlation ids. + +Custom handlers can be implemented by proposing a CDI-managed bean with `@Identifier` qualifier. + +## Reply Error Handling + +If the reply server produces an error, it can propagate the error back to the requestor, failing the returned `Uni`. + +If configured using the `reply.failure.handler` channel attribute, +the `ReplyFailureHandler` implementations are discovered through CDI, matching the `@Identifier` qualifier. + +A sample reply error handler can lookup application properties and return the error to be thrown by the reply: + +``` java +{{ insert('amqp/outbound/MyAmqpReplyFailureHandler.java') }} +``` + +`null` return value indicates that no error has been found in the reply message, and it can be delivered to the application. + +## Connection Sharing + +Multiple AMQP channels can share the same underlying connection when configured with the same `container-id`. +This reduces resource consumption and is particularly useful for request-reply patterns +where the sender and reply receiver can share a single connection. + +```properties +mp.messaging.outgoing.my-request.connector=smallrye-amqp +mp.messaging.outgoing.my-request.address=requests +mp.messaging.outgoing.my-request.container-id=my-connection +mp.messaging.outgoing.my-request.reply.address=replies + +mp.messaging.incoming.other-channel.connector=smallrye-amqp +mp.messaging.incoming.other-channel.address=other +mp.messaging.incoming.other-channel.container-id=my-connection +``` + +Both channels above will share the same AMQP connection because they use the same `container-id`. + +!!! note + Connection sharing requires all channels with the same `container-id` to have compatible connection settings (host, port, credentials, etc.). + If the settings differ, the connector will detect the conflict and raise an error. + +## Using with RabbitMQ + +RabbitMQ 4.0+ with native AMQP 1.0 support is compatible with the request-reply pattern. +When using RabbitMQ, remember to use `/queues/` prefixed addresses (v2 address format) +and set `use-anonymous-sender=false` as anonymous senders are not supported. + +```properties +mp.messaging.outgoing.my-request.connector=smallrye-amqp +mp.messaging.outgoing.my-request.address=/queues/requests +mp.messaging.outgoing.my-request.reply.address=/queues/replies +mp.messaging.outgoing.my-request.use-anonymous-sender=false + +mp.messaging.incoming.request.connector=smallrye-amqp +mp.messaging.incoming.request.address=/queues/requests + +mp.messaging.outgoing.reply.connector=smallrye-amqp +mp.messaging.outgoing.reply.address=/queues/replies +mp.messaging.outgoing.reply.use-anonymous-sender=false +``` + +See [Using RabbitMQ](rabbitmq.md) for more details on RabbitMQ-specific configuration. diff --git a/documentation/src/main/java/amqp/outbound/AmqpReplier.java b/documentation/src/main/java/amqp/outbound/AmqpReplier.java new file mode 100644 index 0000000000..7f5236abaa --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/AmqpReplier.java @@ -0,0 +1,20 @@ +package amqp.outbound; + +import java.util.Random; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +@ApplicationScoped +public class AmqpReplier { + + Random rand = new Random(); + + @Incoming("request") + @Outgoing("reply") + String handleRequest(String request) { + return request + "-" + rand.nextInt(100); + } +} diff --git a/documentation/src/main/java/amqp/outbound/AmqpReplierWithMetadata.java b/documentation/src/main/java/amqp/outbound/AmqpReplierWithMetadata.java new file mode 100644 index 0000000000..c798cd1361 --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/AmqpReplierWithMetadata.java @@ -0,0 +1,31 @@ +package amqp.outbound; + +import java.util.Random; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata; +import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata; + +@ApplicationScoped +public class AmqpReplierWithMetadata { + + Random rand = new Random(); + + @Incoming("request") + @Outgoing("reply") + Message handleRequest(Message request) { + IncomingAmqpMetadata incoming = request.getMetadata(IncomingAmqpMetadata.class) + .orElseThrow(); + OutgoingAmqpMetadata outMeta = OutgoingAmqpMetadata.builder() + .withAddress(incoming.getReplyTo()) + .withCorrelationId(incoming.getId()) + .build(); + String reply = request.getPayload() + "-" + rand.nextInt(100); + return request.withPayload(reply).addMetadata(outMeta); + } +} diff --git a/documentation/src/main/java/amqp/outbound/AmqpRequestReplyEmitter.java b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyEmitter.java new file mode 100644 index 0000000000..f1d5285c8d --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyEmitter.java @@ -0,0 +1,21 @@ +package amqp.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply; + +@ApplicationScoped +public class AmqpRequestReplyEmitter { + + @Inject + @Channel("my-request") + AmqpRequestReply quoteRequest; + + public Uni requestQuote(String request) { + return quoteRequest.request(request); + } +} diff --git a/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMessageEmitter.java b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMessageEmitter.java new file mode 100644 index 0000000000..de5f1bbaf3 --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMessageEmitter.java @@ -0,0 +1,22 @@ +package amqp.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply; + +@ApplicationScoped +public class AmqpRequestReplyMessageEmitter { + + @Inject + @Channel("my-request") + AmqpRequestReply quoteRequest; + + public Uni> requestMessage(String request) { + return quoteRequest.request(Message.of(request)); + } +} diff --git a/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMultiEmitter.java b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMultiEmitter.java new file mode 100644 index 0000000000..25d7766061 --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/AmqpRequestReplyMultiEmitter.java @@ -0,0 +1,21 @@ +package amqp.outbound; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply; + +@ApplicationScoped +public class AmqpRequestReplyMultiEmitter { + + @Inject + @Channel("my-request") + AmqpRequestReply quoteRequest; + + public Multi requestQuote(String request) { + return quoteRequest.requestMulti(request).select().first(5); + } +} diff --git a/documentation/src/main/java/amqp/outbound/MyAmqpReplyFailureHandler.java b/documentation/src/main/java/amqp/outbound/MyAmqpReplyFailureHandler.java new file mode 100644 index 0000000000..77b1d2d149 --- /dev/null +++ b/documentation/src/main/java/amqp/outbound/MyAmqpReplyFailureHandler.java @@ -0,0 +1,24 @@ +package amqp.outbound; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.reactive.messaging.amqp.AmqpMessage; +import io.smallrye.reactive.messaging.amqp.reply.ReplyFailureHandler; + +@ApplicationScoped +@Identifier("my-reply-error") +public class MyAmqpReplyFailureHandler implements ReplyFailureHandler { + + @Override + public Throwable handleReply(AmqpMessage replyMessage) { + var properties = replyMessage.getApplicationProperties(); + if (properties != null) { + String error = properties.getString("REPLY_ERROR"); + if (error != null) { + return new IllegalArgumentException(error); + } + } + return null; + } +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHelper.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHelper.java index a1056bccd7..5c6f486a17 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHelper.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHelper.java @@ -4,18 +4,19 @@ import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log; import static io.vertx.core.net.ClientOptionsBase.DEFAULT_METRICS_NAME; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Optional; +import java.util.UUID; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.literal.NamedLiteral; import io.smallrye.common.annotation.Identifier; -import io.smallrye.reactive.messaging.ClientCustomizer; import io.smallrye.reactive.messaging.providers.helpers.ConfigUtils; import io.smallrye.reactive.messaging.providers.i18n.ProviderLogging; import io.vertx.amqp.AmqpClientOptions; -import io.vertx.mutiny.amqp.AmqpClient; -import io.vertx.mutiny.core.Vertx; public class AmqpClientHelper { @@ -23,36 +24,6 @@ private AmqpClientHelper() { // avoid direct instantiation. } - static AmqpClient createClient(AmqpConnector connector, AmqpConnectorCommonConfiguration config, - Instance amqpClientOptions, - Instance> configCustomizers) { - Optional clientOptionsName = config.getClientOptionsName(); - AmqpClientOptions options; - if (clientOptionsName.isPresent()) { - options = createClientFromClientOptionsBean(amqpClientOptions, clientOptionsName.get(), config); - } else { - options = getOptionsFromChannel(config); - } - AmqpClientOptions clientOptions = ConfigUtils.customize(config.config(), configCustomizers, options); - AmqpClient client = AmqpClient.create(connector.getVertx(), clientOptions); - connector.addClient(client); - return client; - } - - static AmqpClient createClient(Vertx vertx, AmqpConnectorCommonConfiguration config, - Instance amqpClientOptions, - Instance> configCustomizers) { - Optional clientOptionsName = config.getClientOptionsName(); - AmqpClientOptions options; - if (clientOptionsName.isPresent()) { - options = createClientFromClientOptionsBean(amqpClientOptions, clientOptionsName.get(), config); - } else { - options = getOptionsFromChannel(config); - } - AmqpClientOptions clientOptions = ConfigUtils.customize(config.config(), configCustomizers, options); - return AmqpClient.create(vertx, clientOptions); - } - static AmqpClientOptions createClientFromClientOptionsBean(Instance instance, String optionsBeanName, AmqpConnectorCommonConfiguration config) { Instance options = instance.select(Identifier.Literal.of(optionsBeanName)); @@ -181,11 +152,43 @@ static AmqpClientOptions getOptionsFromChannel(AmqpConnectorCommonConfiguration .setReconnectInterval(reconnectInterval) .setConnectTimeout(connectTimeout); - options.setMetricsName("amqp|" + config.getChannel()); - config.getSniServerName().ifPresent(options::setSniServerName); config.getVirtualHost().ifPresent(options::setVirtualHost); return options; } + static AmqpClientOptions buildClientOptions(AmqpConnector connector, AmqpConnectorCommonConfiguration config) { + Optional clientOptionsName = config.getClientOptionsName(); + AmqpClientOptions options; + if (clientOptionsName.isPresent()) { + options = createClientFromClientOptionsBean(connector.getClientOptions(), clientOptionsName.get(), config); + } else { + options = getOptionsFromChannel(config); + } + if (options.getContainerId() == null) { + options.setContainerId("vert.x-" + UUID.randomUUID()); + } else { + options.setMetricsName("amqp|" + options.getContainerId()); + } + return ConfigUtils.customize(config.config(), connector.getConfigCustomizers(), options); + } + + public static String computeConnectionFingerprint(AmqpClientOptions options) { + return sha256(options.toJson().encode()); + } + + private static String sha256(String input) { + try { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); + StringBuilder sb = new StringBuilder(); + for (byte b : hash) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHolder.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHolder.java new file mode 100644 index 0000000000..8c5b3c1e18 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpClientHolder.java @@ -0,0 +1,38 @@ +package io.smallrye.reactive.messaging.amqp; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import io.vertx.mutiny.amqp.AmqpClient; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; + +public class AmqpClientHolder { + + private final AmqpClient client; + private final Set channels = ConcurrentHashMap.newKeySet(); + private volatile ConnectionHolder connectionHolder; + + public AmqpClientHolder(AmqpClient client) { + this.client = client; + } + + public AmqpClient client() { + return client; + } + + public AmqpClientHolder retain(String channel) { + channels.add(channel); + return this; + } + + public synchronized ConnectionHolder getOrCreateConnectionHolder( + AmqpConnectorCommonConfiguration config, + Vertx vertx, + Context rootContext) { + if (connectionHolder == null) { + connectionHolder = new ConnectionHolder(client, config, vertx, rootContext); + } + return connectionHolder; + } +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java index 63535490e6..ce630659f6 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java @@ -1,15 +1,14 @@ package io.smallrye.reactive.messaging.amqp; +import static io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions.ex; import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Flow; import jakarta.annotation.Priority; @@ -50,7 +49,7 @@ @ConnectorAttribute(name = "reconnect-attempts", direction = INCOMING_AND_OUTGOING, description = "The number of reconnection attempts", type = "int", alias = "amqp-reconnect-attempts", defaultValue = "100") @ConnectorAttribute(name = "reconnect-interval", direction = INCOMING_AND_OUTGOING, description = "The interval in second between two reconnection attempts", type = "int", alias = "amqp-reconnect-interval", defaultValue = "10") @ConnectorAttribute(name = "connect-timeout", direction = INCOMING_AND_OUTGOING, description = "The connection timeout in milliseconds", type = "int", alias = "amqp-connect-timeout", defaultValue = "1000") -@ConnectorAttribute(name = "container-id", direction = INCOMING_AND_OUTGOING, description = "The AMQP container id", type = "string") +@ConnectorAttribute(name = "container-id", direction = INCOMING_AND_OUTGOING, description = "The AMQP container id. Channels with the same container id share the same AMQP connection.", type = "string") @ConnectorAttribute(name = "address", direction = INCOMING_AND_OUTGOING, description = "The AMQP address. If not set, the channel name is used", type = "string") @ConnectorAttribute(name = "link-name", direction = INCOMING_AND_OUTGOING, description = "The name of the link. If not set, the channel name is used.", type = "string") @ConnectorAttribute(name = "client-options-name", direction = INCOMING_AND_OUTGOING, description = "The name of the AMQP Client Option bean used to customize the AMQP client configuration", type = "string", alias = "amqp-client-options-name") @@ -62,12 +61,12 @@ @ConnectorAttribute(name = "capabilities", type = "string", direction = INCOMING_AND_OUTGOING, description = " A comma-separated list of capabilities proposed by the sender or receiver client.") @ConnectorAttribute(name = "retry-on-fail-attempts", direction = INCOMING_AND_OUTGOING, description = "The number of tentative to retry on failure", type = "int", defaultValue = "6") @ConnectorAttribute(name = "retry-on-fail-interval", direction = INCOMING_AND_OUTGOING, description = "The interval (in seconds) between two sending attempts", type = "int", defaultValue = "5") - @ConnectorAttribute(name = "broadcast", direction = INCOMING, description = "Whether the received AMQP messages must be dispatched to multiple _subscribers_", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "durable", direction = INCOMING, description = "Whether AMQP subscription is durable", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "auto-acknowledgement", direction = INCOMING, description = "Whether the received AMQP messages must be acknowledged when received", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message produced from an AMQP message is nacked. Accepted values are `fail` (default), `accept`, `release`, `reject`, `modified-failed`, `modified-failed-undeliverable-here`", defaultValue = "fail") @ConnectorAttribute(name = "selector", direction = INCOMING, description = "Sets a message selector. This attribute is used to define an `apache.org:selector-filter:string` filter on the source terminus, using SQL-based syntax to request the server filters which messages are delivered to the receiver (if supported by the server in question). Precise functionality supported and syntax needed can vary depending on the server.", type = "string") +@ConnectorAttribute(name = "qos", direction = INCOMING, description = "Sets the local QOS config, values can be {@code null}, {@code AT_MOST_ONCE} or {@code AT_LEAST_ONCE}.", type = "string") @ConnectorAttribute(name = "durable", direction = OUTGOING, description = "Whether sent AMQP messages are marked durable", type = "boolean", defaultValue = "false") @ConnectorAttribute(name = "ttl", direction = OUTGOING, description = "The time-to-live of the send AMQP messages. 0 to disable the TTL", type = "long", defaultValue = "0") @@ -88,6 +87,9 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt public static final String CONNECTOR_NAME = "smallrye-amqp"; + public static final String ME_ADDRESS = "$me"; + public static final String DIRECT_REPLY_TO_ADDRESS = "amq.rabbitmq.reply-to"; + @Inject private ExecutionHolder executionHolder; @@ -102,7 +104,8 @@ public class AmqpConnector implements InboundConnector, OutboundConnector, Healt @Inject private Instance openTelemetryInstance; - private final List clients = new CopyOnWriteArrayList<>(); + private final Map clients = new ConcurrentHashMap<>(); + private final Map containerIdFingerprints = new ConcurrentHashMap<>(); /** * Tracks the consumer connection holder. @@ -127,35 +130,44 @@ void setup(ExecutionHolder executionHolder) { @Override public Flow.Publisher> getPublisher(Config config) { AmqpConnectorIncomingConfiguration ic = new AmqpConnectorIncomingConfiguration(config); - AmqpClient client = AmqpClientHelper.createClient(executionHolder.vertx(), ic, clientOptions, configCustomizers); - addClient(client); - - IncomingAmqpChannel incoming = new IncomingAmqpChannel(ic, client, executionHolder.vertx(), + IncomingAmqpChannel incoming = new IncomingAmqpChannel(ic, getClientHolder(ic), executionHolder.vertx(), openTelemetryInstance, this::reportFailure); incomingChannels.put(ic.getChannel(), incoming); - return incoming.getPublisher(); } @Override public Flow.Subscriber> getSubscriber(Config config) { AmqpConnectorOutgoingConfiguration oc = new AmqpConnectorOutgoingConfiguration(config); - AmqpClient client = AmqpClientHelper.createClient(executionHolder.vertx(), oc, clientOptions, configCustomizers); - addClient(client); - OutgoingAmqpChannel outgoing = new OutgoingAmqpChannel(oc, client, executionHolder.vertx(), + OutgoingAmqpChannel outgoing = new OutgoingAmqpChannel(oc, getClientHolder(oc), executionHolder.vertx(), openTelemetryInstance, this::reportFailure); outgoingChannels.put(oc.getChannel(), outgoing); return outgoing.getSubscriber(); } + private AmqpClientHolder getClientHolder(AmqpConnectorCommonConfiguration config) { + String channel = config.getChannel(); + AmqpClientOptions options = AmqpClientHelper.buildClientOptions(this, config); + String containerId = options.getContainerId(); + String fingerprint = AmqpClientHelper.computeConnectionFingerprint(options); + String existing = containerIdFingerprints.putIfAbsent(containerId, fingerprint); + if (existing != null && !existing.equals(fingerprint)) { + throw ex.illegalStateContainerIdConfigMismatch(containerId); + } + return clients.compute(fingerprint, (key, current) -> { + if (current == null) + return new AmqpClientHolder(AmqpClient.create(executionHolder.vertx(), options)).retain(channel); + return current.retain(channel); + }); + } + public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) { outgoingChannels.values().forEach(OutgoingAmqpChannel::close); incomingChannels.values().forEach(IncomingAmqpChannel::close); - clients.forEach(c -> { - // We cannot use andForget as it could report an error is the broker is not available. + clients.values().forEach(holder -> { //noinspection ResultOfMethodCallIgnored - c.close().subscribeAsCompletionStage(); + holder.client().close().subscribeAsCompletionStage(); }); clients.clear(); } @@ -164,12 +176,16 @@ public Vertx getVertx() { return executionHolder.vertx(); } - public void addClient(AmqpClient client) { - clients.add(client); + public Map getClients() { + return clients; } - public List getClients() { - return clients; + public Instance getClientOptions() { + return clientOptions; + } + + public Instance> getConfigCustomizers() { + return configCustomizers; } /** @@ -239,4 +255,12 @@ public void reportFailure(String channel, Throwable reason) { terminate(null); } + public String getIncomingAddress(String channel) { + IncomingAmqpChannel incoming = incomingChannels.get(channel); + if (incoming == null) { + return null; + } + return incoming.getResolvedAddress(); + } + } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java index 561cc92742..f2ee5e5245 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpCreditBasedSender.java @@ -1,5 +1,6 @@ package io.smallrye.reactive.messaging.amqp; +import static io.smallrye.reactive.messaging.amqp.AmqpConnector.ME_ADDRESS; import static io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions.ex; import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log; import static java.time.Duration.ofSeconds; @@ -340,7 +341,7 @@ private io.vertx.mutiny.amqp.AmqpMessage getMessage(Message msg, boolean dura return null; } - if (!actualAddress.equals(amqp.address())) { + if (!ME_ADDRESS.equals(actualAddress) && !actualAddress.equals(amqp.address())) { amqp.getDelegate().unwrap().setAddress(actualAddress); } @@ -369,6 +370,8 @@ private String getActualAddress(Message message, io.vertx.mutiny.amqp.AmqpMes } return Optional.ofNullable(addressFromMessage); }) + .or(() -> message.getMetadata(IncomingAmqpMetadata.class) + .map(IncomingAmqpMetadata::getReplyTo)) .orElse(configuredAddress); } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessageConverter.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessageConverter.java index f4290ace9c..5a94dfac49 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessageConverter.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpMessageConverter.java @@ -69,7 +69,13 @@ static io.vertx.mutiny.amqp.AmqpMessage convertToAmqpMessage(Message message, output.setAddress(metadata.getAddress()); output.setSubject(metadata.getSubject()); output.setReplyTo(metadata.getReplyTo()); - output.setCorrelationId(metadata.getCorrelationId()); + if (metadata.getCorrelationId() != null) { + output.setCorrelationId(metadata.getCorrelationId()); + } else { + message.getMetadata(IncomingAmqpMetadata.class) + .filter(incoming -> incoming.getReplyTo() != null) + .ifPresent(incoming -> output.setCorrelationId(incoming.getId())); + } output.setContentType(metadata.getContentType()); output.setContentEncoding(metadata.getContentEncoding()); output.setExpiryTime(metadata.getExpiryTime()); diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/ConnectionHolder.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/ConnectionHolder.java index f2107e3de8..514da69229 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/ConnectionHolder.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/ConnectionHolder.java @@ -6,6 +6,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -16,7 +17,6 @@ import io.smallrye.common.annotation.CheckReturnValue; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; -import io.vertx.amqp.impl.AmqpConnectionImpl; import io.vertx.mutiny.amqp.AmqpClient; import io.vertx.mutiny.amqp.AmqpConnection; import io.vertx.mutiny.core.Context; @@ -27,6 +27,7 @@ public class ConnectionHolder { private final AmqpClient client; private final AmqpConnectorCommonConfiguration configuration; private final AtomicReference holder = new AtomicReference<>(); + private final AtomicReference> ongoingConnection = new AtomicReference<>(); private final Vertx vertx; private final Context root; @@ -73,7 +74,7 @@ public Uni isConnected() { * @return the list of capability */ public static List capabilities(AmqpConnection connection) { - Symbol[] capabilities = ((AmqpConnectionImpl) connection.getDelegate()).unwrap().getRemoteOfferedCapabilities(); + Symbol[] capabilities = connection.getDelegate().unwrap().getRemoteOfferedCapabilities(); return Arrays.stream(capabilities).map(Symbol::toString).collect(Collectors.toList()); } @@ -96,14 +97,7 @@ public int getHealthTimeout() { return configuration.getHealthTimeout(); } - private static class CurrentConnection { - final AmqpConnection connection; - final Context context; - - private CurrentConnection(AmqpConnection connection, Context context) { - this.connection = connection; - this.context = context; - } + private record CurrentConnection(AmqpConnection connection, Context context) { } public synchronized void onFailure(Consumer callback) { @@ -112,61 +106,73 @@ public synchronized void onFailure(Consumer callback) { @CheckReturnValue public Uni getOrEstablishConnection() { - return Uni.createFrom().item(() -> { - CurrentConnection connection = holder.get(); - if (connection != null && connection.connection != null && !connection.connection.isDisconnected()) { - return connection.connection; - } else { - return null; + return Uni.createFrom().deferred(this::establishConnection); + } + + private Uni establishConnection() { + CurrentConnection current = holder.get(); + if (current != null && current.connection != null + && !current.connection.isDisconnected()) { + return Uni.createFrom().item(current.connection); + } + + CompletableFuture existing = ongoingConnection.get(); + if (existing != null) { + if (!existing.isDone() || (current != null && !current.connection.isDisconnected())) { + return Uni.createFrom().completionStage(existing); } - }) - .onItem().ifNull().switchTo(() -> { - // we don't have a connection, try to connect. - Integer retryInterval = configuration.getReconnectInterval(); - Integer retryAttempts = configuration.getReconnectAttempts(); + ongoingConnection.compareAndSet(existing, null); + } - CurrentConnection reference = holder.get(); + CompletableFuture placeholder = new CompletableFuture<>(); + CompletableFuture prev = ongoingConnection.compareAndExchange(null, placeholder); + if (prev != null) { + return Uni.createFrom().completionStage(prev); + } - if (reference != null && reference.connection != null && !reference.connection.isDisconnected()) { - AmqpConnection connection = reference.connection; - return Uni.createFrom().item(connection); - } + Integer retryInterval = configuration.getReconnectInterval(); + Integer retryAttempts = configuration.getReconnectAttempts(); + + client.connect() + .runSubscriptionOn(root::runOnContext) + .onSubscription().invoke(s -> log.establishingConnection()) + .onFailure().invoke(log::unableToConnectToBroker) + .onItem().invoke(log::connectionEstablished) + .onItem().transform(conn -> new CurrentConnection(conn, root)) + .onItem().transform(currentConn -> { + holder.set(currentConn); + AmqpConnection conn = currentConn.connection; + conn + .exceptionHandler(t -> { + holder.set(null); + log.connectionFailure(t); - return client.connect() - .onSubscription().invoke(s -> log.establishingConnection()) - .onItem().transform(conn -> { - log.connectionEstablished(); - holder.set(new CurrentConnection(conn, root == null ? Vertx.currentContext() : root)); - conn - .exceptionHandler(t -> { - holder.set(null); - log.connectionFailure(t); - - // The callback failure allows propagating the failure downstream, - // as we are disconnected from the flow. - Consumer c; - synchronized (this) { - c = onFailure; - } - if (c != null) { - c.accept(t); - } - }); - // handle the case we are already disconnected. - if (conn.isDisconnected() || holder.get() == null) { - // Throwing the exception would trigger a retry. - holder.set(null); - throw ex.illegalStateConnectionDisconnected(); + Consumer c; + synchronized (this) { + c = onFailure; + } + if (c != null) { + c.accept(t); } - return conn; - }) - .onFailure().invoke(log::unableToConnectToBroker) - .onFailure().retry().withBackOff(ofSeconds(1), ofSeconds(retryInterval)).atMost(retryAttempts) - .onFailure().invoke(t -> { - holder.set(null); - log.unableToRecoverFromConnectionDisruption(t); }); + if (conn.isDisconnected() || holder.get() == null) { + holder.set(null); + throw ex.illegalStateConnectionDisconnected(); + } + return conn; + }) + .onFailure().invoke(log::unableToConnectToBroker) + .onFailure().retry().withBackOff(ofSeconds(1), ofSeconds(retryInterval)).atMost(retryAttempts) + .onFailure().invoke(t -> { + holder.set(null); + log.unableToRecoverFromConnectionDisruption(t); + }) + .subscribe().with(placeholder::complete, t -> { + ongoingConnection.compareAndSet(placeholder, null); + placeholder.completeExceptionally(t); }); + + return Uni.createFrom().completionStage(placeholder); } public static CompletionStage runOnContext(Context context, AmqpMessage msg, @@ -184,4 +190,13 @@ public static CompletionStage runOnContextAndReportFailure(Context context msg.runOnMessageContext(() -> f.completeExceptionally(reason)); }); } + + void runWithTrampoline(Runnable action) { + Context context = getContext(); + if (context != null) { + VertxContext.runOnContext(context.getDelegate(), action); + } else { + root.runOnContext(action); + } + } } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java index 26dc04d55f..cd79c55fb5 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java @@ -27,7 +27,6 @@ import io.smallrye.reactive.messaging.providers.helpers.VertxContext; import io.vertx.amqp.AmqpReceiverOptions; import io.vertx.core.impl.VertxInternal; -import io.vertx.mutiny.amqp.AmqpClient; import io.vertx.mutiny.amqp.AmqpReceiver; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.Vertx; @@ -40,8 +39,16 @@ public class IncomingAmqpChannel { private final BiConsumer reportFailure; private final ConnectionHolder holder; private final boolean healthEnabled; + private volatile String resolvedAddress; - public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClient client, Vertx vertx, + public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClientHolder clientHolder, Vertx vertx, + Instance openTelemetryInstance, BiConsumer reportFailure) { + this(ic, clientHolder.getOrCreateConnectionHolder(ic, vertx, + Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext())), + openTelemetryInstance, reportFailure); + } + + public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, ConnectionHolder holder, Instance openTelemetryInstance, BiConsumer reportFailure) { this.reportFailure = reportFailure; this.opened = new AtomicBoolean(false); @@ -53,8 +60,9 @@ public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClient cli String link = ic.getLinkName().orElse(channel); boolean cloudEvents = ic.getCloudEvents(); boolean tracing = ic.getTracingEnabled(); - Context root = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); - this.holder = new ConnectionHolder(client, ic, vertx, root); + this.holder = holder; + + this.resolvedAddress = address; AmqpReceiverOptions options = new AmqpReceiverOptions() .setAutoAcknowledgement(ic.getAutoAcknowledgement()) @@ -63,6 +71,8 @@ public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClient cli .setCapabilities(getClientCapabilities(ic)) .setSelector(ic.getSelector().orElse(null)); + ic.getQos().ifPresent(options::setQos); + if (ic.getTracingEnabled()) { amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance); } else { @@ -74,8 +84,11 @@ public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, AmqpClient cli Integer attempts = ic.getRetryOnFailAttempts(); multi = holder.getOrEstablishConnection() .onItem().transformToUni(connection -> connection.createReceiver(address, options)) - .onItem().invoke(r -> opened.set(true)) - .onItem().transformToMulti(r -> getStreamOfMessages(r, holder, address, channel, onNack, + .onItem().invoke(r -> { + opened.set(true); + resolvedAddress = r.address(); + }) + .onItem().transformToMulti(r -> getStreamOfMessages(r, holder, channel, onNack, cloudEvents, tracing)) // Retry on failure. .onFailure().invoke(log::retrieveMessagesRetrying) @@ -99,15 +112,18 @@ public Flow.Publisher> getPublisher() { return multi; } + public String getResolvedAddress() { + return resolvedAddress; + } + @SuppressWarnings({ "rawtypes", "unchecked" }) private Multi> getStreamOfMessages(AmqpReceiver receiver, ConnectionHolder holder, - String address, String channel, AmqpFailureHandler onNack, boolean cloudEventEnabled, Boolean tracingEnabled) { - log.receiverListeningAddress(address); + log.receiverListeningAddress(resolvedAddress); // The processor is used to inject AMQP Connection failure in the stream and trigger a retry. BroadcastProcessor processor = BroadcastProcessor.create(); diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/OutgoingAmqpChannel.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/OutgoingAmqpChannel.java index e56f1fcf69..1cdd1f7dd0 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/OutgoingAmqpChannel.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/OutgoingAmqpChannel.java @@ -16,8 +16,9 @@ import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.vertx.amqp.AmqpSenderOptions; -import io.vertx.mutiny.amqp.AmqpClient; +import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.amqp.AmqpSender; +import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.Vertx; import io.vertx.proton.ProtonSender; @@ -28,7 +29,14 @@ public class OutgoingAmqpChannel { private final AmqpCreditBasedSender processor; private final boolean healthEnabled; - public OutgoingAmqpChannel(AmqpConnectorOutgoingConfiguration oc, AmqpClient client, Vertx vertx, + public OutgoingAmqpChannel(AmqpConnectorOutgoingConfiguration oc, AmqpClientHolder clientHolder, Vertx vertx, + Instance openTelemetryInstance, BiConsumer reportFailure) { + this(oc, clientHolder.getOrCreateConnectionHolder(oc, vertx, + Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext())), + openTelemetryInstance, reportFailure); + } + + public OutgoingAmqpChannel(AmqpConnectorOutgoingConfiguration oc, ConnectionHolder holder, Instance openTelemetryInstance, BiConsumer reportFailure) { String configuredAddress = oc.getAddress().orElseGet(oc::getChannel); @@ -37,7 +45,6 @@ public OutgoingAmqpChannel(AmqpConnectorOutgoingConfiguration oc, AmqpClient cli AtomicReference sender = new AtomicReference<>(); String link = oc.getLinkName().orElseGet(oc::getChannel); - ConnectionHolder holder = new ConnectionHolder(client, oc, vertx, null); Uni getSender = Uni.createFrom().deferred(() -> { diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPExceptions.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPExceptions.java index a34b9f2501..2b1326248a 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPExceptions.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPExceptions.java @@ -32,4 +32,6 @@ public interface AMQPExceptions { @Message(id = 16005, value = "Only one subscriber allowed") IllegalStateException illegalStateOnlyOneSubscriberAllowed(); + @Message(id = 16006, value = "Container ID '%s' does not match the configuration") + IllegalStateException illegalStateContainerIdConfigMismatch(String containerId); } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPLogging.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPLogging.java index d127fd3e5c..ba3de0f534 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPLogging.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/i18n/AMQPLogging.java @@ -139,4 +139,12 @@ public interface AMQPLogging extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 16230, value = "No credits for channel %s, checking periodically for credits") void stillNoMoreCreditsForChannel(String channel); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 16231, value = "Received reply message on channel `%s` with unknown correlation ID `%s`, ignoring") + void requestReplyMessageIgnored(String channel, String correlationId); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 16232, value = "Reply consumer failure on channel `%s`") + void requestReplyConsumerFailure(String channel, @Cause Throwable t); } diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReply.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReply.java new file mode 100644 index 0000000000..66680b0af2 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReply.java @@ -0,0 +1,77 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.util.Map; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.EmitterType; + +/** + * AmqpRequestReply provides functionality for sending requests and receiving + * responses over AMQP 1.0. + * + * @param the type of the request value + * @param the type of the response value + */ +@Experimental("Experimental API") +public interface AmqpRequestReply extends EmitterType { + + String REPLY_TIMEOUT_KEY = "reply.timeout"; + + String REPLY_CORRELATION_ID_HANDLER_KEY = "reply.correlation-id.handler"; + + String DEFAULT_CORRELATION_ID_HANDLER = "uuid"; + + String REPLY_FAILURE_HANDLER_KEY = "reply.failure.handler"; + + String REPLY_ADDRESS_KEY = "reply.address"; + + /** + * Sends a request and receives a response. + * + * @param request the request object to be sent + * @return a Uni object representing the result of the send and receive operation + */ + Uni request(Req request); + + /** + * Sends a request and receives a response. + * + * @param request the request message to be sent + * @return a Uni object representing the result of the send and receive operation + */ + Uni> request(Message request); + + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi requestMulti(Req request); + + /** + * Sends a request and receives responses. + * + * @param request the request message to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi> requestMulti(Message request); + + /** + * Retrieves the pending replies. + * + * @return a map containing the pending replies. The map's keys are the correlation IDs and the values + * are instances of PendingReply. + */ + Map getPendingReplies(); + + /** + * Sends the completion event to the channel indicating that no other events will be sent afterward. + */ + void complete(); + +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyFactory.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyFactory.java new file mode 100644 index 0000000000..9580d0ac75 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyFactory.java @@ -0,0 +1,80 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Any; +import jakarta.enterprise.inject.Instance; +import jakarta.enterprise.inject.Produces; +import jakarta.enterprise.inject.Typed; +import jakarta.enterprise.inject.spi.InjectionPoint; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.reactive.messaging.ChannelRegistry; +import io.smallrye.reactive.messaging.EmitterConfiguration; +import io.smallrye.reactive.messaging.EmitterFactory; +import io.smallrye.reactive.messaging.MessageConverter; +import io.smallrye.reactive.messaging.amqp.AmqpConnector; +import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor; +import io.smallrye.reactive.messaging.providers.extension.ChannelProducer; +import io.smallrye.reactive.messaging.providers.helpers.ConverterUtils; + +@EmitterFactoryFor(AmqpRequestReply.class) +@ApplicationScoped +public class AmqpRequestReplyFactory implements EmitterFactory> { + + @Inject + ChannelRegistry channelRegistry; + + @Inject + @Any + AmqpConnector connector; + + @Inject + Instance converters; + + @Inject + @Any + Instance correlationIdHandlers; + + @Inject + @Any + Instance replyFailureHandlers; + + @Inject + Instance config; + + @Override + public AmqpRequestReplyImpl createEmitter(EmitterConfiguration configuration, long defaultBufferSize) { + return new AmqpRequestReplyImpl<>(configuration, defaultBufferSize, config.get(), connector, correlationIdHandlers, + replyFailureHandlers); + } + + @Produces + @Typed(AmqpRequestReply.class) + @Channel("") + AmqpRequestReply produceEmitter(InjectionPoint injectionPoint) { + String channelName = ChannelProducer.getChannelName(injectionPoint); + AmqpRequestReply emitter = channelRegistry.getEmitter(channelName, AmqpRequestReply.class); + Type replyType = getReplyPayloadType(injectionPoint); + if (replyType != null) { + ((AmqpRequestReplyImpl) emitter).setReplyConverter(ConverterUtils.convertFunction(converters, replyType)); + } + return emitter; + } + + private Type getReplyPayloadType(InjectionPoint injectionPoint) { + if (injectionPoint.getType() instanceof ParameterizedType) { + Type[] typeArguments = ((ParameterizedType) injectionPoint.getType()).getActualTypeArguments(); + if (typeArguments.length == 2) { + return typeArguments[1]; + } + } + return null; + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyImpl.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyImpl.java new file mode 100644 index 0000000000..3be728b672 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyImpl.java @@ -0,0 +1,236 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import static io.smallrye.reactive.messaging.amqp.AmqpConnector.DIRECT_REPLY_TO_ADDRESS; +import static io.smallrye.reactive.messaging.amqp.AmqpConnector.ME_ADDRESS; +import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Flow; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import jakarta.enterprise.inject.Instance; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.subscription.MultiEmitter; +import io.smallrye.mutiny.subscription.MultiSubscriber; +import io.smallrye.reactive.messaging.EmitterConfiguration; +import io.smallrye.reactive.messaging.amqp.AmqpConnector; +import io.smallrye.reactive.messaging.amqp.AmqpMessage; +import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata; +import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl; +import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; +import io.smallrye.reactive.messaging.providers.impl.Configs; +import io.smallrye.reactive.messaging.providers.impl.OverrideConfig; +import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; + +@Experimental("Experimental API") +public class AmqpRequestReplyImpl extends MutinyEmitterImpl + implements AmqpRequestReply, MultiSubscriber> { + + private final String channel; + private final AmqpConnector connector; + private final String replyToAddress; + private final Duration replyTimeout; + + private final CorrelationIdHandler correlationIdHandler; + private final ReplyFailureHandler replyFailureHandler; + private Function, Uni>> replyConverter; + + private final Map> pendingReplies = new ConcurrentHashMap<>(); + private final AtomicReference subscription = new AtomicReference<>(); + + @SuppressWarnings("unchecked") + public AmqpRequestReplyImpl(EmitterConfiguration config, + long defaultBufferSize, Config channelConfig, + AmqpConnector connector, Instance correlationIdHandlers, + Instance replyFailureHandlers) { + super(config, defaultBufferSize); + this.channel = config.name(); + this.connector = connector; + Config connectorConfig = Configs.outgoing(channelConfig, AmqpConnector.CONNECTOR_NAME, channel); + this.replyTimeout = connectorConfig.getOptionalValue(REPLY_TIMEOUT_KEY, Long.class) + .map(Duration::ofMillis) + .orElse(Duration.ofSeconds(5)); + String correlationIdHandlerIdentifier = connectorConfig + .getOptionalValue(AmqpRequestReply.REPLY_CORRELATION_ID_HANDLER_KEY, String.class) + .orElse(AmqpRequestReply.DEFAULT_CORRELATION_ID_HANDLER); + this.correlationIdHandler = CDIUtils.getInstanceById(correlationIdHandlers, + correlationIdHandlerIdentifier).get(); + this.replyFailureHandler = connectorConfig + .getOptionalValue(AmqpRequestReply.REPLY_FAILURE_HANDLER_KEY, String.class) + .map(id -> CDIUtils.getInstanceById(replyFailureHandlers, id, () -> null)) + .orElse(null); + + Map> override = new HashMap<>(); + override.put("auto-acknowledgement", x -> true); + connectorConfig.getOptionalValue("container-id", String.class) + .ifPresent(s -> override.put("container-id", x -> s)); + + this.replyToAddress = connectorConfig.getOptionalValue(REPLY_ADDRESS_KEY, String.class) + .orElse(channel + "-reply"); + override.put("address", x -> replyToAddress); + + Config incomingConfig = Configs.prefixOverride(connectorConfig, "reply", override); + ((Publisher>) connector.getPublisher(incomingConfig)).subscribe(this); + } + + @Override + public Flow.Publisher> getPublisher() { + return this.publisher.onTermination().invoke(this::complete); + } + + @Override + public void complete() { + super.complete(); + Subscriptions.cancel(subscription); + for (CorrelationId correlationId : pendingReplies.keySet()) { + PendingReplyImpl reply = pendingReplies.remove(correlationId); + if (reply != null) { + reply.complete(); + } + } + } + + @Override + public Uni request(Req request) { + return requestMulti(request).toUni(); + } + + @Override + public Uni> request(Message request) { + return requestMulti(request).toUni(); + } + + @Override + public Multi requestMulti(Req request) { + return requestMulti(ContextAwareMessage.of(request)) + .map(Message::getPayload); + } + + @Override + public Multi> requestMulti(Message request) { + var builder = request.getMetadata(OutgoingAmqpMetadata.class) + .map(OutgoingAmqpMetadata::from) + .orElseGet(OutgoingAmqpMetadata::builder); + CorrelationId correlationId = correlationIdHandler.generate(request); + // if using direct-reply-to reply-to address is assigned by broker when the receiver attaches + String replyTo = DIRECT_REPLY_TO_ADDRESS.equals(replyToAddress) ? connector.getIncomingAddress(channel) + : replyToAddress; + builder.withMessageId(correlationId.toString()).withReplyTo(replyTo); + OutgoingAmqpMetadata outMetadata = builder.build(); + // Register pending reply before sending to avoid race where a fast reply + // arrives before the pending reply is registered and gets silently dropped + return Multi.createFrom().> emitter(emitter -> { + pendingReplies.put(correlationId, + new PendingReplyImpl<>(outMetadata, + (MultiEmitter>) emitter)); + sendMessage(request.addMetadata(outMetadata)) + .subscribe().with( + unused -> subscription.get().request(1), + failure -> { + pendingReplies.remove(correlationId); + emitter.fail(failure); + }); + }) + .ifNoItem().after(replyTimeout) + .failWith(() -> new AmqpRequestReplyTimeoutException(correlationId)) + .onItem().transformToUniAndConcatenate(m -> { + if (replyFailureHandler != null) { + Throwable failure = replyFailureHandler.handleReply( + (AmqpMessage) m); + if (failure != null) { + return Uni.createFrom().failure(failure); + } + } + return Uni.createFrom().item(m); + }) + .onTermination().invoke(() -> pendingReplies.remove(correlationId)) + .plug(multi -> replyConverter != null ? multi + .onItem().transformToUniAndConcatenate(f -> replyConverter.apply(f)) + : multi); + } + + public void setReplyConverter(Function, Uni>> converterFunction) { + this.replyConverter = converterFunction; + } + + @Override + public Map getPendingReplies() { + return new HashMap<>(pendingReplies); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + if (Subscriptions.setIfEmpty(this.subscription, subscription)) { + subscription.request(1); + } + } + + @Override + public void onItem(AmqpMessage item) { + Object rawCorrelationId = item.getCorrelationId(); + + CorrelationId correlationId = rawCorrelationId != null + ? correlationIdHandler.parse(rawCorrelationId.toString()) + : null; + if (correlationId != null) { + PendingReplyImpl reply = pendingReplies.get(correlationId); + if (reply != null) { + reply.emitter.emit(item); + } else { + log.requestReplyMessageIgnored(channel, correlationId.toString()); + } + } + subscription.get().request(1); + } + + @Override + public void onFailure(Throwable failure) { + log.requestReplyConsumerFailure(channel, failure); + } + + @Override + public void onCompletion() { + + } + + private static class PendingReplyImpl implements PendingReply { + + private final OutgoingAmqpMetadata metadata; + private final MultiEmitter> emitter; + + public PendingReplyImpl(OutgoingAmqpMetadata metadata, + MultiEmitter> emitter) { + this.metadata = metadata; + this.emitter = emitter; + } + + @Override + public OutgoingAmqpMetadata metadata() { + return metadata; + } + + @Override + public void complete() { + emitter.complete(); + } + + @Override + public boolean isCancelled() { + return emitter.isCancelled(); + } + + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyTimeoutException.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyTimeoutException.java new file mode 100644 index 0000000000..a63ced8ccd --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyTimeoutException.java @@ -0,0 +1,11 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +public class AmqpRequestReplyTimeoutException extends RuntimeException { + public AmqpRequestReplyTimeoutException(String message) { + super(message); + } + + public AmqpRequestReplyTimeoutException(CorrelationId correlationId) { + super("Timeout waiting for a reply for request with correlation ID: " + correlationId); + } +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationId.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationId.java new file mode 100644 index 0000000000..0d9cc9138b --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationId.java @@ -0,0 +1,39 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.util.Arrays; +import java.util.Base64; + +public class BytesCorrelationId extends CorrelationId { + + private final byte[] bytes; + + public BytesCorrelationId(byte[] bytes) { + this.bytes = bytes; + } + + public static BytesCorrelationId fromString(String string) { + return new BytesCorrelationId(Base64.getDecoder().decode(string)); + } + + @Override + public String toString() { + return Base64.getEncoder().encodeToString(bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BytesCorrelationId that = (BytesCorrelationId) o; + return Arrays.equals(bytes, that.bytes); + } +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationIdHandler.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationIdHandler.java new file mode 100644 index 0000000000..7860c80823 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/BytesCorrelationIdHandler.java @@ -0,0 +1,34 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.security.SecureRandom; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +@Identifier("bytes") +public class BytesCorrelationIdHandler implements CorrelationIdHandler { + + @Inject + @ConfigProperty(name = "smallrye.amqp.request-reply.correlation-id.bytes.length", defaultValue = "12") + int bytesLength; + + private final SecureRandom random = new SecureRandom(); + + @Override + public CorrelationId generate(Message request) { + byte[] bytes = new byte[bytesLength]; + random.nextBytes(bytes); + return new BytesCorrelationId(bytes); + } + + @Override + public CorrelationId parse(String string) { + return BytesCorrelationId.fromString(string); + } +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationId.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationId.java new file mode 100644 index 0000000000..bb2b88f5ae --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationId.java @@ -0,0 +1,13 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +/** + * Represents an identifier to correlate a request message and reply message. + */ +public abstract class CorrelationId { + + public abstract String toString(); + + public abstract int hashCode(); + + public abstract boolean equals(Object o); +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationIdHandler.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationIdHandler.java new file mode 100644 index 0000000000..17232677d3 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/CorrelationIdHandler.java @@ -0,0 +1,31 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import org.eclipse.microprofile.reactive.messaging.Message; + +/** + * Generates and parses correlation IDs for request messages. + *

+ * CDI-managed beans that implement this interface are discovered using + * the {@link io.smallrye.common.annotation.Identifier} qualifier to be configured. + * + * @see AmqpRequestReply + */ +public interface CorrelationIdHandler { + + /** + * Generates a correlation ID for the given request message. + * + * @param request the request message to generate the correlation ID for + * @return the generated correlation ID + */ + CorrelationId generate(Message request); + + /** + * Parses a correlation ID from the given string. + * + * @param string the string from which to parse the correlation ID + * @return the parsed correlation ID + */ + CorrelationId parse(String string); + +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/PendingReply.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/PendingReply.java new file mode 100644 index 0000000000..ff04e4725f --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/PendingReply.java @@ -0,0 +1,24 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata; + +/** + * A pending reply for a request. + */ +public interface PendingReply { + + /** + * @return the metadata of the request + */ + OutgoingAmqpMetadata metadata(); + + /** + * Complete the pending reply. + */ + void complete(); + + /** + * @return whether the pending reply was terminated (with a completion or failure). + */ + boolean isCancelled(); +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/ReplyFailureHandler.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/ReplyFailureHandler.java new file mode 100644 index 0000000000..82fba5bf4f --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/ReplyFailureHandler.java @@ -0,0 +1,24 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import io.smallrye.reactive.messaging.amqp.AmqpMessage; + +/** + * Handles failure cases in reply messages and extracts the throwable. + *

+ * CDI-managed beans that implement this interface are discovered using + * the {@link io.smallrye.common.annotation.Identifier} qualifier to be configured. + * + * @see AmqpRequestReply + */ +public interface ReplyFailureHandler { + + /** + * Handles a reply received from AMQP to extract errors, if any. + * Returned throwable will be used to fail the reply {@link io.smallrye.mutiny.Uni}. + * If reply message contains no error, returns {@code null} in order for the message to be delivered. + * + * @param replyMessage the reply message + * @return The throwable representing any error encountered during the reply. + */ + Throwable handleReply(AmqpMessage replyMessage); +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/StringCorrelationId.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/StringCorrelationId.java new file mode 100644 index 0000000000..3f8ad28fb0 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/StringCorrelationId.java @@ -0,0 +1,33 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.util.Objects; + +public class StringCorrelationId extends CorrelationId { + + private final String id; + + public StringCorrelationId(String id) { + this.id = id; + } + + @Override + public String toString() { + return id; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + StringCorrelationId that = (StringCorrelationId) o; + return Objects.equals(id, that.id); + } + + @Override + public int hashCode() { + return Objects.hash(id); + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/UUIDCorrelationIdHandler.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/UUIDCorrelationIdHandler.java new file mode 100644 index 0000000000..d1c383e1da --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/reply/UUIDCorrelationIdHandler.java @@ -0,0 +1,24 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import java.util.UUID; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +@Identifier("uuid") +public class UUIDCorrelationIdHandler implements CorrelationIdHandler { + @Override + public CorrelationId generate(Message request) { + return new StringCorrelationId(UUID.randomUUID().toString()); + } + + @Override + public CorrelationId parse(String string) { + return new StringCorrelationId(string); + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpConnectionSharingTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpConnectionSharingTest.java new file mode 100644 index 0000000000..c2883480d3 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpConnectionSharingTest.java @@ -0,0 +1,331 @@ +package io.smallrye.reactive.messaging.amqp; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.amqp.AmqpMessage; +import io.vertx.mutiny.amqp.AmqpReceiver; + +public class AmqpConnectionSharingTest extends AmqpBrokerTestBase { + + private WeldContainer container; + + @AfterEach + public void cleanup() { + if (container != null) { + container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)) + .get().terminate(null); + container.shutdown(); + } + + MapBasedConfig.cleanup(); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + } + + @Test + public void testContainerIdIncomingAndOutgoing() { + Weld weld = new Weld(); + weld.addBeanClasses(SharedConnectionProcessor.class); + new MapBasedConfig() + .with("mp.messaging.incoming.in-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.in-channel.address", "shared-test-in") + .with("mp.messaging.incoming.in-channel.host", host) + .with("mp.messaging.incoming.in-channel.port", port) + .with("mp.messaging.incoming.in-channel.username", username) + .with("mp.messaging.incoming.in-channel.password", password) + .with("mp.messaging.incoming.in-channel.tracing-enabled", false) + .with("mp.messaging.incoming.in-channel.container-id", "my-shared-connection") + .with("mp.messaging.outgoing.out-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.out-channel.address", "shared-test-out") + .with("mp.messaging.outgoing.out-channel.host", host) + .with("mp.messaging.outgoing.out-channel.port", port) + .with("mp.messaging.outgoing.out-channel.username", username) + .with("mp.messaging.outgoing.out-channel.password", password) + .with("mp.messaging.outgoing.out-channel.tracing-enabled", false) + .with("mp.messaging.outgoing.out-channel.container-id", "my-shared-connection") + .write(); + container = weld.initialize(); + + AmqpConnector connector = container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)).get(); + await().until(() -> isAmqpConnectorReady(connector)); + + assertThat(connector.getClients()).hasSize(1); + } + + @Test + public void testWithoutContainerIdSeparateConnections() { + Weld weld = new Weld(); + weld.addBeanClasses(SharedConnectionProcessor.class); + new MapBasedConfig() + .with("mp.messaging.incoming.in-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.in-channel.address", "no-share-in") + .with("mp.messaging.incoming.in-channel.host", host) + .with("mp.messaging.incoming.in-channel.port", port) + .with("mp.messaging.incoming.in-channel.username", username) + .with("mp.messaging.incoming.in-channel.password", password) + .with("mp.messaging.incoming.in-channel.tracing-enabled", false) + .with("mp.messaging.outgoing.out-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.out-channel.address", "no-share-out") + .with("mp.messaging.outgoing.out-channel.host", host) + .with("mp.messaging.outgoing.out-channel.port", port) + .with("mp.messaging.outgoing.out-channel.username", username) + .with("mp.messaging.outgoing.out-channel.password", password) + .with("mp.messaging.outgoing.out-channel.tracing-enabled", false) + .write(); + container = weld.initialize(); + + AmqpConnector connector = container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)).get(); + await().until(() -> isAmqpConnectorReady(connector)); + + assertThat(connector.getClients()).hasSize(2); + } + + @Test + public void testSameContainerIdDifferentConfigFails() { + Weld weld = new Weld(); + weld.addBeanClasses(SharedConnectionProcessor.class); + new MapBasedConfig() + .with("mp.messaging.incoming.in-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.in-channel.address", "mismatch-in") + .with("mp.messaging.incoming.in-channel.host", host) + .with("mp.messaging.incoming.in-channel.port", port) + .with("mp.messaging.incoming.in-channel.username", username) + .with("mp.messaging.incoming.in-channel.password", password) + .with("mp.messaging.incoming.in-channel.tracing-enabled", false) + .with("mp.messaging.incoming.in-channel.container-id", "same-id") + .with("mp.messaging.outgoing.out-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.out-channel.address", "mismatch-out") + .with("mp.messaging.outgoing.out-channel.host", host) + .with("mp.messaging.outgoing.out-channel.port", port) + .with("mp.messaging.outgoing.out-channel.username", "different-user") + .with("mp.messaging.outgoing.out-channel.password", password) + .with("mp.messaging.outgoing.out-channel.tracing-enabled", false) + .with("mp.messaging.outgoing.out-channel.container-id", "same-id") + .write(); + + assertThatThrownBy(weld::initialize) + .isInstanceOf(Exception.class); + } + + @Test + public void testContainerIdMultipleIncomingChannels() { + Weld weld = new Weld(); + weld.addBeanClasses(DualIncomingBean.class); + new MapBasedConfig() + .with("mp.messaging.incoming.data1.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data1.address", "multi-share-1") + .with("mp.messaging.incoming.data1.host", host) + .with("mp.messaging.incoming.data1.port", port) + .with("mp.messaging.incoming.data1.username", username) + .with("mp.messaging.incoming.data1.password", password) + .with("mp.messaging.incoming.data1.tracing-enabled", false) + .with("mp.messaging.incoming.data1.container-id", "dual-incoming") + .with("mp.messaging.incoming.data2.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data2.address", "multi-share-2") + .with("mp.messaging.incoming.data2.host", host) + .with("mp.messaging.incoming.data2.port", port) + .with("mp.messaging.incoming.data2.username", username) + .with("mp.messaging.incoming.data2.password", password) + .with("mp.messaging.incoming.data2.tracing-enabled", false) + .with("mp.messaging.incoming.data2.container-id", "dual-incoming") + .write(); + container = weld.initialize(); + + AmqpConnector connector = container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)).get(); + await().until(() -> isAmqpConnectorReady(connector)); + + assertThat(connector.getClients()).hasSize(1); + + usage.produceTenIntegers("multi-share-1", () -> 1); + usage.produceTenIntegers("multi-share-2", () -> 2); + + DualIncomingBean bean = container.getBeanManager().createInstance() + .select(DualIncomingBean.class).get(); + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> { + assertThat(bean.list1()).hasSizeGreaterThanOrEqualTo(10); + assertThat(bean.list2()).hasSizeGreaterThanOrEqualTo(10); + }); + } + + @Test + public void testRequestReplySharesConnection() { + startReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyBean.class); + new MapBasedConfig() + .with("mp.messaging.outgoing.rr-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.rr-channel.address", "rr-share-requests") + .with("mp.messaging.outgoing.rr-channel.host", host) + .with("mp.messaging.outgoing.rr-channel.port", port) + .with("mp.messaging.outgoing.rr-channel.username", username) + .with("mp.messaging.outgoing.rr-channel.password", password) + .with("mp.messaging.outgoing.rr-channel.tracing-enabled", false) + .with("mp.messaging.outgoing.rr-channel.container-id", "rr-shared") + .with("mp.messaging.outgoing.rr-channel.reply.address", "rr-share-replies") + .write(); + container = weld.initialize(); + + AmqpConnector connector = container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)).get(); + await().until(() -> isAmqpConnectorReady(connector)); + + assertThat(connector.getClients()).hasSize(1); + + RequestReplyBean bean = container.getBeanManager().createInstance() + .select(RequestReplyBean.class).get(); + + List replies = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 5; i++) { + bean.requestReply().request(i).subscribe().with(replies::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + } + + @Test + public void testSharedConnectionExchangesMessages() { + Weld weld = new Weld(); + weld.addBeanClasses(SharedConnectionProcessor.class); + new MapBasedConfig() + .with("mp.messaging.incoming.in-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.in-channel.address", "shared-exchange-in") + .with("mp.messaging.incoming.in-channel.host", host) + .with("mp.messaging.incoming.in-channel.port", port) + .with("mp.messaging.incoming.in-channel.username", username) + .with("mp.messaging.incoming.in-channel.password", password) + .with("mp.messaging.incoming.in-channel.tracing-enabled", false) + .with("mp.messaging.incoming.in-channel.container-id", "exchange-test") + .with("mp.messaging.outgoing.out-channel.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.out-channel.address", "shared-exchange-out") + .with("mp.messaging.outgoing.out-channel.host", host) + .with("mp.messaging.outgoing.out-channel.port", port) + .with("mp.messaging.outgoing.out-channel.username", username) + .with("mp.messaging.outgoing.out-channel.password", password) + .with("mp.messaging.outgoing.out-channel.tracing-enabled", false) + .with("mp.messaging.outgoing.out-channel.container-id", "exchange-test") + .write(); + container = weld.initialize(); + + AmqpConnector connector = container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)).get(); + await().until(() -> isAmqpConnectorReady(connector)); + + assertThat(connector.getClients()).hasSize(1); + + List received = new CopyOnWriteArrayList<>(); + usage.client.createReceiver("shared-exchange-out") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(m -> received.add(m.bodyAsString())); + + usage.produceTenIntegers("shared-exchange-in", () -> 42); + + SharedConnectionProcessor processor = container.getBeanManager().createInstance() + .select(SharedConnectionProcessor.class).get(); + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(processor.processed()).hasSizeGreaterThanOrEqualTo(10)); + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(received).hasSizeGreaterThanOrEqualTo(10)); + } + + private void startReplyServer() { + usage.client.createReceiver("rr-share-requests") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + usage.client.createSender(replyTo) + .onItem().transformToUni(sender -> sender.sendWithAck(AmqpMessage.create() + .correlationId(correlationId) + .withBody(String.valueOf(request.bodyAsInteger())) + .build())) + .subscribe().with(x -> { + }); + } + }); + } + + @ApplicationScoped + public static class SharedConnectionProcessor { + + private final List processed = new CopyOnWriteArrayList<>(); + + @Incoming("in-channel") + @Outgoing("out-channel") + public Message process(Message msg) { + String value = "processed-" + msg.getPayload(); + processed.add(value); + return msg.withPayload(value); + } + + public List processed() { + return processed; + } + } + + @ApplicationScoped + public static class DualIncomingBean { + + private final List list1 = new CopyOnWriteArrayList<>(); + private final List list2 = new CopyOnWriteArrayList<>(); + + @Incoming("data1") + public void consume1(int value) { + list1.add(value); + } + + @Incoming("data2") + public void consume2(int value) { + list2.add(value); + } + + public List list1() { + return list1; + } + + public List list2() { + return list2; + } + } + + @ApplicationScoped + public static class RequestReplyBean { + + @Inject + @Channel("rr-channel") + AmqpRequestReply requestReply; + + public AmqpRequestReply requestReply() { + return requestReply; + } + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpRabbitMQSinkTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpRabbitMQSinkTest.java index 669f341ef5..bb3cba6750 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpRabbitMQSinkTest.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpRabbitMQSinkTest.java @@ -55,6 +55,7 @@ public void testSinkUsingIntegerUsingDefaultAnonymousSender() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -69,7 +70,7 @@ public void testSinkUsingIntegerUsingDefaultAnonymousSender() { assertThat(messagesReceived).allSatisfy(msg -> { assertThat(msg.getBody()).isInstanceOf(AmqpValue.class); - assertThat(msg.getAddress()).isEqualTo(topic); + assertThat(msg.getAddress()).isEqualTo("/queues/" + topic); }).extracting(msg -> ((AmqpValue) msg.getBody()).getValue()) .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); } @@ -79,6 +80,7 @@ public void testSinkWhenConnectionIsDown() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -104,7 +106,7 @@ public void testSinkWhenConnectionIsDown() { assertThat(messagesReceived).allSatisfy(msg -> { assertThat(msg.getBody()).isInstanceOf(AmqpValue.class); - assertThat(msg.getAddress()).isEqualTo(topic); + assertThat(msg.getAddress()).isEqualTo("/queues/" + topic); }).extracting(msg -> ((AmqpValue) msg.getBody()).getValue()) .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); } @@ -114,6 +116,7 @@ public void testSinkUsingIntegerUsingNonAnonymousSender() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -129,7 +132,7 @@ public void testSinkUsingIntegerUsingNonAnonymousSender() { // Should have used a fixed-address sender link, vertify target address assertThat(messagesReceived).allSatisfy(msg -> { assertThat(msg.getBody()).isInstanceOf(AmqpValue.class); - assertThat(msg.getAddress()).isEqualTo(topic); + assertThat(msg.getAddress()).isEqualTo("/queues/" + topic); }).extracting(msg -> ((AmqpValue) msg.getBody()).getValue()) .containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); } @@ -139,6 +142,7 @@ public void testSinkUsingString() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -153,7 +157,7 @@ public void testSinkUsingString() { assertThat(messagesReceived).allSatisfy(msg -> { assertThat(msg.getBody()).isInstanceOf(AmqpValue.class); - assertThat(msg.getAddress()).isEqualTo(topic); + assertThat(msg.getAddress()).isEqualTo("/queues/" + topic); }).extracting(msg -> ((AmqpValue) msg.getBody()).getValue()) .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); } @@ -175,6 +179,7 @@ public void testSinkUsingObject() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -223,6 +228,7 @@ public void testSinkUsingObjectThatCannotBeSerialized() { String topic = UUID.randomUUID().toString(); AtomicInteger nack = new AtomicInteger(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -265,6 +271,7 @@ public void testSinkUsingJsonObject() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -292,6 +299,7 @@ public void testSinkUsingJsonArray() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -328,6 +336,7 @@ public void testSinkUsingVertxBuffer() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -362,6 +371,7 @@ public void testSinkUsingMutinyBuffer() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -396,6 +406,7 @@ public void testSinkUsingByteArray() { int msgCount = 10; String topic = UUID.randomUUID().toString(); List messagesReceived = new CopyOnWriteArrayList<>(); + createQueue(topic); usage.consume(topic, new AmqpReceiverOptions().setDurable(false), msg -> messagesReceived.add(msg.getDelegate().unwrap())); @@ -444,14 +455,14 @@ private Map createBaseConfig(String topic) { private Flow.Subscriber> createProviderAndSink(String topic) { Map config = createBaseConfig(topic); - config.put("address", topic); + config.put("address", "/queues/" + topic); return getSubscriberBuilder(config); } private Flow.Subscriber> createProviderAndSinkOrdered(String topic) { Map config = createBaseConfig(topic); - config.put("address", topic); + config.put("address", "/queues/" + topic); config.put("max-inflight-messages", 1L); return getSubscriberBuilder(config); @@ -459,7 +470,7 @@ private Flow.Subscriber> createProviderAndSinkOrdered(Strin private Flow.Subscriber> createProviderAndNonAnonymousSink(String topic) { Map config = createBaseConfig(topic); - config.put("address", topic); + config.put("address", "/queues/" + topic); config.put("use-anonymous-sender", false); return getSubscriberBuilder(config); diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpTestBase.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpTestBase.java index b32e2c974a..4093cbd003 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpTestBase.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpTestBase.java @@ -17,7 +17,7 @@ public class AmqpTestBase { private final Logger logger = Logger.getLogger(getClass()); - ExecutionHolder executionHolder; + protected ExecutionHolder executionHolder; @BeforeAll public static void setupClass(TestInfo testInfo) { diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpUsage.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpUsage.java index 23ee3177d0..9e329514cc 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpUsage.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/AmqpUsage.java @@ -39,7 +39,7 @@ public class AmqpUsage { private final static Logger LOGGER = Logger.getLogger(AmqpUsage.class); - private final AmqpClient client; + public final AmqpClient client; private final Vertx vertx; public AmqpUsage(Vertx vertx, String host, int port, String user, String pwd) { diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQBrokerTestBase.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQBrokerTestBase.java index 5c8886c835..9a6cabd8e8 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQBrokerTestBase.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQBrokerTestBase.java @@ -2,7 +2,12 @@ import static org.awaitility.Awaitility.await; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.time.Duration; +import java.util.Base64; import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.weld.environment.se.WeldContainer; @@ -15,7 +20,6 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.MountableFile; import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -33,10 +37,11 @@ public class RabbitMQBrokerTestBase { protected static String host; protected static int port; - final static String username = "guest"; - final static String password = "guest"; - AmqpUsage usage; - ExecutionHolder executionHolder; + protected static int managementPort; + protected final static String username = "guest"; + protected final static String password = "guest"; + protected AmqpUsage usage; + protected ExecutionHolder executionHolder; @BeforeAll public static void setupMutiny() { @@ -45,14 +50,27 @@ public static void setupMutiny() { @BeforeAll public static void startBroker() { - try { - RABBIT.start(); - } catch (Exception e) { - LOGGER.error("Failed to start RabbitMQ container, trying again...", e); - RABBIT.start(); + int maxAttempts = 5; + for (int attempt = 1; attempt <= maxAttempts; attempt++) { + try { + RABBIT.start(); + break; + } catch (Exception e) { + if (attempt == maxAttempts) { + throw e; + } + LOGGER.error("Failed to start RabbitMQ container (attempt {}), retrying...", attempt, e); + try { + Thread.sleep(2000L * attempt); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } } port = RABBIT.getPort(); + managementPort = RABBIT.getManagementPort(); host = RABBIT.getHost(); System.setProperty("amqp-host", host); @@ -92,6 +110,22 @@ public void tearDown() { MapBasedConfig.cleanup(); } + protected static void createQueue(String name) { + try { + String auth = Base64.getEncoder().encodeToString((username + ":" + password).getBytes()); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create("http://" + host + ":" + managementPort + "/api/queues/%2f/" + name)) + .header("Authorization", "Basic " + auth) + .header("Content-Type", "application/json") + .PUT(HttpRequest.BodyPublishers.ofString("{\"durable\":false,\"auto_delete\":true}")) + .build(); + HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build() + .send(request, HttpResponse.BodyHandlers.ofString()); + } catch (Exception e) { + throw new RuntimeException("Failed to create queue: " + name, e); + } + } + public boolean isAmqpConnectorReady(WeldContainer container) { HealthCenter health = container.getBeanManager().createInstance().select(HealthCenter.class).get(); return health.getReadiness().isOk(); @@ -105,12 +139,11 @@ public boolean isAmqpConnectorAlive(WeldContainer container) { public static class RabbitMQContainer extends GenericContainer { public RabbitMQContainer() { - super(DockerImageName.parse("docker.io/rabbitmq:3.12-management")); + super(DockerImageName.parse("docker.io/rabbitmq:4.2.5-management")); withExposedPorts(5672, 15672); - waitingFor(Wait.forLogMessage(".*Server startup complete; 4 plugins started.*\\n", 1) + waitingFor(Wait.forHttp("/api/overview").forPort(15672) + .withBasicCredentials("guest", "guest") .withStartupTimeout(Duration.ofSeconds(30))); - withCopyFileToContainer(MountableFile.forClasspathResource("rabbitmq/enabled_plugins"), - "/etc/rabbitmq/enabled_plugins"); } public RabbitMQContainer(int port) { @@ -122,6 +155,10 @@ public int getPort() { return getMappedPort(5672); } + public int getManagementPort() { + return getMappedPort(15672); + } + } } diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQTest.java index ca5eaf57f6..d47c93f26a 100644 --- a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQTest.java +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/RabbitMQTest.java @@ -4,6 +4,7 @@ import static org.awaitility.Awaitility.await; import java.util.List; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -35,15 +36,17 @@ public void cleanup() { @Test public void testSendingMessagesToRabbitMQ() throws InterruptedException { + String queue = UUID.randomUUID().toString(); + createQueue(queue); CountDownLatch latch = new CountDownLatch(10); - usage.consumeIntegers("sink", + usage.consumeIntegers("/queues/" + queue, v -> latch.countDown()); weld.addBeanClass(ProducingBean.class); new MapBasedConfig() - .with("mp.messaging.outgoing.sink.address", "sink") + .with("mp.messaging.outgoing.sink.address", "/queues/" + queue) .with("mp.messaging.outgoing.sink.connector", AmqpConnector.CONNECTOR_NAME) .with("mp.messaging.outgoing.sink.host", host) .with("mp.messaging.outgoing.sink.port", port) @@ -62,16 +65,18 @@ public void testSendingMessagesToRabbitMQ() throws InterruptedException { } @Test - public void testSendingMessagesToRabbitMQWithNotAnonymousDetection() throws InterruptedException { + public void testSendingMessagesToRabbitMQWithAnonymousDetection() throws InterruptedException { + String queue = UUID.randomUUID().toString(); + createQueue(queue); CountDownLatch latch = new CountDownLatch(10); - usage.consumeIntegers("sink-not-anonymous", + usage.consumeIntegers("/queues/" + queue, v -> latch.countDown()); weld.addBeanClass(ProducingBean.class); new MapBasedConfig() - .with("mp.messaging.outgoing.sink.address", "sink-not-anonymous") + .with("mp.messaging.outgoing.sink.address", "/queues/" + queue) .with("mp.messaging.outgoing.sink.connector", AmqpConnector.CONNECTOR_NAME) .with("mp.messaging.outgoing.sink.host", host) .with("mp.messaging.outgoing.sink.port", port) @@ -90,8 +95,11 @@ public void testSendingMessagesToRabbitMQWithNotAnonymousDetection() throws Inte @Test public void testReceivingMessagesFromRabbitMQ() { + String queue = UUID.randomUUID().toString(); + createQueue(queue); + new MapBasedConfig() - .put("mp.messaging.incoming.data.address", "data") + .put("mp.messaging.incoming.data.address", "/queues/" + queue) .put("mp.messaging.incoming.data.connector", AmqpConnector.CONNECTOR_NAME) .put("mp.messaging.incoming.data.host", host) .put("mp.messaging.incoming.data.port", port) @@ -114,7 +122,7 @@ public void testReceivingMessagesFromRabbitMQ() { assertThat(list).isEmpty(); AtomicInteger counter = new AtomicInteger(); - usage.produceTenIntegers("data", counter::getAndIncrement); + usage.produceTenIntegers("/queues/" + queue, counter::getAndIncrement); await().atMost(2, TimeUnit.MINUTES).until(() -> list.size() >= 10); assertThat(list).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRabbitMQRequestReplyTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRabbitMQRequestReplyTest.java new file mode 100644 index 0000000000..447491ad85 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRabbitMQRequestReplyTest.java @@ -0,0 +1,272 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import io.smallrye.reactive.messaging.amqp.AmqpConnector; +import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata; +import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata; +import io.smallrye.reactive.messaging.amqp.RabbitMQBrokerTestBase; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.amqp.AmqpMessage; +import io.vertx.mutiny.amqp.AmqpReceiver; + +public class AmqpRabbitMQRequestReplyTest extends RabbitMQBrokerTestBase { + + private WeldContainer container; + + @AfterEach + public void cleanup() { + if (container != null) { + container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)) + .get().terminate(null); + container.shutdown(); + } + + MapBasedConfig.cleanup(); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + } + + private MapBasedConfig commonConfig() { + return new MapBasedConfig() + .with("mp.messaging.connector.smallrye-amqp.host", host) + .with("mp.messaging.connector.smallrye-amqp.port", port) + .with("mp.messaging.connector.smallrye-amqp.username", username) + .with("mp.messaging.connector.smallrye-amqp.password", password) + .with("mp.messaging.connector.smallrye-amqp.tracing-enabled", false); + } + + private MapBasedConfig requestReplyConfig(String requestQueue, String replyQueue) { + return commonConfig() + .with("mp.messaging.outgoing.request-reply.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.request-reply.address", "/queues/" + requestQueue) + .with("mp.messaging.outgoing.request-reply.use-anonymous-sender", false) + .with("mp.messaging.outgoing.request-reply.reply.address", "/queues/" + replyQueue); + } + + private void startReplyServer(String requestQueue, String replyQueue) { + usage.client.createReceiver("/queues/" + requestQueue) + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + usage.client.createSender(replyTo) + .onItem().transformToUni(sender -> sender.sendWithAck(AmqpMessage.create() + .correlationId(correlationId) + .withBody(String.valueOf(request.bodyAsInteger())) + .build())) + .subscribe().with(x -> { + }); + } + }); + } + + @Test + public void testReply() { + String id = UUID.randomUUID().toString().substring(0, 8); + String requestQueue = "requests-" + id; + String replyQueue = "replies-" + id; + createQueue(requestQueue); + createQueue(replyQueue); + startReplyServer(requestQueue, replyQueue); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + requestReplyConfig(requestQueue, replyQueue).write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + for (int i = 0; i < 5; i++) { + producer.requestReply().request(i).subscribe().with(replies::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyMessage() { + String id = UUID.randomUUID().toString().substring(0, 8); + String requestQueue = "requests-msg-" + id; + String replyQueue = "replies-msg-" + id; + createQueue(requestQueue); + createQueue(replyQueue); + startReplyServer(requestQueue, replyQueue); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + requestReplyConfig(requestQueue, replyQueue).write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + for (int i = 0; i < 5; i++) { + producer.requestReply().request(Message.of(i)).subscribe().with(m -> replies.add(m.getPayload())); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyTimeout() { + String id = UUID.randomUUID().toString().substring(0, 8); + String requestQueue = "requests-timeout-" + id; + String replyQueue = "replies-timeout-" + id; + createQueue(requestQueue); + createQueue(replyQueue); + // No reply server — requests will time out + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + requestReplyConfig(requestQueue, replyQueue) + .with("mp.messaging.outgoing.request-reply." + AmqpRequestReply.REPLY_TIMEOUT_KEY, "1000") + .write(); + container = weld.initialize(); + + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + producer.requestReply().request(1) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitFailure(Duration.ofSeconds(5)).assertFailedWith(AmqpRequestReplyTimeoutException.class); + } + + @Test + public void testReplyWithReactiveServer() { + String id = UUID.randomUUID().toString().substring(0, 8); + String requestQueue = "requests-reactive-" + id; + String replyQueue = "replies-reactive-" + id; + createQueue(requestQueue); + createQueue(replyQueue); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class, ReactiveReplyServer.class); + requestReplyConfig(requestQueue, replyQueue) + .with("mp.messaging.incoming.server-in.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.server-in.address", "/queues/" + requestQueue) + .with("mp.messaging.outgoing.server-out.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.server-out.address", "/queues/" + replyQueue) + .with("mp.messaging.outgoing.server-out.use-anonymous-sender", false) + .write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + for (int i = 0; i < 5; i++) { + producer.requestReply().request(i).subscribe().with(replies::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("reply-0", "reply-1", "reply-2", "reply-3", "reply-4"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyWithReactiveMessageServer() { + String id = UUID.randomUUID().toString().substring(0, 8); + String requestQueue = "requests-reactive-msg-" + id; + String replyQueue = "replies-reactive-msg-" + id; + createQueue(requestQueue); + createQueue(replyQueue); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class, ReactiveReplyMessageServer.class); + requestReplyConfig(requestQueue, replyQueue) + .with("mp.messaging.incoming.server-in.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.server-in.address", "/queues/" + requestQueue) + .with("mp.messaging.outgoing.server-out.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.server-out.address", "/queues/" + replyQueue) + .with("mp.messaging.outgoing.server-out.use-anonymous-sender", false) + .write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + for (int i = 0; i < 5; i++) { + producer.requestReply().request(i).subscribe().with(replies::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("reply-0", "reply-1", "reply-2", "reply-3", "reply-4"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @ApplicationScoped + public static class RequestReplyProducer { + + @Inject + @Channel("request-reply") + AmqpRequestReply requestReply; + + public AmqpRequestReply requestReply() { + return requestReply; + } + } + + @ApplicationScoped + public static class ReactiveReplyServer { + + @Incoming("server-in") + @Outgoing("server-out") + public String process(int request) { + return "reply-" + request; + } + } + + @ApplicationScoped + public static class ReactiveReplyMessageServer { + + @Incoming("server-in") + @Outgoing("server-out") + public Message process(Message request) { + IncomingAmqpMetadata incoming = request.getMetadata(IncomingAmqpMetadata.class).orElse(null); + if (incoming != null) { + OutgoingAmqpMetadata outMeta = OutgoingAmqpMetadata.builder() + .withAddress(incoming.getReplyTo()) + .withCorrelationId(incoming.getId()) + .build(); + return request.withPayload("reply-" + request.getPayload()).addMetadata(outMeta); + } + return request.withPayload(String.valueOf(request.getPayload())); + } + } +} diff --git a/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyEmitterTest.java b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyEmitterTest.java new file mode 100644 index 0000000000..e6db64c443 --- /dev/null +++ b/smallrye-reactive-messaging-amqp/src/test/java/io/smallrye/reactive/messaging/amqp/reply/AmqpRequestReplyEmitterTest.java @@ -0,0 +1,492 @@ +package io.smallrye.reactive.messaging.amqp.reply; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.config.ConfigProvider; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.common.annotation.Identifier; +import io.smallrye.config.SmallRyeConfigProviderResolver; +import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; +import io.smallrye.reactive.messaging.amqp.AmqpBrokerTestBase; +import io.smallrye.reactive.messaging.amqp.AmqpConnector; +import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata; +import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.amqp.AmqpMessage; +import io.vertx.mutiny.amqp.AmqpReceiver; + +public class AmqpRequestReplyEmitterTest extends AmqpBrokerTestBase { + + private WeldContainer container; + + @AfterEach + public void cleanup() { + if (container != null) { + container.getBeanManager().createInstance() + .select(AmqpConnector.class, ConnectorLiteral.of(AmqpConnector.CONNECTOR_NAME)) + .get().terminate(null); + container.shutdown(); + } + + MapBasedConfig.cleanup(); + SmallRyeConfigProviderResolver.instance().releaseConfig(ConfigProvider.getConfig()); + } + + private MapBasedConfig config() { + return new MapBasedConfig() + .with("mp.messaging.outgoing.request-reply.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.request-reply.address", "requests") + .with("mp.messaging.outgoing.request-reply.host", host) + .with("mp.messaging.outgoing.request-reply.port", port) + .with("mp.messaging.outgoing.request-reply.username", username) + .with("mp.messaging.outgoing.request-reply.password", password) + .with("mp.messaging.outgoing.request-reply.tracing-enabled", false) + .with("mp.messaging.outgoing.request-reply.reply.address", "request-reply-reply"); + } + + private void startReplyServer() { + startReplyServer("requests"); + } + + private void startReplyServer(String address) { + usage.client.createReceiver(address) + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + usage.client.createSender(replyTo) + .onItem().transformToUni(sender -> sender.sendWithAck(AmqpMessage.create() + .correlationId(correlationId) + .withBody(String.valueOf(request.bodyAsInteger())) + .build())) + .subscribe().with(x -> { + }); + } + }); + } + + private void startSlowReplyServer() { + usage.client.createReceiver("requests") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + usage.client.createSender(replyTo) + .onItem().transformToUni(sender -> sender.sendWithAck(AmqpMessage.create() + .correlationId(correlationId) + .withBody(String.valueOf(request.bodyAsInteger())) + .build())) + .subscribe().with(x -> { + }); + } + }); + } + + @Test + public void testReply() { + startReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config().write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + for (int i = 0; i < 10; i++) { + producer.requestReply().request(i).subscribe().with(replies::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(10)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyMessage() { + startReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config().write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + for (int i = 0; i < 10; i++) { + producer.requestReply().request(Message.of(i)).subscribe().with(m -> replies.add(m.getPayload())); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(10)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyTimeout() { + startSlowReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config() + .with("mp.messaging.outgoing.request-reply." + AmqpRequestReply.REPLY_TIMEOUT_KEY, "1000") + .write(); + container = weld.initialize(); + + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + producer.requestReply().request(1) + .subscribe().withSubscriber(UniAssertSubscriber.create()) + .awaitFailure().assertFailedWith(AmqpRequestReplyTimeoutException.class); + } + + @Test + public void testReplyMessageMulti() { + startMultiReplyServer(10); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config().write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + List expected = new ArrayList<>(); + int sent = 5; + for (int i = 0; i < sent; i++) { + producer.requestReply().requestMulti(i) + .subscribe() + .with(replies::add); + for (int j = 0; j < 10; j++) { + expected.add(i + ": " + j); + } + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(10 * sent)); + assertThat(replies).containsAll(expected); + Map pendingReplies = producer.requestReply().getPendingReplies(); + for (PendingReply pending : pendingReplies.values()) { + pending.complete(); + } + await().untilAsserted(() -> assertThat(producer.requestReply().getPendingReplies()).isEmpty()); + } + + @Test + public void testReplyMessageMultiLimit() { + startMultiReplyServer(10); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config().write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + producer.requestReply().requestMulti(0) + .select().first(5) + .subscribe() + .with(replies::add); + + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyMultipleEmittersSameRequestAddress() { + startReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducerSecond.class); + config() + .with("mp.messaging.outgoing.request-reply2.connector", AmqpConnector.CONNECTOR_NAME) + .with("mp.messaging.outgoing.request-reply2.address", "requests") + .with("mp.messaging.outgoing.request-reply2.host", host) + .with("mp.messaging.outgoing.request-reply2.port", port) + .with("mp.messaging.outgoing.request-reply2.username", username) + .with("mp.messaging.outgoing.request-reply2.password", password) + .with("mp.messaging.outgoing.request-reply2.tracing-enabled", false) + .with("mp.messaging.outgoing.request-reply2.reply.address", "request-reply2-reply") + .write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducerSecond app = container.getBeanManager().createInstance() + .select(RequestReplyProducerSecond.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + for (int i = 0; i < 20; i++) { + AmqpRequestReply requestReply = (i % 2 == 0) ? app.requestReply() : app.requestReply2(); + requestReply.request(Message.of(i)).subscribe().with(m -> replies.add(m.getPayload())); + } + + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(20)); + assertThat(replies) + .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", + "10", "11", "12", "13", "14", "15", "16", "17", "18", "19"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); + assertThat(app.requestReply2().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyMessageBytesCorrelationId() { + startReplyServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config() + .with("mp.messaging.outgoing.request-reply." + AmqpRequestReply.REPLY_CORRELATION_ID_HANDLER_KEY, + "bytes") + .write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + for (int i = 0; i < 10; i++) { + producer.requestReply().request(Message.of(i)).subscribe().with(m -> replies.add(m.getPayload())); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(10)); + assertThat(replies).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyFailureHandler() { + startReplyServerWithFailure(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class, MyReplyFailureHandler.class); + config() + .with("mp.messaging.outgoing.request-reply." + AmqpRequestReply.REPLY_FAILURE_HANDLER_KEY, + "my-reply-error") + .write(); + container = weld.initialize(); + + List replies = new CopyOnWriteArrayList<>(); + List errors = new CopyOnWriteArrayList<>(); + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + for (int i = 0; i < 10; i++) { + producer.requestReply().request(i) + .subscribe().with(replies::add, errors::add); + } + await().atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertThat(replies).hasSize(6)); + assertThat(replies) + .containsExactlyInAnyOrder("1", "2", "4", "5", "7", "8"); + await().untilAsserted(() -> assertThat(errors).hasSize(4)); + assertThat(errors) + .extracting(Throwable::getMessage) + .allSatisfy(message -> assertThat(message).containsAnyOf("0", "3", "6", "9") + .contains("Cannot reply to")); + assertThat(producer.requestReply().getPendingReplies()).isEmpty(); + } + + @Test + public void testReplyGracefulShutdown() { + startNonReplyingServer(); + + Weld weld = new Weld(); + weld.addBeanClasses(RequestReplyProducer.class); + config() + .with("mp.messaging.outgoing.request-reply." + AmqpRequestReply.REPLY_TIMEOUT_KEY, "30000") + .write(); + container = weld.initialize(); + + RequestReplyProducer producer = container.getBeanManager().createInstance() + .select(RequestReplyProducer.class).get(); + await().until(() -> isAmqpConnectorReady(container)); + + for (int i = 0; i < 5; i++) { + producer.requestReply().request(i) + .subscribe().with(r -> { + }, e -> { + }); + } + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(producer.requestReply().getPendingReplies()).hasSize(5)); + + producer.requestReply().complete(); + + await().atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(producer.requestReply().getPendingReplies()).isEmpty()); + } + + private void startNonReplyingServer() { + usage.client.createReceiver("requests") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + }); + } + + private void startMultiReplyServer(int replyCount) { + usage.client.createReceiver("requests") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + usage.client.createSender(replyTo) + .subscribe().with(sender -> { + String payload = String.valueOf(request.bodyAsInteger()); + for (int i = 0; i < replyCount; i++) { + sender.sendWithAck(AmqpMessage.create() + .correlationId(correlationId) + .withBody(payload + ": " + i) + .build()).subscribe().with(x -> { + }); + } + }); + } + }); + } + + private void startReplyServerWithFailure() { + usage.client.createReceiver("requests") + .onItem().transformToMulti(AmqpReceiver::toMulti) + .subscribe().with(request -> { + String replyTo = request.replyTo(); + String correlationId = request.id(); + if (replyTo != null && correlationId != null) { + int value = request.bodyAsInteger(); + io.vertx.mutiny.amqp.AmqpMessageBuilder builder = AmqpMessage.create() + .correlationId(correlationId) + .withBody(String.valueOf(value)); + if (value % 3 == 0) { + builder.applicationProperties( + io.vertx.core.json.JsonObject.of("REPLY_ERROR", "Cannot reply to " + value)); + } + io.vertx.mutiny.amqp.AmqpMessage reply = builder.build(); + usage.client.createSender(replyTo) + .onItem().transformToUni(sender -> sender.sendWithAck(reply)) + .subscribe().with(x -> { + }); + } + }); + } + + @ApplicationScoped + public static class RequestReplyProducer { + + @Inject + @Channel("request-reply") + AmqpRequestReply requestReply; + + public AmqpRequestReply requestReply() { + return requestReply; + } + } + + @ApplicationScoped + public static class RequestReplyProducerSecond { + + @Inject + @Channel("request-reply") + AmqpRequestReply requestReply; + + @Inject + @Channel("request-reply2") + AmqpRequestReply requestReply2; + + public AmqpRequestReply requestReply() { + return requestReply; + } + + public AmqpRequestReply requestReply2() { + return requestReply2; + } + } + + @ApplicationScoped + public static class ReactiveReplyMessageServer { + + @Incoming("server-in") + @Outgoing("server-out") + public Message process(Message request) { + IncomingAmqpMetadata incoming = request.getMetadata(IncomingAmqpMetadata.class).orElse(null); + if (incoming != null) { + OutgoingAmqpMetadata outMeta = OutgoingAmqpMetadata.builder() + .withAddress(incoming.getReplyTo()) + .withCorrelationId(incoming.getId()) + .build(); + return request.withPayload("reply-" + request.getPayload()).addMetadata(outMeta); + } + return request.withPayload(String.valueOf(request.getPayload())); + } + } + + @ApplicationScoped + public static class ReactiveReplyServer { + + @Incoming("server-in") + @Outgoing("server-out") + public String process(int request) { + return "reply-" + request; + } + } + + @ApplicationScoped + @Identifier("my-reply-error") + public static class MyReplyFailureHandler implements ReplyFailureHandler { + + @Override + public Throwable handleReply(io.smallrye.reactive.messaging.amqp.AmqpMessage replyMessage) { + io.vertx.core.json.JsonObject properties = replyMessage.getApplicationProperties(); + if (properties != null) { + String error = properties.getString("REPLY_ERROR"); + if (error != null) { + return new IllegalArgumentException(error); + } + } + return null; + } + } + +} diff --git a/smallrye-reactive-messaging-amqp/src/test/resources/rabbitmq/enabled_plugins b/smallrye-reactive-messaging-amqp/src/test/resources/rabbitmq/enabled_plugins deleted file mode 100644 index e7160fc837..0000000000 --- a/smallrye-reactive-messaging-amqp/src/test/resources/rabbitmq/enabled_plugins +++ /dev/null @@ -1,2 +0,0 @@ -[rabbitmq_management,rabbitmq_management_agent,rabbitmq_amqp1_0,rabbitmq_web_dispatch]. -