Skip to content

Commit 7eebee6

Browse files
committed
Wait for openFuture on AMQP links before usage
1 parent 1f8724c commit 7eebee6

3 files changed

Lines changed: 21 additions & 4 deletions

File tree

spring-amqp-client/src/main/java/org/springframework/amqp/client/DefaultAmqpClient.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ private Sender getSender() throws ClientException {
160160
this.connectionFactory
161161
.getConnection()
162162
.openAnonymousSender(this.senderOptions);
163-
this.sender = senderToReturn;
163+
Future<Sender> openFuture = senderToReturn.openFuture();
164+
this.sender = ProtonUtils.toSupplier(openFuture, this.senderOptions.openTimeout()).get();
164165
}
165166
}
166167
finally {
@@ -172,9 +173,12 @@ private Sender getSender() throws ClientException {
172173

173174
private <M extends Message<?>> CompletableFuture<M> receive(String fromAddress, @Nullable Duration receiveTimeout) {
174175
try {
175-
Receiver receiver =
176+
Future<Receiver> openFuture =
176177
this.connectionFactory.getConnection()
177178
.openReceiver(fromAddress, this.receiverOptions)
179+
.openFuture();
180+
Receiver receiver =
181+
ProtonUtils.toSupplier(openFuture, this.receiverOptions.openTimeout()).get()
178182
// Since this 'Receiver' is volatile and only about one message to consume,
179183
// therefore only one credit is permitted without renewing.
180184
.addCredit(1);

spring-amqp-client/src/main/java/org/springframework/amqp/client/listener/AmqpMessageListenerContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
2525
import java.util.concurrent.Executor;
26+
import java.util.concurrent.Future;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.locks.Lock;
2829
import java.util.concurrent.locks.ReentrantLock;
@@ -283,8 +284,13 @@ public void start() {
283284
for (String queue : this.queues) {
284285
for (int i = 0; i < this.consumersPerQueue; i++) {
285286
try {
287+
Future<Receiver> openFuture =
288+
this.connectionFactory.getConnection()
289+
.openReceiver(queue, receiverOptions)
290+
.openFuture();
286291
ClientReceiver receiver =
287-
(ClientReceiver) connection.openReceiver(queue, receiverOptions)
292+
(ClientReceiver) ProtonUtils.toSupplier(openFuture, receiverOptions.openTimeout())
293+
.get()
288294
.addCredit(this.initialCredits);
289295
AmqpConsumer consumer = new AmqpConsumer(receiver);
290296
this.queueToConsumers.add(queue, consumer);

spring-amqp-client/src/main/java/org/springframework/amqp/client/listener/AmqpMessagingListenerAdapter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@
2121
import java.lang.reflect.WildcardType;
2222
import java.util.Objects;
2323
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.Future;
2425

26+
import org.apache.qpid.protonj2.client.ConnectionOptions;
2527
import org.apache.qpid.protonj2.client.Delivery;
2628
import org.apache.qpid.protonj2.client.DeliveryState;
29+
import org.apache.qpid.protonj2.client.Sender;
2730
import org.apache.qpid.protonj2.client.Tracker;
2831
import org.apache.qpid.protonj2.client.exceptions.ClientException;
2932
import org.jspecify.annotations.Nullable;
@@ -566,10 +569,14 @@ private static void settleRequest(Delivery delivery, Message request, AmqpAcknow
566569

567570
private static void sendResponse(Delivery delivery, String replyTo, Message response) throws Exception {
568571
org.apache.qpid.protonj2.client.Message<?> protonMessage = ProtonUtils.toProtonMessage(response);
569-
Tracker tracker =
572+
Future<Sender> openFuture =
570573
delivery.receiver()
571574
.connection()
572575
.openSender(replyTo)
576+
.openFuture();
577+
Tracker tracker =
578+
ProtonUtils.toSupplier(openFuture, ConnectionOptions.DEFAULT_OPEN_TIMEOUT)
579+
.get()
573580
.send(protonMessage)
574581
.settlementFuture()
575582
.get();

0 commit comments

Comments
 (0)