Skip to content

Commit 3199889

Browse files
committed
Use only ByteBuffer for basic.publish
byte[] is wrapped into a ByteBuffer.
1 parent ab48f63 commit 3199889

10 files changed

Lines changed: 127 additions & 382 deletions

File tree

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,7 @@
798798
<include>src/test/java/com/rabbitmq/client/test/TestUtils.java</include>
799799
<include>src/test/java/com/rabbitmq/client/test/RpcTopologyRecordingTest.java</include>
800800
<include>src/test/java/com/rabbitmq/client/test/PublishWithByteBufferTest.java</include>
801+
<include>src/test/java/com/rabbitmq/client/test/ByteBufferPublishTest.java</include>
801802
</includes>
802803
<googleJavaFormat>
803804
<version>${google-java-format.version}</version>

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 29 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -48,47 +48,26 @@ public class AMQCommand implements Command {
4848
private final CommandAssembler assembler;
4949
private final Lock assemblerLock = new ReentrantLock();
5050

51+
/** Construct a command for inbound frame assembly with a max body length. */
5152
AMQCommand(int maxBodyLength) {
52-
this(null, null, null, maxBodyLength);
53+
this.assembler = new CommandAssembler(null, null, maxBodyLength);
5354
}
5455

55-
/** Construct a command ready to fill in by reading frames */
56+
/** Construct a command ready to fill in by reading frames. */
5657
public AMQCommand() {
57-
this(null, null, null, Integer.MAX_VALUE);
58+
this(Integer.MAX_VALUE);
5859
}
5960

6061
/**
6162
* Construct a command with just a method, and without header or body.
6263
* @param method the wrapped method
6364
*/
6465
public AMQCommand(com.rabbitmq.client.Method method) {
65-
this(method, null, null, Integer.MAX_VALUE);
66+
this.assembler = new CommandAssembler((Method) method, null, Integer.MAX_VALUE);
6667
}
6768

6869
/**
69-
* Construct a command with a specified method, header and body.
70-
* @param method the wrapped method
71-
* @param contentHeader the wrapped content header
72-
* @param body the message body data
73-
*/
74-
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body) {
75-
this.assembler = new CommandAssembler((Method) method, contentHeader, body, Integer.MAX_VALUE);
76-
}
77-
78-
/**
79-
* Construct a command with a specified method, header and body.
80-
* @param method the wrapped method
81-
* @param contentHeader the wrapped content header
82-
* @param body the message body data
83-
* @param maxBodyLength the maximum size for an inbound message body
84-
*/
85-
public AMQCommand(com.rabbitmq.client.Method method, AMQContentHeader contentHeader, byte[] body,
86-
int maxBodyLength) {
87-
this.assembler = new CommandAssembler((Method) method, contentHeader, body, maxBodyLength);
88-
}
89-
90-
/**
91-
* Construct a command with a ByteBuffer body for zero-copy transmission.
70+
* Construct a command with a ByteBuffer body for transmission.
9271
* @param method the wrapped method
9372
* @param contentHeader the wrapped content header
9473
* @param body the message body as a ByteBuffer
@@ -133,11 +112,29 @@ public void transmit(AMQChannel channel) throws IOException {
133112
try {
134113
Method m = this.assembler.getMethod();
135114
if (m.hasContent()) {
136-
ByteBuffer bbBody = this.assembler.getByteBufferBody();
137-
if (bbBody != null) {
138-
transmitWithByteBuffer(m, bbBody, channelNumber, connection);
139-
} else {
140-
transmitWithByteArray(m, this.assembler.getContentBody(), channelNumber, connection);
115+
ByteBuffer body = this.assembler.getByteBufferBody();
116+
int bodySize = body == null ? 0 : body.remaining();
117+
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize);
118+
119+
int frameMax = connection.getFrameMax();
120+
boolean cappedFrameMax = frameMax > 0;
121+
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : bodySize;
122+
123+
if (cappedFrameMax && headerFrame.size() > frameMax) {
124+
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
125+
throw new IllegalArgumentException(msg);
126+
}
127+
connection.writeFrame(m.toFrame(channelNumber));
128+
connection.writeFrame(headerFrame);
129+
130+
if (body != null) {
131+
int bodyPosition = body.position();
132+
for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) {
133+
int remaining = bodySize - offset;
134+
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
135+
Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength);
136+
connection.writeFrame(frame);
137+
}
141138
}
142139
} else {
143140
connection.writeFrame(m.toFrame(channelNumber));
@@ -149,52 +146,6 @@ public void transmit(AMQChannel channel) throws IOException {
149146
connection.flush();
150147
}
151148

152-
private void transmitWithByteArray(Method m, byte[] body, int channelNumber, AMQConnection connection) throws IOException {
153-
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
154-
155-
int frameMax = connection.getFrameMax();
156-
boolean cappedFrameMax = frameMax > 0;
157-
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
158-
159-
if (cappedFrameMax && headerFrame.size() > frameMax) {
160-
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
161-
throw new IllegalArgumentException(msg);
162-
}
163-
connection.writeFrame(m.toFrame(channelNumber));
164-
connection.writeFrame(headerFrame);
165-
166-
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
167-
int remaining = body.length - offset;
168-
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
169-
Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength);
170-
connection.writeFrame(frame);
171-
}
172-
}
173-
174-
private void transmitWithByteBuffer(Method m, ByteBuffer body, int channelNumber, AMQConnection connection) throws IOException {
175-
int bodySize = body.remaining();
176-
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, bodySize);
177-
178-
int frameMax = connection.getFrameMax();
179-
boolean cappedFrameMax = frameMax > 0;
180-
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : bodySize;
181-
182-
if (cappedFrameMax && headerFrame.size() > frameMax) {
183-
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
184-
throw new IllegalArgumentException(msg);
185-
}
186-
connection.writeFrame(m.toFrame(channelNumber));
187-
connection.writeFrame(headerFrame);
188-
189-
int bodyPosition = body.position();
190-
for (int offset = 0; offset < bodySize; offset += bodyPayloadMax) {
191-
int remaining = bodySize - offset;
192-
int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
193-
Frame frame = Frame.fromBodyFragment(channelNumber, body, bodyPosition + offset, fragmentLength);
194-
connection.writeFrame(frame);
195-
}
196-
}
197-
198149
@Override public String toString() {
199150
return toString(false);
200151
}

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -716,34 +716,8 @@ public void basicPublish(String exchange, String routingKey,
716716
BasicProperties props, byte[] body)
717717
throws IOException
718718
{
719-
final long deliveryTag;
720-
if (nextPublishSeqNo > 0) {
721-
deliveryTag = getNextPublishSeqNo();
722-
unconfirmedSet.add(deliveryTag);
723-
nextPublishSeqNo++;
724-
} else {
725-
deliveryTag = 0;
726-
}
727-
if (props == null) {
728-
props = MessageProperties.MINIMAL_BASIC;
729-
}
730-
AMQP.Basic.Publish publish = new Basic.Publish.Builder()
731-
.exchange(exchange)
732-
.routingKey(routingKey)
733-
.mandatory(mandatory)
734-
.immediate(immediate)
735-
.build();
736-
try {
737-
ObservationCollector.PublishCall publishCall = properties -> {
738-
AMQCommand command = new AMQCommand(publish, properties, body);
739-
transmit(command);
740-
};
741-
observationCollector.publish(publishCall, publish, props, body, this.connectionInfo());
742-
} catch (IOException | AlreadyClosedException e) {
743-
metricsCollector.basicPublishFailure(this, e);
744-
throw e;
745-
}
746-
metricsCollector.basicPublish(this, deliveryTag);
719+
basicPublish(exchange, routingKey, mandatory, immediate, props,
720+
body == null ? null : ByteBuffer.wrap(body));
747721
}
748722

749723
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private enum CAState {
5959

6060
private final int maxBodyLength;
6161

62-
public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] body,
62+
public CommandAssembler(Method method, AMQContentHeader contentHeader,
6363
int maxBodyLength) {
6464
this.method = method;
6565
this.contentHeader = contentHeader;
@@ -68,7 +68,6 @@ public CommandAssembler(Method method, AMQContentHeader contentHeader, byte[] bo
6868
this.byteBufferBody = null;
6969
this.remainingBodyBytes = 0;
7070
this.maxBodyLength = maxBodyLength;
71-
appendBodyFragment(body);
7271
if (method == null) {
7372
this.state = CAState.EXPECTING_METHOD;
7473
} else if (contentHeader == null) {
@@ -183,7 +182,7 @@ public synchronized byte[] getContentBody() {
183182
return coalesceContentBody();
184183
}
185184

186-
public synchronized ByteBuffer getByteBufferBody() {
185+
public ByteBuffer getByteBufferBody() {
187186
return this.byteBufferBody;
188187
}
189188

src/main/java/com/rabbitmq/client/impl/Frame.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class Frame {
4545
/** Frame payload (for outbound frames) */
4646
private final ByteArrayOutputStream accumulator;
4747

