Skip to content

Commit 31af29f

Browse files
authored
feat(event): support rollback (removed) for block and transaction log triggers (tronprotocol#6833)
On a chain reorg, BlockLogTrigger/TransactionLogTrigger subscribers previously never learned that an orphaned block's events were rolled back. Add the same "removed" semantics already used by ContractTrigger. - add a `removed` field to BlockLogTrigger/TransactionLogTrigger and the matching setRemoved() on their capsules - Manager (event version 0): refactor postBlockTrigger to emit a single real-time block (with removed), move the solidified-mode batch into postSolidityTrigger, add reOrgBlockTrigger to re-emit erased blocks with removed=true, rename reApplyLogsFilter to reApplyBlockEvents and re-emit the re-applied fork branch's block/tx triggers (forward), thread removed through processTransactionTrigger/postTransactionTrigger - RealtimeEventService (event version 1): post block/tx triggers synchronously to the plugin with the removed flag instead of the async triggerCapsuleQueue, avoiding overwrite of the shared cached capsule; drop the unused manager field - tests: capsule removed passthrough, version-0 emit (testReOrgBlockTriggerRemoved), rewritten RealtimeEventServiceTest, updated ManagerTest stub
1 parent 6a67f29 commit 31af29f

10 files changed

Lines changed: 305 additions & 74 deletions

File tree

common/src/main/java/org/tron/common/logsfilter/trigger/BlockLogTrigger.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ public class BlockLogTrigger extends Trigger {
2727
@Setter
2828
private List<String> transactionList = new ArrayList<>();
2929

30+
// true when this block is being rolled back due to a chain reorg (fork switch);
31+
// mirrors the Ethereum log "removed" semantics already used by ContractTrigger.
32+
@Getter
33+
@Setter
34+
private boolean removed;
35+
3036
public BlockLogTrigger() {
3137
setTriggerName(Trigger.BLOCK_TRIGGER_NAME);
3238
}
@@ -44,6 +50,8 @@ public String toString() {
4450
.append(transactionSize)
4551
.append(", latestSolidifiedBlockNumber: ")
4652
.append(latestSolidifiedBlockNumber)
53+
.append(", removed: ")
54+
.append(removed)
4755
.append(", transactionList: ")
4856
.append(transactionList).toString();
4957
}

common/src/main/java/org/tron/common/logsfilter/trigger/TransactionLogTrigger.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ public class TransactionLogTrigger extends Trigger {
9797
@Setter
9898
private Map<String, Long> extMap;
9999

100+
// true when this transaction is being rolled back due to a chain reorg (fork switch);
101+
// mirrors the Ethereum log "removed" semantics already used by ContractTrigger.
102+
@Getter
103+
@Setter
104+
private boolean removed;
105+
100106
public TransactionLogTrigger() {
101107
setTriggerName(Trigger.TRANSACTION_TRIGGER_NAME);
102108
}

framework/src/main/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) {
2727
blockLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
2828
}
2929

30+
public void setRemoved(boolean removed) {
31+
blockLogTrigger.setRemoved(removed);
32+
}
33+
3034
@Override
3135
public void processTrigger() {
3236
EventPluginLoader.getInstance().postBlockTrigger(blockLogTrigger);

framework/src/main/java/org/tron/common/logsfilter/capsule/TransactionLogTriggerCapsule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,10 @@ public void setLatestSolidifiedBlockNumber(long latestSolidifiedBlockNumber) {
377377
transactionLogTrigger.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
378378
}
379379

380+
public void setRemoved(boolean removed) {
381+
transactionLogTrigger.setRemoved(removed);
382+
}
383+
380384
private List<InternalTransactionPojo> getInternalTransactionList(
381385
List<InternalTransaction> internalTransactionList) {
382386
List<InternalTransactionPojo> pojoList = new ArrayList<>();

framework/src/main/java/org/tron/core/db/Manager.java

Lines changed: 82 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ private void switchFork(BlockCapsule newHead)
11271127
.equals(binaryTree.getValue().peekLast().getParentHash())) {
11281128
if (EventPluginLoader.getInstance().getVersion() == 0) {
11291129
reOrgContractTrigger();
1130+
reOrgBlockTrigger();
11301131
}
11311132
reOrgLogsFilter();
11321133
eraseBlock();
@@ -1202,7 +1203,7 @@ private void switchFork(BlockCapsule newHead)
12021203
}
12031204
}
12041205
// only reached when the whole new branch applied cleanly; a failed switch rethrows above
1205-
reApplyLogsFilter(first);
1206+
reApplyBlockEvents(first);
12061207
}
12071208

12081209
}
@@ -1435,9 +1436,10 @@ void blockTrigger(final BlockCapsule block, long oldSolid, long newSolid) {
14351436
return;
14361437
}
14371438

