Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ gomodtidy: gomods

.PHONY: mockery
mockery: $(mockery) ## Install mockery.
go install github.com/vektra/mockery/v2@v2.46.3
go install github.com/vektra/mockery/v2@v2.53.0

.PHONY: generate
generate: mockery
Expand Down
34 changes: 13 additions & 21 deletions chains/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"time"

"github.com/jpillora/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"
"gopkg.in/guregu/null.v4"

Expand Down Expand Up @@ -43,22 +41,6 @@ const (
hederaChainType = "hedera"
)

var (
promTimeUntilBroadcast = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_time_until_tx_broadcast",
Help: "The amount of time elapsed from when a transaction is enqueued to until it is broadcast.",
Buckets: []float64{
float64(500 * time.Millisecond),
float64(time.Second),
float64(5 * time.Second),
float64(15 * time.Second),
float64(30 * time.Second),
float64(time.Minute),
float64(2 * time.Minute),
},
}, []string{"chainID"})
)

var ErrTxRemoved = errors.New("tx removed")

type ProcessUnstartedTxs[ADDR chains.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error)
Expand All @@ -79,6 +61,11 @@ type TransmitChecker[CID chains.ID, ADDR chains.Hashable, THASH, BHASH chains.Ha
Check(ctx context.Context, l logger.SugaredLogger, tx types.Tx[CID, ADDR, THASH, BHASH, SEQ, FEE], a types.TxAttempt[CID, ADDR, THASH, BHASH, SEQ, FEE]) error
}

type broadcasterMetrics interface {
IncrementNumBroadcastedTxs(ctx context.Context)
RecordTimeUntilTxBroadcast(ctx context.Context, duration float64)
}

// Broadcaster monitors txes for transactions that need to
// be broadcast, assigns sequences and ensures that at least one node
// somewhere has received the transaction successfully.
Expand Down Expand Up @@ -106,6 +93,7 @@ type Broadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, T
feeConfig types.BroadcasterFeeConfig
txConfig types.BroadcasterTransactionsConfig
listenerConfig types.BroadcasterListenerConfig
metrics broadcasterMetrics

// autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence
// when Start is called
Expand Down Expand Up @@ -144,6 +132,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
checkerFactory TransmitCheckerFactory[CID, ADDR, THASH, BHASH, SEQ, FEE],
autoSyncSequence bool,
chainType string,
metrics broadcasterMetrics,
) *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]{
Expand All @@ -161,6 +150,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
sequenceTracker: sequenceTracker,
metrics: metrics,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
Expand Down Expand Up @@ -532,11 +522,12 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
// In all scenarios, the correct thing to do is assume success for now
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
observeTimeUntilBroadcast(ctx, eb.metrics, etx.CreatedAt, time.Now())
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, types.TxAttemptBroadcast)
if err != nil {
return err, true
}
eb.metrics.IncrementNumBroadcastedTxs(ctx)
// Increment sequence if successfully broadcasted
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
Expand Down Expand Up @@ -601,6 +592,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
if err != nil {
return err, true
}
eb.metrics.IncrementNumBroadcastedTxs(ctx)
// Increment sequence if successfully broadcasted
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
Expand Down Expand Up @@ -755,7 +747,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) saveFatallyError
return eb.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, etx)
}

func observeTimeUntilBroadcast[CHAIN_ID chains.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
func observeTimeUntilBroadcast(ctx context.Context, metrics broadcasterMetrics, createdAt, broadcastAt time.Time) {
duration := float64(broadcastAt.Sub(createdAt))
promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration)
metrics.RecordTimeUntilTxBroadcast(ctx, duration)
}
113 changes: 38 additions & 75 deletions chains/txmgr/confirmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/multierr"

commonhex "github.com/smartcontractkit/chainlink-common/pkg/utils/hex"
Expand All @@ -33,48 +31,13 @@ const (
processHeadTimeout = 10 * time.Minute
)

var (
promNumGasBumps = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_num_gas_bumps",
Help: "Number of gas bumps",
}, []string{"chainID"})

promGasBumpExceedsLimit = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_gas_bump_exceeds_limit",
Help: "Number of times gas bumping failed from exceeding the configured limit. Any counts of this type indicate a serious problem.",
}, []string{"chainID"})
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "tx_manager_num_confirmed_transactions",
Help: "Total number of confirmed transactions. Note that this can err to be too high since transactions are counted on each confirmation, which can happen multiple times per transaction in the case of re-orgs",
}, []string{"chainID"})
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_time_until_tx_confirmed",
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
Buckets: []float64{
float64(500 * time.Millisecond),
float64(time.Second),
float64(5 * time.Second),
float64(15 * time.Second),
float64(30 * time.Second),
float64(time.Minute),
float64(2 * time.Minute),
float64(5 * time.Minute),
float64(10 * time.Minute),
},
}, []string{"chainID"})
promBlocksUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "tx_manager_blocks_until_tx_confirmed",
Help: "The amount of blocks that have been mined from a transaction being broadcast to being included in a block.",
Buckets: []float64{
float64(1),
float64(5),
float64(10),
float64(20),
float64(50),
float64(100),
},
}, []string{"chainID"})
)
type confimerMetrics interface {
IncrementNumGasBumps(ctx context.Context)
IncrementGasBumpExceedsLimit(ctx context.Context)
IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int)
RecordTimeUntilTxConfirmed(ctx context.Context, duration float64)
RecordBlocksUntilTxConfirmed(ctx context.Context, blocksElapsed float64)
}

