Skip to content

Commit 337d04f

Browse files
committed
Revert "Return CompletableFuture in ByteBuffer basicPublish"
This reverts commit 5a14fc0.
1 parent 5a14fc0 commit 337d04f

17 files changed

Lines changed: 105 additions & 164 deletions

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,8 +313,8 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean
313313
* @param body the message body
314314
* @throws java.io.IOException if an error is encountered
315315
*/
316-
default CompletableFuture<Void> basicPublish(String exchange, String routingKey, BasicProperties props, ByteBuffer body) throws IOException {
317-
return basicPublish(exchange, routingKey, false, false, props, body);
316+
default void basicPublish(String exchange, String routingKey, BasicProperties props, ByteBuffer body) throws IOException {
317+
basicPublish(exchange, routingKey, false, false, props, body);
318318
}
319319

320320
/**
@@ -328,31 +328,29 @@ default CompletableFuture<Void> basicPublish(String exchange, String routingKey,
328328
* @param body the message body
329329
* @throws java.io.IOException if an error is encountered
330330
*/
331-
default CompletableFuture<Void> basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, ByteBuffer body)
331+
default void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, ByteBuffer body)
332332
throws IOException {
333-
return basicPublish(exchange, routingKey, mandatory, false, props, body);
333+
basicPublish(exchange, routingKey, mandatory, false, props, body);
334334
}
335335

336336
/**
337337
* Publish a message with a {@link ByteBuffer} body for zero-copy transmission.
338338
*
339-
* @param exchange the exchange to publish the message to
339+
* @see #basicPublish(String, String, boolean, boolean, BasicProperties, byte[])
340+
* @param exchange the exchange to publish the message to
340341
* @param routingKey the routing key
341-
* @param mandatory true if the 'mandatory' flag is to be set
342-
* @param immediate true if the 'immediate' flag is to be
343-
* set. Note that the RabbitMQ server does not support this flag.
344-
* @param props other properties for the message - routing headers etc
345-
* @param body the message body
346-
* @return
342+
* @param mandatory true if the 'mandatory' flag is to be set
343+
* @param immediate true if the 'immediate' flag is to be
344+
* set. Note that the RabbitMQ server does not support this flag.
345+
* @param props other properties for the message - routing headers etc
346+
* @param body the message body
347347
* @throws java.io.IOException if an error is encountered
348-
* @see #basicPublish(String, String, boolean, boolean, BasicProperties, byte[])
349348
*/
350-
default CompletableFuture<Void> basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body)
349+
default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body)
351350
throws IOException {
352351
byte[] bytes = new byte[body.remaining()];
353352
body.get(bytes);
354353
basicPublish(exchange, routingKey, mandatory, immediate, props, bytes);
355-
return null;
356354
}
357355

