Skip to content

Commit 03eca29

Browse files
authored
fix(jsonrpc): enforce log filter cap and improve match efficiency (tronprotocol#6732)
1 parent fafbc1d commit 03eca29

27 files changed

Lines changed: 669 additions & 113 deletions

common/src/main/java/org/tron/common/es/ExecutorServiceManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@
44
import java.util.concurrent.BlockingQueue;
55
import java.util.concurrent.ExecutorService;
66
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ForkJoinPool;
8+
import java.util.concurrent.ForkJoinWorkerThread;
79
import java.util.concurrent.Future;
810
import java.util.concurrent.ScheduledExecutorService;
911
import java.util.concurrent.ScheduledFuture;
1012
import java.util.concurrent.ThreadPoolExecutor;
1113
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicInteger;
1215
import lombok.extern.slf4j.Slf4j;
1316
import org.tron.common.exit.ExitManager;
1417

@@ -35,6 +38,22 @@ public static ScheduledExecutorService newSingleThreadScheduledExecutor(String n
3538
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
3639
}
3740

41+
public static ForkJoinPool newForkJoinPool(String name, int parallelism) {
42+
return newForkJoinPool(name, parallelism, false);
43+
}
44+
45+
public static ForkJoinPool newForkJoinPool(String name, int parallelism, boolean isDaemon) {
46+
AtomicInteger counter = new AtomicInteger(0);
47+
ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
48+
ForkJoinWorkerThread thread =
49+
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
50+
thread.setName(name + "-" + counter.getAndIncrement());
51+
thread.setDaemon(isDaemon);
52+
return thread;
53+
};
54+
return new ForkJoinPool(parallelism, factory, null, false);
55+
}
56+
3857
public static ExecutorService newFixedThreadPool(String name, int fixThreads) {
3958
return newFixedThreadPool(name, fixThreads, false);
4059
}

common/src/main/java/org/tron/common/parameter/CommonParameter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,9 @@ public class CommonParameter {
492492
@Getter
493493
@Setter
494494
public int jsonRpcMaxBlockFilterNum = 50000;
495+
@Getter
496+
@Setter
497+
public int jsonRpcMaxLogFilterNum = 20000;
495498

496499
@Getter
497500
@Setter

common/src/main/java/org/tron/core/config/args/NodeConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ public void setHttpPBFTPort(int v) {
311311
private int maxBlockRange = 5000;
312312
private int maxSubTopics = 1000;
313313
private int maxBlockFilterNum = 50000;
314+
private int maxLogFilterNum = 20000;
314315
private long maxMessageSize = 4194304;
315316
}
316317

common/src/main/resources/reference.conf

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,9 +386,12 @@ node {
386386
# Maximum topics within a topic criteria, >0 otherwise no limit
387387
maxSubTopics = 1000
388388

389-
# Maximum number for blockFilter
389+
# Maximum number for blockFilter. >0 otherwise no limit
390390
maxBlockFilterNum = 50000
391391

392+
# Maximum number of concurrent eth_newFilter registrations, >0 otherwise no limit
393+
maxLogFilterNum = 20000
394+
392395
# Maximum JSON-RPC request body size, default 4MB. Independent from rpc.maxMessageSize.
393396
maxMessageSize = 4M
394397
}

framework/src/main/java/org/tron/common/logsfilter/capsule/BlockFilterCapsule.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.tron.common.logsfilter.capsule;
22

3-
import static org.tron.core.services.jsonrpc.TronJsonRpcImpl.handleBLockFilter;
4-
53
import lombok.Getter;
64
import lombok.Setter;
75
import lombok.ToString;
@@ -20,19 +18,12 @@ public class BlockFilterCapsule extends FilterTriggerCapsule {
2018
private boolean solidified;
2119

2220
public BlockFilterCapsule(BlockCapsule block, boolean solidified) {
23-
blockHash = block.getBlockId().toString();
24-
this.solidified = solidified;
21+
this(block.getBlockId().toString(), solidified);
2522
}
2623

2724
public BlockFilterCapsule(String blockHash, boolean solidified) {
2825
this.blockHash = blockHash;
2926
this.solidified = solidified;
3027
}
3128

32-
@Override
33-
public void processFilterTrigger() {
34-
handleBLockFilter(this);
35-
}
36-
3729
}
38-
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package org.tron.common.logsfilter.capsule;
22

3-
public class FilterTriggerCapsule extends TriggerCapsule {
3+
public class FilterTriggerCapsule {
44

5-
public void processFilterTrigger() {
6-
throw new UnsupportedOperationException();
7-
}
85
}

framework/src/main/java/org/tron/common/logsfilter/capsule/LogsFilterCapsule.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package org.tron.common.logsfilter.capsule;
22

3-
import static org.tron.core.services.jsonrpc.TronJsonRpcImpl.handleLogsFilter;
4-
53
import java.util.List;
64
import lombok.Getter;
75
import lombok.Setter;
@@ -43,8 +41,4 @@ public LogsFilterCapsule(long blockNumber, String blockHash, Bloom bloom,
4341
this.removed = removed;
4442
}
4543

46-
@Override
47-
public void processFilterTrigger() {
48-
handleLogsFilter(this);
49-
}
50-
}
44+
}

