Skip to content

Commit 1cf19cb

Browse files
authored
Merge pull request #1960 from rabbitmq/mergify/bp/v5.x/pr-1959
Implement a TODO in CommandAssembler (backport #1959)
2 parents 1f79fc9 + abd6d60 commit 1cf19cb

3 files changed

Lines changed: 66 additions & 15 deletions

File tree

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler,
252252

253253
this.credentialsRefreshService = params.getCredentialsRefreshService();
254254

255-
255+
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
256256
this._channel0 = createChannel0();
257257

258258
this._channelManager = null;
@@ -267,7 +267,6 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler,
267267
this.errorOnWriteListener = params.getErrorOnWriteListener() != null ? params.getErrorOnWriteListener() :
268268
(connection, exception) -> { throw exception; }; // we just propagate the exception for non-recoverable connections
269269
this.workPoolTimeout = params.getWorkPoolTimeout();
270-
this.maxInboundMessageBodySize = params.getMaxInboundMessageBodySize();
271270
}
272271

273272
AMQChannel createChannel0() {

src/main/java/com/rabbitmq/client/impl/CommandAssembler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222

2323
import com.rabbitmq.client.AMQP;
24+
import com.rabbitmq.client.MalformedFrameException;
2425
import com.rabbitmq.client.UnexpectedFrameError;
2526
import static java.lang.String.format;
2627

@@ -142,14 +143,16 @@ private void consumeHeaderFrame(Frame f) throws IOException {
142143
}
143144
}
144145

