Skip to content

Commit 7793c3b

Browse files
wbclaude
authored andcommitted
feat(net): optimize disconnectRandom by tracking block receive time per peer
- Add blockRcvTime/blockRcvTimeCmp fields to PeerConnection to track when a peer last delivered a valid block - Set blockRcvTime in BlockMsgHandler after each block is received - Fix lastInteractiveTime update in InventoryMsgHandler: only update for block inventories above current head block num, preventing attackers from forging activity via stale block hashes - Add getRandomDisconnectionPeers() to ResilienceService: narrows the disconnect candidate pool to the oldest half by blockRcvTime, so peers that recently delivered blocks are protected from random eviction Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 4f41f26 commit 7793c3b

9 files changed

Lines changed: 311 additions & 8 deletions

File tree

framework/src/main/java/org/tron/core/net/messagehandler/BlockMsgHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ private void processBlock(PeerConnection peer, BlockCapsule block) throws P2pExc
150150

151151
try {
152152
tronNetDelegate.processBlock(block, false);
153+
peer.setBlockRcvTime(System.currentTimeMillis());
153154
witnessProductBlockService.validWitnessProductTwoBlock(block);
154155

155156
Item item = new Item(blockId, InventoryType.BLOCK);

framework/src/main/java/org/tron/core/net/messagehandler/InventoryMsgHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.springframework.beans.factory.annotation.Autowired;
77
import org.springframework.stereotype.Component;
88
import org.tron.common.utils.Sha256Hash;
9+
import org.tron.core.capsule.BlockCapsule.BlockId;
910
import org.tron.core.config.args.Args;
1011
import org.tron.core.exception.P2pException;
1112
import org.tron.core.exception.P2pException.TypeEnum;
@@ -44,7 +45,10 @@ public void processMessage(PeerConnection peer, TronMessage msg) throws P2pExcep
4445
peer.getAdvInvReceive().put(item, System.currentTimeMillis());
4546
advService.addInv(item);
4647
if (type.equals(InventoryType.BLOCK) && peer.getAdvInvSpread().getIfPresent(item) == null) {
47-
peer.setLastInteractiveTime(System.currentTimeMillis());
48+
long headNum = tronNetDelegate.getHeadBlockId().getNum();
49+
if (new BlockId(id).getNum() > headNum) {
50+
peer.setLastInteractiveTime(System.currentTimeMillis());
51+
}
4852
}
4953
}
5054
}