framework/src/main/java/org/tron/core/config/args/Args.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ private static void applyNodeConfig(NodeConfig nc) {
559559
PARAMETER.jsonRpcMaxBlockRange = jsonrpc.getMaxBlockRange();
560560
PARAMETER.jsonRpcMaxSubTopics = jsonrpc.getMaxSubTopics();
561561
PARAMETER.jsonRpcMaxBlockFilterNum = jsonrpc.getMaxBlockFilterNum();
562+
PARAMETER.jsonRpcMaxLogFilterNum = jsonrpc.getMaxLogFilterNum();
562563
PARAMETER.jsonRpcMaxMessageSize = jsonrpc.getMaxMessageSize();
563564

564565
// ---- P2P sub-bean ----

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.commons.collections4.CollectionUtils;
4949
import org.bouncycastle.util.encoders.Hex;
5050
import org.springframework.beans.factory.annotation.Autowired;
51+
import org.springframework.context.annotation.Lazy;
5152
import org.springframework.stereotype.Component;
5253
import org.tron.api.GrpcAPI;
5354
import org.tron.api.GrpcAPI.TransactionInfoList;
@@ -142,6 +143,7 @@
142143
import org.tron.core.service.MortgageService;
143144
import org.tron.core.service.RewardViCalService;
144145
import org.tron.core.services.event.exception.EventException;
146+
import org.tron.core.services.jsonrpc.TronJsonRpcImpl;
145147
import org.tron.core.store.AccountAssetStore;
146148
import org.tron.core.store.AccountIdIndexStore;
147149
import org.tron.core.store.AccountIndexStore;
@@ -277,6 +279,10 @@ public class Manager {
277279
@Autowired
278280
private RewardViCalService rewardViCalService;
279281

282+
@Lazy
283+
@Autowired
284+
private TronJsonRpcImpl tronJsonRpcImpl;
285+
280286
/**
281287
* Cycle thread to rePush Transactions
282288
*/
@@ -333,8 +339,10 @@ public class Manager {
333339
while (isRunFilterProcessThread) {
334340
try {
335341
FilterTriggerCapsule filterCapsule = filterCapsuleQueue.poll(1, TimeUnit.SECONDS);
336-
if (filterCapsule != null) {
337-
filterCapsule.processFilterTrigger();
342+
if (filterCapsule instanceof LogsFilterCapsule) {
343+
tronJsonRpcImpl.handleLogsFilter((LogsFilterCapsule) filterCapsule);
344+
} else if (filterCapsule instanceof BlockFilterCapsule) {
345+
tronJsonRpcImpl.handleBLockFilter((BlockFilterCapsule) filterCapsule);
338346
}
339347
} catch (InterruptedException e) {
340348
logger.error("FilterProcessLoop get InterruptedException, error is {}.",
@@ -2268,7 +2276,8 @@ private void reOrgLogsFilter() {
22682276
}
22692277

22702278
private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) {
2271-
BlockFilterCapsule blockFilterCapsule = new BlockFilterCapsule(blockCapsule, solidified);
2279+
BlockFilterCapsule blockFilterCapsule =
2280+
new BlockFilterCapsule(blockCapsule, solidified);
22722281
if (!filterCapsuleQueue.offer(blockFilterCapsule)) {
22732282
logger.info("Too many filters, block filter lost: {}.", blockCapsule.getBlockId());
22742283
}

framework/src/main/java/org/tron/core/services/jsonrpc/JsonRpcApiUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class JsonRpcApiUtil {
6161
public static final String TAG_SAFE_SUPPORT_ERROR = "TAG safe not supported";
6262
public static final String BLOCK_NUM_ERROR = "invalid block number";
6363

64+
private static final SecureRandom random = new SecureRandom();
65+
6466
public static byte[] convertToTronAddress(byte[] address) {
6567
byte[] newAddress = new byte[21];
6668
byte[] temp = new byte[] {Wallet.getAddressPreFixByte()};
@@ -647,7 +649,6 @@ public static long parseBlockNumber(String blockNumOrTag, Wallet wallet)
647649
}
648650

649651
public static String generateFilterId() {
650-
SecureRandom random = new SecureRandom();
651652
byte[] uid = new byte[16]; // 128 bits are converted to 16 bytes
652653
random.nextBytes(uid);
653654
return ByteArray.toHexString(uid);

0 commit comments

Comments
 (0)