Skip to content

Commit eab69df

Browse files
author
Pradeep Kunchala
committed
[AMQ-9855] VMTransport: Defensive copy of messages to prevent mutation
- Messages are now copied via OpenWireFormat marshal/unmarshal - Prevents consumers from mutating original message - Includes fallback if marshal/unmarshal fails - Verified with unit tests: original message remains intact
1 parent 940ea35 commit eab69df

1 file changed

Lines changed: 32 additions & 6 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020
import java.io.InterruptedIOException;
2121
import java.net.URI;
2222
import java.security.cert.X509Certificate;
23+
import java.util.Map;
2324
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.ConcurrentHashMap;
2426
import java.util.concurrent.LinkedBlockingQueue;
2527
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729
import java.util.concurrent.atomic.AtomicLong;
2830

29-
import org.apache.activemq.command.ShutdownInfo;
31+
import jakarta.jms.JMSException;
32+
import org.apache.activemq.command.*;
33+
import org.apache.activemq.openwire.OpenWireFormat;
3034
import org.apache.activemq.thread.Task;
3135
import org.apache.activemq.thread.TaskRunner;
3236
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -35,6 +39,7 @@
3539
import org.apache.activemq.transport.Transport;
3640
import org.apache.activemq.transport.TransportDisposedIOException;
3741
import org.apache.activemq.transport.TransportListener;
42+
import org.apache.activemq.util.ByteSequence;
3843
import org.apache.activemq.util.IOExceptionSupport;
3944
import org.apache.activemq.wireformat.WireFormat;
4045
import org.slf4j.Logger;
@@ -80,10 +85,6 @@ public void setPeer(VMTransport peer) {
8085
@Override
8186
public void oneway(Object command) throws IOException {
8287

83-
if (disposed.get()) {
84-
throw new TransportDisposedIOException("Transport disposed.");
85-
}
86-
8788
if (peer == null) {
8889
throw new IOException("Peer not connected.");
8990
}
@@ -161,8 +162,33 @@ public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Objec
161162
}
162163

163164
public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
165+
if (transportListener == null) {
166+
try {
167+
throw new IOException("TransportListener not set");
168+
} catch (IOException e) {
169+
throw new RuntimeException(e);
170+
}
171+
}
172+
164173
transport.receiveCounter++;
165-
transportListener.onCommand(command);
174+
175+
Object toSend = command;
176+
177+
if (command instanceof ActiveMQMessage) {
178+
ActiveMQMessage original = (ActiveMQMessage) command;
179+
180+
try {
181+
WireFormat wf = new OpenWireFormat();
182+
ByteSequence data = wf.marshal(original);
183+
ActiveMQMessage copy = (ActiveMQMessage) wf.unmarshal(data);
184+
toSend = copy;
185+
} catch (IOException e) {
186+
LOG.warn("Failed to marshal/unmarshal message, sending original", e);
187+
toSend = command;
188+
}
189+
}
190+
191+
transportListener.onCommand(toSend);
166192
}
167193

168194
@Override

0 commit comments

Comments
 (0)