1438-
// if event subscribe is enabled, post block trigger to queue
1439-
postBlockTrigger(block);
1439+
// if event subscribe is enabled, post block trigger to queue (real-time, not removed)
1440+
postBlockTrigger(block, false);
14401441
// if event subscribe is enabled, post solidity trigger to queue
1442+
// (also emits solidified-mode block/transaction triggers)
14411443
postSolidityTrigger(newSolid);
14421444
} catch (Exception e) {
14431445
logger.error("Block trigger failed. head: {}, oldSolid: {}, newSolid: {}",
@@ -2205,6 +2207,27 @@ private void postSolidityFilter(final long oldSolidNum, final long latestSolidif
22052207
}
22062208

22072209
private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
2210+
// solidified-mode block trigger: emit the newly-solidified blocks (never removed,
2211+
// since solidified blocks cannot be reorged).
2212+
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()
2213+
&& EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
2214+
for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) {
2215+
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule);
2216+
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(latestSolidifiedBlockNumber);
2217+
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
2218+
logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId());
2219+
}
2220+
}
2221+
}
2222+
2223+
// solidified-mode transaction trigger: emit transactions of the newly-solidified blocks.
2224+
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()
2225+
&& EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
2226+
for (BlockCapsule capsule : getContinuousBlockCapsule(latestSolidifiedBlockNumber)) {
2227+
processTransactionTrigger(capsule, false);
2228+
}
2229+
}
2230+
22082231
if (eventPluginLoaded && EventPluginLoader.getInstance().isSolidityLogTriggerEnable()) {
22092232
for (Long i : Args.getSolidityContractLogTriggerMap().keySet()) {
22102233
postSolidityLogContractTrigger(i, latestSolidifiedBlockNumber);
@@ -2233,7 +2256,7 @@ private void postSolidityTrigger(final long latestSolidifiedBlockNumber) {
22332256
lastUsedSolidityNum = latestSolidifiedBlockNumber;
22342257
}
22352258

2236-
private void processTransactionTrigger(BlockCapsule newBlock) {
2259+
private void processTransactionTrigger(BlockCapsule newBlock, boolean removed) {
22372260
List<TransactionCapsule> transactionCapsuleList = newBlock.getTransactions();
22382261

22392262
// need to set eth compatible data from transactionInfoList
@@ -2252,7 +2275,7 @@ private void processTransactionTrigger(BlockCapsule newBlock) {
22522275
transactionCapsule.setBlockNum(newBlock.getNum());
22532276

22542277
cumulativeEnergyUsed += postTransactionTrigger(transactionCapsule, newBlock, i,
2255-
cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice);
2278+
cumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice, removed);
22562279

22572280
cumulativeLogCount += transactionInfo.getLogCount();
22582281
}
@@ -2261,12 +2284,12 @@ private void processTransactionTrigger(BlockCapsule newBlock) {
22612284
newBlock.getNum(),
22622285
"the sizes of transactionInfoList and transactionCapsuleList are not equal");
22632286
for (TransactionCapsule e : newBlock.getTransactions()) {
2264-
postTransactionTrigger(e, newBlock);
2287+
postTransactionTrigger(e, newBlock, removed);
22652288
}
22662289
}
22672290
} else {
22682291
for (TransactionCapsule e : newBlock.getTransactions()) {
2269-
postTransactionTrigger(e, newBlock);
2292+
postTransactionTrigger(e, newBlock, removed);
22702293
}
22712294
}
22722295
}
@@ -2290,14 +2313,25 @@ private void reOrgLogsFilter() {
22902313
// (oldest-first). Must be kept in sync with the FULL-filter section of blockTrigger.
22912314
// Solidity filters are intentionally not posted here: solidification events for these
22922315
// blocks arrive later, when postSolidityFilter runs against the then-canonical chain.
2293-
private void reApplyLogsFilter(List<KhaosBlock> newBranch) {
2316+
// Re-emit the per-block subscription events for a newly-applied fork branch after a chain
2317+
// reorg: JSON-RPC block/logs filters and event-subscribe block/transaction triggers. The
2318+
// fork-switch path returns before blockTrigger() runs, so without this these forward events
2319+
// would be lost for the re-applied blocks (contract triggers are already re-emitted during
2320+
// applyBlock). All emitted as forward (removed=false): these blocks are now canonical.
2321+
private void reApplyBlockEvents(List<KhaosBlock> newBranch) {
22942322
if (CommonParameter.getInstance().isJsonRpcHttpFullNodeEnable()) {
22952323
for (KhaosBlock khaosBlock : newBranch) {
22962324
BlockCapsule blockCapsule = khaosBlock.getBlk();
22972325
postBlockFilter(blockCapsule, false);
22982326
postLogsFilter(blockCapsule, false, false);
22992327
}
23002328
}
2329+
2330+
if (EventPluginLoader.getInstance().getVersion() == 0) {
2331+
for (KhaosBlock khaosBlock : newBranch) {
2332+
postBlockTrigger(khaosBlock.getBlk(), false);
2333+
}
2334+
}
23012335
}
23022336

23032337
private void postBlockFilter(final BlockCapsule blockCapsule, boolean solidified) {
@@ -2324,39 +2358,26 @@ private void postLogsFilter(final BlockCapsule blockCapsule, boolean solidified,
23242358
}
23252359
}
23262360

2327-
void postBlockTrigger(final BlockCapsule blockCapsule) {
2328-
// process block trigger
2361+
// Real-time block/transaction triggers for a single block. The solidified-mode batch is
2362+
// handled in postSolidityTrigger (driven by solidification advancement), so here we only
2363+
// emit for triggers configured as non-solidified. {@code removed=true} re-emits the same
2364+
// trigger when the block is rolled back by a chain reorg (see reOrgBlockTrigger).
2365+
void postBlockTrigger(final BlockCapsule blockCapsule, boolean removed) {
23292366
long solidityBlkNum = getDynamicPropertiesStore().getLatestSolidifiedBlockNum();
2330-
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()) {
2331-
List<BlockCapsule> capsuleList = new ArrayList<>();
2332-
if (EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
2333-
capsuleList = getContinuousBlockCapsule(solidityBlkNum);
2334-
} else {
2335-
capsuleList.add(blockCapsule);
2336-
}
23372367

2338-
for (BlockCapsule capsule : capsuleList) {
2339-
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(capsule);
2340-
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum);
2341-
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
2342-
logger.info("Too many triggers, block trigger lost: {}.", capsule.getBlockId());
2343-
}
2368+
if (eventPluginLoaded && EventPluginLoader.getInstance().isBlockLogTriggerEnable()
2369+
&& !EventPluginLoader.getInstance().isBlockLogTriggerSolidified()) {
2370+
BlockLogTriggerCapsule blockLogTriggerCapsule = new BlockLogTriggerCapsule(blockCapsule);
2371+
blockLogTriggerCapsule.setLatestSolidifiedBlockNumber(solidityBlkNum);
2372+
blockLogTriggerCapsule.setRemoved(removed);
2373+
if (!triggerCapsuleQueue.offer(blockLogTriggerCapsule)) {
2374+
logger.info("Too many triggers, block trigger lost: {}.", blockCapsule.getBlockId());
23442375
}
23452376
}
23462377

2347-
// process transaction trigger
2348-
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()) {
2349-
List<BlockCapsule> capsuleList = new ArrayList<>();
2350-
if (EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
2351-
capsuleList = getContinuousBlockCapsule(solidityBlkNum);
2352-
} else {
2353-
// need to reset block
2354-
capsuleList.add(blockCapsule);
2355-
}
2356-
2357-
for (BlockCapsule capsule : capsuleList) {
2358-
processTransactionTrigger(capsule);
2359-
}
2378+
if (eventPluginLoaded && EventPluginLoader.getInstance().isTransactionLogTriggerEnable()
2379+
&& !EventPluginLoader.getInstance().isTransactionLogTriggerSolidified()) {
2380+
processTransactionTrigger(blockCapsule, removed);
23602381
}
23612382
}
23622383

@@ -2382,11 +2403,13 @@ private List<BlockCapsule> getContinuousBlockCapsule(long solidityBlkNum) {
23822403
// cumulativeEnergyUsed is the total of energy used before the current transaction
23832404
private long postTransactionTrigger(final TransactionCapsule trxCap,
23842405
final BlockCapsule blockCap, int index, long preCumulativeEnergyUsed,
2385-
long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice) {
2406+
long cumulativeLogCount, final TransactionInfo transactionInfo, long energyUnitPrice,
2407+
boolean removed) {
23862408
TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap,
23872409
index, preCumulativeEnergyUsed, cumulativeLogCount, transactionInfo, energyUnitPrice);
23882410
trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore()
23892411
.getLatestSolidifiedBlockNum());
2412+
trx.setRemoved(removed);
23902413
if (!triggerCapsuleQueue.offer(trx)) {
23912414
logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId());
23922415
}
@@ -2396,10 +2419,11 @@ private long postTransactionTrigger(final TransactionCapsule trxCap,
23962419

23972420

23982421
private void postTransactionTrigger(final TransactionCapsule trxCap,
2399-
final BlockCapsule blockCap) {
2422+
final BlockCapsule blockCap, boolean removed) {
24002423
TransactionLogTriggerCapsule trx = new TransactionLogTriggerCapsule(trxCap, blockCap);
24012424
trx.setLatestSolidifiedBlockNumber(getDynamicPropertiesStore()
24022425
.getLatestSolidifiedBlockNum());
2426+
trx.setRemoved(removed);
24032427
if (!triggerCapsuleQueue.offer(trx)) {
24042428
logger.info("Too many triggers, transaction trigger lost: {}.", trxCap.getTransactionId());
24052429
}
@@ -2424,6 +2448,26 @@ private void reOrgContractTrigger() {
24242448
clearSolidityContractTriggerCache(getHeadBlockNum());
24252449
}
24262450

2451+
// On a chain reorg, re-emit the block/transaction triggers of the block being erased with
2452+
// removed=true, so subscribers can roll back. Only real-time (non-solidified) triggers were
2453+
// ever emitted for this block, so postBlockTrigger(.., true) naturally no-ops in solidified
2454+
// mode. Called in the erase loop before eraseBlock(), so the old head is still current head.
2455+
private void reOrgBlockTrigger() {
2456+
if (eventPluginLoaded
2457+
&& (EventPluginLoader.getInstance().isBlockLogTriggerEnable()
2458+
|| EventPluginLoader.getInstance().isTransactionLogTriggerEnable())) {
2459+
logger.info("Switch fork occurred, post reOrgBlockTrigger.");
2460+
try {
2461+
BlockCapsule oldHeadBlock = chainBaseManager.getBlockById(
2462+
getDynamicPropertiesStore().getLatestBlockHeaderHash());
2463+
postBlockTrigger(oldHeadBlock, true);
2464+
} catch (BadItemException | ItemNotFoundException e) {
2465+
logger.error("Block header hash does not exist or is bad: {}.",
2466+
getDynamicPropertiesStore().getLatestBlockHeaderHash());
2467+
}
2468+
}
2469+
}
2470+
24272471
private void clearSolidityContractTriggerCache(long blockNum) {
24282472
if (eventPluginLoaded
24292473
&& (EventPluginLoader.getInstance().isSolidityEventTriggerEnable()

framework/src/main/java/org/tron/core/services/event/RealtimeEventService.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.tron.common.es.ExecutorServiceManager;
1313
import org.tron.common.logsfilter.EventPluginLoader;
1414
import org.tron.common.logsfilter.trigger.Trigger;
15-
import org.tron.core.db.Manager;
1615
import org.tron.core.services.event.bo.BlockEvent;
1716
import org.tron.core.services.event.bo.Event;
1817

@@ -25,9 +24,6 @@ public class RealtimeEventService {
2524
@Getter
2625
private static Object contractLock = new Object();
2726

28-
@Autowired
29-
private Manager manager;
30-
3127
@Autowired
3228
private SolidEventService solidEventService;
3329

@@ -77,25 +73,31 @@ public synchronized void work() {
7773
public void flush(BlockEvent blockEvent, boolean isRemove) {
7874
logger.info("Flush realtime event {}", blockEvent.getBlockId().getString());
7975

76+
// Post block/transaction triggers synchronously to the plugin (processTrigger ->
77+
// EventPluginLoader serializes immediately) instead of the async triggerCapsuleQueue: the
78+
// capsule is a shared cached object whose removed flag is set per-flush, so an async consumer
79+
// could read it after a later flush overwrote it. This mirrors how contract triggers below
80+
// are posted directly. isRemove=true re-emits the block/transaction as rolled back on a reorg.
8081
if (instance.isBlockLogTriggerEnable()
81-
&& !instance.isBlockLogTriggerSolidified()
82-
&& !isRemove) {
82+
&& !instance.isBlockLogTriggerSolidified()) {
8383
if (blockEvent.getBlockLogTriggerCapsule() == null) {
8484
logger.warn("BlockLogTriggerCapsule is null. {}", blockEvent.getBlockId().getString());
8585
} else {
86-
manager.getTriggerCapsuleQueue().offer(blockEvent.getBlockLogTriggerCapsule());
86+
blockEvent.getBlockLogTriggerCapsule().setRemoved(isRemove);
87+
blockEvent.getBlockLogTriggerCapsule().processTrigger();
8788
}
8889
}
8990

9091
if (instance.isTransactionLogTriggerEnable()
91-
&& !instance.isTransactionLogTriggerSolidified()
92-
&& !isRemove) {
92+
&& !instance.isTransactionLogTriggerSolidified()) {
9393
if (blockEvent.getTransactionLogTriggerCapsules() == null) {
9494
logger.warn("TransactionLogTriggerCapsules is null. {}",
9595
blockEvent.getBlockId().getString());
9696
} else {
97-
blockEvent.getTransactionLogTriggerCapsules().forEach(v ->
98-
manager.getTriggerCapsuleQueue().offer(v));
97+
blockEvent.getTransactionLogTriggerCapsules().forEach(v -> {
98+
v.setRemoved(isRemove);
99+
v.processTrigger();
100+
});
99101
}
100102
}
101103

framework/src/test/java/org/tron/common/logsfilter/TransactionLogTriggerCapsuleTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,24 @@ public void setup() {
3535
System.currentTimeMillis(), Sha256Hash.ZERO_HASH.getByteString());
3636
}
3737

38+
@Test
39+
public void testSetRemoved() {
40+
BalanceContract.TransferContract.Builder builder =
41+
BalanceContract.TransferContract.newBuilder()
42+
.setOwnerAddress(ByteString.copyFrom(ByteArray.fromHexString(OWNER_ADDRESS)))
43+
.setToAddress(ByteString.copyFrom(ByteArray.fromHexString(RECEIVER_ADDRESS)))
44+
.setAmount(1000L);
45+
transactionCapsule = new TransactionCapsule(builder.build(),
46+
Protocol.Transaction.Contract.ContractType.TransferContract);
47+
TransactionLogTriggerCapsule triggerCapsule =
48+
new TransactionLogTriggerCapsule(transactionCapsule, blockCapsule);
49+
50+
// default is false (forward emit); reorg rollback sets it to true
51+
Assert.assertFalse(triggerCapsule.getTransactionLogTrigger().isRemoved());
52+
triggerCapsule.setRemoved(true);
53+
Assert.assertTrue(triggerCapsule.getTransactionLogTrigger().isRemoved());
54+
}
55+
3856
@Test
3957
public void testConstructorWithUnfreezeBalanceTrxCapsule() {
4058
BalanceContract.UnfreezeBalanceContract.Builder builder2 =

framework/src/test/java/org/tron/common/logsfilter/capsule/BlockLogTriggerCapsuleTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,12 @@ public void testSetLatestSolidifiedBlockNumber() {
3232
Assert.assertEquals(100,
3333
blockLogTriggerCapsule.getBlockLogTrigger().getLatestSolidifiedBlockNumber());
3434
}
35+
36+
@Test
37+
public void testSetRemoved() {
38+
// default is false (forward emit); reorg rollback sets it to true
39+
Assert.assertFalse(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved());
40+
blockLogTriggerCapsule.setRemoved(true);
41+
Assert.assertTrue(blockLogTriggerCapsule.getBlockLogTrigger().isRemoved());
42+
}
3543
}

0 commit comments

Comments
 (0)