Skip to content

Commit c70c3cd

Browse files
committed
introduce a light weight mode for tracking block progression
Signed-off-by: Chengxuan Xing <chengxuan.xing@kaleido.io>
1 parent df126b4 commit c70c3cd

13 files changed

Lines changed: 312 additions & 21 deletions

README.md

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,40 +34,63 @@ For a full list of configuration options see [config.md](./config.md)
3434

3535
```yaml
3636
connectors:
37-
- type: ethereum
38-
server:
39-
port: 5102
40-
ethereum:
41-
url: http://localhost:8545
37+
- type: ethereum
38+
server:
39+
port: 5102
40+
ethereum:
41+
url: http://localhost:8545
4242
```
4343
4444
## Blockchain node compatibility
4545
4646
For EVM connector to function properly, you should check the blockchain node supports the following JSON-RPC Methods over HTTP:
47+
4748
### Event tracking
49+
4850
- `eth_blockNumber`
4951
- `eth_newBlockFilter`
5052
- `eth_getFilterLogs`
5153
- `eth_getFilterChanges`
52-
- `eth_getBlockByHash`
5354
- `eth_getLogs`
5455
- `eth_newFilter`
5556
- `eth_uninstallFilter`
5657
- `eth_getTransactionByHash`
5758
- `eth_getTransactionReceipt`
5859

5960
### Query
61+
6062
- `eth_call`
6163
- `eth_getBalance`
6264
- `eth_gasPrice`[^1]
63-
65+
6466
### Transaction submission
67+
6568
- `eth_estimateGas`
6669
- `eth_sendTransaction`
6770
- `eth_getTransactionCount`
6871
- `eth_sendRawTransaction`[^2]
6972

70-
7173
[^1]: also used by Transaction submission if the handler is configured to get gas price using "connector".
7274

