Skip to content

Commit 2fd9db2

Browse files
wbclaude
authored andcommitted
feat(net): optimize disconnectRandom by tracking block receive time per peer
- Add blockRcvTime/blockRcvTimeCmp fields to PeerConnection to track when a peer last delivered a valid block - Set blockRcvTime in BlockMsgHandler after each block is received - Fix lastInteractiveTime update in InventoryMsgHandler: only update for block inventories above current head block num, preventing attackers from forging activity via stale block hashes - Add getRandomDisconnectionPeers() to ResilienceService: narrows the disconnect candidate pool to the oldest half by blockRcvTime, so peers that recently delivered blocks are protected from random eviction Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 4f41f26 commit 2fd9db2

13 files changed

Lines changed: 332 additions & 20 deletions

File tree

framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pExc
150150

151151
try {
152152
tronNetDelegate.processBlock(block, false);
153+
peer.setBlockRcvTime(System.currentTimeMillis());
153154
witnessProductBlockService.validWitnessProductTwoBlock(block);
154155

155156
Item item = new Item(blockId, InventoryType.BLOCK);

framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.springframework.beans.factory.annotation.Autowired;
77
import org.springframework.stereotype.Component;
88
import org.tron.common.utils.Sha256Hash;
9+
import org.tron.core.capsule.BlockCapsule.BlockId;
910
import org.tron.core.config.args.Args;
1011
import org.tron.core.exception.P2pException;
1112
import org.tron.core.exception.P2pException.TypeEnum;
@@ -44,7 +45,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
4445
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
4546
advService.addInv(item);
4647
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
47-
peer.setLastInteractiveTime(System.currentTimeMillis());
48+
long headNum = tronNetDelegate.getHeadBlockId().getNum();
49+
if (new BlockId(id).getNum() > headNum) {
50+
peer.setLastInteractiveTime(System.currentTimeMillis());
51+
}
4852
}
4953
}
5054
}

