Skip to content

Commit 2e3164c

Browse files
committed
fix(net): fix RejectedExecutionException during shutdown trxHandlePoo
1 parent 039821c commit 2e3164c

2 files changed

Lines changed: 183 additions & 3 deletions

File tree

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.concurrent.BlockingQueue;
44
import java.util.concurrent.ExecutorService;
55
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.RejectedExecutionException;
67
import java.util.concurrent.ScheduledExecutorService;
78
import java.util.concurrent.TimeUnit;
89
import lombok.Getter;
@@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {
4445

4546
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
4647

48+
private volatile boolean isClosed = false;
4749
private int threadNum = Args.getInstance().getValidateSignThreadNum();
4850
private final String trxEsName = "trx-msg-handler";
4951
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
@@ -58,8 +60,14 @@ public void init() {
5860
}
5961

6062
public void close() {
61-
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
63+
isClosed = true;
64+
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
6265
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
66+
// Then shutdown the worker pool to finish already-submitted tasks.
67+
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
68+
// Discard any remaining items and release references.
69+
smartContractQueue.clear();
70+
queue.clear();
6371
}
6472

6573
public boolean isBusy() {
@@ -68,6 +76,10 @@ public boolean isBusy() {
6876

6977
@Override
7078
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
79+
if (isClosed) {
80+
logger.warn("TransactionsMsgHandler is closed, drop message");
81+
return;
82+
}
7183
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
7284
check(peer, transactionsMessage);
7385
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
@@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
7890
int trxHandlePoolQueueSize = 0;
7991
int dropSmartContractCount = 0;
8092
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
93+
if (isClosed) {
94+
logger.warn("TransactionsMsgHandler is closed during processing, stop submit");
95+
break;
96+
}
8197
int type = trx.getRawData().getContract(0).getType().getNumber();
8298
if (type == ContractType.TriggerSmartContract_VALUE
8399
|| type == ContractType.CreateSmartContract_VALUE) {
@@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
87103
dropSmartContractCount++;
88104
}
89105
} else {
90-
ExecutorServiceManager.submit(
91-
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
106+
try {
107+
ExecutorServiceManager.submit(
108+
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
109+
} catch (RejectedExecutionException e) {
110+
logger.warn("Submit task to {} failed", trxEsName);
111+
break;
112+
}
92113
}
93114
}
94115

framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import com.google.protobuf.Any;
44
import com.google.protobuf.ByteString;
55
import java.lang.reflect.Field;
6+
import java.lang.reflect.Method;
67
import java.util.ArrayList;
78
import java.util.List;
89
import java.util.Map;
910
import java.util.concurrent.BlockingQueue;
1011
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.ExecutorService;
1113
import java.util.concurrent.LinkedBlockingQueue;
14+
import java.util.concurrent.RejectedExecutionException;
1215

1316
import lombok.Getter;
1417
import org.joda.time.DateTime;
@@ -20,7 +23,10 @@
2023
import org.tron.common.TestConstants;
2124
import org.tron.common.runtime.TvmTestUtils;
2225
import org.tron.common.utils.ByteArray;
26+
import org.tron.core.ChainBaseManager;
2327
import org.tron.core.config.args.Args;
28+
import org.tron.core.exception.P2pException;
29+
import org.tron.core.exception.P2pException.TypeEnum;
2430
import org.tron.core.net.TronNetDelegate;
2531
import org.tron.core.net.message.adv.TransactionMessage;
2632
import org.tron.core.net.message.adv.TransactionsMessage;
@@ -132,6 +138,159 @@ public void testProcessMessage() {
132138
}
133139
}
134140

141+
@Test
142+
public void testProcessMessageAfterClose() throws Exception {
143+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
144+
handler.init();
145+
handler.close();
146+
147+
PeerConnection peer = Mockito.mock(PeerConnection.class);
148+
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);
149+
150+
handler.processMessage(peer, msg);
151+
152+
Mockito.verify(msg, Mockito.never()).getTransactions();
153+
Mockito.verifyNoInteractions(peer);
154+
}
155+
156+
@Test
157+
public void testRejectedExecution() throws Exception {
158+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
159+
try {
160+
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
161+
Mockito.when(mockPool.submit(Mockito.any(Runnable.class)))
162+
.thenThrow(new RejectedExecutionException("pool closed"));
163+
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
164+
poolField.setAccessible(true);
165+
poolField.set(handler, mockPool);
166+
167+
PeerConnection peer = Mockito.mock(PeerConnection.class);
168+
TransactionsMessage msg = buildTransferMessage(2);
169+
stubAdvInvRequest(peer, msg);
170+
// 2 transfer transactions, submit throws on the first → catch + break, only called once
171+
handler.processMessage(peer, msg);
172+
173+
Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
174+
} finally {
175+
handler.close();
176+
}
177+
}
178+
179+
@Test
180+
public void testCloseDuringProcessing() throws Exception {
181+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
182+
try {
183+
Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed");
184+
closedField.setAccessible(true);
185+
186+
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
187+
// on the first submit, flip isClosed to true so the second iteration breaks
188+
Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> {
189+
closedField.set(handler, true);
190+
return null;
191+
});
192+
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
193+
poolField.setAccessible(true);
194+
poolField.set(handler, mockPool);
195+
196+
PeerConnection peer = Mockito.mock(PeerConnection.class);
197+
TransactionsMessage msg = buildTransferMessage(2);
198+
stubAdvInvRequest(peer, msg);
199+
handler.processMessage(peer, msg);
200+
201+
Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
202+
} finally {
203+
handler.close();
204+
}
205+
}
206+
207+
private TransactionsMessage buildTransferMessage(int count) {
208+
List<Protocol.Transaction> txs = new ArrayList<>();
209+
for (int i = 0; i < count; i++) {
210+
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
211+
.setAmount(10 + i)
212+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
213+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
214+
.build();
215+
txs.add(Protocol.Transaction.newBuilder().setRawData(
216+
Protocol.Transaction.raw.newBuilder()
217+
.setTimestamp(1_700_000_000_000L + i)
218+
.setRefBlockNum(1)
219+
.addContract(Protocol.Transaction.Contract.newBuilder()
220+
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
221+
.setParameter(Any.pack(tc)).build()).build())
222+
.build());
223+
}
224+
return new TransactionsMessage(txs);
225+
}
226+
227+
private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) {
228+
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
229+
for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) {
230+
Item item = new Item(new TransactionMessage(trx).getMessageId(),
231+
Protocol.Inventory.InventoryType.TRX);
232+
advInvRequest.put(item, 0L);
233+
}
234+
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);
235+
}
236+
237+
@Test
238+
public void testHandleTransaction() throws Exception {
239+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
240+
try {
241+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
242+
AdvService advService = Mockito.mock(AdvService.class);
243+
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);
244+
245+
Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
246+
f1.setAccessible(true);
247+
f1.set(handler, tronNetDelegate);
248+
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
249+
f2.setAccessible(true);
250+
f2.set(handler, advService);
251+
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
252+
f3.setAccessible(true);
253+
f3.set(handler, chainBaseManager);
254+
255+
PeerConnection peer = Mockito.mock(PeerConnection.class);
256+
257+
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
258+
.setAmount(10)
259+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
260+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
261+
.build();
262+
long now = System.currentTimeMillis();
263+
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
264+
Protocol.Transaction.raw.newBuilder()
265+
.setTimestamp(now)
266+
.setExpiration(now + 60_000)
267+
.setRefBlockNum(1)
268+
.addContract(Protocol.Transaction.Contract.newBuilder()
269+
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
270+
.setParameter(Any.pack(tc)).build()).build())
271+
.build();
272+
TransactionMessage trxMsg = new TransactionMessage(trx);
273+
274+
Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
275+
"handleTransaction", PeerConnection.class, TransactionMessage.class);
276+
handleTx.setAccessible(true);
277+
278+
// happy path → push and broadcast
279+
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
280+
handleTx.invoke(handler, peer, trxMsg);
281+
Mockito.verify(advService).broadcast(trxMsg);
282+
283+
// P2pException BAD_TRX → disconnect
284+
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
285+
.when(tronNetDelegate).pushTransaction(Mockito.any());
286+
handleTx.invoke(handler, peer, trxMsg);
287+
Mockito.verify(peer).setBadPeer(true);
288+
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
289+
} finally {
290+
handler.close();
291+
}
292+
}
293+
135294
class TrxEvent {
136295

137296
@Getter

0 commit comments

Comments
 (0)