Skip to content
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f23d367
Centralized TXM metrics and integrated beholder
amit-momin Apr 1, 2025
2868c2d
Renamed types and added metric attributes
amit-momin Apr 2, 2025
d93deb5
Merge branch 'main' into feature/integrate-txm-metrics-with-beholder
amit-momin Apr 11, 2025
db61a62
Merge branch 'main' into feature/integrate-txm-metrics-with-beholder
amit-momin Apr 11, 2025
89bf2a1
Tidied go mod
amit-momin Apr 11, 2025
8aa57dd
Simplified time and block till confirmation observations
amit-momin Apr 11, 2025
b8e50a8
Merge branch 'main' of https://github.com/smartcontractkit/chainlink-…
DylanTinianov Apr 25, 2025
3d8034a
Add beholder metrics
DylanTinianov Apr 25, 2025
e2685f8
Add Beholder Metrics
DylanTinianov Apr 25, 2025
05e081b
Add metrics
DylanTinianov Apr 25, 2025
d6803ac
Create generic metrics
DylanTinianov Apr 25, 2025
ec2fb6a
Update send_only_node.go
DylanTinianov Apr 25, 2025
19afa9b
Update send_only_node_lifecycle.go
DylanTinianov Apr 25, 2025
933e317
Export logpoller metrics
DylanTinianov Apr 28, 2025
1c6ca7d
Set dataset size
DylanTinianov Apr 28, 2025
c3486bf
Bump mockery
DylanTinianov Apr 28, 2025
32c9d4d
Add network to node fsm metrics
DylanTinianov Apr 28, 2025
bbff8b5
Mock test metrics
DylanTinianov Apr 28, 2025
bc2836f
Add test coverage
DylanTinianov Apr 28, 2025
cfe0778
lint
DylanTinianov Apr 28, 2025
aeca7c7
lint
DylanTinianov Apr 28, 2025
43987e4
Update chain_families.go
DylanTinianov Apr 28, 2025
094b39d
Create txm_test.go
DylanTinianov Apr 28, 2025
e13c3bb
Prefix beholder metrics
DylanTinianov May 1, 2025
bd055af
Add NodeClientVersion
DylanTinianov May 2, 2025
89b8664
Add NodeClientVersion test
DylanTinianov May 2, 2025
2fa1b1b
Add ClientVersion metric
DylanTinianov May 2, 2025
e20856f
Update metrics
DylanTinianov May 2, 2025
f93abf9
Fix test
DylanTinianov May 2, 2025
289048d
Fix tests
DylanTinianov May 2, 2025
81b76f7
lint
DylanTinianov May 2, 2025
d0e932d
Use test context
DylanTinianov May 2, 2025
0ccce42
Update node_fsm.go
DylanTinianov May 2, 2025
ad71a2f
Rename ClientVersion
DylanTinianov May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ gomodtidy: gomods

.PHONY: mockery
mockery: $(mockery) ## Install mockery.
go install github.com/vektra/mockery/v2@v2.46.3
go install github.com/vektra/mockery/v2@v2.53.0

.PHONY: generate
generate: mockery
Expand Down
34 changes: 13 additions & 21 deletions chains/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"time"

"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
"gopkg.in/guregu/null.v4"

Expand Down Expand Up @@ -43,22 +41,6 @@ const (
hederaChainType = "hedera"
)

var (
promTimeUntilBroadcast = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_time_until_tx_broadcast",
Help: "The amount of time elapsed from when a transaction is enqueued to until it is broadcast.",
Buckets: []float64{
float64(500 * time.Millisecond),
float64(time.Second),
float64(5 * time.Second),
float64(15 * time.Second),
float64(30 * time.Second),
float64(time.Minute),
float64(2 * time.Minute),
},
}, []string{"chainID"})
)

var ErrTxRemoved = errors.New("tx removed")

type ProcessUnstartedTxs[ADDR chains.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error)
Expand All @@ -79,6 +61,11 @@ type TransmitChecker[CID chains.ID, ADDR chains.Hashable, THASH, BHASH chains.Ha
Check(ctx context.Context, l logger.SugaredLogger, tx types.Tx[CID, ADDR, THASH, BHASH, SEQ, FEE], a types.TxAttempt[CID, ADDR, THASH, BHASH, SEQ, FEE]) error
}

type broadcasterMetrics interface {
IncrementNumBroadcastedTxs(ctx context.Context)
RecordTimeUntilTxBroadcast(ctx context.Context, duration float64)
}

// Broadcaster monitors txes for transactions that need to
// be broadcast, assigns sequences and ensures that at least one node
// somewhere has received the transaction successfully.
Expand Down Expand Up @@ -106,6 +93,7 @@ type Broadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, T
feeConfig types.BroadcasterFeeConfig
txConfig types.BroadcasterTransactionsConfig
listenerConfig types.BroadcasterListenerConfig
metrics broadcasterMetrics

// autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence
// when Start is called
Expand Down Expand Up @@ -144,6 +132,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
checkerFactory TransmitCheckerFactory[CID, ADDR, THASH, BHASH, SEQ, FEE],
autoSyncSequence bool,
chainType string,
metrics broadcasterMetrics,
) *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]{
Expand All @@ -161,6 +150,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
sequenceTracker: sequenceTracker,
metrics: metrics,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
Expand Down Expand Up @@ -532,11 +522,12 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
// In all scenarios, the correct thing to do is assume success for now
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
observeTimeUntilBroadcast(ctx, eb.metrics, etx.CreatedAt, time.Now())
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, types.TxAttemptBroadcast)
if err != nil {
return err, true
}
eb.metrics.IncrementNumBroadcastedTxs(ctx)
// Increment sequence if successfully broadcasted
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
Expand Down Expand Up @@ -601,6 +592,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
if err != nil {
return err, true
}
eb.metrics.IncrementNumBroadcastedTxs(ctx)
// Increment sequence if successfully broadcasted
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
Expand Down Expand Up @@ -755,7 +747,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) saveFatallyError
return eb.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, etx)
}

func observeTimeUntilBroadcast[CHAIN_ID chains.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
func observeTimeUntilBroadcast(ctx context.Context, metrics broadcasterMetrics, createdAt, broadcastAt time.Time) {
duration := float64(broadcastAt.Sub(createdAt))
promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration)
metrics.RecordTimeUntilTxBroadcast(ctx, duration)
}
113 changes: 38 additions & 75 deletions chains/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"

commonhex "github.com/smartcontractkit/chainlink-common/pkg/utils/hex"
Expand All @@ -33,48 +31,13 @@ const (
processHeadTimeout = 10 * time.Minute
)

var (
promNumGasBumps = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_num_gas_bumps",
Help: "Number of gas bumps",
}, []string{"chainID"})

promGasBumpExceedsLimit = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_gas_bump_exceeds_limit",
Help: "Number of times gas bumping failed from exceeding the configured limit. Any counts of this type indicate a serious problem.",
}, []string{"chainID"})
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_num_confirmed_transactions",
Help: "Total number of confirmed transactions. Note that this can err to be too high since transactions are counted on each confirmation, which can happen multiple times per transaction in the case of re-orgs",
}, []string{"chainID"})
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_time_until_tx_confirmed",
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
Buckets: []float64{
float64(500 * time.Millisecond),
float64(time.Second),
float64(5 * time.Second),
float64(15 * time.Second),
float64(30 * time.Second),
float64(time.Minute),
float64(2 * time.Minute),
float64(5 * time.Minute),
float64(10 * time.Minute),
},
}, []string{"chainID"})
promBlocksUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_blocks_until_tx_confirmed",
Help: "The amount of blocks that have been mined from a transaction being broadcast to being included in a block.",
Buckets: []float64{
float64(1),
float64(5),
float64(10),
float64(20),
float64(50),
float64(100),
},
}, []string{"chainID"})
)
type confimerMetrics interface {
Comment thread
DylanTinianov marked this conversation as resolved.
IncrementNumGasBumps(ctx context.Context)
IncrementGasBumpExceedsLimit(ctx context.Context)
IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int)
RecordTimeUntilTxConfirmed(ctx context.Context, duration float64)
RecordBlocksUntilTxConfirmed(ctx context.Context, blocksElapsed float64)
}

// Confirmer is a broad service which performs four different tasks in sequence on every new longest chain
// Step 1: Mark that all currently pending transaction attempts were broadcast before this block
Expand All @@ -95,6 +58,7 @@ type Confirmer[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THA
txConfig types.ConfirmerTransactionsConfig
dbConfig types.ConfirmerDatabaseConfig
chainID CID
metrics confimerMetrics

ks types.KeyStore[ADDR]
enabledAddresses []ADDR
Expand Down Expand Up @@ -127,6 +91,7 @@ func NewConfirmer[
lggr logger.Logger,
isReceiptNil func(R) bool,
stuckTxDetector types.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
metrics confimerMetrics,
) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
lggr = logger.Named(lggr, "Confirmer")
return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -143,6 +108,7 @@ func NewConfirmer[
mb: mailbox.NewSingle[HEAD](),
isReceiptNil: isReceiptNil,
stuckTxDetector: stuckTxDetector,
metrics: metrics,
}
}

Expand Down Expand Up @@ -368,7 +334,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
return nil
}
// Add newly confirmed transactions to the prom metric
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(includedTxs)))
ec.metrics.IncrementNumConfirmedTxs(ctx, len(includedTxs))

