Skip to content

Commit 416efc4

Browse files
wbwb
authored andcommitted
feat(net): optimize transaction rate limiting with accurate cache size check
1. Add Manager.getCachedTransactionSize() = pushTransactionQueue + pendingTransactions + rePushTransactions to expose the true cached transaction count across all three queues. 2. Fix isTooManyPending() to include pushTransactionQueue, which was previously omitted, causing the pending threshold to be underestimated. 3. Update TransactionsMsgHandler.isBusy() to factor in the Manager cache size via TronNetDelegate.getCachedTransactionSize(), so the node stops accepting TRX INV messages when the full pipeline is busy. 4. Make the busy threshold configurable via node.maxTrxCacheSize (default: 50000), replacing the hardcoded MAX_TRX_SIZE constant.
1 parent fe25380 commit 416efc4

10 files changed

Lines changed: 105 additions & 7 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,9 @@ public class CommonParameter {
492492
public long pendingTransactionTimeout;
493493
@Getter
494494
@Setter
495+
public int maxTrxCacheSize;
496+
@Getter
497+
@Setter
495498
public boolean nodeMetricsEnable = false;
496499
@Getter
497500
@Setter

common/src/main/java/org/tron/core/config/args/NodeConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class NodeConfig {
8585
private ChannelConfig channel = new ChannelConfig();
8686
private int maxTransactionPendingSize = 2000;
8787
private long pendingTransactionTimeout = 60000;
88+
private int maxTrxCacheSize = 50_000;
8889
private int agreeNodeCount = 0;
8990
private boolean openHistoryQueryWhenLiteFN = false;
9091
private boolean unsolidifiedBlockCheck = false;
@@ -496,6 +497,11 @@ private void postProcess() {
496497
if (dynamicConfig.checkInterval <= 0) {
497498
dynamicConfig.checkInterval = 600;
498499
}
500+
501+
// maxTrxCacheSize: minimum 2000
502+
if (maxTrxCacheSize < 2000) {
503+
maxTrxCacheSize = 2000;
504+
}
499505
}
500506

501507
// ===========================================================================

common/src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ node {
344344
receiveTcpMinDataLength = 2048
345345
maxTransactionPendingSize = 2000
346346
pendingTransactionTimeout = 60000
347+
# total cached trx across handler queues + pending + rePush
348+
maxTrxCacheSize = 50000
347349

348350
# Consensus agreement
349351
agreeNodeCount = 0

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ private static void applyNodeConfig(NodeConfig nc) {
640640

641641
PARAMETER.maxTransactionPendingSize = nc.getMaxTransactionPendingSize();
642642
PARAMETER.pendingTransactionTimeout = nc.getPendingTransactionTimeout();
643+
PARAMETER.maxTrxCacheSize = nc.getMaxTrxCacheSize();
643644

644645
PARAMETER.validContractProtoThreadNum = nc.getValidContractProtoThreads();
645646

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2085,9 +2085,13 @@ public NullifierStore getNullifierStore() {
20852085
return chainBaseManager.getNullifierStore();
20862086
}
20872087

2088+
public int getCachedTransactionSize() {
2089+
return pushTransactionQueue.size() + getPendingTransactions().size()
2090+
+ getRePushTransactions().size();
2091+
}
2092+
20882093
public boolean isTooManyPending() {
2089-
return getPendingTransactions().size() + getRePushTransactions().size()
2090-
> maxTransactionPendingSize;
2094+
return getCachedTransactionSize() > maxTransactionPendingSize;
20912095
}
20922096

20932097
private void preValidateTransactionSign(List<TransactionCapsule> txs)

framework/src/main/java/org/tron/core/net/TronNetDelegate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,8 @@ public boolean isBlockUnsolidified() {
384384
return headNum - solidNum >= maxUnsolidifiedBlocks;
385385
}
386386

387+
public int getCachedTransactionSize() {
388+
return dbManager.getCachedTransactionSize();
389+
}
390+
387391
}

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
@Component
3333
public class TransactionsMsgHandler implements TronMsgHandler {
3434

35-
private static int MAX_TRX_SIZE = 50_000;
3635
private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100;
3736
@Autowired
3837
private TronNetDelegate tronNetDelegate;
@@ -41,7 +40,8 @@ public class TransactionsMsgHandler implements TronMsgHandler {
4140
@Autowired
4241
private ChainBaseManager chainBaseManager;
4342

44-
private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(MAX_TRX_SIZE);
43+
private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(
44+
Args.getInstance().getMaxTrxCacheSize());
4545

4646
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
4747

@@ -71,7 +71,8 @@ public void close() {
7171
}
7272

7373
public boolean isBusy() {
74-
return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE;
74+
return queue.size() + smartContractQueue.size()
75+
+ tronNetDelegate.getCachedTransactionSize() > Args.getInstance().getMaxTrxCacheSize();
7576
}
7677

7778
@Override

framework/src/main/resources/config.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ node {
160160
# Range: [50, 2000], default: 500
161161
# maxPendingBlockSize = 500
162162

163+
# Maximum total number of cached transactions (handler queues + pending + rePush).
164+
# When exceeded, the node stops accepting TRX INV messages from peers.
165+
# maxTrxCacheSize = 50000
166+
163167
# Number of validate sign thread, default availableProcessors
164168
# validateSignThreadNum = 16
165169

framework/src/test/java/org/tron/core/db/ManagerTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.BlockingQueue;
2930
import java.util.concurrent.LinkedBlockingQueue;
3031
import java.util.concurrent.atomic.AtomicInteger;
3132
import java.util.stream.Collectors;
@@ -1419,6 +1420,56 @@ public void testClearSolidityContractTriggerCache() throws Exception {
14191420
}
14201421
}
14211422

1423+
@Test
1424+
public void testGetCachedTransactionSize() throws Exception {
1425+
BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
1426+
pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1427+
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
1428+
pushField.setAccessible(true);
1429+
pushField.set(dbManager, pushQ);
1430+
1431+
dbManager.getPendingTransactions().clear();
1432+
dbManager.getPendingTransactions().add(
1433+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1434+
dbManager.getPendingTransactions().add(
1435+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1436+
1437+
dbManager.getRePushTransactions().clear();
1438+
1439+
// 1 (push) + 2 (pending) + 0 (rePush) = 3
1440+
Assert.assertEquals(3, dbManager.getCachedTransactionSize());
1441+
1442+
// cleanup
1443+
pushQ.clear();
1444+
dbManager.getPendingTransactions().clear();
1445+
}
1446+
1447+
@Test
1448+
public void testIsTooManyPendingIncludesPushQueue() throws Exception {
1449+
int threshold = Args.getInstance().getMaxTransactionPendingSize();
1450+
1451+
BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
1452+
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
1453+
pushField.setAccessible(true);
1454+
pushField.set(dbManager, pushQ);
1455+
1456+
dbManager.getPendingTransactions().clear();
1457+
dbManager.getRePushTransactions().clear();
1458+
1459+
for (int i = 0; i < threshold; i++) {
1460+
dbManager.getPendingTransactions().add(
1461+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1462+
}
1463+
Assert.assertFalse(dbManager.isTooManyPending());
1464+
1465+
pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1466+
Assert.assertTrue(dbManager.isTooManyPending());
1467+
1468+
// cleanup
1469+
dbManager.getPendingTransactions().clear();
1470+
pushQ.clear();
1471+
}
1472+
14221473
public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long amount)
14231474
throws BalanceInsufficientException {
14241475
Commons.adjustBalance(accountStore, accountAddress, amount,

framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ public static void init() {
4848
public void testProcessMessage() {
4949
TransactionsMsgHandler transactionsMsgHandler = new TransactionsMsgHandler();
5050
try {
51-
Assert.assertFalse(transactionsMsgHandler.isBusy());
52-
5351
transactionsMsgHandler.init();
5452

5553
PeerConnection peer = Mockito.mock(PeerConnection.class);
@@ -60,6 +58,8 @@ public void testProcessMessage() {
6058
field.setAccessible(true);
6159
field.set(transactionsMsgHandler, tronNetDelegate);
6260

61+
Assert.assertFalse(transactionsMsgHandler.isBusy());
62+
6363
BalanceContract.TransferContract transferContract = BalanceContract.TransferContract
6464
.newBuilder()
6565
.setAmount(10)
@@ -290,6 +290,28 @@ public void testHandleTransaction() throws Exception {
290290
}
291291
}
292292

293+
@Test
294+
public void testIsBusyWithCachedTransactions() throws Exception {
295+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
296+
297+
int threshold = Args.getInstance().getMaxTrxCacheSize();
298+
TronNetDelegate tronNetDelegateMock = Mockito.mock(TronNetDelegate.class);
299+
Field field = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
300+
field.setAccessible(true);
301+
field.set(handler, tronNetDelegateMock);
302+
303+
// queue and smartContractQueue are empty, but cached size > threshold
304+
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold + 1);
305+
Assert.assertTrue(handler.isBusy());
306+
307+
// boundary: cached size == threshold, isBusy() uses strict >, so not busy
308+
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(threshold);
309+
Assert.assertFalse(handler.isBusy());
310+
311+
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(0);
312+
Assert.assertFalse(handler.isBusy());
313+
}
314+
293315
class TrxEvent {
294316

295317
@Getter

0 commit comments

Comments
 (0)