Skip to content

Commit 2925e01

Browse files
authored
Merge branch 'main' into lli/rpc-beholder-metric
2 parents cd5fc94 + 710d5a7 commit 2925e01

5 files changed

Lines changed: 79 additions & 37 deletions

File tree

chains/txmgr/broadcaster.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,9 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
463463
}
464464

465465
lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee))
466-
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
467466
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr)
467+
lgr.Infow("Broadcasted transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "tracingID", etx.GetTracingID(lgr), "meta", etx.Meta, "feeLimit",
468+
attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etxID", etx.ID, "etx", etx, "errType", errType, "err", err)
468469

469470
// The validation below is only applicable to Hedera because it has instant finality and a unique sequence behavior
470471
if eb.chainType == hederaChainType {

chains/txmgr/confirmer.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
347347
continue
348348
}
349349
confirmedTxIDs = append(confirmedTxIDs, tx.ID)
350+
ec.lggr.Infow("Transaction confirmed", "etxID", tx.ID, "tracingID", tx.GetTracingID(ec.lggr))
350351
observeUntilTxConfirmed(ctx, ec.metrics, tx, head)
351352
}
352353
// Mark the transactions included on-chain with a purge attempt as fatal error with the terminally stuck error message
@@ -796,12 +797,14 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ForceRebroadcas
796797
continue
797798
}
798799
attempt.Tx = *etx // for logging
799-
ec.lggr.Debugw("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "err", err, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt)
800-
if errCode, err := ec.client.SendTransactionReturnCode(ctx, *etx, attempt, ec.lggr); errCode != multinode.Successful && err != nil {
801-
ec.lggr.Errorw(fmt.Sprintf("ForceRebroadcast: failed to rebroadcast tx %v with sequence %v, gas limit %v, and caller provided fee Limit %v : %s", etx.ID, *etx.Sequence, attempt.ChainSpecificFeeLimit, etx.FeeLimit, err.Error()), "err", err, "fee", attempt.TxFee)
802-
continue
800+
errType, err := ec.client.SendTransactionReturnCode(ctx, *etx, attempt, ec.lggr)
801+
if errType == multinode.Successful || errType == multinode.TransactionAlreadyKnown {
802+
ec.lggr.Infow("ForceRebroadcast: Broadcasted transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "tracingID", etx.GetTracingID(ec.lggr), "meta", etx.Meta, "feeLimit",
803+
attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etxID", etx.ID, "etx", etx, "errType", errType, "err", err)
804+
} else {
805+
ec.lggr.Errorw("ForceRebroadcast: Broadcasted transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "tracingID", etx.GetTracingID(ec.lggr), "meta", etx.Meta, "feeLimit",
806+
attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etxID", etx.ID, "etx", etx, "errType", errType, "err", err)
803807
}
804-
ec.lggr.Infof("ForceRebroadcast: successfully rebroadcast tx %v with hash: 0x%x", etx.ID, attempt.Hash)
805808
}
806809
}
807810
return nil

chains/txmgr/txmgr.go

Lines changed: 50 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ type TxManager[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THA
6161
GetTransactionFee(ctx context.Context, transactionID string) (fee *evmtypes.TransactionFee, err error)
6262
GetTransactionReceipt(ctx context.Context, transactionID string) (receipt *txmgrtypes.ChainReceipt[THASH, BHASH], err error)
6363
CalculateFee(feeParts FeeParts) *big.Int
64+
// SupportsDualBroadcast reports whether this TXM will route transactions marked with
65+
// DualBroadcast=true to an OFA rather than the public mempool.
66+
// Jobs that configure a secondary EOA check this at startup and refuse to run if it
67+
// returns false, preventing accidental public-mempool exposure of secondary transactions.
68+
SupportsDualBroadcast() bool
6469
}
6570

6671
type TxmV2Wrapper[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH chains.Hashable, BHASH chains.Hashable, SEQ chains.Sequence, FEE fees.Fee] interface {
@@ -98,16 +103,17 @@ type Txm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH cha
98103
chSubbed chan struct{}
99104
wg sync.WaitGroup
100105

101-
reaper *Reaper[CID]
102-
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
103-
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
104-
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]
105-
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
106-
finalizer txmgrtypes.Finalizer[BHASH, HEAD]
107-
fwdMgr txmgrtypes.ForwarderManager[ADDR]
108-
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
109-
newErrorClassifier NewErrorClassifier
110-
txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
106+
reaper *Reaper[CID]
107+
resender *Resender[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
108+
broadcaster *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
109+
confirmer *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]
110+
tracker *Tracker[CID, ADDR, THASH, BHASH, R, SEQ, FEE]
111+
finalizer txmgrtypes.Finalizer[BHASH, HEAD]
112+
fwdMgr txmgrtypes.ForwarderManager[ADDR]
113+
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
114+
newErrorClassifier NewErrorClassifier
115+
txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]
116+
dualBroadcastEnabled bool
111117

112118
enabledAddrs []ADDR // sorted as strings
113119
}
@@ -137,29 +143,31 @@ func NewTxm[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THASH
137143
finalizer txmgrtypes.Finalizer[BHASH, HEAD],
138144
newErrorClassifierFunc NewErrorClassifier,
139145
txmv2wrapper TxmV2Wrapper[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE],
146+
dualBroadcastEnabled bool,
140147
) *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE] {
141148
b := Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]{
142-
logger: logger.Sugared(lggr),
143-
txStore: txStore,
144-
config: cfg,
145-
txConfig: txCfg,
146-
keyStore: keyStore,
147-
chainID: chainID,
148-
checkerFactory: checkerFactory,
149-
chHeads: make(chan HEAD),
150-
trigger: make(chan ADDR),
151-
chStop: make(chan struct{}),
152-
chSubbed: make(chan struct{}),
153-
reset: make(chan reset),
154-
fwdMgr: fwdMgr,
155-
txAttemptBuilder: txAttemptBuilder,
156-
broadcaster: broadcaster,
157-
confirmer: confirmer,
158-
resender: resender,
159-
tracker: tracker,
160-
newErrorClassifier: newErrorClassifierFunc,
161-
finalizer: finalizer,
162-
txmv2wrapper: txmv2wrapper,
149+
logger: logger.Sugared(lggr),
150+
txStore: txStore,
151+
config: cfg,
152+
txConfig: txCfg,
153+
keyStore: keyStore,
154+
chainID: chainID,
155+
checkerFactory: checkerFactory,
156+
chHeads: make(chan HEAD),
157+
trigger: make(chan ADDR),
158+
chStop: make(chan struct{}),
159+
chSubbed: make(chan struct{}),
160+
reset: make(chan reset),
161+
fwdMgr: fwdMgr,
162+
txAttemptBuilder: txAttemptBuilder,
163+
broadcaster: broadcaster,
164+
confirmer: confirmer,
165+
resender: resender,
166+
tracker: tracker,
167+
newErrorClassifier: newErrorClassifierFunc,
168+
finalizer: finalizer,
169+
txmv2wrapper: txmv2wrapper,
170+
dualBroadcastEnabled: dualBroadcastEnabled,
163171
}
164172

