Skip to content

Commit 107490c

Browse files
authored
fix(net): fix RejectedExecutionException during shutdown trxHandlePool (#6692)
1 parent 512ce0c commit 107490c

2 files changed

Lines changed: 183 additions & 4 deletions

File tree

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.util.concurrent.BlockingQueue;
44
import java.util.concurrent.ExecutorService;
55
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.RejectedExecutionException;
67
import java.util.concurrent.ScheduledExecutorService;
78
import java.util.concurrent.TimeUnit;
89
import lombok.Getter;
@@ -44,6 +45,7 @@ public class TransactionsMsgHandler implements TronMsgHandler {
4445

4546
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
4647

48+
private volatile boolean isClosed = false;
4749
private int threadNum = Args.getInstance().getValidateSignThreadNum();
4850
private final String trxEsName = "trx-msg-handler";
4951
private ExecutorService trxHandlePool = ExecutorServiceManager.newThreadPoolExecutor(
@@ -58,8 +60,14 @@ public void init() {
5860
}
5961

6062
public void close() {
61-
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
63+
isClosed = true;
64+
// Stop the scheduler first so no new tasks are drained from smartContractQueue.
6265
ExecutorServiceManager.shutdownAndAwaitTermination(smartContractExecutor, smartEsName);
66+
// Then shutdown the worker pool to finish already-submitted tasks.
67+
ExecutorServiceManager.shutdownAndAwaitTermination(trxHandlePool, trxEsName);
68+
// Discard any remaining items and release references.
69+
smartContractQueue.clear();
70+
queue.clear();
6371
}
6472

6573
public boolean isBusy() {
@@ -68,6 +76,10 @@ public boolean isBusy() {
6876

6977
@Override
7078
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
79+
if (isClosed) {
80+
logger.info("TransactionsMsgHandler is closed, drop message");
81+
return;
82+
}
7183
TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
7284
check(peer, transactionsMessage);
7385
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
@@ -78,6 +90,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
7890
int trxHandlePoolQueueSize = 0;
7991
int dropSmartContractCount = 0;
8092
for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
93+
if (isClosed) {
94+
logger.info("TransactionsMsgHandler is closed during processing, stop submit");
95+
break;
96+
}
8197
int type = trx.getRawData().getContract(0).getType().getNumber();
8298
if (type == ContractType.TriggerSmartContract_VALUE
8399
|| type == ContractType.CreateSmartContract_VALUE) {
@@ -87,8 +103,13 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
87103
dropSmartContractCount++;
88104
}
89105
} else {
90-
ExecutorServiceManager.submit(
91-
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
106+
try {
107+
ExecutorServiceManager.submit(
108+
trxHandlePool, () -> handleTransaction(peer, new TransactionMessage(trx)));
109+
} catch (RejectedExecutionException e) {
110+
logger.warn("Submit task to {} failed", trxEsName);
111+
break;
112+
}
92113
}
93114
}
94115

framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java

Lines changed: 159 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
import com.google.protobuf.Any;
44
import com.google.protobuf.ByteString;
55
import java.lang.reflect.Field;
6+
import java.lang.reflect.Method;
67
import java.util.ArrayList;
78
import java.util.List;
89
import java.util.Map;
910
import java.util.concurrent.BlockingQueue;
1011
import java.util.concurrent.ConcurrentHashMap;
12+
import java.util.concurrent.ExecutorService;
1113
import java.util.concurrent.LinkedBlockingQueue;
14+
import java.util.concurrent.RejectedExecutionException;
1215

1316
import lombok.Getter;
1417
import org.joda.time.DateTime;
@@ -20,7 +23,10 @@
2023
import org.tron.common.TestConstants;
2124
import org.tron.common.runtime.TvmTestUtils;
2225
import org.tron.common.utils.ByteArray;
26+
import org.tron.core.ChainBaseManager;
2327
import org.tron.core.config.args.Args;
28+
import org.tron.core.exception.P2pException;
29+
import org.tron.core.exception.P2pException.TypeEnum;
2430
import org.tron.core.net.TronNetDelegate;
2531
import org.tron.core.net.message.adv.TransactionMessage;
2632
import org.tron.core.net.message.adv.TransactionsMessage;
@@ -80,7 +86,6 @@ public void testProcessMessage() {
8086
transactionsMsgHandler.processMessage(peer, new TransactionsMessage(transactionList));
8187
Assert.assertNull(advInvRequest.get(item));
8288
//Thread.sleep(10);
83-
transactionsMsgHandler.close();
8489
BlockingQueue<TrxEvent> smartContractQueue =
8590
new LinkedBlockingQueue(2);
8691
smartContractQueue.offer(new TrxEvent(null, null));
@@ -132,6 +137,159 @@ public void testProcessMessage() {
132137
}
133138
}
134139

140+
@Test
141+
public void testProcessMessageAfterClose() throws Exception {
142+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
143+
handler.init();
144+
handler.close();
145+
146+
PeerConnection peer = Mockito.mock(PeerConnection.class);
147+
TransactionsMessage msg = Mockito.mock(TransactionsMessage.class);
148+
149+
handler.processMessage(peer, msg);
150+
151+
Mockito.verify(msg, Mockito.never()).getTransactions();
152+
Mockito.verifyNoInteractions(peer);
153+
}
154+
155+
@Test
156+
public void testRejectedExecution() throws Exception {
157+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
158+
try {
159+
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
160+
Mockito.when(mockPool.submit(Mockito.any(Runnable.class)))
161+
.thenThrow(new RejectedExecutionException("pool closed"));
162+
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
163+
poolField.setAccessible(true);
164+
poolField.set(handler, mockPool);
165+
166+
PeerConnection peer = Mockito.mock(PeerConnection.class);
167+
TransactionsMessage msg = buildTransferMessage(2);
168+
stubAdvInvRequest(peer, msg);
169+
// 2 transfer transactions, submit throws on the first → catch + break, only called once
170+
handler.processMessage(peer, msg);
171+
172+
Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
173+
} finally {
174+
handler.close();
175+
}
176+
}
177+
178+
@Test
179+
public void testCloseDuringProcessing() throws Exception {
180+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
181+
try {
182+
Field closedField = TransactionsMsgHandler.class.getDeclaredField("isClosed");
183+
closedField.setAccessible(true);
184+
185+
ExecutorService mockPool = Mockito.mock(ExecutorService.class);
186+
// on the first submit, flip isClosed to true so the second iteration breaks
187+
Mockito.when(mockPool.submit(Mockito.any(Runnable.class))).thenAnswer(inv -> {
188+
closedField.set(handler, true);
189+
return null;
190+
});
191+
Field poolField = TransactionsMsgHandler.class.getDeclaredField("trxHandlePool");
192+
poolField.setAccessible(true);
193+
poolField.set(handler, mockPool);
194+
195+
PeerConnection peer = Mockito.mock(PeerConnection.class);
196+
TransactionsMessage msg = buildTransferMessage(2);
197+
stubAdvInvRequest(peer, msg);
198+
handler.processMessage(peer, msg);
199+
200+
Mockito.verify(mockPool, Mockito.times(1)).submit(Mockito.any(Runnable.class));
201+
} finally {
202+
handler.close();
203+
}
204+
}
205+
206+
private TransactionsMessage buildTransferMessage(int count) {
207+
List<Protocol.Transaction> txs = new ArrayList<>();
208+
for (int i = 0; i < count; i++) {
209+
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
210+
.setAmount(10 + i)
211+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
212+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
213+
.build();
214+
txs.add(Protocol.Transaction.newBuilder().setRawData(
215+
Protocol.Transaction.raw.newBuilder()
216+
.setTimestamp(1_700_000_000_000L + i)
217+
.setRefBlockNum(1)
218+
.addContract(Protocol.Transaction.Contract.newBuilder()
219+
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
220+
.setParameter(Any.pack(tc)).build()).build())
221+
.build());
222+
}
223+
return new TransactionsMessage(txs);
224+
}
225+
226+
private void stubAdvInvRequest(PeerConnection peer, TransactionsMessage msg) {
227+
Map<Item, Long> advInvRequest = new ConcurrentHashMap<>();
228+
for (Protocol.Transaction trx : msg.getTransactions().getTransactionsList()) {
229+
Item item = new Item(new TransactionMessage(trx).getMessageId(),
230+
Protocol.Inventory.InventoryType.TRX);
231+
advInvRequest.put(item, 0L);
232+
}
233+
Mockito.when(peer.getAdvInvRequest()).thenReturn(advInvRequest);
234+
}
235+
236+
@Test
237+
public void testHandleTransaction() throws Exception {
238+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
239+
try {
240+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
241+
AdvService advService = Mockito.mock(AdvService.class);
242+
ChainBaseManager chainBaseManager = Mockito.mock(ChainBaseManager.class);
243+
244+
Field f1 = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
245+
f1.setAccessible(true);
246+
f1.set(handler, tronNetDelegate);
247+
Field f2 = TransactionsMsgHandler.class.getDeclaredField("advService");
248+
f2.setAccessible(true);
249+
f2.set(handler, advService);
250+
Field f3 = TransactionsMsgHandler.class.getDeclaredField("chainBaseManager");
251+
f3.setAccessible(true);
252+
f3.set(handler, chainBaseManager);
253+
254+
PeerConnection peer = Mockito.mock(PeerConnection.class);
255+
256+
BalanceContract.TransferContract tc = BalanceContract.TransferContract.newBuilder()
257+
.setAmount(10)
258+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString("121212a9cf")))
259+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString("232323a9cf")))
260+
.build();
261+
long now = System.currentTimeMillis();
262+
Protocol.Transaction trx = Protocol.Transaction.newBuilder().setRawData(
263+
Protocol.Transaction.raw.newBuilder()
264+
.setTimestamp(now)
265+
.setExpiration(now + 60_000)
266+
.setRefBlockNum(1)
267+
.addContract(Protocol.Transaction.Contract.newBuilder()
268+
.setType(Protocol.Transaction.Contract.ContractType.TransferContract)
269+
.setParameter(Any.pack(tc)).build()).build())
270+
.build();
271+
TransactionMessage trxMsg = new TransactionMessage(trx);
272+
273+
Method handleTx = TransactionsMsgHandler.class.getDeclaredMethod(
274+
"handleTransaction", PeerConnection.class, TransactionMessage.class);
275+
handleTx.setAccessible(true);
276+
277+
// happy path → push and broadcast
278+
Mockito.when(chainBaseManager.getNextBlockSlotTime()).thenReturn(now);
279+
handleTx.invoke(handler, peer, trxMsg);
280+
Mockito.verify(advService).broadcast(trxMsg);
281+
282+
// P2pException BAD_TRX → disconnect
283+
Mockito.doThrow(new P2pException(TypeEnum.BAD_TRX, "bad"))
284+
.when(tronNetDelegate).pushTransaction(Mockito.any());
285+
handleTx.invoke(handler, peer, trxMsg);
286+
Mockito.verify(peer).setBadPeer(true);
287+
Mockito.verify(peer).disconnect(Protocol.ReasonCode.BAD_TX);
288+
} finally {
289+
handler.close();
290+
}
291+
}
292+
135293
class TrxEvent {
136294

137295
@Getter

0 commit comments

Comments
 (0)