Skip to content

Commit 4eec223

Browse files
author
Pradeep Kunchala
committed
AMQ-9855: Fix null/empty message body for vm:// topics
- Updated VMTransport to create defensive copy of ActiveMQMessage during dispatch. This prevents shared mutable state across consumers in VM transport. - Added VMTransportDefensiveCopyTest to reproduce AMQ-9855 scenario and verify fix. - Test ensures that message body remains intact across multiple consumers. - Patch aligns VM transport behavior with TCP transport and avoids Camel route failures.
1 parent eab69df commit 4eec223

2 files changed

Lines changed: 169 additions & 31 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,10 @@
2020
import java.io.InterruptedIOException;
2121
import java.net.URI;
2222
import java.security.cert.X509Certificate;
23+
import java.sql.PreparedStatement;
24+
import java.util.List;
2325
import java.util.Map;
24-
import java.util.concurrent.BlockingQueue;
25-
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.LinkedBlockingQueue;
27-
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.*;
2827
import java.util.concurrent.atomic.AtomicBoolean;
2928
import java.util.concurrent.atomic.AtomicLong;
3029

@@ -73,6 +72,9 @@ public class VMTransport implements Transport, Task {
7372

7473
private volatile int receiveCounter;
7574

75+
private final List<TransportListener> listeners = new CopyOnWriteArrayList<>();
76+
private final ExecutorService executor = Executors.newCachedThreadPool();
77+
7678
public VMTransport(URI location) {
7779
this.location = location;
7880
this.id = NEXT_ID.getAndIncrement();
@@ -95,6 +97,30 @@ public void oneway(Object command) throws IOException {
9597
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
9698
}
9799

100+
// Deep copy the message if it is a MessageDispatch
101+
Object toSend = command;
102+
if (command instanceof MessageDispatch) {
103+
MessageDispatch original = (MessageDispatch) command;
104+
try {
105+
WireFormat wf = new OpenWireFormat();
106+
ByteSequence data = wf.marshal(original);
107+
toSend = wf.unmarshal(data); // deep copy
108+
} catch (IOException e) {
109+
LOG.warn("Failed to deep copy MessageDispatch, sending original", e);
110+
toSend = command;
111+
}
112+
} else if (command instanceof ActiveMQMessage) {
113+
ActiveMQMessage original = (ActiveMQMessage) command;
114+
try {
115+
WireFormat wf = new OpenWireFormat();
116+
ByteSequence data = wf.marshal(original);
117+
toSend = (ActiveMQMessage) wf.unmarshal(data);
118+
} catch (IOException e) {
119+
LOG.warn("Failed to marshal/unmarshal ActiveMQMessage, sending original", e);
120+
toSend = command;
121+
}
122+
}
123+
98124
if (peer.async) {
99125
peer.getMessageQueue().put(command);
100126
peer.wakeup();
@@ -125,6 +151,10 @@ public void oneway(Object command) throws IOException {
125151
return;
126152
}
127153
}
154+
155+
// Dispatch to listener
156+
dispatch(peer, peer.messageQueue, toSend);
157+
128158
} catch (InterruptedException e) {
129159
Thread.currentThread().interrupt();
130160
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
@@ -162,33 +192,9 @@ public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Objec
162192
}
163193

164194
public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
165-
if (transportListener == null) {
166-
try {
167-
throw new IOException("TransportListener not set");
168-
} catch (IOException e) {
169-
throw new RuntimeException(e);
170-
}
171-
}
172-
173-
transport.receiveCounter++;
174-
175-
Object toSend = command;
176-
177-
if (command instanceof ActiveMQMessage) {
178-
ActiveMQMessage original = (ActiveMQMessage) command;
179-
180-
try {
181-
WireFormat wf = new OpenWireFormat();
182-
ByteSequence data = wf.marshal(original);
183-
ActiveMQMessage copy = (ActiveMQMessage) wf.unmarshal(data);
184-
toSend = copy;
185-
} catch (IOException e) {
186-
LOG.warn("Failed to marshal/unmarshal message, sending original", e);
187-
toSend = command;
188-
}
189-
}
195+
transport.receiveCounter++;
196+
transportListener.onCommand(command);
190197

191-
transportListener.onCommand(toSend);
192198
}
193199

194200
@Override
@@ -281,7 +287,7 @@ protected void wakeup() {
281287
}
282288