145-
private void consumeBodyFrame(Frame f) {
146+
private void consumeBodyFrame(Frame f) throws MalformedFrameException {
146147
if (f.getType() == AMQP.FRAME_BODY) {
147148
byte[] fragment = f.getPayload();
149+
if (fragment.length > this.remainingBodyBytes) {
150+
throw new MalformedFrameException(format(
151+
"Body frame payload (%d bytes) exceeds remaining expected body size (%d bytes)",
152+
fragment.length, this.remainingBodyBytes));
153+
}
148154
this.remainingBodyBytes -= fragment.length;
149155
updateContentBodyState();
150-
if (this.remainingBodyBytes < 0) {
151-
throw new UnsupportedOperationException("%%%%%% FIXME unimplemented");
152-
}
153156
appendBodyFragment(fragment);
154157
} else {
155158
throw new UnexpectedFrameError(f, AMQP.FRAME_BODY);

src/test/java/com/rabbitmq/client/test/BrokenFramesTest.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
package com.rabbitmq.client.test;
1818

1919
import com.rabbitmq.client.AMQP;
20+
import com.rabbitmq.client.Connection;
2021
import com.rabbitmq.client.ConnectionFactory;
22+
import com.rabbitmq.client.MalformedFrameException;
2123
import com.rabbitmq.client.UnexpectedFrameError;
2224
import com.rabbitmq.client.WriteListener;
2325
import com.rabbitmq.client.impl.AMQConnection;
26+
import com.rabbitmq.client.impl.AMQImpl;
2427
import com.rabbitmq.client.impl.AMQImpl.Basic.Publish;
28+
import com.rabbitmq.client.impl.DefaultExceptionHandler;
2529
import com.rabbitmq.client.impl.Frame;
2630
import com.rabbitmq.client.impl.FrameHandler;
2731
import org.junit.jupiter.api.AfterEach;
@@ -30,13 +34,14 @@
3034

3135
import java.io.IOException;
3236
import java.net.InetAddress;
33-
import java.net.SocketException;
3437
import java.nio.ByteBuffer;
3538
import java.util.ArrayList;
3639
import java.util.Iterator;
3740
import java.util.List;
3841
import java.util.concurrent.Executors;
42+
import java.util.concurrent.atomic.AtomicReference;
3943

44+
import static org.assertj.core.api.Assertions.assertThat;
4045
import static org.junit.jupiter.api.Assertions.*;
4146

4247
public class BrokenFramesTest {
@@ -98,6 +103,50 @@ public class BrokenFramesTest {
98103
fail("No UnexpectedFrameError thrown");
99104
}
100105

106+
@Test
107+
public void bodyOverrunTriggersConnectionClosure() throws Exception {
108+
List<Frame> frames = new ArrayList<>();
109+
int channelNumber = 0;
110+
factory.setMaxInboundMessageBodySize(1024);
111+
AtomicReference<Throwable> exception = new AtomicReference<>();
112+
factory.setExceptionHandler(
113+
new DefaultExceptionHandler() {
114+
@Override
115+
public void handleUnexpectedConnectionDriverException(
116+
Connection conn, Throwable ex) {
117+
exception.set(ex);
118+
}
119+
});
120+
121+
// 1. Setup a valid method frame (Basic.Deliver expects content)
122+
AMQImpl.Basic.Deliver method = new AMQImpl.Basic.Deliver("ctag", 1L, false, "", "rk");
123+
frames.add(method.toFrame(channelNumber));
124+
125+
// 2. Declares exactly 1 byte of body size
126+
AMQP.BasicProperties props = new AMQP.BasicProperties();
127+
frames.add(props.toFrame(channelNumber, 1L));
128+
129+
// 3. Payload is 2 bytes (0x41, 0x42), exceeding the declared size
130+
frames.add(new Frame(AMQP.FRAME_BODY, channelNumber, new byte[] { 0x41, 0x42 }));
131+
132+
myFrameHandler.setFrames(frames.iterator());
133+
134+
AMQConnection connection = null;
135+
try {
136+
connection = new AMQConnection(factory.params(Executors.newFixedThreadPool(1)), myFrameHandler);
137+
connection.start();
138+
fail("Expected an exception to be thrown due to body overrun, but none was.");
139+
} catch (IOException e) {
140+
assertThat(e).hasRootCauseInstanceOf(MalformedFrameException.class);
141+
assertThat(exception.get()).isNotNull().isInstanceOf(MalformedFrameException.class);
142+
} finally {
143+
assertTrue(myFrameHandler.closeCalled, "The underlying FrameHandler (socket) should have been closed.");
144+
if (connection != null) {
145+
assertFalse(connection.isOpen(), "The AMQConnection should be marked as closed.");
146+
}
147+
}
148+
}
149+
101150
private UnexpectedFrameError findUnexpectedFrameError(Exception e) {
102151
Throwable t = e;
103152
while ((t = t.getCause()) != null) {
@@ -106,42 +155,42 @@ private UnexpectedFrameError findUnexpectedFrameError(Exception e) {
106155
return (UnexpectedFrameError) t;
107156
}
108157
}
109-
110158
return null;
111159
}
112160

113161
private static class MyFrameHandler implements FrameHandler {
114162
private Iterator<Frame> frames;
163+
public boolean closeCalled = false;
115164

116165
public void setFrames(Iterator<Frame> frames) {
117166
this.frames = frames;
118167
}
119168

120-
public Frame readFrame() throws IOException {
169+
public Frame readFrame() {
121170
return frames.next();
122171
}
123172

124-
public void sendHeader() throws IOException {
173+
public void sendHeader() {
125174
}
126175

127176
@Override
128177
public void initialize(AMQConnection connection) {
129178
connection.startMainLoop();
130179
}
131180

132-
public void setTimeout(int timeoutMs) throws SocketException {
181+
public void setTimeout(int timeoutMs) {
133182
// no need to implement this: don't bother changing the timeout
134183
}
135184

136-
public void writeFrame(Frame frame) throws IOException {
185+
public void writeFrame(Frame frame) {
137186
// no need to implement this: don't bother writing the frame
138187
}
139188

140189
public void close() {
141-
// nothing to do
190+
this.closeCalled = true;
142191
}
143192

144-
public int getTimeout() throws SocketException {
193+
public int getTimeout() {
145194
return 0;
146195
}
147196

@@ -153,7 +202,7 @@ public int getPort() {
153202
return -1;
154203
}
155204

156-
public void flush(WriteListener listener) throws IOException {
205+
public void flush(WriteListener listener) {
157206
// no need to implement this: don't bother writing the frame
158207
}
159208

0 commit comments

Comments
 (0)