165173
if txCfg.ResendAfterThreshold() <= 0 {
@@ -603,6 +611,13 @@ func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) GetForwarderForEOA(ctx
603611
return
604612
}
605613

614+
// SupportsDualBroadcast reports whether this TXM will route DualBroadcast transactions to an
615+
// OFA rather than the public mempool. The value is set at construction time by the
616+
// txmgr builder based on node config.
617+
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) SupportsDualBroadcast() bool {
618+
return b.dualBroadcastEnabled
619+
}
620+
606621
// GetForwarderForEOAOCR2Feeds calls forwarderMgr to get a proper forwarder for a given EOA and checks if its set as a transmitter on the OCR2Aggregator contract.
607622
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) GetForwarderForEOAOCR2Feeds(ctx context.Context, eoa, ocr2Aggregator ADDR) (forwarder ADDR, err error) {
608623
if !b.txConfig.ForwardersEnabled() {
@@ -881,6 +896,10 @@ func (n *NullTxManager[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) GetTransactionR
881896
return
882897
}
883898

899+
func (n *NullTxManager[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) SupportsDualBroadcast() bool {
900+
return false
901+
}
902+
884903
func (b *Txm[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) pruneQueueAndCreateTxn(
885904
ctx context.Context,
886905
txRequest txmgrtypes.TxRequest[ADDR, THASH],

chains/txmgr/txmgrtest/err.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,7 @@ func (n *ErrTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Coun
9999
func (n *ErrTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTransactionStatus(ctx context.Context, transactionID string) (status commontypes.TransactionStatus, err error) {
100100
return
101101
}
102+
103+
func (n *ErrTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SupportsDualBroadcast() bool {
104+
return false
105+
}

chains/txmgr/types/tx.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ type TxMeta[ADDR chains.Hashable, TX_HASH chains.Hashable] struct {
163163
// Dual Broadcast
164164
DualBroadcast *bool `json:"DualBroadcast,omitempty"`
165165
DualBroadcastParams *string `json:"DualBroadcastParams,omitempty"`
166+
167+
// TracingID is used for tracing the entire lifecycle of a transaction from OCR Transmit to confirmation on-chain.
168+
TracingID *string `json:"TracingID,omitempty"`
166169
}
167170

168171
type TxAttempt[
@@ -263,6 +266,18 @@ func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetMeta() (*TxMeta[A
263266
return &m, nil
264267
}
265268

269+
func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetTracingID(lgr logger.Logger) string {
270+
meta, err := e.GetMeta()
271+
if err != nil {
272+
lgr.Errorw("failed to get meta of the transaction", "err", err)
273+
return ""
274+
}
275+
if meta == nil || meta.TracingID == nil {
276+
return ""
277+
}
278+
return *meta.TracingID
279+
}
280+
266281
// GetLogger returns a new logger with metadata fields.
267282
func (e *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetLogger(lgr logger.Logger) logger.SugaredLogger {
268283
lgr = logger.With(lgr,

0 commit comments

Comments
 (0)