Skip to content

Commit 3fa8f87

Browse files
committed
Add callback parameter to ByteBuffer basicPublish
To be notified when the ByteBuffer can be released.
1 parent 3199889 commit 3fa8f87

10 files changed

Lines changed: 221 additions & 14 deletions

File tree

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean
314314
* @throws java.io.IOException if an error is encountered
315315
*/
316316
default void basicPublish(String exchange, String routingKey, BasicProperties props, ByteBuffer body) throws IOException {
317-
basicPublish(exchange, routingKey, false, false, props, body);
317+
basicPublish(exchange, routingKey, false, false, props, body, null);
318318
}
319319

320320
/**
@@ -330,7 +330,7 @@ default void basicPublish(String exchange, String routingKey, BasicProperties pr
330330
*/
331331
default void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, ByteBuffer body)
332332
throws IOException {
333-
basicPublish(exchange, routingKey, mandatory, false, props, body);
333+
basicPublish(exchange, routingKey, mandatory, false, props, body, null);
334334
}
335335

336336
/**
@@ -348,9 +348,70 @@ default void basicPublish(String exchange, String routingKey, boolean mandatory,
348348
*/
349349
default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body)
350350
throws IOException {
351+
basicPublish(exchange, routingKey, mandatory, immediate, props, body, null);
352+
}
353+
354+
/**
355+
* Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for
356+
* write completion notification.
357+
*
358+
* <p>The listener is called when the network layer has finished writing the body to the
359+
* socket. This is useful for off-heap buffers that must be released after the write completes.
360+
*
361+
* <p><strong>Threading:</strong> the listener may be called on the Netty event loop thread.
362+
* It must not perform blocking operations.
363+
*
364+
* @param exchange the exchange to publish the message to
365+
* @param routingKey the routing key
366+
* @param props other properties for the message - routing headers etc
367+
* @param body the message body
368+
* @param listener called when the write completes, may be {@code null}
369+
* @throws java.io.IOException if an error is encountered
370+
*/
371+
default void basicPublish(String exchange, String routingKey, BasicProperties props, ByteBuffer body, WriteListener listener)
372+
throws IOException {
373+
basicPublish(exchange, routingKey, false, false, props, body, listener);
374+
}
375+
376+
/**
377+
* Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for
378+
* write completion notification.
379+
*
380+
* @param exchange the exchange to publish the message to
381+
* @param routingKey the routing key
382+
* @param mandatory true if the 'mandatory' flag is to be set
383+
* @param props other properties for the message - routing headers etc
384+
* @param body the message body
385+
* @param listener called when the write completes, may be {@code null}
386+
* @throws java.io.IOException if an error is encountered
387+
*/
388+
default void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, ByteBuffer body, WriteListener listener)
389+
throws IOException {
390+
basicPublish(exchange, routingKey, mandatory, false, props, body, listener);
391+
}
392+
393+
/**
394+
* Publish a message with a {@link ByteBuffer} body and a {@link WriteListener} for
395+
* write completion notification.
396+
*
397+
* @param exchange the exchange to publish the message to
398+
* @param routingKey the routing key
399+
* @param mandatory true if the 'mandatory' flag is to be set
400+
* @param immediate true if the 'immediate' flag is to be
401+
* set. Note that the RabbitMQ server does not support this flag.
402+
* @param props other properties for the message - routing headers etc
403+
* @param body the message body
404+
* @param listener called when the write completes, may be {@code null}
405+
* @throws java.io.IOException if an error is encountered
406+
*/
407+
default void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, ByteBuffer body, WriteListener listener)
408+
throws IOException {
351409
byte[] bytes = new byte[body.remaining()];
352410
body.get(bytes);
353411
basicPublish(exchange, routingKey, mandatory, immediate, props, bytes);
412+
if (listener != null) {
413+
listener.done(true, null);
414+
}
354415
}
355416

356417
/**
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
* Listener notified when the network layer has finished writing a message body.
20+
*
21+
* <p>This is primarily useful when publishing with an off-heap {@link java.nio.ByteBuffer}:
22+
* the callback tells the caller when the buffer is no longer in use and can be safely released.
23+
*
24+
* <p><strong>Threading:</strong> the callback may be invoked on the Netty event loop thread.
25+
* Implementations must not perform blocking operations.
26+
*
27+
* @since 5.25.0
28+
*/
29+
@FunctionalInterface
30+
public interface WriteListener {
31+
32+
/**
33+
* Called when the write operation completes.
34+
*
35+
* @param success {@code true} if the data was written to the socket successfully
36+
* @param cause the exception if the write failed, {@code null} on success
37+
*/
38+
void done(boolean success, Throwable cause);
39+
}

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import com.rabbitmq.client.AMQP;
2626
import com.rabbitmq.client.Command;
27+
import com.rabbitmq.client.WriteListener;
2728