framework/src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public class PeerConnection {
8888
@Setter
8989
private volatile long lastInteractiveTime;
9090

91+
@Setter
92+
@Getter
93+
private volatile long blockRcvTime;
94+
9195
@Getter
9296
@Setter
9397
private volatile TronState tronState = TronState.INIT;

framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import static org.tron.common.math.Maths.ceil;
44
import static org.tron.common.math.Maths.max;
55

6+
import java.util.ArrayList;
67
import java.util.Comparator;
78
import java.util.HashMap;
9+
import java.util.IdentityHashMap;
810
import java.util.List;
911
import java.util.Map;
1012
import java.util.Optional;
@@ -44,7 +46,7 @@ public class ResilienceService {
4446

4547
@Autowired
4648
private ChainBaseManager chainBaseManager;
47-
49+
4850
public void init() {
4951
if (Args.getInstance().isOpenFullTcpDisconnect) {
5052
executor.scheduleWithFixedDelay(() -> {
@@ -86,6 +88,7 @@ private void disconnectRandom() {
8688
.collect(Collectors.toList());
8789

8890
if (peers.size() >= minBroadcastPeerSize) {
91+
peers = getRandomDisconnectionPeers(peers);
8992
long now = System.currentTimeMillis();
9093
Map<Object, Integer> weights = new HashMap<>();
9194
peers.forEach(peer -> {
@@ -121,6 +124,14 @@ private void disconnectRandom() {
121124
}
122125

123126

127+
private List<PeerConnection> getRandomDisconnectionPeers(List<PeerConnection> peers) {
128+
Map<PeerConnection, Long> snapshot = new IdentityHashMap<>(peers.size());
129+
peers.forEach(p -> snapshot.put(p, p.getBlockRcvTime()));
130+
List<PeerConnection> sorted = new ArrayList<>(peers);
131+
sorted.sort(Comparator.comparingLong(snapshot::get));
132+
return sorted.subList(0, sorted.size() / 2);
133+
}
134+
124135
private void disconnectLan() {
125136
if (!isLanNode()) {
126137
return;

framework/src/main/java/org/tron/core/net/service/sync/SyncService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ private void processSyncBlock(BlockCapsule block, PeerConnection peerConnection)
337337
try {
338338
tronNetDelegate.validSignature(block);
339339
tronNetDelegate.processBlock(block, true);
340+
peerConnection.setBlockRcvTime(System.currentTimeMillis());
340341
pbftDataSyncHandler.processPBFTCommitData(block);
341342
} catch (P2pException p2pException) {
342343
logger.error("Process sync block {} failed, type: {}",

framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import lombok.extern.slf4j.Slf4j;
1616
import org.junit.Assert;
1717
import org.junit.Before;
18-
import org.junit.BeforeClass;
1918
import org.junit.Test;
2019
import org.mockito.Mockito;
2120
import org.tron.common.BaseTest;
@@ -39,20 +38,16 @@
3938
@Slf4j
4039
public class BlockMsgHandlerTest extends BaseTest {
4140

41+
static {
42+
Args.setParam(new String[] {"--output-directory", dbPath(), "--debug"},
43+
TestConstants.TEST_CONF);
44+
}
45+
4246
@Resource
4347
private BlockMsgHandler handler;
4448
@Resource
4549
private PeerConnection peer;
4650

47-
/**
48-
* init context.
49-
*/
50-
@BeforeClass
51-
public static void init() {
52-
Args.setParam(new String[] {"--output-directory", dbPath(), "--debug"},
53-
TestConstants.TEST_CONF);
54-
}
55-
5651
@Before
5752
public void before() throws Exception {
5853
Channel c1 = new Channel();
@@ -122,6 +117,7 @@ public void testProcessMessage() {
122117
try {
123118
blockCapsule = new BlockCapsule(1, Sha256Hash.ZERO_HASH,
124119
System.currentTimeMillis() + 1000, Sha256Hash.ZERO_HASH.getByteString());
120+
blockCapsule.setMerkleRoot();
125121
msg = new BlockMessage(blockCapsule);
126122
peer.getAdvInvRequest()
127123
.put(new Item(msg.getBlockId(), InventoryType.BLOCK), System.currentTimeMillis());
@@ -132,12 +128,12 @@ public void testProcessMessage() {
132128
}
133129

134130
@Test
135-
public void testProcessBlock() {
131+
public void testProcessBlock() throws Exception {
136132
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
137-
133+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
134+
field.setAccessible(true);
135+
Object origin = field.get(handler);
138136
try {
139-
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
140-
field.setAccessible(true);
141137
field.set(handler, tronNetDelegate);
142138

143139
BlockCapsule blockCapsule0 = new BlockCapsule(1,
@@ -164,8 +160,74 @@ public void testProcessBlock() {
164160
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
165161
method.setAccessible(true);
166162
method.invoke(handler, peer, blockCapsule0);
167-
} catch (Exception e) {
168-
Assert.fail();
163+
} finally {
164+
field.set(handler, origin);
165+
}
166+
}
167+
168+
@Test
169+
public void testBlockRcvTimeSetAfterProcessBlockSuccess() throws Exception {
170+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
171+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
172+
field.setAccessible(true);
173+
Object origin = field.get(handler);
174+
try {
175+
field.set(handler, tronNetDelegate);
176+
177+
BlockCapsule blockCapsule = new BlockCapsule(1,
178+
Sha256Hash.wrap(ByteString.copyFrom(ByteArray.fromHexString(
179+
"9938a342238077182498b464ac0292229938a342238077182498b464ac029222"))),
180+
1234, ByteString.copyFrom("1234567".getBytes()));
181+
182+
Mockito.doReturn(true).when(tronNetDelegate).validBlock(any(BlockCapsule.class));
183+
Mockito.doReturn(true).when(tronNetDelegate).containBlock(any(BlockId.class));
184+
Mockito.doReturn(blockCapsule.getBlockId()).when(tronNetDelegate).getHeadBlockId();
185+
Mockito.doNothing().when(tronNetDelegate).processBlock(any(BlockCapsule.class), anyBoolean());
186+
Mockito.doReturn(new ArrayList<PeerConnection>()).when(tronNetDelegate).getActivePeer();
187+
188+
peer.setBlockRcvTime(0L);
189+
Method method = handler.getClass()
190+
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
191+
method.setAccessible(true);
192+
193+
long before = System.currentTimeMillis();
194+
method.invoke(handler, peer, blockCapsule);
195+
long after = System.currentTimeMillis();
196+
197+
Assert.assertTrue("blockRcvTime should be set after successful processBlock",
198+
peer.getBlockRcvTime() >= before && peer.getBlockRcvTime() <= after);
199+
} finally {
200+
field.set(handler, origin);
201+
}
202+
}
203+
204+
@Test
205+
public void testBlockRcvTimeNotSetWhenValidationFails() throws Exception {
206+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
207+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
208+
field.setAccessible(true);
209+
Object origin = field.get(handler);
210+
try {
211+
field.set(handler, tronNetDelegate);
212+
213+
BlockCapsule blockCapsule = new BlockCapsule(1,
214+
Sha256Hash.wrap(ByteString.copyFrom(ByteArray.fromHexString(
215+
"9938a342238077182498b464ac0292229938a342238077182498b464ac029222"))),
216+
1234, ByteString.copyFrom("1234567".getBytes()));
217+
218+
// validBlock returns false → processBlock short-circuits before setBlockRcvTime
219+
Mockito.doReturn(false).when(tronNetDelegate).validBlock(any(BlockCapsule.class));
220+
221+
peer.setBlockRcvTime(0L);
222+
Method method = handler.getClass()
223+
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
224+
method.setAccessible(true);
225+
method.invoke(handler, peer, blockCapsule);
226+
227+
Assert.assertEquals("blockRcvTime must stay 0 when block fails validation",
228+
0L, peer.getBlockRcvTime());
229+
} finally {
230+
field.set(handler, origin);
169231
}
170232
}
171233
}

framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.tron.core.net.messagehandler;
22

3+
import static org.mockito.ArgumentMatchers.any;
34
import static org.mockito.Mockito.mock;
45

56
import java.lang.reflect.Field;
@@ -12,20 +13,25 @@
1213
import org.mockito.Mockito;
1314
import org.tron.common.TestConstants;
1415
import org.tron.common.utils.Sha256Hash;
16+
import org.tron.core.capsule.BlockCapsule.BlockId;
1517
import org.tron.core.config.args.Args;
1618
import org.tron.core.exception.P2pException;
1719
import org.tron.core.net.TronNetDelegate;
1820
import org.tron.core.net.message.adv.InventoryMessage;
1921
import org.tron.core.net.peer.PeerConnection;
22+
import org.tron.core.net.service.adv.AdvService;
2023
import org.tron.p2p.connection.Channel;
2124
import org.tron.protos.Protocol.Inventory.InventoryType;
2225

2326
public class InventoryMsgHandlerTest {
2427

28+
static {
29+
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
30+
}
31+
2532
@Test
2633
public void testProcessMessage() throws Exception {
2734
InventoryMsgHandler handler = new InventoryMsgHandler();
28-
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
2935
Args.logConfig();
3036

3137
InventoryMessage msg = new InventoryMessage(new ArrayList<>(), InventoryType.TRX);
@@ -59,7 +65,6 @@ public void testProcessMessage() throws Exception {
5965
@Test
6066
public void testDuplicateHashesRejected() throws Exception {
6167
InventoryMsgHandler handler = new InventoryMsgHandler();
62-
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
6368

6469
Sha256Hash hash = Sha256Hash.wrap(new byte[32]);
6570
InventoryMessage msg = new InventoryMessage(Arrays.asList(hash, hash), InventoryType.TRX);
@@ -74,6 +79,83 @@ public void testDuplicateHashesRejected() throws Exception {
7479
}
7580
}
7681

82+
@Test
83+
public void testLastInteractiveTimeNotUpdatedForSolidifiedBlock() throws Exception {
84+
InventoryMsgHandler handler = new InventoryMsgHandler();
85+
86+
TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
87+
AdvService advService = mock(AdvService.class);
88+
Mockito.when(advService.addInv(any())).thenReturn(true);
89+
// block num 100 is at head boundary — should NOT update
90+
Mockito.when(tronNetDelegate.getHeadBlockId())
91+
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 100L));
92+
93+
Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
94+
delegateField.setAccessible(true);
95+
delegateField.set(handler, tronNetDelegate);
96+
Field advField = handler.getClass().getDeclaredField("advService");
97+
advField.setAccessible(true);
98+
advField.set(handler, advService);
99+
100+
PeerConnection peer = new PeerConnection();
101+
peer.setChannel(getChannel("1.0.0.4", 1001));
102+
peer.setNeedSyncFromPeer(false);
103+
peer.setNeedSyncFromUs(false);
104+
peer.setLastInteractiveTime(0L);
105+
106+
// Block hash encodes num=100 (at solidified boundary — should NOT update)
107+
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
108+
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);
109+
handler.processMessage(peer, msg);
110+
111+
Assert.assertEquals("lastInteractiveTime should NOT be updated for solidified block",
112+
0L, peer.getLastInteractiveTime());
113+
}
114+
115+
@Test
116+
public void testLastInteractiveTimeUpdatedForBothPeersWithSameAboveSolidifiedBlock()
117+
throws Exception {
118+
InventoryMsgHandler handler = new InventoryMsgHandler();
119+
120+
TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
121+
AdvService advService = mock(AdvService.class);
122+
// First call returns true (peer1), second call returns false (peer2 — already in cache)
123+
Mockito.when(advService.addInv(any())).thenReturn(true).thenReturn(false);
124+
Mockito.when(tronNetDelegate.getHeadBlockId())
125+
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 99L));
126+
127+
Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
128+
delegateField.setAccessible(true);
129+
delegateField.set(handler, tronNetDelegate);
130+
Field advField = handler.getClass().getDeclaredField("advService");
131+
advField.setAccessible(true);
132+
advField.set(handler, advService);
133+
134+
PeerConnection peer1 = new PeerConnection();
135+
peer1.setChannel(getChannel("1.0.0.5", 1002));
136+
peer1.setNeedSyncFromPeer(false);
137+
peer1.setNeedSyncFromUs(false);
138+
peer1.setLastInteractiveTime(0L);
139+
140+
PeerConnection peer2 = new PeerConnection();
141+
peer2.setChannel(getChannel("1.0.0.6", 1003));
142+
peer2.setNeedSyncFromPeer(false);
143+
peer2.setNeedSyncFromUs(false);
144+
peer2.setLastInteractiveTime(0L);
145+
146+
// block num 100 > solidified 99 — both peers should update
147+
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
148+
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);
149+
150+
handler.processMessage(peer1, msg);
151+
handler.processMessage(peer2, msg);
152+
153+
Assert.assertTrue("peer1 lastInteractiveTime should be updated",
154+
peer1.getLastInteractiveTime() > 0L);
155+
Assert.assertTrue("peer2 lastInteractiveTime should be updated even when addInv returns false",
156+
peer2.getLastInteractiveTime() > 0L);
157+
}
158+
77159
private Channel getChannel(String host, int port) throws Exception {
78160
Channel channel = new Channel();
79161
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);

framework/src/test/java/org/tron/core/net/messagehandler/MessageHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class MessageHandlerTest {
4242
@ClassRule
4343
public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
4444

45+
static {
46+
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
47+
}
4548

4649
@BeforeClass
4750
public static void init() throws Exception {

framework/src/test/java/org/tron/core/net/messagehandler/PbftMsgHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ public class PbftMsgHandlerTest {
4343
private PeerConnection peer;
4444
private static String dbPath = "output-pbft-message-handler-test";
4545

46+
static {
47+
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
48+
}
4649

4750
@BeforeClass
4851
public static void init() {

framework/src/test/java/org/tron/core/net/messagehandler/SyncBlockChainMsgHandlerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public static String dbPath() {
4646
return null;
4747
}
4848

49+
static {
50+
Args.setParam(new String[] {}, TestConstants.TEST_CONF);
51+
}
52+
4953
@BeforeClass
5054
public static void before() {
5155
Args.setParam(new String[] {"--output-directory", dbPath()}, TestConstants.TEST_CONF);

0 commit comments

Comments
 (0)