Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
|password|Password|`string`|`<nil>`
|username|Username|`string`|`<nil>`

## connector.blockListener

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|mode|The mode in which the block listener should run, either canonical or trusted (default: canonical).|`string`|`canonical`

## connector.events

|Key|Description|Type|Default Value|
Expand Down
3 changes: 3 additions & 0 deletions internal/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package ethereum
import (
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-evmconnect/pkg/ethblocklistener"
)

const (
Expand Down Expand Up @@ -47,6 +48,7 @@ const (
HederaCompatibilityMode = "hederaCompatibilityMode"
TraceTXForRevertReason = "traceTXForRevertReason"
WebSocketsEnabled = "ws.enabled"
BlockListenerMode = "blockListener.mode"
MaxAsyncBlockFetchConcurrency = "maxAsyncBlockFetchConcurrency"
UseGetBlockReceipts = "useGetBlockReceipts"
)
Expand Down Expand Up @@ -88,6 +90,7 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(TxCacheSize, 250)
conf.AddKnownKey(HederaCompatibilityMode, false)
conf.AddKnownKey(TraceTXForRevertReason, false)
conf.AddKnownKey(BlockListenerMode, string(ethblocklistener.BlockListenerModeCanonical))
conf.AddKnownKey(MaxAsyncBlockFetchConcurrency, 25)
conf.AddKnownKey(UseGetBlockReceipts, false /* likely consumers of this package will want to set this default to true */)

Expand Down
1 change: 1 addition & 0 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
})