purgeTxIDs := make([]int64, 0, len(includedTxs))
confirmedTxIDs := make([]int64, 0, len(includedTxs))
Expand All @@ -381,7 +347,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
continue
}
confirmedTxIDs = append(confirmedTxIDs, tx.ID)
observeUntilTxConfirmed(ec.chainID, tx.TxAttempts, head)
observeUntilTxConfirmed(ctx, ec.metrics, tx, head)
}
// Mark the transactions included on-chain with a purge attempt as fatal error with the terminally stuck error message
if err := ec.txStore.UpdateTxFatalError(ctx, purgeTxIDs, ec.stuckTxDetector.StuckTxFatalError()); err != nil {
Expand Down Expand Up @@ -667,13 +633,13 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) bumpGas(ctx con
// if no error, return attempt
// if err, continue below
if err == nil {
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementNumGasBumps(ctx)
ec.lggr.Debugw("Rebroadcast bumping fee for tx", append(logFields, "bumpedFee", bumpedFee.String(), "bumpedFeeLimit", bumpedFeeLimit)...)
return bumpedAttempt, err
}

if errors.Is(err, fees.ErrBumpFeeExceedsLimit) {
promGasBumpExceedsLimit.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementGasBumpExceedsLimit(ctx)
}

return bumpedAttempt, fmt.Errorf("error bumping gas: %w", err)
Expand Down Expand Up @@ -712,7 +678,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) handleInProgres
if err != nil {
return fmt.Errorf("could not bump gas for terminally underpriced transaction: %w", err)
}
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementNumGasBumps(ctx)
lggr.With(
"sendError", sendError,
"maxGasPriceConfig", ec.feeConfig.MaxFeePrice(),
Expand Down Expand Up @@ -853,38 +819,35 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) sendEmptyTransa
return txhash, nil
}

// observeUntilTxConfirmed observes the promBlocksUntilTxConfirmed metric for each confirmed
// transaction.
// observeUntilTxConfirmed observes the timeUntilTxConfirmed and blocksUntilTxConfirmed metrics for each confirmed transaction.
func observeUntilTxConfirmed[
CHAIN_ID chains.ID,
ADDR chains.Hashable,
TX_HASH, BLOCK_HASH chains.Hashable,
SEQ chains.Sequence,
FEE fees.Fee,
](chainID CHAIN_ID, attempts []types.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
for _, attempt := range attempts {
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
// was created. We want to measure the amount of time taken from when a transaction is created
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
// were needed to achieve this.
duration := time.Since(attempt.Tx.CreatedAt)
promTimeUntilTxConfirmed.
WithLabelValues(chainID.String()).
Observe(float64(duration))

// Since a tx can have many attempts, we take the number of blocks to confirm as the block number
// of the receipt minus the block number of the first ever broadcast for this transaction.
var minBroadcastBefore int64
for _, a := range attempt.Tx.TxAttempts {
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
minBroadcastBefore = *b
}
}
if minBroadcastBefore > 0 {
blocksElapsed := head.BlockNumber() - minBroadcastBefore
promBlocksUntilTxConfirmed.
WithLabelValues(chainID.String()).
Observe(float64(blocksElapsed))
](ctx context.Context, metrics confimerMetrics, tx *types.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
if tx == nil {
return
}
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
// was created. We want to measure the amount of time taken from when a transaction is created
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
// were needed to achieve this.
duration := time.Since(tx.CreatedAt)
metrics.RecordTimeUntilTxConfirmed(ctx, float64(duration))
Comment thread
DylanTinianov marked this conversation as resolved.

// Since a tx can have many attempts, we take the number of blocks to confirm as the current block number
// minus the block number of the first ever broadcast for this transaction.
var minBroadcastBefore int64
for _, a := range tx.TxAttempts {
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
minBroadcastBefore = *b
}
}

if minBroadcastBefore > 0 {
blocksElapsed := head.BlockNumber() - minBroadcastBefore
metrics.RecordBlocksUntilTxConfirmed(ctx, float64(blocksElapsed))
}
}
2 changes: 1 addition & 1 deletion metrics/chain_families.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package metrics

const (
EVM = "evm"
EVM = "EVM"
Comment thread
DylanTinianov marked this conversation as resolved.
Solana = "solana"
)
61 changes: 56 additions & 5 deletions metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,66 @@ module github.com/smartcontractkit/chainlink-framework/metrics

go 1.24.1

require github.com/prometheus/client_golang v1.22.0
require (
github.com/prometheus/client_golang v1.22.0
github.com/smartcontractkit/chainlink-common v0.7.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/metric v1.35.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 // indirect
github.com/cloudevents/sdk-go/v2 v2.16.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sys v0.30.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
github.com/prometheus/common v0.63.0 // indirect
github.com/prometheus/procfs v0.16.0 // indirect
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
go.opentelemetry.io/otel/log v0.6.0 // indirect
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading