Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ nav:
- amqp/amqp.md
- 'Receiving messages': amqp/receiving-amqp-messages.md
- 'Sending messages': amqp/sending-amqp-messages.md
- 'AMQP Request/Reply': amqp/request-reply.md
- 'Health Checks': amqp/health.md
- 'Client Customization': amqp/client-customization.md
- 'Using RabbitMQ': amqp/rabbitmq.md
Expand Down
1 change: 1 addition & 0 deletions documentation/src/main/docs/amqp/amqp.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ With this connector, your application can:

- receive messages from an AMQP Broker or Router.
- send `Message` to an AMQP *address*
- implement [request-reply](request-reply.md) messaging patterns

The AMQP connector is based on the [Vert.x AMQP Client](https://vertx.io/docs/vertx-amqp-client/java/).

Expand Down
36 changes: 29 additions & 7 deletions documentation/src/main/docs/amqp/rabbitmq.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
# Using RabbitMQ

This connector is for AMQP 1.0. RabbitMQ implements AMQP 0.9.1. RabbitMQ
does not provide AMQP 1.0 by default, but there is a plugin for it. To
use RabbitMQ with this connector, enable and configure the [AMQP 1.0
plugin](https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/v3.8.x/README.md).
RabbitMQ 4.0+ provides native AMQP 1.0 support, making it fully compatible with this connector.
For older versions (3.x), RabbitMQ implements AMQP 0.9.1 and requires the
[AMQP 1.0 plugin](https://github.com/rabbitmq/rabbitmq-amqp1.0/blob/v3.8.x/README.md) to be enabled.

Despite the plugin, a few features won’t work with RabbitMQ. Thus, we
recommend the following configurations.
## Address format

RabbitMQ 4.0+ uses the v2 address format with `/queues/` and `/exchanges/` prefixes.
When using this connector with RabbitMQ, prefix your addresses accordingly:

``` properties
mp.messaging.incoming.prices.connector=smallrye-amqp
mp.messaging.incoming.prices.address=/queues/prices

mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=/queues/prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false
```

## Recommendations

RabbitMQ support for AMQP 1.0 may differ from other AMQP 1.0 supporting brokers.
We recommend the following configurations.

To receive messages from RabbitMQ:

Expand All @@ -19,16 +34,23 @@ mp.messaging.incoming.prices.durable=false

To send messages to RabbitMQ:

- set the destination `address` (anonymous sender are not supported)
- set the destination `address` (anonymous senders are not supported)
- set `use-anonymous-sender` to `false`

``` properties
mp.messaging.outgoing.generated-price.connector=smallrye-amqp
mp.messaging.outgoing.generated-price.address=prices
mp.messaging.outgoing.generated-price.use-anonymous-sender=false
```

It’s not possible to change the destination dynamically (using message
metadata) when using RabbitMQ. The connector automatically detects that
the broker does not support anonymous sender (See
<http://docs.oasis-open.org/amqp/anonterm/v1.0/anonterm-v1.0.html>).

## Request/Reply with RabbitMQ

This connector supports the request-reply pattern with RabbitMQ.
See [AMQP Request/Reply](request-reply.md#using-with-rabbitmq) for details.

Alternatively, you can use the [RabbitMQ connector](../rabbitmq/rabbitmq.md).
163 changes: 163 additions & 0 deletions documentation/src/main/docs/amqp/request-reply.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# AMQP Request/Reply

!!!warning "Experimental"
AMQP Request Reply Emitter is an experimental feature.

The AMQP [Request-Reply](https://www.enterpriseintegrationpatterns.com/patterns/messaging/RequestReply.html) pattern allows you to publish a message to an AMQP address and then await for a reply message that responds to the initial request.

The `AmqpRequestReply` emitter implements the requestor (or the client) of the request-reply pattern for AMQP 1.0 outbound channels:

``` java
{{ insert('amqp/outbound/AmqpRequestReplyEmitter.java') }}
```

The `request` method publishes the request message to the configured target address of the outgoing channel,
and listens on a reply address (by default, the channel name with `-reply` suffix) for a reply message.
When the reply is received the returned `Uni` is completed with the message payload.

The request send operation generates a correlation id and sets the AMQP `message-id` property,
which it expects to be sent back in the reply message's `correlation-id` property.

The replier (or the server) can be implemented using a Reactive Messaging processor:

``` java
{{ insert('amqp/outbound/AmqpReplier.java') }}
```

When you need more control over the reply message, such as setting the reply-to address and correlation id explicitly, you can use the `Message` type:

``` java
{{ insert('amqp/outbound/AmqpReplierWithMetadata.java') }}
```

Given the following configuration example:

```properties
mp.messaging.outgoing.my-request.connector=smallrye-amqp
mp.messaging.outgoing.my-request.address=requests
mp.messaging.outgoing.my-request.reply.address=my-request-reply

mp.messaging.incoming.request.connector=smallrye-amqp
mp.messaging.incoming.request.address=requests

mp.messaging.outgoing.reply.connector=smallrye-amqp
mp.messaging.outgoing.reply.address=my-request-reply
mp.messaging.outgoing.reply.use-anonymous-sender=false
```

## Requesting with `Message` types

Like the core Emitter's `send` methods, `request` method also can receive a `Message` type and return a message:

``` java
{{ insert('amqp/outbound/AmqpRequestReplyMessageEmitter.java') }}
```

!!! note
The ingested reply type of the `AmqpRequestReply` is discovered at runtime,
in order to configure a `MessageConverter` to be applied on the incoming message before returning the `Uni` result.

## Requesting multiple replies

You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type.

For example this can be used to aggregate multiple replies to a single request.

``` java
{{ insert('amqp/outbound/AmqpRequestReplyMultiEmitter.java') }}
```
Like the other `request` you can also request `Message` types.

!!! note
The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will
fail.

## Pending replies and reply timeout

By default, the `Uni` returned from the `request` method is configured to fail with timeout exception if no reply is received after 5 seconds.
This timeout is configurable with the channel attribute `reply.timeout` (in milliseconds).

```properties
mp.messaging.outgoing.my-request.reply.timeout=10000
```

A snapshot of the list of pending replies is available through the `AmqpRequestReply#getPendingReplies` method.

## Scaling Request/Reply

If multiple requestor instances are configured on the same outgoing address, and the same reply address,
each requestor instance will receive replies of all instances. If an observed correlation id doesn't match
the id of any pending replies, the reply is simply discarded.
With the additional network traffic this allows scaling requestors, (and repliers) dynamically.

## Correlation Ids

The AMQP Request/Reply allows configuring the correlation id mechanism completely through a `CorrelationIdHandler` implementation.
The default handler is based on randomly generated UUID strings, set as the AMQP `message-id` on the request message.
The reply message is expected to carry the same value in the `correlation-id` property.

The correlation id handler implementation can be configured using the `reply.correlation-id.handler` attribute.
The default configuration is `uuid`,
and an alternative `bytes` implementation can be used to generate random binary correlation ids.

Custom handlers can be implemented by proposing a CDI-managed bean with `@Identifier` qualifier.

## Reply Error Handling

If the reply server produces an error, it can propagate the error back to the requestor, failing the returned `Uni`.

If configured using the `reply.failure.handler` channel attribute,
the `ReplyFailureHandler` implementations are discovered through CDI, matching the `@Identifier` qualifier.

A sample reply error handler can lookup application properties and return the error to be thrown by the reply:

``` java
{{ insert('amqp/outbound/MyAmqpReplyFailureHandler.java') }}
```

`null` return value indicates that no error has been found in the reply message, and it can be delivered to the application.

## Connection Sharing

Multiple AMQP channels can share the same underlying connection when configured with the same `container-id`.
This reduces resource consumption and is particularly useful for request-reply patterns
where the sender and reply receiver can share a single connection.

```properties
mp.messaging.outgoing.my-request.connector=smallrye-amqp
mp.messaging.outgoing.my-request.address=requests
mp.messaging.outgoing.my-request.container-id=my-connection
mp.messaging.outgoing.my-request.reply.address=replies

mp.messaging.incoming.other-channel.connector=smallrye-amqp
mp.messaging.incoming.other-channel.address=other
mp.messaging.incoming.other-channel.container-id=my-connection
```

Both channels above will share the same AMQP connection because they use the same `container-id`.

!!! note
Connection sharing requires all channels with the same `container-id` to have compatible connection settings (host, port, credentials, etc.).
If the settings differ, the connector will detect the conflict and raise an error.

## Using with RabbitMQ

RabbitMQ 4.0+ with native AMQP 1.0 support is compatible with the request-reply pattern.
When using RabbitMQ, remember to use `/queues/` prefixed addresses (v2 address format)
and set `use-anonymous-sender=false` as anonymous senders are not supported.

```properties
mp.messaging.outgoing.my-request.connector=smallrye-amqp
mp.messaging.outgoing.my-request.address=/queues/requests
mp.messaging.outgoing.my-request.reply.address=/queues/replies
mp.messaging.outgoing.my-request.use-anonymous-sender=false

mp.messaging.incoming.request.connector=smallrye-amqp
mp.messaging.incoming.request.address=/queues/requests

mp.messaging.outgoing.reply.connector=smallrye-amqp
mp.messaging.outgoing.reply.address=/queues/replies
mp.messaging.outgoing.reply.use-anonymous-sender=false
```

See [Using RabbitMQ](rabbitmq.md) for more details on RabbitMQ-specific configuration.
20 changes: 20 additions & 0 deletions documentation/src/main/java/amqp/outbound/AmqpReplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package amqp.outbound;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class AmqpReplier {

Random rand = new Random();

@Incoming("request")
@Outgoing("reply")
String handleRequest(String request) {
return request + "-" + rand.nextInt(100);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package amqp.outbound;

import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.reactive.messaging.amqp.IncomingAmqpMetadata;
import io.smallrye.reactive.messaging.amqp.OutgoingAmqpMetadata;

@ApplicationScoped
public class AmqpReplierWithMetadata {

Random rand = new Random();

@Incoming("request")
@Outgoing("reply")
Message<String> handleRequest(Message<String> request) {
IncomingAmqpMetadata incoming = request.getMetadata(IncomingAmqpMetadata.class)
.orElseThrow();
OutgoingAmqpMetadata outMeta = OutgoingAmqpMetadata.builder()
.withAddress(incoming.getReplyTo())
.withCorrelationId(incoming.getId())
.build();
String reply = request.getPayload() + "-" + rand.nextInt(100);
return request.withPayload(reply).addMetadata(outMeta);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package amqp.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply;

@ApplicationScoped
public class AmqpRequestReplyEmitter {

@Inject
@Channel("my-request")
AmqpRequestReply<String, Integer> quoteRequest;

public Uni<Integer> requestQuote(String request) {
return quoteRequest.request(request);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package amqp.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply;

@ApplicationScoped
public class AmqpRequestReplyMessageEmitter {

@Inject
@Channel("my-request")
AmqpRequestReply<String, Integer> quoteRequest;

public Uni<Message<Integer>> requestMessage(String request) {
return quoteRequest.request(Message.of(request));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package amqp.outbound;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.amqp.reply.AmqpRequestReply;

@ApplicationScoped
public class AmqpRequestReplyMultiEmitter {

@Inject
@Channel("my-request")
AmqpRequestReply<String, Integer> quoteRequest;

public Multi<Integer> requestQuote(String request) {
return quoteRequest.requestMulti(request).select().first(5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package amqp.outbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.amqp.AmqpMessage;
import io.smallrye.reactive.messaging.amqp.reply.ReplyFailureHandler;

@ApplicationScoped
@Identifier("my-reply-error")
public class MyAmqpReplyFailureHandler implements ReplyFailureHandler {

@Override
public Throwable handleReply(AmqpMessage<?> replyMessage) {
var properties = replyMessage.getApplicationProperties();
if (properties != null) {
String error = properties.getString("REPLY_ERROR");
if (error != null) {
return new IllegalArgumentException(error);
}
}
return null;
}
}
Loading