// Confirmer is a broad service which performs four different tasks in sequence on every new longest chain
// Step 1: Mark that all currently pending transaction attempts were broadcast before this block
Expand All @@ -95,6 +58,7 @@ type Confirmer[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THA
txConfig types.ConfirmerTransactionsConfig
dbConfig types.ConfirmerDatabaseConfig
chainID CID
metrics confimerMetrics

ks types.KeyStore[ADDR]
enabledAddresses []ADDR
Expand Down Expand Up @@ -127,6 +91,7 @@ func NewConfirmer[
lggr logger.Logger,
isReceiptNil func(R) bool,
stuckTxDetector types.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
metrics confimerMetrics,
) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
lggr = logger.Named(lggr, "Confirmer")
return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
Expand All @@ -143,6 +108,7 @@ func NewConfirmer[
mb: mailbox.NewSingle[HEAD](),
isReceiptNil: isReceiptNil,
stuckTxDetector: stuckTxDetector,
metrics: metrics,
}
}

Expand Down Expand Up @@ -368,7 +334,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
return nil
}
// Add newly confirmed transactions to the prom metric
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(includedTxs)))
ec.metrics.IncrementNumConfirmedTxs(ctx, len(includedTxs))

purgeTxIDs := make([]int64, 0, len(includedTxs))
confirmedTxIDs := make([]int64, 0, len(includedTxs))
Expand All @@ -381,7 +347,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
continue
}
confirmedTxIDs = append(confirmedTxIDs, tx.ID)
observeUntilTxConfirmed(ec.chainID, tx.TxAttempts, head)
observeUntilTxConfirmed(ctx, ec.metrics, tx, head)
}
// Mark the transactions included on-chain with a purge attempt as fatal error with the terminally stuck error message
if err := ec.txStore.UpdateTxFatalError(ctx, purgeTxIDs, ec.stuckTxDetector.StuckTxFatalError()); err != nil {
Expand Down Expand Up @@ -667,13 +633,13 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) bumpGas(ctx con
// if no error, return attempt
// if err, continue below
if err == nil {
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementNumGasBumps(ctx)
ec.lggr.Debugw("Rebroadcast bumping fee for tx", append(logFields, "bumpedFee", bumpedFee.String(), "bumpedFeeLimit", bumpedFeeLimit)...)
return bumpedAttempt, err
}

if errors.Is(err, fees.ErrBumpFeeExceedsLimit) {
promGasBumpExceedsLimit.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementGasBumpExceedsLimit(ctx)
}

return bumpedAttempt, fmt.Errorf("error bumping gas: %w", err)
Expand Down Expand Up @@ -712,7 +678,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) handleInProgres
if err != nil {
return fmt.Errorf("could not bump gas for terminally underpriced transaction: %w", err)
}
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
ec.metrics.IncrementNumGasBumps(ctx)
lggr.With(
"sendError", sendError,
"maxGasPriceConfig", ec.feeConfig.MaxFeePrice(),
Expand Down Expand Up @@ -853,38 +819,35 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) sendEmptyTransa
return txhash, nil
}

// observeUntilTxConfirmed observes the promBlocksUntilTxConfirmed metric for each confirmed
// transaction.
// observeUntilTxConfirmed observes the timeUntilTxConfirmed and blocksUntilTxConfirmed metrics for each confirmed transaction.
func observeUntilTxConfirmed[
CHAIN_ID chains.ID,
ADDR chains.Hashable,
TX_HASH, BLOCK_HASH chains.Hashable,
SEQ chains.Sequence,
FEE fees.Fee,
](chainID CHAIN_ID, attempts []types.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
for _, attempt := range attempts {
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
// was created. We want to measure the amount of time taken from when a transaction is created
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
// were needed to achieve this.
duration := time.Since(attempt.Tx.CreatedAt)
promTimeUntilTxConfirmed.
WithLabelValues(chainID.String()).
Observe(float64(duration))

// Since a tx can have many attempts, we take the number of blocks to confirm as the block number
// of the receipt minus the block number of the first ever broadcast for this transaction.
var minBroadcastBefore int64
for _, a := range attempt.Tx.TxAttempts {
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
minBroadcastBefore = *b
}
}
if minBroadcastBefore > 0 {
blocksElapsed := head.BlockNumber() - minBroadcastBefore
promBlocksUntilTxConfirmed.
WithLabelValues(chainID.String()).
Observe(float64(blocksElapsed))
](ctx context.Context, metrics confimerMetrics, tx *types.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
if tx == nil {
return
}
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
// was created. We want to measure the amount of time taken from when a transaction is created
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
// were needed to achieve this.
duration := time.Since(tx.CreatedAt)
metrics.RecordTimeUntilTxConfirmed(ctx, float64(duration))

// Since a tx can have many attempts, we take the number of blocks to confirm as the current block number
// minus the block number of the first ever broadcast for this transaction.
var minBroadcastBefore int64
for _, a := range tx.TxAttempts {
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
minBroadcastBefore = *b
}
}

if minBroadcastBefore > 0 {
blocksElapsed := head.BlockNumber() - minBroadcastBefore
metrics.RecordBlocksUntilTxConfirmed(ctx, float64(blocksElapsed))
}
}
61 changes: 56 additions & 5 deletions metrics/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,66 @@ module github.com/smartcontractkit/chainlink-framework/metrics

go 1.24.1

require github.com/prometheus/client_golang v1.22.0
require (
github.com/prometheus/client_golang v1.22.0
github.com/smartcontractkit/chainlink-common v0.7.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/metric v1.35.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 // indirect
github.com/cloudevents/sdk-go/v2 v2.16.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sys v0.30.0 // indirect
google.golang.org/protobuf v1.36.5 // indirect
github.com/prometheus/common v0.63.0 // indirect
github.com/prometheus/procfs v0.16.0 // indirect
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.30.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
go.opentelemetry.io/otel/log v0.6.0 // indirect
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
google.golang.org/grpc v1.71.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading
Loading