2829
/**
2930
* AMQP 0-9-1-specific implementation of {@link Command} which accumulates
@@ -47,10 +48,12 @@ public class AMQCommand implements Command {
4748
/** The assembler for this command - synchronised on - contains all the state */
4849
private final CommandAssembler assembler;
4950
private final Lock assemblerLock = new ReentrantLock();
51+
private final WriteListener writeListener;
5052

5153
/** Construct a command for inbound frame assembly with a max body length. */
5254
AMQCommand(int maxBodyLength) {
5355
this.assembler = new CommandAssembler(null, null, maxBodyLength);
56+
this.writeListener = null;
5457
}
5558

5659
/** Construct a command ready to fill in by reading frames. */
@@ -64,6 +67,7 @@ public AMQCommand() {
6467
*/
6568
public AMQCommand(com.rabbitmq.client.Method method) {
6669
this.assembler = new CommandAssembler((Method) method, null, Integer.MAX_VALUE);
70+
this.writeListener = null;
6771
}
6872

6973
/**
@@ -73,7 +77,20 @@ public AMQCommand(com.rabbitmq.client.Method method) {
7377
* @param body the message body as a ByteBuffer
7478
*/
7579
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body) {
80+
this(method, contentHeader, body, null);
81+
}
82+
83+
/**
84+
* Construct a command with a ByteBuffer body and a write completion listener.
85+
* @param method the wrapped method
86+
* @param contentHeader the wrapped content header
87+
* @param body the message body as a ByteBuffer
88+
* @param writeListener called when the network layer finishes writing, may be {@code null}
89+
*/
90+
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, ByteBuffer body,
91+
WriteListener writeListener) {
7692
this.assembler = new CommandAssembler((Method) method, contentHeader, body);
93+
this.writeListener = writeListener;
7794
}
7895

7996
/** Public API - {@inheritDoc} */
@@ -143,7 +160,7 @@ public void transmit(AMQChannel channel) throws IOException {
143160
assemblerLock.unlock();
144161
}
145162

146-
connection.flush();
163+
connection.flush(this.writeListener);
147164
}
148165

