Skip to content

Commit 9897ea8

Browse files
committed
use ForkJoinPool as necessary
1 parent 6323ea8 commit 9897ea8

1 file changed

Lines changed: 50 additions & 38 deletions

File tree

framework/src/main/java/org/tron/core/services/jsonrpc/TronJsonRpcImpl.java

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ public enum RequestSource {
171171
private static final String NO_BLOCK_HEADER_BY_HASH = "header for hash not found";
172172

173173
private static final String ERROR_SELECTOR = "08c379a0"; // Function selector for Error(string)
174+
private static final int FILTER_PARALLEL_THRESHOLD = 10000;
175+
private static final ForkJoinPool LOGS_FILTER_POOL = new ForkJoinPool(2);
174176
/**
175177
* thread pool of query section bloom store
176178
*/
@@ -233,49 +235,59 @@ public static void handleLogsFilter(LogsFilterCapsule logsFilterCapsule) {
233235
eventFilterMap = getEventFilter2ResultFull();
234236
}
235237

236-
ForkJoinPool pool = new ForkJoinPool(2); //parallelStream default num(CPU) -1
237-
pool.submit(() -> eventFilterMap.entrySet().parallelStream().forEach(entry -> {
238+
if (eventFilterMap.size() <= FILTER_PARALLEL_THRESHOLD) {
239+
eventFilterMap.entrySet().forEach(
240+
entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule));
241+
} else {
242+
LOGS_FILTER_POOL.submit(() -> eventFilterMap.entrySet().parallelStream()
243+
.forEach(entry -> processLogFilterEntry(entry, eventFilterMap, logsFilterCapsule))
244+
).join();
245+
}
246+
long t2 = System.currentTimeMillis();
247+
logger.info("handleLogsFilter {} cost {}, filter size {}",
248+
logsFilterCapsule.isSolidified() ? "Solidity" : "Full", t2 - t1, eventFilterMap.size());
249+
}
238250

239-
LogFilterAndResult logFilterAndResult = entry.getValue();
240-
if (logFilterAndResult.isExpire()) {
241-
eventFilterMap.remove(entry.getKey());
242-
return;
243-
}
251+
private static void processLogFilterEntry(
252+
Map.Entry<String, LogFilterAndResult> entry,
253+
Map<String, LogFilterAndResult> eventFilterMap,
254+
LogsFilterCapsule logsFilterCapsule) {
255+
LogFilterAndResult logFilterAndResult = entry.getValue();
256+
if (logFilterAndResult.isExpire()) {
257+
eventFilterMap.remove(entry.getKey());
258+
return;
259+
}
244260

245-
long blockNumber = logsFilterCapsule.getBlockNumber();
246-
long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock();
247-
long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock();
248-
if (!(fromBlock <= blockNumber && blockNumber <= toBlock)) {
249-
return;
250-
}
261+
long blockNumber = logsFilterCapsule.getBlockNumber();
262+
long fromBlock = logFilterAndResult.getLogFilterWrapper().getFromBlock();
263+
long toBlock = logFilterAndResult.getLogFilterWrapper().getToBlock();
264+
if (!(fromBlock <= blockNumber && blockNumber <= toBlock)) {
265+
return;
266+
}
251267

252-
if (logsFilterCapsule.getBloom() != null && !logFilterAndResult.getLogFilterWrapper()
253-
.getLogFilter().matchBloom(logsFilterCapsule.getBloom())) {
254-
return;
255-
}
268+
if (logsFilterCapsule.getBloom() != null && !logFilterAndResult.getLogFilterWrapper()
269+
.getLogFilter().matchBloom(logsFilterCapsule.getBloom())) {
270+
return;
271+
}
256272

257-
LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter();
258-
List<LogFilterElement> elements =
259-
LogMatch.matchBlock(logFilter, blockNumber, logsFilterCapsule.getBlockHash(),
260-
logsFilterCapsule.getTxInfoList(), logsFilterCapsule.isRemoved());
273+
LogFilter logFilter = logFilterAndResult.getLogFilterWrapper().getLogFilter();
274+
List<LogFilterElement> elements =
275+
LogMatch.matchBlock(logFilter, blockNumber, logsFilterCapsule.getBlockHash(),
276+
logsFilterCapsule.getTxInfoList(), logsFilterCapsule.isRemoved());
261277

262-
List<LogFilterElement> localResults = new ArrayList<>(elements.size());
263-
for (LogFilterElement element : elements) {
264-
LogFilterElement cachedElement;
265-
try {
266-
// compare with hashcode() first, then with equals(). If not exist, put it.
267-
cachedElement = logElementCache.get(element, () -> element);
268-
} catch (ExecutionException e) {
269-
logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen
270-
cachedElement = element;
271-
}
272-
localResults.add(cachedElement);
278+
List<LogFilterElement> localResults = new ArrayList<>(elements.size());
279+
for (LogFilterElement element : elements) {
280+
LogFilterElement cachedElement;
281+
try {
282+
// compare with hashcode() first, then with equals(). If not exist, put it.
283+
cachedElement = logElementCache.get(element, () -> element);
284+
} catch (ExecutionException e) {
285+
logger.error("Getting/loading LogFilterElement from cache fails", e); // never happen
286+
cachedElement = element;
273287
}
274-
logFilterAndResult.getResult().addAll(localResults);
275-
})).join();
276-
long t2 = System.currentTimeMillis();
277-
logger.info("handleLogsFilter {} cost {}, filter size {}",
278-
logsFilterCapsule.isSolidified() ? "Solidity" : "Full", t2 - t1, eventFilterMap.size());
288+
localResults.add(cachedElement);
289+
}
290+
logFilterAndResult.getResult().addAll(localResults);
279291
}
280292

281293
@Override
@@ -1428,7 +1440,7 @@ public String newFilter(FilterRequest fr) throws JsonRpcInvalidParamsException,
14281440
} else {
14291441
eventFilter2Result = eventFilter2ResultSolidity;
14301442
}
1431-
if (eventFilter2Result.size() > maxLogFilterNum) {
1443+
if (eventFilter2Result.size() >= maxLogFilterNum) {
14321444
throw new JsonRpcExceedLimitException(
14331445
"exceed max log filters: " + maxLogFilterNum + ", try again later");
14341446
}

0 commit comments

Comments
 (0)