283289
/**
284-
* @see org.apache.activemq.thread.Task#iterate()
290+
* @see Task#iterate()
285291
*/
286292
@Override
287293
public boolean iterate() {
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package org.apache.activemq.transport.vm;
2+
3+
import jakarta.jms.*;
4+
import org.apache.activemq.ActiveMQConnectionFactory;
5+
import org.apache.activemq.broker.BrokerService;
6+
import org.junit.After;
7+
import org.junit.Before;
8+
import org.junit.Test;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.concurrent.*;
13+
14+
import static org.junit.Assert.assertNotNull;
15+
import static org.junit.Assert.assertNotSame;
16+
17+
import java.util.concurrent.*;
18+
19+
20+
public class VMTransportDefensiveCopyTest {
21+
22+
private BrokerService broker;
23+
private Connection connection;
24+
25+
@Before
26+
public void setUp() throws Exception {
27+
broker = new BrokerService();
28+
broker.setPersistent(false);
29+
broker.addConnector("vm://localhost");
30+
broker.start();
31+
32+
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
33+
connection = cf.createConnection();
34+
connection.setClientID("HIGH_CONC_TEST"); // needed for durable subscribers
35+
connection.start();
36+
}
37+
38+
@After
39+
public void tearDown() throws Exception {
40+
if (connection != null) connection.close();
41+
if (broker != null) broker.stop();
42+
}
43+
44+
@Test
45+
public void testConcurrentProducersAndConsumers() throws Exception {
46+
final int MESSAGE_COUNT = 100;
47+
final int PRODUCERS = 5;
48+
final int DURABLE_CONSUMERS = 2;
49+
final int NON_DURABLE_CONSUMERS = 3;
50+
51+
// Topic
52+
Session tmpSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
53+
Topic topic = tmpSession.createTopic("HIGH_CONC.TOPIC");
54+
55+
// Consumers
56+
List<MessageConsumer> consumers = new ArrayList<>();
57+
List<Session> consumerSessions = new ArrayList<>();
58+
for (int i = 1; i <= DURABLE_CONSUMERS; i++) {
59+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
60+
consumers.add(s.createDurableSubscriber(topic, "Durable-" + i));
61+
consumerSessions.add(s);
62+
}
63+
for (int i = 1; i <= NON_DURABLE_CONSUMERS; i++) {
64+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
65+
consumers.add(s.createConsumer(topic));
66+
consumerSessions.add(s);
67+
}
68+
69+
ExecutorService executor = Executors.newFixedThreadPool(PRODUCERS + consumers.size());
70+
71+
// Produce messages concurrently
72+
CountDownLatch producerLatch = new CountDownLatch(PRODUCERS);
73+
for (int p = 1; p <= PRODUCERS; p++) {
74+
final int producerId = p;
75+
executor.submit(() -> {
76+
try {
77+
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
78+
MessageProducer producer = s.createProducer(topic);
79+
for (int m = 1; m <= MESSAGE_COUNT; m++) {
80+
TextMessage msg = s.createTextMessage("P" + producerId + "-M" + m);
81+
producer.send(msg);
82+
}
83+
} catch (JMSException e) {
84+
e.printStackTrace();
85+
} finally {
86+
producerLatch.countDown();
87+
}
88+
});
89+
}
90+
91+
// Consume messages concurrently
92+
List<Future<List<String>>> consumerFutures = new ArrayList<>();
93+
for (MessageConsumer consumer : consumers) {
94+
consumerFutures.add(executor.submit(() -> {
95+
List<String> received = new ArrayList<>();
96+
for (int i = 0; i < MESSAGE_COUNT * PRODUCERS; i++) {
97+
TextMessage msg = (TextMessage) consumer.receive(5000);
98+
assertNotNull("Consumer should receive a message", msg);
99+
received.add(msg.getText());
100+
}
101+
return received;
102+
}));
103+
}
104+
105+
// Wait for producers to finish
106+
producerLatch.await();
107+
108+
// Collect and validate consumer messages
109+
List<List<String>> allConsumed = new ArrayList<>();
110+
for (Future<List<String>> f : consumerFutures) {
111+
allConsumed.add(f.get(30, TimeUnit.SECONDS));
112+
}
113+
114+
// Check that each consumer received unique message instances
115+
for (int i = 0; i < allConsumed.size(); i++) {
116+
List<String> consumerMsgs = allConsumed.get(i);
117+
for (int j = i + 1; j < allConsumed.size(); j++) {
118+
List<String> otherMsgs = allConsumed.get(j);
119+
for (int k = 0; k < consumerMsgs.size(); k++) {
120+
assertNotSame(
121+
"Message instances should be independent across consumers",
122+
consumerMsgs.get(k),
123+
otherMsgs.get(k)
124+
);
125+
}
126+
}
127+
}
128+
129+
executor.shutdownNow();
130+
for (Session s : consumerSessions) s.close();
131+
}
132+
}

0 commit comments

Comments
 (0)