framework/src/main/java/org/tron/core/net/peer/PeerConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public class PeerConnection {
8888
@Setter
8989
private volatile long lastInteractiveTime;
9090

91+
@Setter
92+
@Getter
93+
private volatile long blockRcvTime;
94+
9195
@Getter
9296
@Setter
9397
private volatile TronState tronState = TronState.INIT;

framework/src/main/java/org/tron/core/net/service/effective/ResilienceService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import static org.tron.common.math.Maths.ceil;
44
import static org.tron.common.math.Maths.max;
55

6+
import java.util.ArrayList;
67
import java.util.Comparator;
78
import java.util.HashMap;
9+
import java.util.IdentityHashMap;
810
import java.util.List;
911
import java.util.Map;
1012
import java.util.Optional;
@@ -44,7 +46,7 @@ public class ResilienceService {
4446

4547
@Autowired
4648
private ChainBaseManager chainBaseManager;
47-
49+
4850
public void init() {
4951
if (Args.getInstance().isOpenFullTcpDisconnect) {
5052
executor.scheduleWithFixedDelay(() -> {
@@ -86,6 +88,7 @@ private void disconnectRandom() {
8688
.collect(Collectors.toList());
8789

8890
if (peers.size() >= minBroadcastPeerSize) {
91+
peers = getRandomDisconnectionPeers(peers);
8992
long now = System.currentTimeMillis();
9093
Map<Object, Integer> weights = new HashMap<>();
9194
peers.forEach(peer -> {
@@ -121,6 +124,14 @@ private void disconnectRandom() {
121124
}
122125

123126

127+
private List<PeerConnection> getRandomDisconnectionPeers(List<PeerConnection> peers) {
128+
Map<PeerConnection, Long> snapshot = new IdentityHashMap<>(peers.size());
129+
peers.forEach(p -> snapshot.put(p, p.getBlockRcvTime()));
130+
List<PeerConnection> sorted = new ArrayList<>(peers);
131+
sorted.sort(Comparator.comparingLong(snapshot::get));
132+
return sorted.subList(0, sorted.size() / 2);
133+
}
134+
124135
private void disconnectLan() {
125136
if (!isLanNode()) {
126137
return;

framework/src/main/java/org/tron/core/net/service/sync/SyncService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ private void processSyncBlock(BlockCapsule block, PeerConnection peerConnection)
337337
try {
338338
tronNetDelegate.validSignature(block);
339339
tronNetDelegate.processBlock(block, true);
340+
peerConnection.setBlockRcvTime(System.currentTimeMillis());
340341
pbftDataSyncHandler.processPBFTCommitData(block);
341342
} catch (P2pException p2pException) {
342343
logger.error("Process sync block {} failed, type: {}",

framework/src/test/java/org/tron/core/net/messagehandler/BlockMsgHandlerTest.java

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public void testProcessMessage() {
122122
try {
123123
blockCapsule = new BlockCapsule(1, Sha256Hash.ZERO_HASH,
124124
System.currentTimeMillis() + 1000, Sha256Hash.ZERO_HASH.getByteString());
125+
blockCapsule.setMerkleRoot();
125126
msg = new BlockMessage(blockCapsule);
126127
peer.getAdvInvRequest()
127128
.put(new Item(msg.getBlockId(), InventoryType.BLOCK), System.currentTimeMillis());
@@ -132,12 +133,12 @@ public void testProcessMessage() {
132133
}
133134

134135
@Test
135-
public void testProcessBlock() {
136+
public void testProcessBlock() throws Exception {
136137
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
137-
138+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
139+
field.setAccessible(true);
140+
Object origin = field.get(handler);
138141
try {
139-
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
140-
field.setAccessible(true);
141142
field.set(handler, tronNetDelegate);
142143

143144
BlockCapsule blockCapsule0 = new BlockCapsule(1,
@@ -164,8 +165,74 @@ public void testProcessBlock() {
164165
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
165166
method.setAccessible(true);
166167
method.invoke(handler, peer, blockCapsule0);
167-
} catch (Exception e) {
168-
Assert.fail();
168+
} finally {
169+
field.set(handler, origin);
170+
}
171+
}
172+
173+
@Test
174+
public void testBlockRcvTimeSetAfterProcessBlockSuccess() throws Exception {
175+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
176+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
177+
field.setAccessible(true);
178+
Object origin = field.get(handler);
179+
try {
180+
field.set(handler, tronNetDelegate);
181+
182+
BlockCapsule blockCapsule = new BlockCapsule(1,
183+
Sha256Hash.wrap(ByteString.copyFrom(ByteArray.fromHexString(
184+
"9938a342238077182498b464ac0292229938a342238077182498b464ac029222"))),
185+
1234, ByteString.copyFrom("1234567".getBytes()));
186+
187+
Mockito.doReturn(true).when(tronNetDelegate).validBlock(any(BlockCapsule.class));
188+
Mockito.doReturn(true).when(tronNetDelegate).containBlock(any(BlockId.class));
189+
Mockito.doReturn(blockCapsule.getBlockId()).when(tronNetDelegate).getHeadBlockId();
190+
Mockito.doNothing().when(tronNetDelegate).processBlock(any(BlockCapsule.class), anyBoolean());
191+
Mockito.doReturn(new ArrayList<PeerConnection>()).when(tronNetDelegate).getActivePeer();
192+
193+
peer.setBlockRcvTime(0L);
194+
Method method = handler.getClass()
195+
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
196+
method.setAccessible(true);
197+
198+
long before = System.currentTimeMillis();
199+
method.invoke(handler, peer, blockCapsule);
200+
long after = System.currentTimeMillis();
201+
202+
Assert.assertTrue("blockRcvTime should be set after successful processBlock",
203+
peer.getBlockRcvTime() >= before && peer.getBlockRcvTime() <= after);
204+
} finally {
205+
field.set(handler, origin);
206+
}
207+
}
208+
209+
@Test
210+
public void testBlockRcvTimeNotSetWhenValidationFails() throws Exception {
211+
TronNetDelegate tronNetDelegate = Mockito.mock(TronNetDelegate.class);
212+
Field field = handler.getClass().getDeclaredField("tronNetDelegate");
213+
field.setAccessible(true);
214+
Object origin = field.get(handler);
215+
try {
216+
field.set(handler, tronNetDelegate);
217+
218+
BlockCapsule blockCapsule = new BlockCapsule(1,
219+
Sha256Hash.wrap(ByteString.copyFrom(ByteArray.fromHexString(
220+
"9938a342238077182498b464ac0292229938a342238077182498b464ac029222"))),
221+
1234, ByteString.copyFrom("1234567".getBytes()));
222+
223+
// validBlock returns false → processBlock short-circuits before setBlockRcvTime
224+
Mockito.doReturn(false).when(tronNetDelegate).validBlock(any(BlockCapsule.class));
225+
226+
peer.setBlockRcvTime(0L);
227+
Method method = handler.getClass()
228+
.getDeclaredMethod("processBlock", PeerConnection.class, BlockCapsule.class);
229+
method.setAccessible(true);
230+
method.invoke(handler, peer, blockCapsule);
231+
232+
Assert.assertEquals("blockRcvTime must stay 0 when block fails validation",
233+
0L, peer.getBlockRcvTime());
234+
} finally {
235+
field.set(handler, origin);
169236
}
170237
}
171238
}

framework/src/test/java/org/tron/core/net/messagehandler/InventoryMsgHandlerTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.tron.core.net.messagehandler;
22

3+
import static org.mockito.ArgumentMatchers.any;
34
import static org.mockito.Mockito.mock;
45

56
import java.lang.reflect.Field;
@@ -12,11 +13,13 @@
1213
import org.mockito.Mockito;
1314
import org.tron.common.TestConstants;
1415
import org.tron.common.utils.Sha256Hash;
16+
import org.tron.core.capsule.BlockCapsule.BlockId;
1517
import org.tron.core.config.args.Args;
1618
import org.tron.core.exception.P2pException;
1719
import org.tron.core.net.TronNetDelegate;
1820
import org.tron.core.net.message.adv.InventoryMessage;
1921
import org.tron.core.net.peer.PeerConnection;
22+
import org.tron.core.net.service.adv.AdvService;
2023
import org.tron.p2p.connection.Channel;
2124
import org.tron.protos.Protocol.Inventory.InventoryType;
2225

@@ -74,6 +77,85 @@ public void testDuplicateHashesRejected() throws Exception {
7477
}
7578
}
7679

80+
@Test
81+
public void testLastInteractiveTimeNotUpdatedForSolidifiedBlock() throws Exception {
82+
InventoryMsgHandler handler = new InventoryMsgHandler();
83+
Args.setParam(new String[]{}, TestConstants.TEST_CONF);
84+
85+
TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
86+
AdvService advService = mock(AdvService.class);
87+
Mockito.when(advService.addInv(any())).thenReturn(true);
88+
// block num 100 is at head boundary — should NOT update
89+
Mockito.when(tronNetDelegate.getHeadBlockId())
90+
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 100L));
91+
92+
Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
93+
delegateField.setAccessible(true);
94+
delegateField.set(handler, tronNetDelegate);
95+
Field advField = handler.getClass().getDeclaredField("advService");
96+
advField.setAccessible(true);
97+
advField.set(handler, advService);
98+
99+
PeerConnection peer = new PeerConnection();
100+
peer.setChannel(getChannel("1.0.0.4", 1001));
101+
peer.setNeedSyncFromPeer(false);
102+
peer.setNeedSyncFromUs(false);
103+
peer.setLastInteractiveTime(0L);
104+
105+
// Block hash encodes num=100 (at solidified boundary — should NOT update)
106+
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
107+
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);
108+
handler.processMessage(peer, msg);
109+
110+
Assert.assertEquals("lastInteractiveTime should NOT be updated for solidified block",
111+
0L, peer.getLastInteractiveTime());
112+
}
113+
114+
@Test
115+
public void testLastInteractiveTimeUpdatedForBothPeersWithSameAboveSolidifiedBlock()
116+
throws Exception {
117+
InventoryMsgHandler handler = new InventoryMsgHandler();
118+
Args.setParam(new String[]{}, TestConstants.TEST_CONF);
119+
120+
TronNetDelegate tronNetDelegate = mock(TronNetDelegate.class);
121+
AdvService advService = mock(AdvService.class);
122+
// First call returns true (peer1), second call returns false (peer2 — already in cache)
123+
Mockito.when(advService.addInv(any())).thenReturn(true).thenReturn(false);
124+
Mockito.when(tronNetDelegate.getHeadBlockId())
125+
.thenReturn(new BlockId(Sha256Hash.ZERO_HASH, 99L));
126+
127+
Field delegateField = handler.getClass().getDeclaredField("tronNetDelegate");
128+
delegateField.setAccessible(true);
129+
delegateField.set(handler, tronNetDelegate);
130+
Field advField = handler.getClass().getDeclaredField("advService");
131+
advField.setAccessible(true);
132+
advField.set(handler, advService);
133+
134+
PeerConnection peer1 = new PeerConnection();
135+
peer1.setChannel(getChannel("1.0.0.5", 1002));
136+
peer1.setNeedSyncFromPeer(false);
137+
peer1.setNeedSyncFromUs(false);
138+
peer1.setLastInteractiveTime(0L);
139+
140+
PeerConnection peer2 = new PeerConnection();
141+
peer2.setChannel(getChannel("1.0.0.6", 1003));
142+
peer2.setNeedSyncFromPeer(false);
143+
peer2.setNeedSyncFromUs(false);
144+
peer2.setLastInteractiveTime(0L);
145+
146+
// block num 100 > solidified 99 — both peers should update
147+
Sha256Hash blockHash = new BlockId(Sha256Hash.ZERO_HASH, 100L);
148+
InventoryMessage msg = new InventoryMessage(Arrays.asList(blockHash), InventoryType.BLOCK);
149+
150+
handler.processMessage(peer1, msg);
151+
handler.processMessage(peer2, msg);
152+
153+
Assert.assertTrue("peer1 lastInteractiveTime should be updated",
154+
peer1.getLastInteractiveTime() > 0L);
155+
Assert.assertTrue("peer2 lastInteractiveTime should be updated even when addInv returns false",
156+
peer2.getLastInteractiveTime() > 0L);
157+
}
158+
77159
private Channel getChannel(String host, int port) throws Exception {
78160
Channel channel = new Channel();
79161
InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);

framework/src/test/java/org/tron/core/net/services/ResilienceServiceTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.io.IOException;
88
import java.net.InetSocketAddress;
99
import java.util.HashSet;
10+
import java.util.List;
1011
import java.util.Set;
1112
import javax.annotation.Resource;
1213
import org.junit.After;
@@ -97,6 +98,57 @@ public void testDisconnectRandom() {
9798
Assert.assertEquals(maxConnection - 1, PeerManager.getPeers().size());
9899
}
99100

101+
@Test
102+
public void testDisconnectRandomPreservesRecentBlockRcvTimePeer() {
103+
int maxConnection = 30;
104+
Assert.assertEquals(0, PeerManager.getPeers().size());
105+
106+
ApplicationContext ctx = (ApplicationContext) ReflectUtils.getFieldObject(p2pEventHandler,
107+
"ctx");
108+
109+
// Create maxConnection + 1 peers (triggers disconnectRandom)
110+
for (int i = 0; i < maxConnection + 1; i++) {
111+
InetSocketAddress inetSocketAddress = new InetSocketAddress("202.0.0." + i, 10001);
112+
Channel c1 = spy(Channel.class);
113+
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
114+
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
115+
ReflectUtils.setFieldValue(c1, "ctx", spy(ChannelHandlerContext.class));
116+
Mockito.doNothing().when(c1).send((byte[]) any());
117+
PeerManager.add(ctx, c1);
118+
}
119+
120+
// Set first minBroadcastPeerSize peers as broadcast-state
121+
List<PeerConnection> peers = PeerManager.getPeers();
122+
for (PeerConnection peer : peers.subList(0, ResilienceService.minBroadcastPeerSize)) {
123+
peer.setNeedSyncFromPeer(false);
124+
peer.setNeedSyncFromUs(false);
125+
peer.setLastInteractiveTime(System.currentTimeMillis() - 1000);
126+
}
127+
for (PeerConnection peer : peers.subList(ResilienceService.minBroadcastPeerSize,
128+
maxConnection + 1)) {
129+
peer.setNeedSyncFromPeer(false);
130+
peer.setNeedSyncFromUs(true);
131+
}
132+
133+
// Give the LAST broadcast peer a very recent blockRcvTime — it must NOT be disconnected
134+
PeerConnection bestPeer = peers.stream()
135+
.filter(p -> !p.isNeedSyncFromUs() && !p.isNeedSyncFromPeer())
136+
.reduce((a, b) -> b) // last broadcast peer
137+
.orElseThrow(() -> new AssertionError("no broadcast peer"));
138+
bestPeer.setBlockRcvTime(System.currentTimeMillis());
139+
140+
InetSocketAddress bestPeerAddress = bestPeer.getChannel().getInetSocketAddress();
141+
142+
// With minBroadcastPeerSize=3 broadcast peers, getRandomDisconnectionPeers returns
143+
// the 1 peer with oldest blockRcvTime (0). bestPeer has most recent time → exempt.
144+
ReflectUtils.invokeMethod(service, "disconnectRandom");
145+
146+
boolean bestPeerStillConnected = PeerManager.getPeers().stream()
147+
.anyMatch(p -> p.getChannel().getInetSocketAddress().equals(bestPeerAddress));
148+
Assert.assertTrue("Peer with most recent blockRcvTime should not be disconnected",
149+
bestPeerStillConnected);
150+
}
151+
100152
@Test
101153
public void testDisconnectLan() {
102154
int minConnection = 8;

0 commit comments

Comments
 (0)