48-
/** Frame payload as a ByteBuffer (for zero-copy outbound body frames) */
48+
/** Frame payload as a ByteBuffer (for basic.publish body frames) */
4949
private final ByteBuffer byteBufferPayload;
5050

5151
private static final int NON_BODY_SIZE = 1 /* type */ + 2 /* channel */ + 4 /* payload size */ + 1 /* end character */;
@@ -74,6 +74,12 @@ public Frame(int type, int channel, byte[] payload) {
7474
this.byteBufferPayload = null;
7575
}
7676

77+
/**
78+
* For basic.publish frames.
79+
* @param type
80+
* @param channel
81+
* @param byteBufferPayload
82+
*/
7783
private Frame(int type, int channel, ByteBuffer byteBufferPayload) {
7884
this.type = type;
7985
this.channel = channel;
@@ -82,17 +88,8 @@ private Frame(int type, int channel, ByteBuffer byteBufferPayload) {
8288
this.byteBufferPayload = byteBufferPayload;
8389
}
8490

85-
public static Frame fromBodyFragment(int channelNumber, byte[] body, int offset, int length)
86-
throws IOException
87-
{
88-
Frame frame = new Frame(AMQP.FRAME_BODY, channelNumber);
89-
DataOutputStream bodyOut = frame.getOutputStream();
90-
bodyOut.write(body, offset, length);
91-
return frame;
92-
}
93-
9491
/**
95-
* Zero-copy body frame from a ByteBuffer slice. The buffer is not copied;
92+
* Body frame from a ByteBuffer slice. The buffer is not copied;
9693
* the caller must not modify it after this call.
9794
*/
9895
public static Frame fromBodyFragment(int channelNumber, ByteBuffer body, int offset, int length) {

src/main/java/com/rabbitmq/client/observation/NoOpObservationCollector.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,6 @@
2323

2424
final class NoOpObservationCollector implements ObservationCollector {
2525

26-
@Override
27-
public void publish(
28-
PublishCall call,
29-
AMQP.Basic.Publish publish,
30-
AMQP.BasicProperties properties,
31-
byte[] body,
32-
ConnectionInfo connectionInfo)
33-
throws IOException {
34-
call.publish(properties);
35-
}
36-
3726
@Override
3827
public void publish(
3928
PublishCall call,
@@ -45,17 +34,6 @@ public void publish(
4534
call.publish(properties);
4635
}
4736

48-
@Override
49-
public void publish(
50-
PublishCall call,
51-
AMQP.Basic.Publish publish,
52-
AMQP.BasicProperties properties,
53-
int bodySize,
54-
ConnectionInfo connectionInfo)
55-
throws IOException {
56-
call.publish(properties);
57-
}
58-
5937
@Override
6038
public Consumer basicConsume(String queue, String consumerTag, Consumer consumer) {
6139
return consumer;

src/main/java/com/rabbitmq/client/observation/ObservationCollector.java

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ public interface ObservationCollector {
4141
/**
4242
* Decorate message publishing.
4343
*
44-
* <p>Implementations are expected to call {@link PublishCall#publish( PublishCall,
45-
* AMQP.Basic.Publish, AMQP.BasicProperties, byte[], ConnectionInfo)} to make sure the message is
46-
* actually sent.
44+
* <p>Implementations are expected to call {@link PublishCall#publish(AMQP.BasicProperties)} to
45+
* make sure the message is actually sent.
4746
*
4847
* @param call
4948
* @param publish
@@ -53,55 +52,12 @@ public interface ObservationCollector {
5352
* @throws IOException
5453
*/
5554
void publish(
56-
PublishCall call,
57-
AMQP.Basic.Publish publish,
58-
AMQP.BasicProperties properties,
59-
byte[] body,
60-
ConnectionInfo connectionInfo)
61-
throws IOException;
62-
63-
/**
64-
* Decorate message publishing with a ByteBuffer body.
65-
*
66-
* <p>The default implementation delegates to the byte[]-based overload using the body size.
67-
* Implementations that need access to the actual body bytes should override this method.
68-
*
69-
* @param call
70-
* @param publish
71-
* @param properties
72-
* @param body
73-
* @param connectionInfo
74-
* @throws IOException
75-
*/
76-
default void publish(
7755
PublishCall call,
7856
AMQP.Basic.Publish publish,
7957
AMQP.BasicProperties properties,
8058
ByteBuffer body,
8159
ConnectionInfo connectionInfo)
82-
throws IOException {
83-
publish(call, publish, properties, body == null ? 0 : body.remaining(), connectionInfo);
84-
}
85-
86-
/**
87-
* Decorate message publishing using only the body size.
88-
*
89-
* @param call
90-
* @param publish
91-
* @param properties
92-
* @param bodySize
93-
* @param connectionInfo
94-
* @throws IOException
95-
*/
96-
default void publish(
97-
PublishCall call,
98-
AMQP.Basic.Publish publish,
99-
AMQP.BasicProperties properties,
100-
int bodySize,
101-
ConnectionInfo connectionInfo)
102-
throws IOException {
103-
publish(call, publish, properties, new byte[0], connectionInfo);
104-
}
60+
throws IOException;
10561

10662
/**
10763
* Decorate consumer registration.

0 commit comments

Comments
 (0)