7375
[^2]: only required by custom transaction handlers that supports pre-signing.
76+
77+
### Optional methods
78+
79+
#### Transaction tracing
80+
81+
> Required when [connector.traceTXForRevertReason](./config.md#connector) is set to `true` (default: `false`)
82+
83+
- `debug_traceTransaction`
84+
85+
#### Block listener
86+
87+
> Required when [connector.blockTrackingMode](./config.md#connector) is set to `inMemoryPartialChain`
88+
89+
- `eth_getBlockByHash`
90+
- `eth_getBlockByNumber`
91+
92+
#### Receipt fetching
93+
94+
> Required when [connector.useGetBlockReceipts](./config.md#connector) is set to `true`
95+
96+
- `eth_getBlockReceipts`

config.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
|---|-----------|----|-------------|
6868
|blockCacheSize|Maximum of blocks to hold in the block info cache|`int`|`250`
6969
|blockPollingInterval|Interval for polling to check for new blocks|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
70+
|blockTrackingMode|The block tracking mode for the block listener|`headBlockNumber` or `inMemoryPartialChain`|`inMemoryPartialChain`
7071
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
7172
|dataFormat|Configure the JSON data format for query output and events|map,flat_array,self_describing|`map`
7273
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/hashicorp/golang-lru v1.0.2
88
github.com/hyperledger/firefly-common v1.5.9
99
github.com/hyperledger/firefly-signer v1.1.21
10-
github.com/hyperledger/firefly-transaction-manager v1.4.4
10+
github.com/hyperledger/firefly-transaction-manager v0.0.0-20260421111941-f20dee0fc832
1111
github.com/sirupsen/logrus v1.9.3
1212
github.com/spf13/cobra v1.8.0
1313
github.com/stretchr/testify v1.9.0
@@ -99,3 +99,6 @@ require (
9999
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
100100
gopkg.in/yaml.v3 v3.0.1 // indirect
101101
)
102+
103+
// Pin kaleido-io/firefly-transaction-manager branch lightweight-confirmation-tracking until merged upstream.
104+
replace github.com/hyperledger/firefly-transaction-manager => github.com/kaleido-io/firefly-transaction-manager v0.0.0-20260421111941-f20dee0fc832

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ github.com/hyperledger/firefly-common v1.5.9 h1:Z1+SuKNYJ8hPKQ5CvcsMg6r/E4RyW6wb
104104
github.com/hyperledger/firefly-common v1.5.9/go.mod h1:1Xawm5PUhxT7k+CL/Kr3i1LE3cTTzoQwZMLimvlW8rs=
105105
github.com/hyperledger/firefly-signer v1.1.21 h1:r7cTOw6e/6AtiXLf84wZy6Z7zppzlc191HokW2hv4N4=
106106
github.com/hyperledger/firefly-signer v1.1.21/go.mod h1:axrlSQeKrd124UdHF5L3MkTjb5DeTcbJxJNCZ3JmcWM=
107-
github.com/hyperledger/firefly-transaction-manager v1.4.4 h1:cbG9FkQWriOcc1MMGaMqU7OpOwLloSV+PImOoaN0ckU=
108-
github.com/hyperledger/firefly-transaction-manager v1.4.4/go.mod h1:1kbYt8ofDXqfwC02vwV/HoOjmiv0IuT9UkJ//bbrliE=
109107
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
110108
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
111109
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
@@ -115,6 +113,8 @@ github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bww
115113
github.com/jarcoal/httpmock v1.2.0/go.mod h1:oCoTsnAz4+UoOUIf5lJOWV2QQIW5UoeUI6aM2YnWAZk=
116114
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
117115
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
116+
github.com/kaleido-io/firefly-transaction-manager v0.0.0-20260421111941-f20dee0fc832 h1:zkq96Tp9IfMa7isUBfXP0OS3gvddx8+vDlnqGDTX+tk=
117+
github.com/kaleido-io/firefly-transaction-manager v0.0.0-20260421111941-f20dee0fc832/go.mod h1:1kbYt8ofDXqfwC02vwV/HoOjmiv0IuT9UkJ//bbrliE=
118118
github.com/karlseguin/ccache v2.0.3+incompatible h1:j68C9tWOROiOLWTS/kCGg9IcJG+ACqn5+0+t8Oh83UU=
119119
github.com/karlseguin/ccache v2.0.3+incompatible/go.mod h1:CM9tNPzT6EdRh14+jiW8mEF9mkNZuuE51qmgGYUB93w=
120120
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=

internal/ethereum/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package ethereum
1919
import (
2020
"github.com/hyperledger/firefly-common/pkg/config"
2121
"github.com/hyperledger/firefly-common/pkg/wsclient"
22+
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
2223
)
2324

2425
const (
2526
ConfigGasEstimationFactor = "gasEstimationFactor"
2627
ConfigDataFormat = "dataFormat"
2728
BlockPollingInterval = "blockPollingInterval"
2829
BlockCacheSize = "blockCacheSize"
30+
BlockTrackingMode = "blockTrackingMode"
2931
EventsCatchupPageSize = "events.catchupPageSize"
3032
EventsCatchupThreshold = "events.catchupThreshold"
3133
EventsCatchupDownscaleRegex = "events.catchupDownscaleRegex"
@@ -70,6 +72,7 @@ func InitConfig(conf config.Section) {
7072
conf.AddKnownKey(WebSocketsEnabled, false)
7173
conf.AddKnownKey(BlockCacheSize, 250)
7274
conf.AddKnownKey(BlockPollingInterval, "1s")
75+
conf.AddKnownKey(BlockTrackingMode, ffcapi.BlockListenerTrackingModeInMemoryPartialChain)
7376
conf.AddKnownKey(ConfigDataFormat, "map")
7477
conf.AddKnownKey(ConfigGasEstimationFactor, DefaultGasEstimationFactor)
7578
conf.AddKnownKey(EventsBlockTimestamps, true)

internal/ethereum/ethereum.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ import (
4343
)
4444

4545
type ethConnector struct {
46-
backend rpcbackend.Backend
47-
wsBackend rpcbackend.WebSocketRPCClient
46+
backend rpcbackend.Backend
47+
wsBackend rpcbackend.WebSocketRPCClient
48+
blockTrackingMode ffcapi.BlockListenerTrackingMode
49+
4850
serializer *abi.Serializer
4951
gasEstimationFactor *big.Float
5052
catchupPageSize int64
@@ -78,6 +80,15 @@ type Connector interface {
7880
}
7981

8082
func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connector, err error) {
83+
84+
blockListenerTrackingMode := ffcapi.BlockListenerTrackingMode(conf.GetString(BlockTrackingMode))
85+
if blockListenerTrackingMode == "" {
86+
blockListenerTrackingMode = ffcapi.BlockListenerTrackingModeInMemoryPartialChain
87+
}
88+
if blockListenerTrackingMode != ffcapi.BlockListenerTrackingModeHeadBlockNumber && blockListenerTrackingMode != ffcapi.BlockListenerTrackingModeInMemoryPartialChain {
89+
return nil, i18n.NewError(ctx, msgs.MsgInvalidBlockListenerTrackingMode, blockListenerTrackingMode)
90+
}
91+
8192
c := &ethConnector{
8293
eventStreams: make(map[fftypes.UUID]*eventStream),
8394
catchupPageSize: conf.GetInt64(EventsCatchupPageSize),
@@ -86,6 +97,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
8697
eventBlockTimestamps: conf.GetBool(EventsBlockTimestamps),
8798
eventFilterPollingInterval: conf.GetDuration(EventsFilterPollingInterval),
8899
traceTXForRevertReason: conf.GetBool(TraceTXForRevertReason),
100+
blockTrackingMode: blockListenerTrackingMode,
89101
retry: retryutil.RetryWrapper{Retry: &retry.Retry{}},
90102
}
91103

@@ -164,6 +176,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
164176
BlockCacheSize: conf.GetInt(BlockCacheSize),
165177
MaxAsyncBlockFetchConcurrency: conf.GetInt(MaxAsyncBlockFetchConcurrency),
166178
UseGetBlockReceipts: conf.GetBool(UseGetBlockReceipts),
179+
TrackingMode: blockListenerTrackingMode,
167180
}, c.backend, c.wsBackend); err != nil {
168181
return nil, err
169182
}
@@ -216,6 +229,7 @@ func (c *ethConnector) ReconcileConfirmationsForTransaction(ctx context.Context,
216229
Rebuilt: ethrpcRes.Rebuilt,
217230
NewFork: ethrpcRes.NewFork,
218231
Confirmed: ethrpcRes.Confirmed,
232+
ActualConfirmationCount: ethrpcRes.ActualConfirmationCount,
219233
TargetConfirmationCount: ethrpcRes.TargetConfirmationCount,
220234
}
221235
if ethrpcReceipt != nil {

internal/ethereum/new_block_listener.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import (
2323
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
2424
)
2525

26+
func (c *ethConnector) GetBlockListenerTrackingMode(_ context.Context) ffcapi.BlockListenerTrackingMode {
27+
return c.blockTrackingMode
28+
}
29+
2630
func (c *ethConnector) NewBlockListener(_ context.Context, req *ffcapi.NewBlockListenerRequest) (*ffcapi.NewBlockListenerResponse, ffcapi.ErrorReason, error) {
2731
// Add the block consumer
2832
c.blockListener.AddConsumer(req.ListenerContext, &ethblocklistener.BlockUpdateConsumer{

internal/msgs/en_config_descriptions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,5 @@ var (
5151
_ = ffc("config.connector.traceTXForRevertReason", "Enable the use of transaction trace functions (e.g. debug_traceTransaction) to obtain transaction revert reasons. This can place a high load on the EVM client.", i18n.BooleanType)
5252
_ = ffc("config.connector.maxAsyncBlockFetchConcurrency", "Maximum concurrency when using asynchronous block downloading (minium 1)", i18n.IntType)
5353
_ = ffc("config.connector.useGetBlockReceipts", "When true, the eth_getBlockReceipts call is available for this connector to use", i18n.BooleanType)
54+
_ = ffc("config.connector.blockTrackingMode", "The block tracking mode for the block listener", "`headBlockNumber` or `inMemoryPartialChain`")
5455
)

internal/msgs/en_error_messages.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,6 @@ var (
8686
MsgUnknownJSONFormatOptions = ffe("FF23066", "JSON formatting option unknown %s=%s")
8787
MsgObservedPanic = ffe("FF23067", "Observed panic: %v")
8888
MsgReturnedBlockHashMismatch = ffe("FF23068", "Returned block %d hash %s does not match requested hash %s")
89+
MsgInvalidBlockListenerTrackingMode = ffe("FF23069", "Invalid block listener tracking mode '%s': must be 'headBlockNumber' or 'inMemoryPartialChain'")
90+
MsgTransactionNotIncludedInChainHead = ffe("FF23070", "Transaction '%s' cannot be reconciled in head-only mode: chain head %d is before receipt block %s")
8991
)

pkg/ethblocklistener/blocklistener.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,23 @@ import (
4949
//
5050
// `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process
5151
type ConfirmationUpdateResult struct {
52-
Confirmations []*ethrpc.MinimalBlockInfo `json:"confirmations,omitempty"`
52+
Confirmations []*ethrpc.MinimalBlockInfo `json:"confirmations,omitempty"` // the confirmation list
5353
Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch
5454
NewFork bool `json:"newFork,omitempty"` // when true, it means a new fork was detected based on the existing confirmations
5555
Confirmed bool `json:"confirmed,omitempty"` // when true, it means the confirmation list is complete and the transaction is confirmed
5656
TargetConfirmationCount uint64 `json:"targetConfirmationCount"` // the target number of confirmations for this reconcile request
57+
ActualConfirmationCount uint64 `json:"actualConfirmationCount"` // the actual number of confirmations for this reconcile request
5758
}
5859

5960
type BlockListenerConfig struct {
60-
MonitoredHeadLength int `json:"monitoredHeadLength"`
61-
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
62-
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
63-
BlockCacheSize int `json:"blockCacheSize"`
64-
IncludeLogsBloom bool `json:"includeLogsBloom"`
65-
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
66-
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
61+
MonitoredHeadLength int `json:"monitoredHeadLength"`
62+
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
63+
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
64+
BlockCacheSize int `json:"blockCacheSize"`
65+
IncludeLogsBloom bool `json:"includeLogsBloom"`
66+
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
67+
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
68+
TrackingMode ffcapi.BlockListenerTrackingMode `json:"trackingMode,omitempty"`
6769
}
6870

6971
type BlockListener interface {
@@ -126,6 +128,9 @@ type blockListener struct {
126128
canonicalChain *list.List
127129
highestBlockSet bool
128130
highestBlock uint64
131+
132+
// headBlockNumber mode: last head value sent on the block listener channel (only written from listenLoop)
133+
currentChainHead uint64
129134
}
130135

131136
func NewBlockListener(ctx context.Context, retry *retry.Retry, conf *BlockListenerConfig, httpConf *ffresty.Config, wsConf *wsclient.WSConfig) (bl BlockListener, err error) {
@@ -141,6 +146,9 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf
141146
if conf.MaxAsyncBlockFetchConcurrency <= 0 {
142147
conf.MaxAsyncBlockFetchConcurrency = 1
143148
}
149+
if conf.TrackingMode == "" {
150+
conf.TrackingMode = ffcapi.BlockListenerTrackingModeInMemoryPartialChain
151+
}
144152
bl := &blockListener{
145153
ctx: log.WithLogField(ctx, "role", "blocklistener"),
146154
retry: &retryutil.RetryWrapper{Retry: retry},
@@ -152,6 +160,7 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf
152160
newHeadsTap: make(chan struct{}),
153161
highestBlockSet: false,
154162
highestBlock: 0,
163+
currentChainHead: 0,
155164
consumers: make(map[fftypes.UUID]*BlockUpdateConsumer),
156165
canonicalChain: list.New(),
157166
blockFetchConcurrencyThrottle: make(chan *blockReceiptRequest, conf.MaxAsyncBlockFetchConcurrency),
@@ -248,6 +257,17 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
248257
})
249258
}
250259

260+
// refreshHighestBlockFromRPC updates highestBlock from eth_blockNumber. Caller must not hold canonicalChainLock.
261+
func (bl *blockListener) refreshHighestBlockFromRPC() (uint64, error) {
262+
var hexBlockHeight ethtypes.HexInteger
263+
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
264+
if rpcErr != nil {
265+
return 0, rpcErr.Error()
266+
}
267+
head := hexBlockHeight.BigInt().Uint64()
268+
return head, nil
269+
}
270+
251271
func (bl *blockListener) waitNextIteration() bool {
252272
select {
253273
case <-bl.ctx.Done():
@@ -313,6 +333,30 @@ func (bl *blockListener) listenLoop() {
313333
}
314334
log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes)
315335

336+
if bl.TrackingMode == ffcapi.BlockListenerTrackingModeHeadBlockNumber {
337+
head, err := bl.refreshHighestBlockFromRPC()
338+
if err != nil {
339+
log.L(bl.ctx).Errorf("Failed to refresh chain head: %s", err)
340+
failCount++
341+
continue
342+
}
343+
if head == bl.currentChainHead {
344+
failCount = 0
345+
continue
346+
}
347+
bl.currentChainHead = head
348+
update := &ffcapi.BlockHashEvent{GapPotential: false, Created: fftypes.Now(), HeadBlockNumber: bl.currentChainHead}
349+
bl.consumerMux.Lock()
350+
consumers := make([]*BlockUpdateConsumer, 0, len(bl.consumers))
351+
for _, c := range bl.consumers {
352+
consumers = append(consumers, c)
353+
}
354+
bl.consumerMux.Unlock()
355+
bl.dispatchToConsumers(consumers, update)
356+
failCount = 0
357+
continue
358+
}
359+
316360
update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()}
317361
var notifyPos *list.Element
318362
for _, h := range blockHashes {
@@ -621,6 +665,10 @@ func (bl *blockListener) GetHighestBlock(ctx context.Context) (uint64, bool) {
621665
return highestBlock, true
622666
}
623667

668+
func (bl *blockListener) GetHeadBlockNumber(_ context.Context) uint64 {
669+
return bl.currentChainHead
670+
}
671+
624672
func (bl *blockListener) setHighestBlock(block uint64) {
625673
bl.canonicalChainLock.Lock()
626674
defer bl.canonicalChainLock.Unlock()

0 commit comments

Comments
 (0)