358356
/**

src/main/java/com/rabbitmq/client/impl/AMQChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -461,11 +461,11 @@ public void transmit(Method m) throws IOException {
461461
}
462462
}
463463

464-
public CompletableFuture<Void> transmit(AMQCommand c) throws IOException {
464+
public void transmit(AMQCommand c) throws IOException {
465465
_channelLock.lock();
466466
try {
467467
ensureIsOpen();
468-
return quiescingTransmit(c);
468+
quiescingTransmit(c);
469469
} finally {
470470
_channelLock.unlock();
471471
}
@@ -480,7 +480,7 @@ public void quiescingTransmit(Method m) throws IOException {
480480
}
481481
}
482482

483-
public CompletableFuture<Void> quiescingTransmit(AMQCommand c) throws IOException {
483+
public void quiescingTransmit(AMQCommand c) throws IOException {
484484
_channelLock.lock();
485485
try {
486486
if (c.getMethod().hasContent()) {
@@ -498,7 +498,7 @@ public CompletableFuture<Void> quiescingTransmit(AMQCommand c) throws IOExceptio
498498
}
499499
}
500500
this._trafficListener.write(c);
501-
return c.transmit(this);
501+
c.transmit(this);
502502
} finally {
503503
_channelLock.unlock();
504504
}

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.io.DataOutputStream;
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
22-
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.locks.Lock;
2423
import java.util.concurrent.locks.ReentrantLock;
2524

@@ -126,30 +125,28 @@ public boolean handleFrame(Frame f) throws IOException {
126125
* @param channel the channel on which to transmit the command
127126
* @throws IOException if an error is encountered
128127
*/
129-
public CompletableFuture<Void> transmit(AMQChannel channel) throws IOException {
128+
public void transmit(AMQChannel channel) throws IOException {
130129
int channelNumber = channel.getChannelNumber();
131130
AMQConnection connection = channel.getConnection();
132131

133-
CompletableFuture<Void> future = null;
134132
assemblerLock.lock();
135133
try {
136134
Method m = this.assembler.getMethod();
137135
if (m.hasContent()) {
138136
ByteBuffer bbBody = this.assembler.getByteBufferBody();
139137
if (bbBody != null) {
140-
future = transmitWithByteBuffer(m, bbBody, channelNumber, connection);
138+
transmitWithByteBuffer(m, bbBody, channelNumber, connection);
141139
} else {
142140
transmitWithByteArray(m, this.assembler.getContentBody(), channelNumber, connection);
143141
}
144142
} else {
145-
connection.writeFrame(m.toFrame(channelNumber), null);
143+
connection.writeFrame(m.toFrame(channelNumber));
146144
}
147145
} finally {
148146
assemblerLock.unlock();
149147
}
150148

151149
connection.flush();
152-
return future;
153150
}
154151

155152
private void transmitWithByteArray(Method m, byte[] body, int channelNumber, AMQConnection connection) throws IOException {
@@ -163,18 +160,18 @@ private void transmitWithByteArray(Method m, byte[] body, int channelNumber, AMQ
163160
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
164161
throw new IllegalArgumentException(msg);
165162
}
166-
connection.writeFrame(m.toFrame(channelNumber), null);
167-
connection.writeFrame(headerFrame, null);
163+
connection.writeFrame(m.toFrame(channelNumber));
164+
connection.writeFrame(headerFrame);
168165

169166
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
170167
int remaining = body.length - offset;
171168
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
172169
Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
173-
connection.writeFrame(frame, null);
170+
connection.writeFrame(frame);
174171
}
175172
}
176173

177-
private CompletableFuture<Void> transmitWithByteBuffer(Method m, ByteBuffer body, int channelNumber, AMQConnection connection) throws IOException {
174+
private void transmitWithByteBuffer(Method m, ByteBuffer body, int channelNumber, AMQConnection connection) throws IOException {
178175
int bodySize = body.remaining();
179176
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize);
180177

@@ -186,23 +183,16 @@ private CompletableFuture<Void> transmitWithByteBuffer(Method m, ByteBuffer body
186183
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
187184
throw new IllegalArgumentException(msg);
188185
}
189-
connection.writeFrame(m.toFrame(channelNumber), null);
190-
connection.writeFrame(headerFrame, null);
186+
connection.writeFrame(m.toFrame(channelNumber));
187+
connection.writeFrame(headerFrame);
191188

192-
// TODO should complete the future in case of error
193-
CompletableFuture<Void> future = new CompletableFuture<>();
194189
int bodyPosition = body.position();
195190
for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) {
196191
int remaining = bodySize - offset;
197192
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
198193
Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength);
199-
if (offset + bodyPayloadMax >= bodySize) {
200-
connection.writeFrame(frame, future);
201-
} else {
202-
connection.writeFrame(frame, null);
203-
}
194+
connection.writeFrame(frame);
204195
}
205-
return future;
206196
}
207197