if c.blockListener, err = ethblocklistener.NewBlockListenerSupplyBackend(ctx, c.retry.Retry, &ethblocklistener.BlockListenerConfig{
Mode: ethblocklistener.BlockListenerMode(conf.GetString(BlockListenerMode)),
BlockPollingInterval: conf.GetDuration(BlockPollingInterval),
MonitoredHeadLength: int(c.checkpointBlockGap),
HederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode),
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (
_ = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing")
_ = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", i18n.FloatType)
_ = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType)
_ = ffc("config.connector.blockListener.mode", "The mode in which the block listener should run, either canonical or trusted (default: canonical).", i18n.StringType)
_ = ffc("config.connector.blockPollingInterval", "Interval for polling to check for new blocks", i18n.TimeDurationType)
_ = ffc("config.connector.queryLoopRetry.initialDelay", "Initial delay for retrying query requests to the RPC endpoint, applicable to all the query loops", i18n.TimeDurationType)
_ = ffc("config.connector.queryLoopRetry.factor", "Factor to increase the delay by, between each query request retry to the RPC endpoint, applicable to all the query loops", i18n.FloatType)
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,5 @@ var (
MsgUnknownJSONFormatOptions = ffe("FF23066", "JSON formatting option unknown %s=%s")
MsgObservedPanic = ffe("FF23067", "Observed panic: %v")
MsgReturnedBlockHashMismatch = ffe("FF23068", "Returned block %d hash %s does not match requested hash %s")
MsgMethodNotAvailableInTrustedMode = ffe("FF23069", "%s is not available in trusted block listener mode")
)
4 changes: 4 additions & 0 deletions pkg/ethblocklistener/block_receipt_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type blockReceiptRequest struct {
// Blocks if throttled.
// Delivers an error if the block is not found.
func (bl *blockListener) FetchBlockReceiptsAsync(blockNumber uint64, blockHash ethtypes.HexBytes0xPrefix, cb func([]*ethrpc.TxReceiptJSONRPC, error)) {
if bl.Mode == BlockListenerModeTrusted {
cb(nil, i18n.NewError(bl.ctx, msgs.MsgMethodNotAvailableInTrustedMode, "FetchBlockReceiptsAsync"))
return
}
brr := &blockReceiptRequest{
bl: bl,
blockNumber: ethtypes.HexUint64(blockNumber),
Expand Down
36 changes: 28 additions & 8 deletions pkg/ethblocklistener/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,30 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

type BlockListenerMode string

const (
// BlockListenerModeCanonical ensures the listener builds and maintains the canonical chain
// in memory, fetching full block receipts and headers to verify chain integrity and adapt
// to potential re-orgs before notifying event streams and reconciling transaction confirmations.
BlockListenerModeCanonical BlockListenerMode = "canonical"

// BlockListenerModeTrusted runs the listener in a minimal fashion for a permissioned BFT chain where
// finality is immediate, and re-orgs are not possible (or at least not expected). The latest block heigt
// from nodes is trusted, and only block headers are received via filters, allowing for running against limited
// access (prividium) JSONRPC endpoints.
BlockListenerModeTrusted BlockListenerMode = "trusted"
)

type BlockListenerConfig struct {
MonitoredHeadLength int `json:"monitoredHeadLength"`
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
BlockCacheSize int `json:"blockCacheSize"`
IncludeLogsBloom bool `json:"includeLogsBloom"`
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
Mode BlockListenerMode `json:"mode"`
MonitoredHeadLength int `json:"monitoredHeadLength"`
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
BlockCacheSize int `json:"blockCacheSize"`
IncludeLogsBloom bool `json:"includeLogsBloom"`
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
}

type BlockListener interface {
Expand Down Expand Up @@ -562,7 +578,11 @@ func (bl *blockListener) checkAndStartListenerLoop() {
defer bl.consumerMux.Unlock()
if bl.listenLoopDone == nil {
bl.listenLoopDone = make(chan struct{})
go bl.listenLoop()
if bl.Mode == BlockListenerModeTrusted {
go bl.trustedListenLoop()
} else {
go bl.listenLoop()
}
}
}

Expand Down
25 changes: 25 additions & 0 deletions pkg/ethblocklistener/blocklistener_blockquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ import (
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
)

func (bl *blockListener) checkTrustedModeGuard(ctx context.Context, methodName string) error {
if bl.Mode == BlockListenerModeTrusted {
return i18n.NewError(ctx, msgs.MsgMethodNotAvailableInTrustedMode, methodName)
}
return nil
}

func (bl *blockListener) addToBlockCache(blockInfo *ethrpc.BlockInfoJSONRPC) {
bl.blockCache.Add(blockInfo.Hash.String(), blockInfo)
bl.blockCache.Add(blockInfo.Number.String(), blockInfo)
Expand Down Expand Up @@ -69,6 +76,9 @@ func (bl *blockListener) GetTransactionReceipt(ctx context.Context, txHash strin
}

func (bl *blockListener) GetBlockInfoByNumber(ctx context.Context, blockNumber uint64, allowCache bool, expectedParentHashStr string, expectedBlockHashStr string) (*ethrpc.BlockInfoJSONRPC, error) {
if err := bl.checkTrustedModeGuard(ctx, "GetBlockInfoByNumber"); err != nil {
return nil, err
}
hexBlockNumber := ethtypes.HexUint64(blockNumber)
var blockInfo *ethrpc.BlockInfoJSONRPC
if allowCache {
Expand All @@ -94,6 +104,9 @@ func (bl *blockListener) GetBlockInfoByNumber(ctx context.Context, blockNumber u
}

func (bl *blockListener) GetBlockInfoByHash(ctx context.Context, hash0xString string) (*ethrpc.BlockInfoJSONRPC, error) {
if err := bl.checkTrustedModeGuard(ctx, "GetBlockInfoByHash"); err != nil {
return nil, err
}
var blockInfo *ethrpc.BlockInfoJSONRPC // the minimal set we cache
cached, ok := bl.blockCache.Get(hash0xString)
if ok {
Expand All @@ -113,6 +126,9 @@ func (bl *blockListener) GetBlockInfoByHash(ctx context.Context, hash0xString st

// Does not use cache, but will add to cache
func (bl *blockListener) GetEVMBlockWithTxHashesByHash(ctx context.Context, hash0xString string) (b *ethrpc.EVMBlockWithTxHashesJSONRPC, err error) {
if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTxHashesByHash"); err != nil {
return nil, err
}
rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByHash", hash0xString, false /* only the txn hashes */)
if rpcErr != nil {
return nil, rpcErr.Error()
Expand All @@ -125,6 +141,9 @@ func (bl *blockListener) GetEVMBlockWithTxHashesByHash(ctx context.Context, hash

// Does not use cache, but will add to cache
func (bl *blockListener) GetEVMBlockWithTransactionsByHash(ctx context.Context, hash0xString string) (b *ethrpc.EVMBlockWithTransactionsJSONRPC, err error) {
if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTransactionsByHash"); err != nil {
return nil, err
}
rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByHash", hash0xString, true /* full blocks */)
if rpcErr != nil {
return nil, rpcErr.Error()
Expand All @@ -137,6 +156,9 @@ func (bl *blockListener) GetEVMBlockWithTransactionsByHash(ctx context.Context,

// Does not use cache, but will add to cache
func (bl *blockListener) GetEVMBlockWithTxHashesByNumber(ctx context.Context, numberLookup string) (b *ethrpc.EVMBlockWithTxHashesJSONRPC, err error) {
if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTxHashesByNumber"); err != nil {
return nil, err
}
rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByNumber", numberLookup, false /* only the txn hashes */)
if rpcErr != nil {
return nil, rpcErr.Error()
Expand All @@ -149,6 +171,9 @@ func (bl *blockListener) GetEVMBlockWithTxHashesByNumber(ctx context.Context, nu

// Does not use cache, but will add to cache
func (bl *blockListener) GetEVMBlockWithTransactionsByNumber(ctx context.Context, numberLookup string) (b *ethrpc.EVMBlockWithTransactionsJSONRPC, err error) {
if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTransactionsByNumber"); err != nil {
return nil, err
}
rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByNumber", numberLookup, true /* full blocks */)
if rpcErr != nil {
return nil, rpcErr.Error()
Expand Down
119 changes: 119 additions & 0 deletions pkg/ethblocklistener/blocklistener_trusted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright © 2026 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethblocklistener

import (
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-evmconnect/pkg/etherrors"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

// trustedListenLoop is the listen loop for BlockListenerModeTrusted.
//
// It uses the same block filter (eth_newBlockFilter / eth_getFilterChanges) and
// optional WS newHeads subscription as the canonical listener for wake-up
// signals, but does NOT resolve block hashes to headers. Instead it calls only
// eth_blockNumber to track chain height and dispatches the raw block hashes to
// consumers.
//
// RPC footprint: eth_blockNumber, eth_newBlockFilter, eth_getFilterChanges,
// optional eth_subscribe("newHeads"). Zero eth_getBlockByHash / eth_getBlockByNumber.
func (bl *blockListener) trustedListenLoop() {
defer close(bl.listenLoopDone)

err := bl.establishBlockHeightWithRetry()
close(bl.initialBlockHeightObtained)
if err != nil {
log.L(bl.ctx).Warnf("Block listener exiting before establishing initial block height: %s", err)
return
}

var filter string
failCount := 0
gapPotential := true
firstIteration := true
for {
switch {
case failCount > 0:
if bl.retry.DoFailureDelay(bl.ctx, failCount) {
log.L(bl.ctx).Debugf("Trusted block listener loop exiting")
return
}
case !firstIteration:
if !bl.waitNextIteration() {
log.L(bl.ctx).Debugf("Trusted block listener loop stopping")
return
}
default:
firstIteration = false
}

if filter == "" {
err := bl.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter")
if err != nil {
log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message)
failCount++
continue
}
bl.markStarted()
}

var blockHashes []ethtypes.HexBytes0xPrefix
rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter)
if rpcErr != nil {
if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message)
filter = ""
gapPotential = true
}
log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message)
failCount++
continue
}

// Query the chain head — this is the only way we track height in trusted mode.
var hexBlockHeight ethtypes.HexInteger
rpcErr = bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
log.L(bl.ctx).Errorf("Failed to query block height: %s", rpcErr.Message)
failCount++
continue
}
bl.setHighestBlock(hexBlockHeight.BigInt().Uint64())

if len(blockHashes) > 0 {
update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()}
for _, h := range blockHashes {
update.BlockHashes = append(update.BlockHashes, h.String())
}

bl.consumerMux.Lock()
consumers := make([]*BlockUpdateConsumer, 0, len(bl.consumers))
for _, c := range bl.consumers {
consumers = append(consumers, c)
}
bl.consumerMux.Unlock()

bl.dispatchToConsumers(consumers, update)
}

failCount = 0
gapPotential = false
}
}
Loading
Loading