Skip to content

Commit 81c6528

Browse files
committed
feat(net):Optimize rate limiting logic and add tests
1 parent 2454a92 commit 81c6528

4 files changed

Lines changed: 24 additions & 12 deletions

File tree

framework/src/main/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandler.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public class FetchInvDataMsgHandler implements TronMsgHandler {
4141
.maximumSize(1000).expireAfterWrite(1, TimeUnit.HOURS).build();
4242

4343
private static final int MAX_SIZE = 1_000_000;
44-
private static final int MAX_FETCH_SIZE = 100;
4544
@Autowired
4645
private TronNetDelegate tronNetDelegate;
4746
@Autowired
@@ -56,13 +55,6 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
5655

5756
FetchInvDataMessage fetchInvDataMsg = (FetchInvDataMessage) msg;
5857

59-
if (peer.isNeedSyncFromUs() && !peer.getP2pRateLimiter().tryAcquire(msg.getType().asByte())) {
60-
// Discard messages that exceed the rate limit
61-
logger.warn("{} message from peer {} exceeds the rate limit",
62-
msg.getType(), peer.getInetSocketAddress());
63-
return;
64-
}
65-
6658
check(peer, fetchInvDataMsg);
6759

6860
InventoryType type = fetchInvDataMsg.getInventoryType();
@@ -164,7 +156,11 @@ private void check(PeerConnection peer, FetchInvDataMessage fetchInvDataMsg) thr
164156
if (!peer.isNeedSyncFromUs()) {
165157
throw new P2pException(TypeEnum.BAD_MESSAGE, "no need sync");
166158
}
167-
if (fetchInvDataMsg.getHashList().size() > MAX_FETCH_SIZE) {
159+
if (!peer.getP2pRateLimiter().tryAcquire(fetchInvDataMsg.getType().asByte())) {
160+
throw new P2pException(TypeEnum.BAD_MESSAGE, fetchInvDataMsg.getType()
161+
+ " message exceeds the rate limit");
162+
}
163+
if (fetchInvDataMsg.getHashList().size() > NetConstants.MAX_BLOCK_FETCH_PER_PEER) {
168164
throw new P2pException(TypeEnum.BAD_MESSAGE, "fetch too more blocks, size:"
169165
+ fetchInvDataMsg.getHashList().size());
170166
}

framework/src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ public class PeerConnection {
5959

6060
private static List<InetSocketAddress> relayNodes = Args.getInstance().getFastForwardNodes();
6161

62+
private static final double SYNC_BLOCK_CHAIN_RATE = 3.0;
63+
private static final double FETCH_INV_DATA_RATE = 3.0;
64+
private static final double P2P_DISCONNECT_RATE = 1.0;
65+
6266
@Getter
6367
private PeerStatistics peerStatistics = new PeerStatistics();
6468

@@ -171,9 +175,9 @@ public void setChannel(Channel channel) {
171175
}
172176
this.nodeStatistics = TronStatsManager.getNodeStatistics(channel.getInetAddress());
173177
lastInteractiveTime = System.currentTimeMillis();
174-
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), 2);
175-
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
176-
p2pRateLimiter.register(P2P_DISCONNECT.asByte(), 1);
178+
p2pRateLimiter.register(SYNC_BLOCK_CHAIN.asByte(), SYNC_BLOCK_CHAIN_RATE);
179+
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), FETCH_INV_DATA_RATE);
180+
p2pRateLimiter.register(P2P_DISCONNECT.asByte(), P2P_DISCONNECT_RATE);
177181
}
178182

179183
public void setBlockBothHave(BlockId blockId) {

framework/src/test/java/org/tron/core/net/messagehandler/FetchInvDataMsgHandlerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public void testProcessMessage() throws Exception {
5858
Mockito.when(advService.getMessage(new Item(blockId, Protocol.Inventory.InventoryType.BLOCK)))
5959
.thenReturn(new BlockMessage(blockCapsule));
6060
ReflectUtils.setFieldValue(fetchInvDataMsgHandler, "advService", advService);
61+
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
62+
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
63+
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);
6164

6265
fetchInvDataMsgHandler.processMessage(peer,
6366
new FetchInvDataMessage(blockIds, Protocol.Inventory.InventoryType.BLOCK));
@@ -77,6 +80,9 @@ public void testSyncFetchCheck() {
7780
Cache<Item, Long> advInvSpread = CacheBuilder.newBuilder().maximumSize(100)
7881
.expireAfterWrite(1, TimeUnit.HOURS).recordStats().build();
7982
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
83+
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
84+
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 2);
85+
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);
8086

8187
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();
8288

@@ -113,6 +119,7 @@ public void testRateLimiter() {
113119
Mockito.when(peer.getAdvInvSpread()).thenReturn(advInvSpread);
114120
P2pRateLimiter p2pRateLimiter = new P2pRateLimiter();
115121
p2pRateLimiter.register(FETCH_INV_DATA.asByte(), 1);
122+
p2pRateLimiter.acquire(FETCH_INV_DATA.asByte());
116123
Mockito.when(peer.getP2pRateLimiter()).thenReturn(p2pRateLimiter);
117124
FetchInvDataMsgHandler fetchInvDataMsgHandler = new FetchInvDataMsgHandler();
118125

framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public void init() throws Exception {
5555
@Test
5656
public void testProcessMessage() throws Exception {
5757
try {
58+
peer.setRemainNum(1);
5859
handler.processMessage(peer, new SyncBlockChainMessage(new ArrayList<>()));
5960
} catch (P2pException e) {
6061
Assert.assertEquals("SyncBlockChain blockIds is empty", e.getMessage());
@@ -71,6 +72,10 @@ public void testProcessMessage() throws Exception {
7172
Assert.assertNotNull(message.toString());
7273
Assert.assertNotNull(((BlockInventoryMessage) message).getAnswerMessage());
7374
Assert.assertFalse(f);
75+
method.invoke(handler, peer, message);
76+
method.invoke(handler, peer, message);
77+
f = (boolean)method.invoke(handler, peer, message);
78+
Assert.assertFalse(f);
7479

7580
Method method1 = handler.getClass().getDeclaredMethod(
7681
"getLostBlockIds", List.class, BlockId.class);

0 commit comments

Comments
 (0)