Skip to content

Commit d130a01

Browse files
committed
Integrate zero-copy code
References #1885
1 parent 3f1f004 commit d130a01

7 files changed

Lines changed: 41 additions & 29 deletions

File tree

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -745,10 +745,10 @@ public boolean hasBrokerInitiatedShutdown() {
745745
private void readFrame(Frame frame) throws IOException {
746746
if (frame != null) {
747747
_missedHeartbeats = 0;
748-
if (frame.type == AMQP.FRAME_HEARTBEAT) {
748+
if (frame.getType() == AMQP.FRAME_HEARTBEAT) {
749749
// Ignore it: we've already just reset the heartbeat counter.
750750
} else {
751-
if (frame.channel == 0) { // the special channel
751+
if (frame.getChannel() == 0) { // the special channel
752752
_channel0.handleFrame(frame);
753753
} else {
754754
if (isOpen()) {
@@ -761,7 +761,7 @@ private void readFrame(Frame frame) throws IOException {
761761
if (cm != null) {
762762
ChannelN channel;
763763
try {
764-
channel = cm.getChannel(frame.channel);
764+
channel = cm.getChannel(frame.getChannel());
765765
} catch(UnknownChannelException e) {
766766
// this can happen if channel has been closed,
767767
// but there was e.g. an in-flight delivery.

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private void updateContentBodyState() {
115115
}
116116

117117
private void consumeMethodFrame(Frame f) throws IOException {
118-
if (f.type == AMQP.FRAME_METHOD) {
118+
if (f.getType() == AMQP.FRAME_METHOD) {
119119
this.method = AMQImpl.readMethodFrom(f.getInputStream());
120120
this.state = this.method.hasContent() ? CAState.EXPECTING_CONTENT_HEADER : CAState.COMPLETE;
121121
} else {
@@ -124,7 +124,7 @@ private void consumeMethodFrame(Frame f) throws IOException {
124124
}
125125

126126
private void consumeHeaderFrame(Frame f) throws IOException {
127-
if (f.type == AMQP.FRAME_HEADER) {
127+
if (f.getType() == AMQP.FRAME_HEADER) {
128128
this.contentHeader = AMQImpl.readContentHeaderFrom(f.getInputStream());
129129
long bodySize = this.contentHeader.getBodySize();
130130
if (bodySize >= this.maxBodyLength) {
@@ -143,7 +143,7 @@ private void consumeHeaderFrame(Frame f) throws IOException {
143143
}
144144

145145
private void consumeBodyFrame(Frame f) {
146-
if (f.type == AMQP.FRAME_BODY) {
146+
if (f.getType() == AMQP.FRAME_BODY) {
147147
byte[] fragment = f.getPayload();
148148
this.remainingBodyBytes -= fragment.length;
149149
updateContentBodyState();

src/main/java/com/rabbitmq/client/impl/Frame.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.rabbitmq.client.impl;
1717

1818
import com.rabbitmq.client.AMQP;
19+
import com.rabbitmq.client.Channel;
1920
import com.rabbitmq.client.LongString;
2021
import com.rabbitmq.client.MalformedFrameException;
2122
import io.netty.buffer.ByteBuf;
@@ -35,10 +36,10 @@
3536
*/
3637
public class Frame {
3738
/** Frame type code */
38-
public final int type;
39+
private final int type;
3940

4041
/** Frame channel number, 0-65535 */
41-
public final int channel;
42+
private final int channel;
4243

4344
/** Frame payload bytes (for inbound frames) */
4445
private final byte[] payload;
@@ -427,4 +428,12 @@ private static int shortStrSize(String str)
427428
{
428429
return str.getBytes("utf-8").length + 1;
429430
}
431+
432+
public int getType() {
433+
return this.type;
434+
}
435+
436+
public int getChannel() {
437+
return this.channel;
438+
}
430439
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.rabbitmq.client.impl.nio;
1717

18+
import com.rabbitmq.client.WriteListener;
1819
import com.rabbitmq.client.impl.AMQConnection;
1920
import com.rabbitmq.client.impl.Frame;
2021
import com.rabbitmq.client.impl.FrameHandler;
@@ -93,8 +94,10 @@ public void writeFrame(Frame frame) throws IOException {
9394
}
9495

9596
@Override
96-
public void flush() throws IOException {
97-
97+
public void flush(WriteListener listener) throws IOException {
98+
if (listener != null) {
99+
listener.done(true, null);
100+
}
98101
}
99102

100103
@Override

src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class BrokenFramesTest {
6464
} catch (IOException e) {
6565
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
6666
assertNotNull(unexpectedFrameError);
67-
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getReceivedFrame().type);
67+
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getReceivedFrame().getType());
6868
assertEquals(AMQP.FRAME_METHOD, unexpectedFrameError.getExpectedFrameType());
6969
return;
7070
}
@@ -90,7 +90,7 @@ public class BrokenFramesTest {
9090
} catch (IOException e) {
9191
UnexpectedFrameError unexpectedFrameError = findUnexpectedFrameError(e);
9292
assertNotNull(unexpectedFrameError);
93-
assertEquals(AMQP.FRAME_BODY, unexpectedFrameError.getReceivedFrame().type);
93+
assertEquals(AMQP.FRAME_BODY, unexpectedFrameError.getReceivedFrame().getType());
9494
assertEquals(AMQP.FRAME_HEADER, unexpectedFrameError.getExpectedFrameType());
9595
return;
9696
}

src/test/java/com/rabbitmq/client/test/FrameBuilderTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public void buildFrameInOneGo() throws IOException {
6262
builder = new FrameBuilder(channel, buffer, Integer.MAX_VALUE);
6363
Frame frame = builder.readFrame();
6464
assertThat(frame).isNotNull();
65-
assertThat(frame.type).isEqualTo(1);
66-
assertThat(frame.channel).isEqualTo(0);
65+
assertThat(frame.getType()).isEqualTo(1);
66+
assertThat(frame.getChannel()).isEqualTo(0);
6767
assertThat(frame.getPayload()).hasSize(3);
6868
}
6969

@@ -83,8 +83,8 @@ public void buildFramesInOneGo() throws IOException {
8383
Frame frame;
8484
while ((frame = builder.readFrame()) != null) {
8585
assertThat(frame).isNotNull();
86-
assertThat(frame.type).isEqualTo(1);
87-
assertThat(frame.channel).isEqualTo(0);
86+
assertThat(frame.getType()).isEqualTo(1);
87+
assertThat(frame.getChannel()).isEqualTo(0);
8888
assertThat(frame.getPayload()).hasSize(3);
8989
frameCount++;
9090
}
@@ -104,8 +104,8 @@ public void buildFrameInSeveralCalls() throws IOException {
104104

105105
frame = builder.readFrame();
106106
assertThat(frame).isNotNull();
107-
assertThat(frame.type).isEqualTo(1);
108-
assertThat(frame.channel).isEqualTo(0);
107+
assertThat(frame.getType()).isEqualTo(1);
108+
assertThat(frame.getChannel()).isEqualTo(0);
109109
assertThat(frame.getPayload()).hasSize(3);
110110
}
111111

src/test/java/com/rabbitmq/client/test/functional/UnexpectedFrames.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public UnexpectedFrames() {
9898
@Test public void missingHeader() throws IOException {
9999
expectUnexpectedFrameError(new Confuser() {
100100
public Frame confuse(Frame frame) {
101-
if (frame.type == AMQP.FRAME_HEADER) {
101+
if (frame.getType() == AMQP.FRAME_HEADER) {
102102
return null;
103103
}
104104
return frame;
@@ -109,11 +109,11 @@ public Frame confuse(Frame frame) {
109109
@Test public void missingMethod() throws IOException {
110110
expectUnexpectedFrameError(new Confuser() {
111111
public Frame confuse(Frame frame) {
112-
if (frame.type == AMQP.FRAME_METHOD) {
112+
if (frame.getType() == AMQP.FRAME_METHOD) {
113113
// We can't just skip the method as that will lead us to
114114
// send 0 bytes and hang waiting for a response.
115115
return new Frame(AMQP.FRAME_HEADER,
116-
frame.channel, ByteBuffer.wrap(frame.getPayload()));
116+
frame.getChannel(), ByteBuffer.wrap(frame.getPayload()));
117117
}
118118
return frame;
119119
}
@@ -123,7 +123,7 @@ public Frame confuse(Frame frame) {
123123
@Test public void missingBody() throws IOException {
124124
expectUnexpectedFrameError(new Confuser() {
125125
public Frame confuse(Frame frame) {
126-
if (frame.type == AMQP.FRAME_BODY) {
126+
if (frame.getType() == AMQP.FRAME_BODY) {
127127
return null;
128128
}
129129
return frame;
@@ -134,10 +134,10 @@ public Frame confuse(Frame frame) {
134134
@Test public void wrongClassInHeader() throws IOException {
135135
expectUnexpectedFrameError(new Confuser() {
136136
public Frame confuse(Frame frame) {
137-
if (frame.type == AMQP.FRAME_HEADER) {
137+
if (frame.getType() == AMQP.FRAME_HEADER) {
138138
byte[] payload = frame.getPayload();
139139
Frame confusedFrame = new Frame(AMQP.FRAME_HEADER,
140-
frame.channel, ByteBuffer.wrap(payload));
140+
frame.getChannel(), ByteBuffer.wrap(payload));
141141
// First two bytes = class ID, must match class ID from
142142
// method.
143143
payload[0] = 12;
@@ -152,8 +152,8 @@ public Frame confuse(Frame frame) {
152152
@Test public void heartbeatOnChannel() throws IOException {
153153
expectUnexpectedFrameError(new Confuser() {
154154
public Frame confuse(Frame frame) {
155-
if (frame.type == AMQP.FRAME_METHOD) {
156-
return new Frame(AMQP.FRAME_HEARTBEAT, frame.channel);
155+
if (frame.getType() == AMQP.FRAME_METHOD) {
156+
return new Frame(AMQP.FRAME_HEARTBEAT, frame.getChannel());
157157
}
158158
return frame;
159159
}
@@ -163,9 +163,9 @@ public Frame confuse(Frame frame) {
163163
@Test public void unknownFrameType() throws IOException {
164164
expectError(AMQP.FRAME_ERROR, new Confuser() {
165165
public Frame confuse(Frame frame) {
166-
if (frame.type == AMQP.FRAME_METHOD) {
167-
return new Frame(0, frame.channel,
168-
ByteBuffer.wrap("1234567890\0001234567890".getBytes()));
166+
if (frame.getType() == AMQP.FRAME_METHOD) {
167+
return new Frame(0, frame.getChannel(),
168+
ByteBuffer.wrap("1234567890\0001234567890".getBytes()));
169169
}
170170
return frame;
171171
}

0 commit comments

Comments
 (0)