Skip to content

Commit 7b18e8c

Browse files
committed
Revert "AMQP Link pairing implementation"
This reverts commit 098c75d.
1 parent 2c997a4 commit 7b18e8c

11 files changed

Lines changed: 15 additions & 1011 deletions

File tree

documentation/src/main/docs/amqp/rabbitmq.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ the broker does not support anonymous sender (See
5050

5151
## Request/Reply with RabbitMQ
5252

53-
This connector supports the request-reply pattern with RabbitMQ, including
54-
RabbitMQ’s [Direct Reply-To](https://www.rabbitmq.com/docs/direct-reply-to) mechanism.
53+
This connector supports the request-reply pattern with RabbitMQ.
5554
See [AMQP Request/Reply](request-reply.md#using-with-rabbitmq) for details.
5655

5756
Alternatively, you can use the [RabbitMQ connector](../rabbitmq/rabbitmq.md).

documentation/src/main/docs/amqp/request-reply.md

Lines changed: 11 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -117,52 +117,6 @@ A sample reply error handler can lookup application properties and return the er
117117

118118
`null` return value indicates that no error has been found in the reply message, and it can be delivered to the application.
119119

120-
## Using Link Pairing
121-
122-
AMQP 1.0 supports the concept of _link pairing_ where sender and receiver links are created as a paired unit on the same connection.
123-
When link pairing is enabled, the sender link and the reply receiver link are paired at the broker level,
124-
allowing the sender to reference the receiver's address as the reply-to target using the special `$me` address.
125-
126-
To enable link pairing:
127-
128-
```properties
129-
mp.messaging.outgoing.my-request.connector=smallrye-amqp
130-
mp.messaging.outgoing.my-request.address=requests
131-
mp.messaging.outgoing.my-request.link-pairing=true
132-
mp.messaging.outgoing.my-request.container-id=my-request-client
133-
```
134-
135-
When `link-pairing` is enabled and no explicit `reply.address` is set, the reply address defaults to `$me`,
136-
meaning the reply receiver's address is resolved by the broker through the paired link.
137-
138-
You can still provide an explicit reply address with link pairing:
139-
140-
```properties
141-
mp.messaging.outgoing.my-request.link-pairing=true
142-
mp.messaging.outgoing.my-request.reply.address=my-reply-queue
143-
```
144-
145-
### Implementing a paired replier
146-
147-
The replier (server) can also use link pairing by configuring both the incoming and outgoing channels
148-
with the same `container-id` and `link-name`, and setting `link-pairing=true`:
149-
150-
```properties
151-
# Server incoming config
152-
mp.messaging.incoming.server-in.connector=smallrye-amqp
153-
mp.messaging.incoming.server-in.address=requests
154-
mp.messaging.incoming.server-in.container-id=reply-server
155-
mp.messaging.incoming.server-in.link-name=reply-server-link
156-
mp.messaging.incoming.server-in.link-pairing=true
157-
158-
# Server outgoing config
159-
mp.messaging.outgoing.server-out.connector=smallrye-amqp
160-
mp.messaging.outgoing.server-out.address=$me
161-
mp.messaging.outgoing.server-out.container-id=reply-server
162-
mp.messaging.outgoing.server-out.link-name=reply-server-link
163-
mp.messaging.outgoing.server-out.link-pairing=true
164-
```
165-
166120
## Connection Sharing
167121

168122
Multiple AMQP channels can share the same underlying connection when configured with the same `container-id`.
@@ -186,30 +140,24 @@ Both channels above will share the same AMQP connection because they use the sam
186140
Connection sharing requires all channels with the same `container-id` to have compatible connection settings (host, port, credentials, etc.).
187141
If the settings differ, the connector will detect the conflict and raise an error.
188142

189-
When using link pairing with request-reply, the outgoing channel and its reply receiver automatically share a connection
190-
when configured with the same `container-id`.
191-
192143
## Using with RabbitMQ
193144

194-
RabbitMQ 4.0+ with the native AMQP 1.0 support provides the `amq.rabbitmq.reply-to` pseudo-address for
195-
[Direct Reply-To](https://www.rabbitmq.com/docs/direct-reply-to),
196-
which dynamically creates volatile reply-to queues.
197-
198-
To use direct reply-to with request-reply:
145+
RabbitMQ 4.0+ with native AMQP 1.0 support is compatible with the request-reply pattern.
146+
When using RabbitMQ, remember to use `/queues/` prefixed addresses (v2 address format)
147+
and set `use-anonymous-sender=false` as anonymous senders are not supported.
199148

200149
```properties
201150
mp.messaging.outgoing.my-request.connector=smallrye-amqp
202151
mp.messaging.outgoing.my-request.address=/queues/requests
203-
mp.messaging.outgoing.my-request.reply.address=amq.rabbitmq.reply-to
204-
mp.messaging.outgoing.my-request.container-id=my-request
152+
mp.messaging.outgoing.my-request.reply.address=/queues/replies
205153
mp.messaging.outgoing.my-request.use-anonymous-sender=false
206-
```
207154

208-
When using `amq.rabbitmq.reply-to`, the broker creates a dynamic volatile queue for replies.
209-
The actual queue address is resolved at connection time and used as the `reply-to` property on request messages.
155+
mp.messaging.incoming.request.connector=smallrye-amqp
156+
mp.messaging.incoming.request.address=/queues/requests
210157

211-
!!! note
212-
With RabbitMQ, remember to use `/queues/` prefixed addresses (v2 address format)
213-
and set `use-anonymous-sender=false` as anonymous senders are not supported.
158+
mp.messaging.outgoing.reply.connector=smallrye-amqp
159+
mp.messaging.outgoing.reply.address=/queues/replies
160+
mp.messaging.outgoing.reply.use-anonymous-sender=false
161+
```
214162

215-
Link pairing is also supported with RabbitMQ, see [Using Link Pairing](#using-link-pairing) above for configuration details.
163+
See [Using RabbitMQ](rabbitmq.md) for more details on RabbitMQ-specific configuration.

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import jakarta.enterprise.inject.Instance;
2121
import jakarta.inject.Inject;
2222

23-
import org.apache.qpid.proton.amqp.Symbol;
2423
import org.eclipse.microprofile.config.Config;
2524
import org.eclipse.microprofile.reactive.messaging.Message;
2625
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
@@ -83,13 +82,11 @@
8382
@ConnectorAttribute(name = "cloud-events-insert-timestamp", type = "boolean", direction = OUTGOING, description = "Whether or not the connector should insert automatically the `time` attribute into the outgoing Cloud Event. Requires `cloud-events` to be set to `true`. This value is used if the message does not configure the `time` attribute itself", alias = "cloud-events-default-timestamp", defaultValue = "true")
8483
@ConnectorAttribute(name = "cloud-events-mode", type = "string", direction = OUTGOING, description = "The Cloud Event mode (`structured` or `binary` (default)). Indicates how are written the cloud events in the outgoing record", defaultValue = "binary")
8584
@ConnectorAttribute(name = "lazy-client", type = "boolean", direction = OUTGOING, description = "Whether to create the connection and sender at startup or at first send request", defaultValue = "true")
86-
@ConnectorAttribute(name = "link-pairing", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to use AMQP Link Pairing. When enabled, the sender and receiver links are paired according to the AMQP Link Pair specification (OASIS).", defaultValue = "false")
8785

8886
public class AmqpConnector implements InboundConnector, OutboundConnector, HealthReporter {
8987

9088
public static final String CONNECTOR_NAME = "smallrye-amqp";
9189

92-
public static final Symbol PAIRED_KEY = Symbol.valueOf("paired");
9390
public static final String ME_ADDRESS = "$me";
9491
public static final String DIRECT_REPLY_TO_ADDRESS = "amq.rabbitmq.reply-to";
9592

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpReceiverHelper.java

Lines changed: 0 additions & 150 deletions
This file was deleted.

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpSenderHelper.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/IncomingAmqpChannel.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.smallrye.reactive.messaging.amqp;
22

3-
import static io.smallrye.reactive.messaging.amqp.AmqpReceiverHelper.createReceiver;
43
import static io.smallrye.reactive.messaging.amqp.ChannelUtils.getClientCapabilities;
54
import static io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging.log;
65
import static java.time.Duration.ofSeconds;
@@ -74,8 +73,6 @@ public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, ConnectionHold
7473

7574
ic.getQos().ifPresent(options::setQos);
7675

77-
boolean linkPairing = ic.getLinkPairing();
78-
7976
if (ic.getTracingEnabled()) {
8077
amqpInstrumenter = AmqpOpenTelemetryInstrumenter.createForConnector(openTelemetryInstance);
8178
} else {
@@ -86,8 +83,7 @@ public IncomingAmqpChannel(AmqpConnectorIncomingConfiguration ic, ConnectionHold
8683
Integer interval = ic.getRetryOnFailInterval();
8784
Integer attempts = ic.getRetryOnFailAttempts();
8885
multi = holder.getOrEstablishConnection()
89-
.onItem()
90-
.transformToUni(connection -> createReceiver(holder, connection, address, options, linkPairing))
86+
.onItem().transformToUni(connection -> connection.createReceiver(address, options))
9187
.onItem().invoke(r -> {
9288
opened.set(true);
9389
resolvedAddress = r.address();

0 commit comments

Comments
 (0)