diff --git a/config.md b/config.md index 9abc6df..aec46d5 100644 --- a/config.md +++ b/config.md @@ -94,6 +94,12 @@ |password|Password|`string`|`` |username|Username|`string`|`` +## connector.blockListener + +|Key|Description|Type|Default Value| +|---|-----------|----|-------------| +|mode|The mode in which the block listener should run, either canonical or trusted (default: canonical).|`string`|`canonical` + ## connector.events |Key|Description|Type|Default Value| diff --git a/internal/ethereum/config.go b/internal/ethereum/config.go index 1fb14c4..c0d17a9 100644 --- a/internal/ethereum/config.go +++ b/internal/ethereum/config.go @@ -19,6 +19,7 @@ package ethereum import ( "github.com/hyperledger/firefly-common/pkg/config" "github.com/hyperledger/firefly-common/pkg/wsclient" + "github.com/hyperledger/firefly-evmconnect/pkg/ethblocklistener" ) const ( @@ -47,6 +48,7 @@ const ( HederaCompatibilityMode = "hederaCompatibilityMode" TraceTXForRevertReason = "traceTXForRevertReason" WebSocketsEnabled = "ws.enabled" + BlockListenerMode = "blockListener.mode" MaxAsyncBlockFetchConcurrency = "maxAsyncBlockFetchConcurrency" UseGetBlockReceipts = "useGetBlockReceipts" ) @@ -88,6 +90,7 @@ func InitConfig(conf config.Section) { conf.AddKnownKey(TxCacheSize, 250) conf.AddKnownKey(HederaCompatibilityMode, false) conf.AddKnownKey(TraceTXForRevertReason, false) + conf.AddKnownKey(BlockListenerMode, string(ethblocklistener.BlockListenerModeCanonical)) conf.AddKnownKey(MaxAsyncBlockFetchConcurrency, 25) conf.AddKnownKey(UseGetBlockReceipts, false /* likely consumers of this package will want to set this default to true */) diff --git a/internal/ethereum/ethereum.go b/internal/ethereum/ethereum.go index 356fdc8..1462419 100644 --- a/internal/ethereum/ethereum.go +++ b/internal/ethereum/ethereum.go @@ -156,6 +156,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto }) if c.blockListener, err = ethblocklistener.NewBlockListenerSupplyBackend(ctx, c.retry.Retry, ðblocklistener.BlockListenerConfig{ + Mode: ethblocklistener.BlockListenerMode(conf.GetString(BlockListenerMode)), BlockPollingInterval: conf.GetDuration(BlockPollingInterval), MonitoredHeadLength: int(c.checkpointBlockGap), HederaCompatibilityMode: conf.GetBool(HederaCompatibilityMode), diff --git a/internal/msgs/en_config_descriptions.go b/internal/msgs/en_config_descriptions.go index cd8f45b..4bda103 100644 --- a/internal/msgs/en_config_descriptions.go +++ b/internal/msgs/en_config_descriptions.go @@ -32,6 +32,7 @@ var ( _ = ffc("config.connector.dataFormat", "Configure the JSON data format for query output and events", "map,flat_array,self_describing") _ = ffc("config.connector.gasEstimationFactor", "The factor to apply to the gas estimation to determine the gas limit", i18n.FloatType) _ = ffc("config.connector.blockCacheSize", "Maximum of blocks to hold in the block info cache", i18n.IntType) + _ = ffc("config.connector.blockListener.mode", "The mode in which the block listener should run, either canonical or trusted (default: canonical).", i18n.StringType) _ = ffc("config.connector.blockPollingInterval", "Interval for polling to check for new blocks", i18n.TimeDurationType) _ = ffc("config.connector.queryLoopRetry.initialDelay", "Initial delay for retrying query requests to the RPC endpoint, applicable to all the query loops", i18n.TimeDurationType) _ = ffc("config.connector.queryLoopRetry.factor", "Factor to increase the delay by, between each query request retry to the RPC endpoint, applicable to all the query loops", i18n.FloatType) diff --git a/internal/msgs/en_error_messages.go b/internal/msgs/en_error_messages.go index 08b32bf..819c01d 100644 --- a/internal/msgs/en_error_messages.go +++ b/internal/msgs/en_error_messages.go @@ -86,4 +86,5 @@ var ( MsgUnknownJSONFormatOptions = ffe("FF23066", "JSON formatting option unknown %s=%s") MsgObservedPanic = ffe("FF23067", "Observed panic: %v") MsgReturnedBlockHashMismatch = ffe("FF23068", "Returned block %d hash %s does not match requested hash %s") + MsgMethodNotAvailableInTrustedMode = ffe("FF23069", "%s is not available in trusted block listener mode") ) diff --git a/pkg/ethblocklistener/block_receipt_fetcher.go b/pkg/ethblocklistener/block_receipt_fetcher.go index b27e103..dffaa2e 100644 --- a/pkg/ethblocklistener/block_receipt_fetcher.go +++ b/pkg/ethblocklistener/block_receipt_fetcher.go @@ -37,6 +37,10 @@ type blockReceiptRequest struct { // Blocks if throttled. // Delivers an error if the block is not found. func (bl *blockListener) FetchBlockReceiptsAsync(blockNumber uint64, blockHash ethtypes.HexBytes0xPrefix, cb func([]*ethrpc.TxReceiptJSONRPC, error)) { + if bl.Mode == BlockListenerModeTrusted { + cb(nil, i18n.NewError(bl.ctx, msgs.MsgMethodNotAvailableInTrustedMode, "FetchBlockReceiptsAsync")) + return + } brr := &blockReceiptRequest{ bl: bl, blockNumber: ethtypes.HexUint64(blockNumber), diff --git a/pkg/ethblocklistener/blocklistener.go b/pkg/ethblocklistener/blocklistener.go index 0259615..e64c512 100644 --- a/pkg/ethblocklistener/blocklistener.go +++ b/pkg/ethblocklistener/blocklistener.go @@ -38,14 +38,30 @@ import ( "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" ) +type BlockListenerMode string + +const ( + // BlockListenerModeCanonical ensures the listener builds and maintains the canonical chain + // in memory, fetching full block receipts and headers to verify chain integrity and adapt + // to potential re-orgs before notifying event streams and reconciling transaction confirmations. + BlockListenerModeCanonical BlockListenerMode = "canonical" + + // BlockListenerModeTrusted runs the listener in a minimal fashion for a permissioned BFT chain where + // finality is immediate, and re-orgs are not possible (or at least not expected). The latest block heigt + // from nodes is trusted, and only block headers are received via filters, allowing for running against limited + // access (prividium) JSONRPC endpoints. + BlockListenerModeTrusted BlockListenerMode = "trusted" +) + type BlockListenerConfig struct { - MonitoredHeadLength int `json:"monitoredHeadLength"` - BlockPollingInterval time.Duration `json:"blockPollingInterval"` - HederaCompatibilityMode bool `json:"hederaCompatibilityMode"` - BlockCacheSize int `json:"blockCacheSize"` - IncludeLogsBloom bool `json:"includeLogsBloom"` - UseGetBlockReceipts bool `json:"useGetBlockReceipts"` - MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"` + Mode BlockListenerMode `json:"mode"` + MonitoredHeadLength int `json:"monitoredHeadLength"` + BlockPollingInterval time.Duration `json:"blockPollingInterval"` + HederaCompatibilityMode bool `json:"hederaCompatibilityMode"` + BlockCacheSize int `json:"blockCacheSize"` + IncludeLogsBloom bool `json:"includeLogsBloom"` + UseGetBlockReceipts bool `json:"useGetBlockReceipts"` + MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"` } type BlockListener interface { @@ -562,7 +578,11 @@ func (bl *blockListener) checkAndStartListenerLoop() { defer bl.consumerMux.Unlock() if bl.listenLoopDone == nil { bl.listenLoopDone = make(chan struct{}) - go bl.listenLoop() + if bl.Mode == BlockListenerModeTrusted { + go bl.trustedListenLoop() + } else { + go bl.listenLoop() + } } } diff --git a/pkg/ethblocklistener/blocklistener_blockquery.go b/pkg/ethblocklistener/blocklistener_blockquery.go index 37f23c4..3fce26d 100644 --- a/pkg/ethblocklistener/blocklistener_blockquery.go +++ b/pkg/ethblocklistener/blocklistener_blockquery.go @@ -26,6 +26,13 @@ import ( "github.com/hyperledger/firefly-signer/pkg/ethtypes" ) +func (bl *blockListener) checkTrustedModeGuard(ctx context.Context, methodName string) error { + if bl.Mode == BlockListenerModeTrusted { + return i18n.NewError(ctx, msgs.MsgMethodNotAvailableInTrustedMode, methodName) + } + return nil +} + func (bl *blockListener) addToBlockCache(blockInfo *ethrpc.BlockInfoJSONRPC) { bl.blockCache.Add(blockInfo.Hash.String(), blockInfo) bl.blockCache.Add(blockInfo.Number.String(), blockInfo) @@ -69,6 +76,9 @@ func (bl *blockListener) GetTransactionReceipt(ctx context.Context, txHash strin } func (bl *blockListener) GetBlockInfoByNumber(ctx context.Context, blockNumber uint64, allowCache bool, expectedParentHashStr string, expectedBlockHashStr string) (*ethrpc.BlockInfoJSONRPC, error) { + if err := bl.checkTrustedModeGuard(ctx, "GetBlockInfoByNumber"); err != nil { + return nil, err + } hexBlockNumber := ethtypes.HexUint64(blockNumber) var blockInfo *ethrpc.BlockInfoJSONRPC if allowCache { @@ -94,6 +104,9 @@ func (bl *blockListener) GetBlockInfoByNumber(ctx context.Context, blockNumber u } func (bl *blockListener) GetBlockInfoByHash(ctx context.Context, hash0xString string) (*ethrpc.BlockInfoJSONRPC, error) { + if err := bl.checkTrustedModeGuard(ctx, "GetBlockInfoByHash"); err != nil { + return nil, err + } var blockInfo *ethrpc.BlockInfoJSONRPC // the minimal set we cache cached, ok := bl.blockCache.Get(hash0xString) if ok { @@ -113,6 +126,9 @@ func (bl *blockListener) GetBlockInfoByHash(ctx context.Context, hash0xString st // Does not use cache, but will add to cache func (bl *blockListener) GetEVMBlockWithTxHashesByHash(ctx context.Context, hash0xString string) (b *ethrpc.EVMBlockWithTxHashesJSONRPC, err error) { + if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTxHashesByHash"); err != nil { + return nil, err + } rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByHash", hash0xString, false /* only the txn hashes */) if rpcErr != nil { return nil, rpcErr.Error() @@ -125,6 +141,9 @@ func (bl *blockListener) GetEVMBlockWithTxHashesByHash(ctx context.Context, hash // Does not use cache, but will add to cache func (bl *blockListener) GetEVMBlockWithTransactionsByHash(ctx context.Context, hash0xString string) (b *ethrpc.EVMBlockWithTransactionsJSONRPC, err error) { + if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTransactionsByHash"); err != nil { + return nil, err + } rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByHash", hash0xString, true /* full blocks */) if rpcErr != nil { return nil, rpcErr.Error() @@ -137,6 +156,9 @@ func (bl *blockListener) GetEVMBlockWithTransactionsByHash(ctx context.Context, // Does not use cache, but will add to cache func (bl *blockListener) GetEVMBlockWithTxHashesByNumber(ctx context.Context, numberLookup string) (b *ethrpc.EVMBlockWithTxHashesJSONRPC, err error) { + if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTxHashesByNumber"); err != nil { + return nil, err + } rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByNumber", numberLookup, false /* only the txn hashes */) if rpcErr != nil { return nil, rpcErr.Error() @@ -149,6 +171,9 @@ func (bl *blockListener) GetEVMBlockWithTxHashesByNumber(ctx context.Context, nu // Does not use cache, but will add to cache func (bl *blockListener) GetEVMBlockWithTransactionsByNumber(ctx context.Context, numberLookup string) (b *ethrpc.EVMBlockWithTransactionsJSONRPC, err error) { + if err := bl.checkTrustedModeGuard(ctx, "GetEVMBlockWithTransactionsByNumber"); err != nil { + return nil, err + } rpcErr := bl.backend.CallRPC(ctx, &b, "eth_getBlockByNumber", numberLookup, true /* full blocks */) if rpcErr != nil { return nil, rpcErr.Error() diff --git a/pkg/ethblocklistener/blocklistener_trusted.go b/pkg/ethblocklistener/blocklistener_trusted.go new file mode 100644 index 0000000..7b3f5d1 --- /dev/null +++ b/pkg/ethblocklistener/blocklistener_trusted.go @@ -0,0 +1,119 @@ +// Copyright © 2026 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethblocklistener + +import ( + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-common/pkg/log" + "github.com/hyperledger/firefly-evmconnect/pkg/etherrors" + "github.com/hyperledger/firefly-signer/pkg/ethtypes" + "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" +) + +// trustedListenLoop is the listen loop for BlockListenerModeTrusted. +// +// It uses the same block filter (eth_newBlockFilter / eth_getFilterChanges) and +// optional WS newHeads subscription as the canonical listener for wake-up +// signals, but does NOT resolve block hashes to headers. Instead it calls only +// eth_blockNumber to track chain height and dispatches the raw block hashes to +// consumers. +// +// RPC footprint: eth_blockNumber, eth_newBlockFilter, eth_getFilterChanges, +// optional eth_subscribe("newHeads"). Zero eth_getBlockByHash / eth_getBlockByNumber. +func (bl *blockListener) trustedListenLoop() { + defer close(bl.listenLoopDone) + + err := bl.establishBlockHeightWithRetry() + close(bl.initialBlockHeightObtained) + if err != nil { + log.L(bl.ctx).Warnf("Block listener exiting before establishing initial block height: %s", err) + return + } + + var filter string + failCount := 0 + gapPotential := true + firstIteration := true + for { + switch { + case failCount > 0: + if bl.retry.DoFailureDelay(bl.ctx, failCount) { + log.L(bl.ctx).Debugf("Trusted block listener loop exiting") + return + } + case !firstIteration: + if !bl.waitNextIteration() { + log.L(bl.ctx).Debugf("Trusted block listener loop stopping") + return + } + default: + firstIteration = false + } + + if filter == "" { + err := bl.backend.CallRPC(bl.ctx, &filter, "eth_newBlockFilter") + if err != nil { + log.L(bl.ctx).Errorf("Failed to establish new block filter: %s", err.Message) + failCount++ + continue + } + bl.markStarted() + } + + var blockHashes []ethtypes.HexBytes0xPrefix + rpcErr := bl.backend.CallRPC(bl.ctx, &blockHashes, "eth_getFilterChanges", filter) + if rpcErr != nil { + if etherrors.MapError(etherrors.FilterRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound { + log.L(bl.ctx).Warnf("Block filter '%v' no longer valid. Recreating filter: %s", filter, rpcErr.Message) + filter = "" + gapPotential = true + } + log.L(bl.ctx).Errorf("Failed to query block filter changes: %s", rpcErr.Message) + failCount++ + continue + } + + // Query the chain head — this is the only way we track height in trusted mode. + var hexBlockHeight ethtypes.HexInteger + rpcErr = bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber") + if rpcErr != nil { + log.L(bl.ctx).Errorf("Failed to query block height: %s", rpcErr.Message) + failCount++ + continue + } + bl.setHighestBlock(hexBlockHeight.BigInt().Uint64()) + + if len(blockHashes) > 0 { + update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()} + for _, h := range blockHashes { + update.BlockHashes = append(update.BlockHashes, h.String()) + } + + bl.consumerMux.Lock() + consumers := make([]*BlockUpdateConsumer, 0, len(bl.consumers)) + for _, c := range bl.consumers { + consumers = append(consumers, c) + } + bl.consumerMux.Unlock() + + bl.dispatchToConsumers(consumers, update) + } + + failCount = 0 + gapPotential = false + } +} diff --git a/pkg/ethblocklistener/blocklistener_trusted_test.go b/pkg/ethblocklistener/blocklistener_trusted_test.go new file mode 100644 index 0000000..f00197b --- /dev/null +++ b/pkg/ethblocklistener/blocklistener_trusted_test.go @@ -0,0 +1,413 @@ +// Copyright © 2026 Kaleido, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ethblocklistener + +import ( + "context" + "testing" + "time" + + "github.com/hyperledger/firefly-common/pkg/fftypes" + "github.com/hyperledger/firefly-evmconnect/mocks/rpcbackendmocks" + "github.com/hyperledger/firefly-evmconnect/pkg/ethrpc" + "github.com/hyperledger/firefly-signer/pkg/ethtypes" + "github.com/hyperledger/firefly-signer/pkg/rpcbackend" + "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func newTestTrustedBlockListener(t *testing.T, confSetup ...func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc)) (context.Context, *blockListener, *rpcbackendmocks.Backend, func()) { + return newTestBlockListener(t, append([]func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc){ + func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.Mode = BlockListenerModeTrusted + }, + }, confSetup...)...) +} + +func TestTrustedListenLoopOKSequential(t *testing.T) { + block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1002Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + block1003Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + + startLatch := newTestLatch() + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + startLatch.waitComplete() + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{block1001Hash, block1002Hash} + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1002) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{block1003Hash} + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1003) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1003) + }) + }) + + updates := make(chan *ffcapi.BlockHashEvent) + bl.AddConsumer(context.Background(), &BlockUpdateConsumer{ + ID: fftypes.NewUUID(), + Ctx: context.Background(), + Updates: updates, + }) + startLatch.complete() + + bu := <-updates + assert.True(t, bu.GapPotential) + assert.Equal(t, []string{block1001Hash.String(), block1002Hash.String()}, bu.BlockHashes) + + bu = <-updates + assert.False(t, bu.GapPotential) + assert.Equal(t, []string{block1003Hash.String()}, bu.BlockHashes) + + done() + <-bl.listenLoopDone + assert.Equal(t, uint64(1003), bl.highestBlock) + assert.Empty(t, bl.SnapshotMonitoredHeadChain()) + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopNewBlockFilterFail(t *testing.T) { + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter"). + Return(&rpcbackend.RPCError{Message: "pop"}). + Run(func(args mock.Arguments) { + go cancelCtx() + }) + }) + + bl.checkAndStartListenerLoop() + bl.WaitClosed() + done() + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopFilterChangesFailAndRecover(t *testing.T) { + block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + + waitForBlock := newTestLatch() + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1). + Return(&rpcbackend.RPCError{Message: "server error"}).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{block1001Hash} + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1001) + waitForBlock.complete() + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1001) + }) + }) + + bl.checkAndStartListenerLoop() + waitForBlock.waitComplete() + assert.Equal(t, uint64(1001), bl.highestBlock) + + done() + <-bl.listenLoopDone + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopFilterNotFoundRecreate(t *testing.T) { + block1001Hash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + + waitForBlock := newTestLatch() + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1). + Return(&rpcbackend.RPCError{Message: "filter not found"}).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID2 + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID2).Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*[]ethtypes.HexBytes0xPrefix) + *hbh = []ethtypes.HexBytes0xPrefix{block1001Hash} + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1001) + waitForBlock.complete() + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1001) + }) + }) + + bl.checkAndStartListenerLoop() + waitForBlock.waitComplete() + assert.Equal(t, uint64(1001), bl.highestBlock) + + done() + <-bl.listenLoopDone + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopBlockNumberFail(t *testing.T) { + waitRetried := newTestLatch() + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil) + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). + Return(&rpcbackend.RPCError{Message: "node down"}).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1001) + waitRetried.complete() + }) + }) + + bl.checkAndStartListenerLoop() + waitRetried.waitComplete() + assert.Equal(t, uint64(1001), bl.highestBlock) + + done() + <-bl.listenLoopDone + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopExitOnContextCancelDuringEstablish(t *testing.T) { + _, bl, mRPC, done := newTestTrustedBlockListener(t) + done() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber"). + Return(&rpcbackend.RPCError{Message: "pop"}).Once() + + h, ok := bl.GetHighestBlock(bl.ctx) + assert.False(t, ok) + assert.Equal(t, uint64(0), h) + + <-bl.listenLoopDone + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopEmptyFilterChanges(t *testing.T) { + waitEmpty := newTestLatch() + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = shortDelay + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + // no hashes returned + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + waitEmpty.complete() + }).Once() + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }) + }) + + bl.checkAndStartListenerLoop() + waitEmpty.waitComplete() + + done() + <-bl.listenLoopDone + assert.Equal(t, uint64(1000), bl.highestBlock) + mRPC.AssertExpectations(t) +} + +func TestTrustedListenLoopExitOnContextCancelDuringWait(t *testing.T) { + _, bl, mRPC, done := newTestTrustedBlockListener(t, func(conf *BlockListenerConfig, mRPC *rpcbackendmocks.Backend, cancelCtx context.CancelFunc) { + conf.BlockPollingInterval = 1 * time.Hour + + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*string) + *hbh = testBlockFilterID1 + }) + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", testBlockFilterID1).Return(nil).Run(func(args mock.Arguments) { + go func() { + time.Sleep(50 * time.Millisecond) + cancelCtx() + }() + }).Once() + mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) { + hbh := args[1].(*ethtypes.HexInteger) + *hbh = *ethtypes.NewHexIntegerU64(1000) + }) + }) + + bl.checkAndStartListenerLoop() + <-bl.listenLoopDone + done() + mRPC.AssertExpectations(t) +} + +// --- Guard tests for trusted mode --- + +func TestTrustedModeGuardGetBlockInfoByNumber(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetBlockInfoByNumber(ctx, 1000, false, "", "") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardGetBlockInfoByHash(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetBlockInfoByHash(ctx, "0xabc") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardGetEVMBlockWithTxHashesByHash(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetEVMBlockWithTxHashesByHash(ctx, "0xabc") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardGetEVMBlockWithTransactionsByHash(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetEVMBlockWithTransactionsByHash(ctx, "0xabc") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardGetEVMBlockWithTxHashesByNumber(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetEVMBlockWithTxHashesByNumber(ctx, "0x1") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardGetEVMBlockWithTransactionsByNumber(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, err := bl.GetEVMBlockWithTransactionsByNumber(ctx, "0x1") + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardReconcileConfirmationsForTransaction(t *testing.T) { + ctx, bl, _, done := newTestTrustedBlockListener(t) + defer done() + _, _, err := bl.ReconcileConfirmationsForTransaction(ctx, "0xabc", nil, 10) + require.Regexp(t, "FF23069", err) +} + +func TestTrustedModeGuardFetchBlockReceiptsAsync(t *testing.T) { + _, bl, _, done := newTestTrustedBlockListener(t) + defer done() + + blockHash := ethtypes.MustNewHexBytes0xPrefix(fftypes.NewRandB32().String()) + fetched := make(chan struct{}) + bl.FetchBlockReceiptsAsync(1000, blockHash, func(receipts []*ethrpc.TxReceiptJSONRPC, err error) { + defer close(fetched) + assert.Regexp(t, "FF23069", err) + assert.Nil(t, receipts) + }) + <-fetched +} + +func TestTrustedModeSnapshotMonitoredHeadChainReturnsEmpty(t *testing.T) { + _, bl, _, done := newTestTrustedBlockListener(t) + defer done() + assert.Empty(t, bl.SnapshotMonitoredHeadChain()) +} diff --git a/pkg/ethblocklistener/confirmation_reconciler.go b/pkg/ethblocklistener/confirmation_reconciler.go index 7528363..6b83397 100644 --- a/pkg/ethblocklistener/confirmation_reconciler.go +++ b/pkg/ethblocklistener/confirmation_reconciler.go @@ -45,6 +45,9 @@ func ffcapiToBlockInfoList(ffcapiBlocks []*ffcapi.MinimalBlockInfo) (blocks []*e // // For historical reasons this interface is FFCAPI derived MinimalBlockInfo in/out, rather than direct. func (bl *blockListener) ReconcileConfirmationsForTransaction(ctx context.Context, txHash string, ffcapiExistingConfirmations []*ffcapi.MinimalBlockInfo, targetConfirmationCount uint64) (*ffcapi.ConfirmationUpdateResult, *ethrpc.TxReceiptJSONRPC, error) { + if err := bl.checkTrustedModeGuard(ctx, "ReconcileConfirmationsForTransaction"); err != nil { + return nil, nil, err + } existingConfirmations, err := ffcapiToBlockInfoList(ffcapiExistingConfirmations) if err != nil {