Skip to content

Commit fe25380

Browse files
authored
feat(sync): reduce memory pressure by deferring block deserialization and throttling in-flight requests (#6717)
1 parent 55da98e commit fe25380

8 files changed

Lines changed: 177 additions & 33 deletions

File tree

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ public class CommonParameter {
162162
@Getter
163163
@Setter
164164
public long syncFetchBatchNum; // clearParam: 2000
165+
@Getter
166+
@Setter
167+
public int maxPendingBlockSize;
165168

166169
// If you are running a solidity node for java tron,
167170
// this flag is set to true

common/src/main/java/org/tron/core/config/args/NodeConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class NodeConfig {
2828
private String trustNode = "";
2929
private boolean walletExtensionApi = false;
3030
private int syncFetchBatchNum = 2000;
31+
private int maxPendingBlockSize = 500;
3132
private int validateSignThreadNum = 0; // 0 = auto (availableProcessors)
3233
private int maxConnections = 30;
3334
private int minConnections = 8;
@@ -449,6 +450,14 @@ private void postProcess() {
449450
syncFetchBatchNum = 100;
450451
}
451452

453+
// maxPendingBlockSize: clamp to [50, 2000]
454+
if (maxPendingBlockSize > 2000) {
455+
maxPendingBlockSize = 2000;
456+
}
457+
if (maxPendingBlockSize < 50) {
458+
maxPendingBlockSize = 50;
459+
}
460+
452461
// blockProducedTimeOut: clamp to [30, 100]
453462
if (blockProducedTimeOut < 30) {
454463
blockProducedTimeOut = 30;

common/src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ node {
188188

189189
# Number of blocks to fetch in one batch during sync. Range: [100, 2000].
190190
syncFetchBatchNum = 2000
191+
# Max in-flight (requested but not yet processed) blocks during sync. Range: [50, 2000].
192+
maxPendingBlockSize = 500
191193

192194
# Number of validate sign threads, default availableProcessors
193195
# Number of validate sign threads, 0 = auto (availableProcessors)

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -616,6 +616,7 @@ private static void applyNodeConfig(NodeConfig nc) {
616616
PARAMETER.nodeEnableIpv6 = nc.isEnableIpv6();
617617

618618
PARAMETER.syncFetchBatchNum = nc.getSyncFetchBatchNum();
619+
PARAMETER.maxPendingBlockSize = nc.getMaxPendingBlockSize();
619620
PARAMETER.solidityThreads = nc.getSolidityThreads();
620621
PARAMETER.blockProducedTimeOut = nc.getBlockProducedTimeOut();
621622

framework/src/main/java/org/tron/core/net/service/sync/SyncService.java

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.google.common.cache.Cache;
66
import com.google.common.cache.CacheBuilder;
77
import java.util.ArrayList;
8+
import java.util.Collection;
89
import java.util.Collections;
910
import java.util.HashMap;
1011
import java.util.LinkedList;
@@ -44,9 +45,9 @@ public class SyncService {
4445
@Autowired
4546
private PbftDataSyncHandler pbftDataSyncHandler;
4647

47-
private Map<BlockMessage, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>();
48+
private Map<UnparsedBlock, PeerConnection> blockWaitToProcess = new ConcurrentHashMap<>();
4849

49-
private Map<BlockMessage, PeerConnection> blockJustReceived = new ConcurrentHashMap<>();
50+
private Map<UnparsedBlock, PeerConnection> blockJustReceived = new ConcurrentHashMap<>();
5051

5152
private long blockCacheTimeout = Args.getInstance().getBlockCacheTimeout();
5253
private Cache<BlockId, PeerConnection> requestBlockIds = CacheBuilder.newBuilder()
@@ -69,6 +70,10 @@ public class SyncService {
6970

7071
private final long syncFetchBatchNum = Args.getInstance().getSyncFetchBatchNum();
7172

73+
private final int maxPendingBlockSize = Args.getInstance().getMaxPendingBlockSize();
74+
75+
private volatile long maxRequestedBlockNum = 0;
76+
7277
public void init() {
7378
ExecutorServiceManager.scheduleWithFixedDelay(fetchExecutor, () -> {
7479
try {
@@ -135,7 +140,9 @@ public void syncNext(PeerConnection peer) {
135140

136141
public void processBlock(PeerConnection peer, BlockMessage blockMessage) {
137142
synchronized (blockJustReceived) {
138-
blockJustReceived.put(blockMessage, peer);
143+
UnparsedBlock unparsedBlock = new UnparsedBlock(
144+
blockMessage.getBlockId(), blockMessage.getData());
145+
blockJustReceived.put(unparsedBlock, peer);
139146
}
140147
handleFlag = true;
141148
if (peer.isSyncIdle()) {
@@ -227,8 +234,18 @@ private BlockId getBlockIdByNum(long num) throws P2pException {
227234
}
228235

229236
private void startFetchSyncBlock() {
237+
Collection<PeerConnection> activePeers = tronNetDelegate.getActivePeer();
238+
int reqNum = activePeers.stream()
239+
.mapToInt(p -> p.getSyncBlockRequested().size()).sum();
240+
int remainNum;
241+
synchronized (blockJustReceived) {
242+
remainNum = maxPendingBlockSize - reqNum
243+
- blockJustReceived.size() - blockWaitToProcess.size();
244+
}
245+
230246
HashMap<PeerConnection, List<BlockId>> send = new HashMap<>();
231-
tronNetDelegate.getActivePeer().stream()
247+
int[] fetchingBlockSize = {0};
248+
activePeers.stream()
232249
.filter(peer -> peer.isNeedSyncFromPeer() && peer.isSyncIdle())
233250
.filter(peer -> peer.isFetchAble())
234251
.forEach(peer -> {
@@ -238,9 +255,16 @@ private void startFetchSyncBlock() {
238255
for (BlockId blockId : peer.getSyncBlockToFetch()) {
239256
if (requestBlockIds.getIfPresent(blockId) == null
240257
&& !peer.getSyncBlockInProcess().contains(blockId)) {
258+
if (fetchingBlockSize[0] >= remainNum && blockId.getNum() > maxRequestedBlockNum) {
259+
break;
260+
}
261+
if (blockId.getNum() > maxRequestedBlockNum) {
262+
maxRequestedBlockNum = blockId.getNum();
263+
}
241264
requestBlockIds.put(blockId, peer);
242265
peer.getSyncBlockRequested().put(blockId, System.currentTimeMillis());
243266
send.get(peer).add(blockId);
267+
fetchingBlockSize[0]++;
244268
if (send.get(peer).size() >= MAX_BLOCK_FETCH_PER_PEER) {
245269
break;
246270
}
@@ -269,29 +293,37 @@ private synchronized void handleSyncBlock() {
269293

270294
isProcessed[0] = false;
271295

272-
blockWaitToProcess.forEach((msg, peerConnection) -> {
296+
blockWaitToProcess.forEach((unparsedBlock, peerConnection) -> {
273297
synchronized (tronNetDelegate.getBlockLock()) {
298+
BlockId blockId = unparsedBlock.getBlockId();
274299
if (peerConnection.isDisconnect()) {
275-
blockWaitToProcess.remove(msg);
276-
invalid(msg.getBlockId(), peerConnection);
300+
blockWaitToProcess.remove(unparsedBlock);
301+
invalid(blockId, peerConnection);
277302
return;
278303
}
279-
if (msg.getBlockId().getNum() <= solidNum) {
280-
blockWaitToProcess.remove(msg);
281-
peerConnection.getSyncBlockInProcess().remove(msg.getBlockId());
304+
if (blockId.getNum() <= solidNum) {
305+
blockWaitToProcess.remove(unparsedBlock);
306+
peerConnection.getSyncBlockInProcess().remove(blockId);
282307
return;
283308
}
284309
final boolean[] isFound = {false};
285310
tronNetDelegate.getActivePeer().stream()
286-
.filter(peer -> msg.getBlockId().equals(peer.getSyncBlockToFetch().peek()))
311+
.filter(peer -> blockId.equals(peer.getSyncBlockToFetch().peek()))
287312
.forEach(peer -> {
288313
isFound[0] = true;
289314
});
290315
if (isFound[0]) {
291-
blockWaitToProcess.remove(msg);
316+
blockWaitToProcess.remove(unparsedBlock);
292317
isProcessed[0] = true;
293-
processSyncBlock(msg.getBlockCapsule(), peerConnection);
294-
peerConnection.getSyncBlockInProcess().remove(msg.getBlockId());
318+
BlockCapsule block;
319+
try {
320+
block = new BlockCapsule(unparsedBlock.getData());
321+
} catch (Exception e) {
322+
logger.warn("Deserialize block {} failed", blockId.getString(), e);
323+
return;
324+
}
325+
processSyncBlock(block, peerConnection);
326+
peerConnection.getSyncBlockInProcess().remove(blockId);
295327
}
296328
}
297329
});
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.tron.core.net.service.sync;
2+
3+
import org.tron.core.capsule.BlockCapsule;
4+
5+
public class UnparsedBlock {
6+
7+
private final BlockCapsule.BlockId blockId;
8+
private final byte[] data;
9+
10+
public UnparsedBlock(BlockCapsule.BlockId blockId, byte[] data) {
11+
if (blockId == null) {
12+
throw new IllegalArgumentException("blockId must not be null");
13+
}
14+
this.blockId = blockId;
15+
this.data = data;
16+
}
17+
18+
public BlockCapsule.BlockId getBlockId() {
19+
return blockId;
20+
}
21+
22+
public byte[] getData() {
23+
return data;
24+
}
25+
26+
@Override
27+
public boolean equals(Object o) {
28+
if (this == o) {
29+
return true;
30+
}
31+
if (!(o instanceof UnparsedBlock)) {
32+
return false;
33+
}
34+
return blockId.equals(((UnparsedBlock) o).blockId);
35+
}
36+
37+
@Override
38+
public int hashCode() {
39+
return blockId.hashCode();
40+
}
41+
42+
@Override
43+
public String toString() {
44+
return blockId.getString();
45+
}
46+
}

framework/src/main/resources/config.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,11 @@ node {
155155
fetchBlock.timeout = 200
156156
# syncFetchBatchNum = 2000
157157

158+
# Maximum number of blocks allowed in-flight (requested but not yet processed).
159+
# Throttles block download to reduce memory pressure during sync.
160+
# Range: [50, 2000], default: 500
161+
# maxPendingBlockSize = 500
162+
158163
# Number of validate sign thread, default availableProcessors
159164
# validateSignThreadNum = 16
160165

framework/src/test/java/org/tron/core/net/services/SyncServiceTest.java

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.tron.core.net.peer.PeerManager;
2424
import org.tron.core.net.peer.TronState;
2525
import org.tron.core.net.service.sync.SyncService;
26+
import org.tron.core.net.service.sync.UnparsedBlock;
2627
import org.tron.p2p.connection.Channel;
2728
import org.tron.protos.Protocol;
2829

@@ -98,12 +99,22 @@ public void testProcessBlock() {
9899
ReflectUtils.setFieldValue(c1, "inetSocketAddress", inetSocketAddress);
99100
ReflectUtils.setFieldValue(c1, "inetAddress", inetSocketAddress.getAddress());
100101
peer.setChannel(c1);
101-
service.processBlock(peer,
102-
new BlockMessage(new BlockCapsule(Protocol.Block.newBuilder().build())));
102+
103+
BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder().build());
104+
BlockMessage blockMessage = new BlockMessage(blockCapsule);
105+
service.processBlock(peer, blockMessage);
106+
103107
boolean fetchFlag = (boolean) ReflectUtils.getFieldObject(service, "fetchFlag");
104108
boolean handleFlag = (boolean) ReflectUtils.getFieldObject(service, "handleFlag");
105109
Assert.assertTrue(fetchFlag);
106110
Assert.assertTrue(handleFlag);
111+
112+
Map<UnparsedBlock, PeerConnection> blockJustReceived =
113+
(Map<UnparsedBlock, PeerConnection>)
114+
ReflectUtils.getFieldObject(service, "blockJustReceived");
115+
Assert.assertEquals(1, blockJustReceived.size());
116+
UnparsedBlock stored = blockJustReceived.keySet().iterator().next();
117+
Assert.assertEquals(blockMessage.getBlockId(), stored.getBlockId());
107118
}
108119

109120
@Test
@@ -169,6 +180,46 @@ public void testStartFetchSyncBlock() throws Exception {
169180
peer.getSyncBlockRequested().remove(blockId);
170181
method.invoke(service);
171182
Assert.assertTrue(peer.getSyncBlockRequested().get(blockId) == null);
183+
184+
// reset maxRequestedBlockNum to 0
185+
Field maxRequestedBlockNumField = service.getClass().getDeclaredField("maxRequestedBlockNum");
186+
maxRequestedBlockNumField.setAccessible(true);
187+
maxRequestedBlockNumField.set(service, 0L);
188+
189+
Map<UnparsedBlock, PeerConnection> blockWaitToProcess =
190+
(Map<UnparsedBlock, PeerConnection>)
191+
ReflectUtils.getFieldObject(service, "blockWaitToProcess");
192+
193+
// target block has num=1, above maxRequestedBlockNum=0 so it can be throttled
194+
BlockCapsule.BlockId highBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 1);
195+
peer.getSyncBlockToFetch().clear();
196+
peer.getSyncBlockToFetch().add(highBlockId);
197+
peer.getSyncBlockRequested().clear();
198+
requestBlockIds.invalidateAll();
199+
200+
// fill blockWaitToProcess to reach maxPendingBlockSize (default 500)
201+
int maxPendingBlockSize = (int) ReflectUtils.getFieldObject(service, "maxPendingBlockSize");
202+
for (int i = 0; i < maxPendingBlockSize; i++) {
203+
BlockCapsule.BlockId fillId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 10000 + i);
204+
blockWaitToProcess.put(new UnparsedBlock(fillId, new byte[0]), peer);
205+
}
206+
method.invoke(service);
207+
// highBlockId must NOT be requested: remainNum <= 0 and num > maxRequestedBlockNum
208+
Assert.assertNull(peer.getSyncBlockRequested().get(highBlockId));
209+
210+
// Symmetric retry-exemption case: budget still saturated, but the target block's num
211+
// is below maxRequestedBlockNum, so it must still be requested (deadlock-avoidance
212+
// retry path — guards an explicit invariant of the throttling design).
213+
maxRequestedBlockNumField.set(service, 100L);
214+
BlockCapsule.BlockId retryBlockId = new BlockCapsule.BlockId(Sha256Hash.ZERO_HASH, 50);
215+
peer.getSyncBlockToFetch().clear();
216+
peer.getSyncBlockToFetch().add(retryBlockId);
217+
peer.getSyncBlockRequested().clear();
218+
requestBlockIds.invalidateAll();
219+
method.invoke(service);
220+
// retryBlockId MUST be requested: remainNum <= 0 but num=50 <= maxRequestedBlockNum=100
221+
Assert.assertNotNull(peer.getSyncBlockRequested().get(retryBlockId));
222+
blockWaitToProcess.clear();
172223
}
173224

174225
@Test
@@ -181,39 +232,34 @@ public void testHandleSyncBlock() throws Exception {
181232
Method method = service.getClass().getDeclaredMethod("handleSyncBlock");
182233
method.setAccessible(true);
183234

184-
Map<BlockMessage, PeerConnection> blockJustReceived =
185-
(Map<BlockMessage, PeerConnection>)
235+
Map<UnparsedBlock, PeerConnection> blockJustReceived =
236+
(Map<UnparsedBlock, PeerConnection>)
186237
ReflectUtils.getFieldObject(service, "blockJustReceived");
187-
Protocol.BlockHeader.raw.Builder blockHeaderRawBuild = Protocol.BlockHeader.raw.newBuilder();
188-
Protocol.BlockHeader.raw blockHeaderRaw = blockHeaderRawBuild
238+
239+
Protocol.BlockHeader.raw blockHeaderRaw = Protocol.BlockHeader.raw.newBuilder()
189240
.setNumber(100000)
190241
.build();
191-
192-
// block header
193-
Protocol.BlockHeader.Builder blockHeaderBuild = Protocol.BlockHeader.newBuilder();
194-
Protocol.BlockHeader blockHeader = blockHeaderBuild.setRawData(blockHeaderRaw).build();
195-
196-
BlockCapsule blockCapsule = new BlockCapsule(Protocol.Block.newBuilder()
197-
.setBlockHeader(blockHeader).build());
198-
242+
Protocol.BlockHeader blockHeader = Protocol.BlockHeader.newBuilder()
243+
.setRawData(blockHeaderRaw).build();
244+
BlockCapsule blockCapsule = new BlockCapsule(
245+
Protocol.Block.newBuilder().setBlockHeader(blockHeader).build());
199246
BlockCapsule.BlockId blockId = blockCapsule.getBlockId();
200247

201-
202248
InetSocketAddress a1 = new InetSocketAddress("127.0.0.1", 10001);
203249
Channel c1 = mock(Channel.class);
204250
Mockito.when(c1.getInetSocketAddress()).thenReturn(a1);
205251
Mockito.when(c1.getInetAddress()).thenReturn(a1.getAddress());
206252
PeerManager.add(ctx, c1);
207253
peer = PeerManager.getPeers().get(0);
208254

209-
blockJustReceived.put(new BlockMessage(blockCapsule), peer);
255+
UnparsedBlock unparsedBlock = new UnparsedBlock(blockId, blockCapsule.getData());
256+
blockJustReceived.put(unparsedBlock, peer);
210257

211258
peer.getSyncBlockToFetch().add(blockId);
212259

213260
Cache<BlockCapsule.BlockId, PeerConnection> requestBlockIds =
214-
(Cache<BlockCapsule.BlockId, PeerConnection>)
215-
ReflectUtils.getFieldObject(service, "requestBlockIds");
216-
261+
(Cache<BlockCapsule.BlockId, PeerConnection>)
262+
ReflectUtils.getFieldObject(service, "requestBlockIds");
217263
requestBlockIds.put(blockId, peer);
218264

219265
method.invoke(service);

0 commit comments

Comments
 (0)