Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,58 @@ For a full list of configuration options see [config.md](./config.md)

```yaml
connectors:
- type: ethereum
server:
port: 5102
ethereum:
url: http://localhost:8545
- type: ethereum
server:
port: 5102
ethereum:
url: http://localhost:8545
```

## Blockchain node compatibility

For EVM connector to function properly, you should check the blockchain node supports the following JSON-RPC Methods over HTTP:

### Event tracking

- `eth_blockNumber`
- `eth_newBlockFilter`
- `eth_getFilterLogs`
- `eth_getFilterChanges`
- `eth_getBlockByHash`
- `eth_getLogs`
- `eth_newFilter`
- `eth_uninstallFilter`
- `eth_getTransactionByHash`
- `eth_getTransactionReceipt`

### Query

- `eth_call`
- `eth_getBalance`
- `eth_gasPrice`[^1]
- `eth_gasPrice`

### Transaction submission

- `eth_estimateGas`
- `eth_sendTransaction`
- `eth_sendTransaction` / `eth_sendRawTransaction`
- `eth_getTransactionCount`
- `eth_sendRawTransaction`[^2]

### Optional methods
Comment thread
peterbroadhurst marked this conversation as resolved.

#### Transaction tracing

> Required when [connector.traceTXForRevertReason](./config.md#connector) is set to `true` (default: `false`)

- `debug_traceTransaction`

#### Block listener

> Required when [connector.blockTrackingMode](./config.md#connector) is set to `inMemoryPartialChain`

- `eth_getBlockByHash`
- `eth_getBlockByNumber`

#### Receipt fetching

[^1]: also used by Transaction submission if the handler is configured to get gas price using "connector".
> Required when [connector.useGetBlockReceipts](./config.md#connector) is set to `true`

[^2]: only required by custom transaction handlers that supports pre-signing.
- `eth_getBlockReceipts`
1 change: 1 addition & 0 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
|---|-----------|----|-------------|
|blockCacheSize|Maximum of blocks to hold in the block info cache|`int`|`250`
|blockPollingInterval|Interval for polling to check for new blocks|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
|chainTrackingMode|Tracking mode for connector block progression. light: fetches head block numbers only, does not download block details, disables block listener support, and confirmation results include only the confirmation count. full: fetches head block numbers and block details, maintains an in-memory partial chain, enables block listener support, and confirmation results include both confirmation count and block details.|`light` or `full`|`full`
|connectionTimeout|The maximum amount of time that a connection is allowed to remain with no data transmitted|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
|dataFormat|Configure the JSON data format for query output and events|map,flat_array,self_describing|`map`
|expectContinueTimeout|See [ExpectContinueTimeout in the Go docs](https://pkg.go.dev/net/http#Transport)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`1s`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/hashicorp/golang-lru v1.0.2
github.com/hyperledger/firefly-common v1.5.9
github.com/hyperledger/firefly-signer v1.1.23-0.20260422080826-42345c6c6b85
github.com/hyperledger/firefly-transaction-manager v1.4.4
github.com/hyperledger/firefly-transaction-manager v1.4.5
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ github.com/hyperledger/firefly-common v1.5.9 h1:Z1+SuKNYJ8hPKQ5CvcsMg6r/E4RyW6wb
github.com/hyperledger/firefly-common v1.5.9/go.mod h1:1Xawm5PUhxT7k+CL/Kr3i1LE3cTTzoQwZMLimvlW8rs=
github.com/hyperledger/firefly-signer v1.1.23-0.20260422080826-42345c6c6b85 h1:gh3YhxUYYwOfBCsEJXFmWO7SFzFrNuNulXftOam2JRI=
github.com/hyperledger/firefly-signer v1.1.23-0.20260422080826-42345c6c6b85/go.mod h1:cb40Xkm/t2+KH+V1q3/zxZPohBNEA0iOA7mcr9wyfzI=
github.com/hyperledger/firefly-transaction-manager v1.4.4 h1:cbG9FkQWriOcc1MMGaMqU7OpOwLloSV+PImOoaN0ckU=
github.com/hyperledger/firefly-transaction-manager v1.4.4/go.mod h1:1kbYt8ofDXqfwC02vwV/HoOjmiv0IuT9UkJ//bbrliE=
github.com/hyperledger/firefly-transaction-manager v1.4.5 h1:zBe8hbzv6lJEWD5Ypk6efO5WXFs4+pqIFLUu7zbdmsg=
github.com/hyperledger/firefly-transaction-manager v1.4.5/go.mod h1:1kbYt8ofDXqfwC02vwV/HoOjmiv0IuT9UkJ//bbrliE=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
3 changes: 3 additions & 0 deletions internal/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package ethereum
import (
"github.com/hyperledger/firefly-common/pkg/config"
"github.com/hyperledger/firefly-common/pkg/wsclient"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

const (
ConfigGasEstimationFactor = "gasEstimationFactor"
ConfigDataFormat = "dataFormat"
BlockPollingInterval = "blockPollingInterval"
BlockCacheSize = "blockCacheSize"
ChainTrackingMode = "chainTrackingMode"
EventsCatchupPageSize = "events.catchupPageSize"
EventsCatchupThreshold = "events.catchupThreshold"
EventsCatchupDownscaleRegex = "events.catchupDownscaleRegex"
Expand Down Expand Up @@ -70,6 +72,7 @@ func InitConfig(conf config.Section) {
conf.AddKnownKey(WebSocketsEnabled, false)
conf.AddKnownKey(BlockCacheSize, 250)
conf.AddKnownKey(BlockPollingInterval, "1s")
conf.AddKnownKey(ChainTrackingMode, ffcapi.ChainTrackingModeFull)
conf.AddKnownKey(ConfigDataFormat, "map")
conf.AddKnownKey(ConfigGasEstimationFactor, DefaultGasEstimationFactor)
conf.AddKnownKey(EventsBlockTimestamps, true)
Expand Down
28 changes: 21 additions & 7 deletions internal/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ import (
)

type ethConnector struct {
backend rpcbackend.Backend
wsBackend rpcbackend.WebSocketRPCClient
backend rpcbackend.Backend
wsBackend rpcbackend.WebSocketRPCClient
chainTrackingMode ffcapi.ChainTrackingMode

serializer *abi.Serializer
gasEstimationFactor *big.Float
catchupPageSize int64
Expand Down Expand Up @@ -78,6 +80,15 @@ type Connector interface {
}

func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connector, err error) {

chainTrackingMode := ffcapi.ChainTrackingMode(conf.GetString(ChainTrackingMode))
if chainTrackingMode == "" {
chainTrackingMode = ffcapi.ChainTrackingModeFull
}
if chainTrackingMode != ffcapi.ChainTrackingModeLight && chainTrackingMode != ffcapi.ChainTrackingModeFull {
return nil, i18n.NewError(ctx, msgs.MsgInvalidChainTrackingMode, chainTrackingMode)
}

c := &ethConnector{
eventStreams: make(map[fftypes.UUID]*eventStream),
catchupPageSize: conf.GetInt64(EventsCatchupPageSize),
Expand All @@ -86,6 +97,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
eventBlockTimestamps: conf.GetBool(EventsBlockTimestamps),
eventFilterPollingInterval: conf.GetDuration(EventsFilterPollingInterval),
traceTXForRevertReason: conf.GetBool(TraceTXForRevertReason),
chainTrackingMode: chainTrackingMode,
retry: retryutil.RetryWrapper{Retry: &retry.Retry{}},
}

Expand Down Expand Up @@ -164,6 +176,7 @@ func NewEthereumConnector(ctx context.Context, conf config.Section) (cc Connecto
BlockCacheSize: conf.GetInt(BlockCacheSize),
MaxAsyncBlockFetchConcurrency: conf.GetInt(MaxAsyncBlockFetchConcurrency),
UseGetBlockReceipts: conf.GetBool(UseGetBlockReceipts),
ChainTrackingMode: c.chainTrackingMode,
}, c.backend, c.wsBackend); err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,11 +225,12 @@ func (c *ethConnector) ReconcileConfirmationsForTransaction(ctx context.Context,
}
if err == nil && ethrpcRes != nil {
res = &ffcapi.ConfirmationUpdateResult{
Confirmations: ethRPCtoFFCAPIConfirmations(ethrpcRes.Confirmations),
Rebuilt: ethrpcRes.Rebuilt,
NewFork: ethrpcRes.NewFork,
Confirmed: ethrpcRes.Confirmed,
TargetConfirmationCount: ethrpcRes.TargetConfirmationCount,
Confirmations: ethRPCtoFFCAPIConfirmations(ethrpcRes.Confirmations),
Rebuilt: ethrpcRes.Rebuilt,
NewFork: ethrpcRes.NewFork,
Confirmed: ethrpcRes.Confirmed,
CurrentConfirmationCount: ethrpcRes.CurrentConfirmationCount,
TargetConfirmationCount: ethrpcRes.TargetConfirmationCount,
}
if ethrpcReceipt != nil {
res.Receipt = c.enrichTransactionReceipt(ctx, ethrpcReceipt)
Expand Down
4 changes: 4 additions & 0 deletions internal/ethereum/new_block_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

func (c *ethConnector) GetChainTrackingMode(_ context.Context) ffcapi.ChainTrackingMode {
return c.chainTrackingMode
}

func (c *ethConnector) NewBlockListener(_ context.Context, req *ffcapi.NewBlockListenerRequest) (*ffcapi.NewBlockListenerResponse, ffcapi.ErrorReason, error) {
// Add the block consumer
c.blockListener.AddConsumer(req.ListenerContext, &ethblocklistener.BlockUpdateConsumer{
Expand Down
1 change: 1 addition & 0 deletions internal/msgs/en_config_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,5 @@ var (
_ = ffc("config.connector.traceTXForRevertReason", "Enable the use of transaction trace functions (e.g. debug_traceTransaction) to obtain transaction revert reasons. This can place a high load on the EVM client.", i18n.BooleanType)
_ = ffc("config.connector.maxAsyncBlockFetchConcurrency", "Maximum concurrency when using asynchronous block downloading (minium 1)", i18n.IntType)
_ = ffc("config.connector.useGetBlockReceipts", "When true, the eth_getBlockReceipts call is available for this connector to use", i18n.BooleanType)
_ = ffc("config.connector.chainTrackingMode", "Tracking mode for connector block progression. light: fetches head block numbers only, does not download block details, disables block listener support, and confirmation results include only the confirmation count. full: fetches head block numbers and block details, maintains an in-memory partial chain, enables block listener support, and confirmation results include both confirmation count and block details.", "`light` or `full`")
)
2 changes: 2 additions & 0 deletions internal/msgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ var (
MsgUnknownJSONFormatOptions = ffe("FF23066", "JSON formatting option unknown %s=%s")
MsgObservedPanic = ffe("FF23067", "Observed panic: %v")
MsgReturnedBlockHashMismatch = ffe("FF23068", "Returned block %d hash %s does not match requested hash %s")
MsgInvalidChainTrackingMode = ffe("FF23069", "Invalid chain tracking mode '%s': must be 'light' or 'full'")
MsgTransactionNotIncludedInChainHead = ffe("FF23070", "Transaction '%s' cannot be reconciled because chain head %d is before receipt block %s")
)
22 changes: 22 additions & 0 deletions mocks/fftmmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 60 additions & 12 deletions pkg/ethblocklistener/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,23 @@ import (
//
// `rebuilt` will be true if an invalid confirmation list is detected by the reconciliation process
type ConfirmationUpdateResult struct {
Confirmations []*ethrpc.MinimalBlockInfo `json:"confirmations,omitempty"`
Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch
NewFork bool `json:"newFork,omitempty"` // when true, it means a new fork was detected based on the existing confirmations
Confirmed bool `json:"confirmed,omitempty"` // when true, it means the confirmation list is complete and the transaction is confirmed
TargetConfirmationCount uint64 `json:"targetConfirmationCount"` // the target number of confirmations for this reconcile request
Confirmations []*ethrpc.MinimalBlockInfo `json:"confirmations,omitempty"` // the confirmation list
Rebuilt bool `json:"rebuilt,omitempty"` // when true, it means the existing confirmations contained invalid blocks, the new confirmations are rebuilt from scratch
NewFork bool `json:"newFork,omitempty"` // when true, it means a new fork was detected based on the existing confirmations
Confirmed bool `json:"confirmed,omitempty"` // when true, it means the confirmation list is complete and the transaction is confirmed
TargetConfirmationCount uint64 `json:"targetConfirmationCount"` // the target number of confirmations for this reconcile request
CurrentConfirmationCount uint64 `json:"currentConfirmationCount"` // the current number of confirmations for this reconcile request
}

type BlockListenerConfig struct {
MonitoredHeadLength int `json:"monitoredHeadLength"`
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
BlockCacheSize int `json:"blockCacheSize"`
IncludeLogsBloom bool `json:"includeLogsBloom"`
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
MonitoredHeadLength int `json:"monitoredHeadLength"`
BlockPollingInterval time.Duration `json:"blockPollingInterval"`
HederaCompatibilityMode bool `json:"hederaCompatibilityMode"`
BlockCacheSize int `json:"blockCacheSize"`
IncludeLogsBloom bool `json:"includeLogsBloom"`
UseGetBlockReceipts bool `json:"useGetBlockReceipts"`
MaxAsyncBlockFetchConcurrency int `json:"maxAsyncBlockFetchConcurrency"`
ChainTrackingMode ffcapi.ChainTrackingMode `json:"chainTrackingMode,omitempty"`
}

type BlockListener interface {
Expand Down Expand Up @@ -126,6 +128,9 @@ type blockListener struct {
canonicalChain *list.List
highestBlockSet bool
highestBlock uint64

// headBlockNumber mode: last head value sent on the block listener channel (only written from listenLoop)
currentChainHead uint64
}

func NewBlockListener(ctx context.Context, retry *retry.Retry, conf *BlockListenerConfig, httpConf *ffresty.Config, wsConf *wsclient.WSConfig) (bl BlockListener, err error) {
Expand All @@ -141,6 +146,9 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf
if conf.MaxAsyncBlockFetchConcurrency <= 0 {
conf.MaxAsyncBlockFetchConcurrency = 1
}
if conf.ChainTrackingMode == "" {
conf.ChainTrackingMode = ffcapi.ChainTrackingModeFull
}
bl := &blockListener{
ctx: log.WithLogField(ctx, "role", "blocklistener"),
retry: &retryutil.RetryWrapper{Retry: retry},
Expand All @@ -152,6 +160,7 @@ func NewBlockListenerSupplyBackend(ctx context.Context, retry *retry.Retry, conf
newHeadsTap: make(chan struct{}),
highestBlockSet: false,
highestBlock: 0,
currentChainHead: 0,
consumers: make(map[fftypes.UUID]*BlockUpdateConsumer),
canonicalChain: list.New(),
blockFetchConcurrencyThrottle: make(chan *blockReceiptRequest, conf.MaxAsyncBlockFetchConcurrency),
Expand Down Expand Up @@ -248,6 +257,17 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
})
}

// refreshHighestBlockFromRPC updates highestBlock from eth_blockNumber. Caller must not hold canonicalChainLock.
func (bl *blockListener) refreshHighestBlockFromRPC() (uint64, error) {
var hexBlockHeight ethtypes.HexInteger
rpcErr := bl.backend.CallRPC(bl.ctx, &hexBlockHeight, "eth_blockNumber")
if rpcErr != nil {
return 0, rpcErr.Error()
}
head := hexBlockHeight.BigInt().Uint64()
return head, nil
}

func (bl *blockListener) waitNextIteration() bool {
select {
case <-bl.ctx.Done():
Expand Down Expand Up @@ -313,6 +333,30 @@ func (bl *blockListener) listenLoop() {
}
log.L(bl.ctx).Debugf("Block filter received new block hashes: %+v", blockHashes)

if bl.ChainTrackingMode == ffcapi.ChainTrackingModeLight {
head, err := bl.refreshHighestBlockFromRPC()
if err != nil {
log.L(bl.ctx).Errorf("Failed to refresh chain head: %s", err)
failCount++
continue
}
if head == bl.currentChainHead {
failCount = 0
continue
}
bl.currentChainHead = head
update := &ffcapi.BlockHashEvent{GapPotential: false, Created: fftypes.Now(), HeadBlockNumber: bl.currentChainHead}
bl.consumerMux.Lock()
consumers := make([]*BlockUpdateConsumer, 0, len(bl.consumers))
for _, c := range bl.consumers {
consumers = append(consumers, c)
}
bl.consumerMux.Unlock()
bl.dispatchToConsumers(consumers, update)
failCount = 0
continue
}

update := &ffcapi.BlockHashEvent{GapPotential: gapPotential, Created: fftypes.Now()}
var notifyPos *list.Element
for _, h := range blockHashes {
Expand Down Expand Up @@ -621,6 +665,10 @@ func (bl *blockListener) GetHighestBlock(ctx context.Context) (uint64, bool) {
return highestBlock, true
}

func (bl *blockListener) GetHeadBlockNumber(_ context.Context) uint64 {
return bl.currentChainHead
}

func (bl *blockListener) setHighestBlock(block uint64) {
bl.canonicalChainLock.Lock()
defer bl.canonicalChainLock.Unlock()
Expand Down
Loading
Loading