208198
@Override public String toString() {

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -646,8 +646,8 @@ public Channel createChannel() throws IOException {
646646
/**
647647
* Public API - sends a frame directly to the broker.
648648
*/
649-
void writeFrame(Frame f, CompletableFuture<Void> future) throws IOException {
650-
_frameHandler.writeFrame(f, future);
649+
void writeFrame(Frame f) throws IOException {
650+
_frameHandler.writeFrame(f);
651651
_heartbeatSender.signalActivity();
652652
}
653653

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -734,9 +734,9 @@ public void basicPublish(String exchange, String routingKey,
734734
.immediate(immediate)
735735
.build();
736736
try {
737-
ObservationCollector.PublishCall<CompletableFuture<Void>> publishCall = properties -> {
737+
ObservationCollector.PublishCall publishCall = properties -> {
738738
AMQCommand command = new AMQCommand(publish, properties, body);
739-
return transmit(command);
739+
transmit(command);
740740
};
741741
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());
742742
} catch (IOException | AlreadyClosedException e) {
@@ -746,42 +746,30 @@ public void basicPublish(String exchange, String routingKey,
746746
metricsCollector.basicPublish(this, deliveryTag);
747747
}
748748

749-
/**
750-
* Public API - {@inheritDoc}
751-
*
752-
* @return
753-
*/
749+
/** Public API - {@inheritDoc} */
754750
@Override
755-
public CompletableFuture<Void> basicPublish(String exchange, String routingKey,
756-
BasicProperties props, ByteBuffer body)
751+
public void basicPublish(String exchange, String routingKey,
752+
BasicProperties props, ByteBuffer body)
757753
throws IOException
758754
{
759-
return basicPublish(exchange, routingKey, false, false, props, body);
755+
basicPublish(exchange, routingKey, false, false, props, body);
760756
}
761757

762-
/**
763-
* Public API - {@inheritDoc}
764-
*
765-
* @return
766-
*/
758+
/** Public API - {@inheritDoc} */
767759
@Override
768-
public CompletableFuture<Void> basicPublish(String exchange, String routingKey,
769-
boolean mandatory,
770-
BasicProperties props, ByteBuffer body)
760+
public void basicPublish(String exchange, String routingKey,
761+
boolean mandatory,
762+
BasicProperties props, ByteBuffer body)
771763
throws IOException
772764
{
773-
return basicPublish(exchange, routingKey, mandatory, false, props, body);
765+
basicPublish(exchange, routingKey, mandatory, false, props, body);
774766
}
775767

776-
/**
777-
* Public API - {@inheritDoc}
778-
*
779-
* @return
780-
*/
768+
/** Public API - {@inheritDoc} */
781769
@Override
782-
public CompletableFuture<Void> basicPublish(String exchange, String routingKey,
783-
boolean mandatory, boolean immediate,
784-
BasicProperties props, ByteBuffer body)
770+
public void basicPublish(String exchange, String routingKey,
771+
boolean mandatory, boolean immediate,
772+
BasicProperties props, ByteBuffer body)
785773
throws IOException
786774
{
787775
final long deliveryTag;
@@ -801,19 +789,17 @@ public CompletableFuture<Void> basicPublish(String exchange, String routingKey,
801789
.mandatory(mandatory)
802790
.immediate(immediate)
803791
.build();
804-
CompletableFuture<Void> future;
805792
try {
806-
ObservationCollector.PublishCall<CompletableFuture<Void>> publishCall = properties -> {
793+
ObservationCollector.PublishCall publishCall = properties -> {
807794
AMQCommand command = new AMQCommand(publish, properties, body);
808-
return transmit(command);
795+
transmit(command);
809796
};
810-
future = observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());
797+
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());
811798
} catch (IOException | AlreadyClosedException e) {
812799
metricsCollector.basicPublishFailure(this, e);
813800
throw e;
814801
}
815802
metricsCollector.basicPublish(this, deliveryTag);
816-
return future;
817803
}
818804

819805
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/DefaultHeartbeatSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public void run() {
136136
long now = System.nanoTime();
137137

138138
if (now > (lastActivityTime + this.heartbeatNanos)) {
139-
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0), null);
139+
frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
140140
frameHandler.flush();
141141
}
142142
} catch (IOException e) {

src/main/java/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.io.IOException;
1919
import java.net.SocketException;
2020
import java.net.SocketTimeoutException;
21-
import java.util.concurrent.CompletableFuture;
2221

2322
/**
2423
* Interface to a frame handler.
@@ -72,7 +71,7 @@ default void finishConnectionNegotiation() {
7271
* @param frame the Frame to transmit
7372
* @throws IOException if there is a problem accessing the connection
7473
*/
75-
void writeFrame(Frame frame, CompletableFuture<Void> future) throws IOException;
74+
void writeFrame(Frame frame) throws IOException;
7675

7776
/**
7877
* Flush the underlying data connection.

0 commit comments

Comments
 (0)