149166
@Override public String toString() {

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,12 @@ void writeFrame(Frame f) throws IOException {
655655
* Public API - flush the output buffers
656656
*/
657657
public void flush() throws IOException {
658+
flush(null);
659+
}
660+
661+
void flush(WriteListener listener) throws IOException {
658662
try {
659-
_frameHandler.flush();
663+
_frameHandler.flush(listener);
660664
} catch (IOException ioe) {
661665
this.errorOnWriteListener.handle(this, ioe);
662666
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,7 @@ public void basicPublish(String exchange, String routingKey,
717717
throws IOException
718718
{
719719
basicPublish(exchange, routingKey, mandatory, immediate, props,
720-
body == null ? null : ByteBuffer.wrap(body));
720+
body == null ? null : ByteBuffer.wrap(body), null);
721721
}
722722

723723
/** Public API - {@inheritDoc} */
@@ -726,7 +726,7 @@ public void basicPublish(String exchange, String routingKey,
726726
BasicProperties props, ByteBuffer body)
727727
throws IOException
728728
{
729-
basicPublish(exchange, routingKey, false, false, props, body);
729+
basicPublish(exchange, routingKey, false, false, props, body, null);
730730
}
731731

732732
/** Public API - {@inheritDoc} */
@@ -736,7 +736,7 @@ public void basicPublish(String exchange, String routingKey,
736736
BasicProperties props, ByteBuffer body)
737737
throws IOException
738738
{
739-
basicPublish(exchange, routingKey, mandatory, false, props, body);
739+
basicPublish(exchange, routingKey, mandatory, false, props, body, null);
740740
}
741741

742742
/** Public API - {@inheritDoc} */
@@ -745,6 +745,38 @@ public void basicPublish(String exchange, String routingKey,
745745
boolean mandatory, boolean immediate,
746746
BasicProperties props, ByteBuffer body)
747747
throws IOException
748+
{
749+
basicPublish(exchange, routingKey, mandatory, immediate, props, body, null);
750+
}
751+
752+
/** Public API - {@inheritDoc} */
753+
@Override
754+
public void basicPublish(String exchange, String routingKey,
755+
BasicProperties props, ByteBuffer body,
756+
WriteListener listener)
757+
throws IOException
758+
{
759+
basicPublish(exchange, routingKey, false, false, props, body, listener);
760+
}
761+
762+
/** Public API - {@inheritDoc} */
763+
@Override
764+
public void basicPublish(String exchange, String routingKey,
765+
boolean mandatory,
766+
BasicProperties props, ByteBuffer body,
767+
WriteListener listener)
768+
throws IOException
769+
{
770+
basicPublish(exchange, routingKey, mandatory, false, props, body, listener);
771+
}
772+
773+
/** Public API - {@inheritDoc} */
774+
@Override
775+
public void basicPublish(String exchange, String routingKey,
776+
boolean mandatory, boolean immediate,
777+
BasicProperties props, ByteBuffer body,
778+
WriteListener listener)
779+
throws IOException
748780
{
749781
final long deliveryTag;
750782
if (nextPublishSeqNo > 0) {
@@ -765,7 +797,7 @@ public void basicPublish(String exchange, String routingKey,
765797
.build();
766798
try {
767799
ObservationCollector.PublishCall publishCall = properties -> {
768-
AMQCommand command = new AMQCommand(publish, properties, body);
800+
AMQCommand command = new AMQCommand(publish, properties, body, listener);
769801
transmit(command);
770802
};
771803
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());

src/main/java/com/rabbitmq/client/impl/FrameHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18+
import com.rabbitmq.client.WriteListener;
1819
import java.io.IOException;
1920
import java.net.SocketException;
2021
import java.net.SocketTimeoutException;
@@ -79,6 +80,19 @@ default void finishConnectionNegotiation() {
7980
*/
8081
void flush() throws IOException;
8182

83+
/**
84+
* Flush the underlying data connection and notify the listener when the write completes.
85+
* The default implementation flushes synchronously and calls the listener immediately.
86+
* @param listener called when the flush completes, may be {@code null}
87+
* @throws IOException if there is a problem accessing the connection
88+
*/
89+
default void flush(WriteListener listener) throws IOException {
90+
flush();
91+
if (listener != null) {
92+
listener.done(true, null);
93+
}
94+
}
95+
8296
/** Close the underlying data connection (complaint not permitted). */
8397
void close();
8498

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.rabbitmq.client.MalformedFrameException;
2626
import com.rabbitmq.client.ShutdownSignalException;
2727
import com.rabbitmq.client.SocketConfigurator;
28+
import com.rabbitmq.client.WriteListener;
2829
import io.netty.bootstrap.Bootstrap;
2930
import io.netty.buffer.ByteBuf;
3031
import io.netty.buffer.Unpooled;
@@ -34,6 +35,7 @@
3435
import io.netty.channel.ChannelInboundHandlerAdapter;
3536
import io.netty.channel.ChannelInitializer;
3637
import io.netty.channel.ChannelOption;
38+
import io.netty.channel.ChannelPromise;
3739
import io.netty.channel.EventLoopGroup;
3840
import io.netty.channel.socket.SocketChannel;
3941
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -408,6 +410,17 @@ public void flush() {
408410
this.channel.flush();
409411
}
410412

413+
@Override
414+
public void flush(WriteListener listener) {
415+
if (listener == null) {
416+
this.channel.flush();
417+
} else {
418+
ChannelPromise promise = this.channel.newPromise();
419+
promise.addListener(future -> listener.done(future.isSuccess(), future.cause()));
420+
this.channel.writeAndFlush(Unpooled.EMPTY_BUFFER, promise);
421+
}
422+
}
423+
411424
@Override
412425
public void close() {
413426
if (this.closed.compareAndSet(false, true)) {

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,21 @@ public void basicPublish(String exchange, String routingKey, boolean mandatory,
237237
delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body);
238238
}
239239

240+
@Override
241+
public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, ByteBuffer body, WriteListener listener) throws IOException {
242+
delegate.basicPublish(exchange, routingKey, props, body, listener);
243+
}
244+
245+
@Override
246+
public void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, ByteBuffer body, WriteListener listener) throws IOException {
247+
delegate.basicPublish(exchange, routingKey, mandatory, props, body, listener);
248+
}
249+
250+
@Override
251+
public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, ByteBuffer body, WriteListener listener) throws IOException {
252+
delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body, listener);
253+
}
254+
240255
@Override
241256
public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException {
242257
return exchangeDeclare(exchange, type, false, false, null);

src/test/java/com/rabbitmq/client/test/ClientTestSuite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@
7474
BlockedConnectionTest.class,
7575
NettyTest.class,
7676
IoDeadlockOnConnectionClosing.class,
77-
ProtocolVersionMismatch.class
77+
ProtocolVersionMismatch.class,
78+
ByteBufferPublishTest.class,
79+
PublishWithByteBufferTest.class
7880
})
7981
public class ClientTestSuite {
8082

0 commit comments

Comments
 (0)