Skip to content

Commit 3b71d42

Browse files
author
Pradeep Kunchala
committed
Made all requested changes in VMTransport.java and cleaned up imports as per feedback
1 parent 35818ee commit 3b71d42

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020
import java.io.InterruptedIOException;
2121
import java.net.URI;
2222
import java.security.cert.X509Certificate;
23-
import java.sql.PreparedStatement;
24-
import java.util.List;
25-
import java.util.Map;
26-
import java.util.concurrent.*;
23+
import java.util.concurrent.BlockingQueue;
24+
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.concurrent.TimeUnit;
2726
import java.util.concurrent.atomic.AtomicBoolean;
2827
import java.util.concurrent.atomic.AtomicLong;
2928

30-
import jakarta.jms.JMSException;
3129
import org.apache.activemq.command.*;
3230
import org.apache.activemq.openwire.OpenWireFormat;
3331
import org.apache.activemq.thread.Task;
@@ -72,9 +70,6 @@ public class VMTransport implements Transport, Task {
7270

7371
private volatile int receiveCounter;
7472

75-
private final List<TransportListener> listeners = new CopyOnWriteArrayList<>();
76-
private final ExecutorService executor = Executors.newCachedThreadPool();
77-
7873
public VMTransport(URI location) {
7974
this.location = location;
8075
this.id = NEXT_ID.getAndIncrement();
@@ -116,8 +111,7 @@ public void oneway(Object command) throws IOException {
116111
ByteSequence data = wf.marshal(original);
117112
toSend = (ActiveMQMessage) wf.unmarshal(data);
118113
} catch (IOException e) {
119-
LOG.warn("Failed to marshal/unmarshal ActiveMQMessage, sending original", e);
120-
toSend = command;
114+
throw new RuntimeException("Failed to deep copy MessageDispatch in VM transport", e);
121115
}
122116
}
123117

@@ -154,6 +148,7 @@ public void oneway(Object command) throws IOException {
154148

155149
// Dispatch to listener
156150
dispatch(peer, peer.messageQueue, toSend);
151+
return;
157152

158153
} catch (InterruptedException e) {
159154
Thread.currentThread().interrupt();
@@ -162,7 +157,6 @@ public void oneway(Object command) throws IOException {
162157
throw iioe;
163158
}
164159

165-
dispatch(peer, peer.messageQueue, command);
166160
}
167161

168162
public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {

activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportDefensiveCopyTest.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,46 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.activemq.transport.vm;
218

3-
import jakarta.jms.*;
19+
import jakarta.jms.Connection;
20+
import jakarta.jms.MessageConsumer;
21+
import jakarta.jms.Session;
22+
import jakarta.jms.Topic;
23+
import jakarta.jms.MessageProducer;
24+
import jakarta.jms.TextMessage;
25+
import jakarta.jms.JMSException;
26+
427
import org.apache.activemq.ActiveMQConnectionFactory;
528
import org.apache.activemq.broker.BrokerService;
629
import org.junit.After;
730
import org.junit.Before;
831
import org.junit.Test;
932

10-
import java.util.ArrayList;
1133
import java.util.List;
12-
import java.util.concurrent.*;
34+
import java.util.ArrayList;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Future;
39+
import java.util.concurrent.TimeUnit;
1340

1441
import static org.junit.Assert.assertNotNull;
1542
import static org.junit.Assert.assertNotSame;
1643

17-
import java.util.concurrent.*;
18-
1944

2045
public class VMTransportDefensiveCopyTest {
2146

0 commit comments

Comments
 (0)