Skip to content

Commit 0581b72

Browse files
wbclaude
authored andcommitted
feat(net): optimize transaction rate limiting with accurate cache size check
1. Add Manager.getCachedTransactionSize() = pushTransactionQueue + pendingTransactions + rePushTransactions to expose the true cached transaction count across all three queues. 2. Fix isTooManyPending() to include pushTransactionQueue, which was previously omitted, causing the pending threshold to be underestimated. 3. Update TransactionsMsgHandler.isBusy() to factor in the Manager cache size via TronNetDelegate.getCachedTransactionSize(), so the node stops accepting TRX INV messages when the full pipeline is busy. 4. Make the busy threshold configurable via node.maxTrxCacheSize (default: 50000), replacing the hardcoded MAX_TRX_SIZE constant. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 980c707 commit 0581b72

10 files changed

Lines changed: 102 additions & 7 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,9 @@ public class CommonParameter {
468468
public long pendingTransactionTimeout;
469469
@Getter
470470
@Setter
471+
public int maxTrxCacheSize;
472+
@Getter
473+
@Setter
471474
public boolean nodeMetricsEnable = false;
472475
@Getter
473476
@Setter

common/src/main/java/org/tron/core/config/args/NodeConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class NodeConfig {
8080
private ChannelConfig channel = new ChannelConfig();
8181
private int maxTransactionPendingSize = 2000;
8282
private long pendingTransactionTimeout = 60000;
83+
private int maxTrxCacheSize = 50_000;
8384
private int agreeNodeCount = 0;
8485
private boolean openHistoryQueryWhenLiteFN = false;
8586
private boolean unsolidifiedBlockCheck = false;
@@ -472,6 +473,11 @@ private void postProcess() {
472473
if (dynamicConfig.checkInterval <= 0) {
473474
dynamicConfig.checkInterval = 600;
474475
}
476+
477+
// maxTrxCacheSize: minimum 2000
478+
if (maxTrxCacheSize < 2000) {
479+
maxTrxCacheSize = 2000;
480+
}
475481
}
476482

477483
// ===========================================================================

common/src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ node {
351351
receiveTcpMinDataLength = 2048
352352
maxTransactionPendingSize = 2000
353353
pendingTransactionTimeout = 60000
354+
# total cached trx across handler queues + pending + rePush
355+
maxTrxCacheSize = 50000
354356

355357
# Consensus agreement
356358
agreeNodeCount = 0

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,7 @@ private static void applyNodeConfig(NodeConfig nc) {
643643

644644
PARAMETER.maxTransactionPendingSize = nc.getMaxTransactionPendingSize();
645645
PARAMETER.pendingTransactionTimeout = nc.getPendingTransactionTimeout();
646+
PARAMETER.maxTrxCacheSize = nc.getMaxTrxCacheSize();
646647

647648
PARAMETER.validContractProtoThreadNum = nc.getValidContractProtoThreads();
648649

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2065,9 +2065,13 @@ public NullifierStore getNullifierStore() {
20652065
return chainBaseManager.getNullifierStore();
20662066
}
20672067

2068+
public int getCachedTransactionSize() {
2069+
return pushTransactionQueue.size() + getPendingTransactions().size()
2070+
+ getRePushTransactions().size();
2071+
}
2072+
20682073
public boolean isTooManyPending() {
2069-
return getPendingTransactions().size() + getRePushTransactions().size()
2070-
> maxTransactionPendingSize;
2074+
return getCachedTransactionSize() > maxTransactionPendingSize;
20712075
}
20722076

20732077
private void preValidateTransactionSign(List<TransactionCapsule> txs)

framework/src/main/java/org/tron/core/net/TronNetDelegate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,4 +384,8 @@ public boolean isBlockUnsolidified() {
384384
return headNum - solidNum >= maxUnsolidifiedBlocks;
385385
}
386386

387+
public int getCachedTransactionSize() {
388+
return dbManager.getCachedTransactionSize();
389+
}
390+
387391
}

framework/src/main/java/org/tron/core/net/messagehandler/TransactionsMsgHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
@Component
3232
public class TransactionsMsgHandler implements TronMsgHandler {
3333

34-
private static int MAX_TRX_SIZE = 50_000;
3534
private static int MAX_SMART_CONTRACT_SUBMIT_SIZE = 100;
3635
@Autowired
3736
private TronNetDelegate tronNetDelegate;
@@ -40,7 +39,8 @@ public class TransactionsMsgHandler implements TronMsgHandler {
4039
@Autowired
4140
private ChainBaseManager chainBaseManager;
4241

43-
private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(MAX_TRX_SIZE);
42+
private BlockingQueue<TrxEvent> smartContractQueue = new LinkedBlockingQueue(
43+
Args.getInstance().getMaxTrxCacheSize());
4444

4545
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
4646

@@ -63,7 +63,8 @@ public void close() {
6363
}
6464

6565
public boolean isBusy() {
66-
return queue.size() + smartContractQueue.size() > MAX_TRX_SIZE;
66+
return queue.size() + smartContractQueue.size()
67+
+ tronNetDelegate.getCachedTransactionSize() > Args.getInstance().getMaxTrxCacheSize();
6768
}
6869

6970
@Override

framework/src/main/resources/config.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ node {
163163
fetchBlock.timeout = 200
164164
# syncFetchBatchNum = 2000
165165

166+
# Maximum total number of cached transactions (handler queues + pending + rePush).
167+
# When exceeded, the node stops accepting TRX INV messages from peers.
168+
# maxTrxCacheSize = 50000
169+
166170
# Number of validate sign thread, default availableProcessors
167171
# validateSignThreadNum = 16
168172

framework/src/test/java/org/tron/core/db/ManagerTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@
1616
import com.google.common.collect.Sets;
1717
import com.google.protobuf.Any;
1818
import com.google.protobuf.ByteString;
19+
import java.lang.reflect.Field;
1920
import java.nio.charset.StandardCharsets;
2021
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Set;
26+
import java.util.concurrent.BlockingQueue;
27+
import java.util.concurrent.LinkedBlockingQueue;
2528
import java.util.concurrent.atomic.AtomicInteger;
2629
import java.util.stream.Collectors;
2730
import java.util.stream.IntStream;
@@ -1292,6 +1295,56 @@ public void blockTrigger() {
12921295
Assert.assertEquals(TronError.ErrCode.EVENT_SUBSCRIBE_ERROR, thrown.getErrCode());
12931296
}
12941297

1298+
@Test
1299+
public void testGetCachedTransactionSize() throws Exception {
1300+
BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
1301+
pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1302+
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
1303+
pushField.setAccessible(true);
1304+
pushField.set(dbManager, pushQ);
1305+
1306+
dbManager.getPendingTransactions().clear();
1307+
dbManager.getPendingTransactions().add(
1308+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1309+
dbManager.getPendingTransactions().add(
1310+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1311+
1312+
dbManager.getRePushTransactions().clear();
1313+
1314+
// 1 (push) + 2 (pending) + 0 (rePush) = 3
1315+
Assert.assertEquals(3, dbManager.getCachedTransactionSize());
1316+
1317+
// cleanup
1318+
pushQ.clear();
1319+
dbManager.getPendingTransactions().clear();
1320+
}
1321+
1322+
@Test
1323+
public void testIsTooManyPendingIncludesPushQueue() throws Exception {
1324+
int threshold = Args.getInstance().getMaxTransactionPendingSize();
1325+
1326+
BlockingQueue<TransactionCapsule> pushQ = new LinkedBlockingQueue<>();
1327+
Field pushField = Manager.class.getDeclaredField("pushTransactionQueue");
1328+
pushField.setAccessible(true);
1329+
pushField.set(dbManager, pushQ);
1330+
1331+
dbManager.getPendingTransactions().clear();
1332+
dbManager.getRePushTransactions().clear();
1333+
1334+
for (int i = 0; i < threshold; i++) {
1335+
dbManager.getPendingTransactions().add(
1336+
new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1337+
}
1338+
Assert.assertFalse(dbManager.isTooManyPending());
1339+
1340+
pushQ.add(new TransactionCapsule(Protocol.Transaction.getDefaultInstance()));
1341+
Assert.assertTrue(dbManager.isTooManyPending());
1342+
1343+
// cleanup
1344+
dbManager.getPendingTransactions().clear();
1345+
pushQ.clear();
1346+
}
1347+
12951348
public void adjustBalance(AccountStore accountStore, byte[] accountAddress, long amount)
12961349
throws BalanceInsufficientException {
12971350
Commons.adjustBalance(accountStore, accountAddress, amount,

framework/src/test/java/org/tron/core/net/messagehandler/TransactionsMsgHandlerTest.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ public static void init() {
4242
public void testProcessMessage() {
4343
TransactionsMsgHandler transactionsMsgHandler = new TransactionsMsgHandler();
4444
try {
45-
Assert.assertFalse(transactionsMsgHandler.isBusy());
46-
4745
transactionsMsgHandler.init();
4846

4947
PeerConnection peer = Mockito.mock(PeerConnection.class);
@@ -54,6 +52,8 @@ public void testProcessMessage() {
5452
field.setAccessible(true);
5553
field.set(transactionsMsgHandler, tronNetDelegate);
5654

55+
Assert.assertFalse(transactionsMsgHandler.isBusy());
56+
5757
BalanceContract.TransferContract transferContract = BalanceContract.TransferContract
5858
.newBuilder()
5959
.setAmount(10)
@@ -132,6 +132,23 @@ public void testProcessMessage() {
132132
}
133133
}
134134

135+
@Test
136+
public void testIsBusyWithCachedTransactions() throws Exception {
137+
TransactionsMsgHandler handler = new TransactionsMsgHandler();
138+
139+
TronNetDelegate tronNetDelegateMock = Mockito.mock(TronNetDelegate.class);
140+
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(50_001);
141+
Field field = TransactionsMsgHandler.class.getDeclaredField("tronNetDelegate");
142+
field.setAccessible(true);
143+
field.set(handler, tronNetDelegateMock);
144+
145+
// queue and smartContractQueue are empty, but cached size > threshold
146+
Assert.assertTrue(handler.isBusy());
147+
148+
Mockito.when(tronNetDelegateMock.getCachedTransactionSize()).thenReturn(0);
149+
Assert.assertFalse(handler.isBusy());
150+
}
151+
135152
class TrxEvent {
136153

137154
@Getter

0 commit comments

Comments
 (0)