diff --git a/bridge/tfchain_bridge/go.mod b/bridge/tfchain_bridge/go.mod index f84a4a519..33b03108d 100644 --- a/bridge/tfchain_bridge/go.mod +++ b/bridge/tfchain_bridge/go.mod @@ -15,7 +15,7 @@ require ( github.com/rs/zerolog v1.26.0 github.com/sirupsen/logrus v1.4.2 // indirect github.com/stellar/go v0.0.0-20210922122349-e6f322c047c5 - github.com/stretchr/objx v0.3.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect github.com/vedhavyas/go-subkey v1.0.3 ) @@ -23,6 +23,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.3 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/threefoldtech/tfchain/clients/tfchain-client-go v0.0.0-20230607082553-5605bca61c79 + go.etcd.io/bbolt v1.3.9 ) require ( @@ -50,7 +51,7 @@ require ( github.com/rs/cors v1.8.2 // indirect github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 // indirect github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a // indirect - github.com/stretchr/testify v1.7.2 // indirect + github.com/stretchr/testify v1.8.1 // indirect golang.org/x/crypto v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect diff --git a/bridge/tfchain_bridge/go.sum b/bridge/tfchain_bridge/go.sum index 0e71ad6be..5453cc25c 100644 --- a/bridge/tfchain_bridge/go.sum +++ b/bridge/tfchain_bridge/go.sum @@ -339,15 +339,17 @@ github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a/go.mod h1:yoxyU/M8n github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/go.mod h1:7CJ23pXirXBJq45DqvO6clzTEGM/l1SfKrgrzLry8b4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= -github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA= @@ -381,6 +383,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -502,6 +506,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/bridge/tfchain_bridge/pkg/bridge/bridge.go b/bridge/tfchain_bridge/pkg/bridge/bridge.go index 496050a55..f02f7a3ce 100644 --- a/bridge/tfchain_bridge/pkg/bridge/bridge.go +++ b/bridge/tfchain_bridge/pkg/bridge/bridge.go @@ -2,12 +2,15 @@ package bridge import ( "context" + "fmt" "strconv" "time" + "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + hProtocol "github.com/stellar/go/protocols/horizon" "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg" "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/stellar" subpkg "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/substrate" @@ -26,6 +29,7 @@ type Bridge struct { blockPersistency *pkg.ChainPersistency config *pkg.BridgeConfig depositFee int64 + idempotency *pkg.IdempotencyStore } func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, error) { @@ -64,12 +68,33 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, erro return nil, "", err } + // Crash-safe idempotency store, kept alongside the block persistency file. + // Records the PROCESSING/COMPLETED state of withdraws and refunds so that a + // crash between submitting a Stellar payment and confirming it on TFChain is + // recovered without double-paying or double-confirming. + idempotency, err := pkg.NewIdempotencyStore(cfg.PersistencyFile + ".idem.db") + if err != nil { + return nil, "", errors.Wrap(err, "failed to open idempotency store") + } + + // The idempotency store is chain-scoped: withdraw keys are TFChain burn tx ids, + // which restart from a low number after a chain reset and would otherwise collide + // with stale COMPLETED entries, causing new withdraws to be wrongly skipped. The + // rescan flag marks a fresh start (it also zeroes the Stellar cursor above), so + // clear the store here too. + if cfg.RescanBridgeAccount { + if err := idempotency.Reset(); err != nil { + return nil, "", errors.Wrap(err, "failed to reset idempotency store") + } + } + bridge := &Bridge{ subClient: subClient, blockPersistency: blockPersistency, wallet: wallet, config: &cfg, depositFee: depositFee, + idempotency: idempotency, } // stat deposit fee? return bridge, wallet.GetKeypair().Address(), nil @@ -93,11 +118,25 @@ func (bridge *Bridge) preCheckBalance(ctx context.Context) error { } func (bridge *Bridge) Start(ctx context.Context) error { + // Close the idempotency store when Start returns. + defer func() { + if err := bridge.idempotency.Close(); err != nil { + log.Warn().Err(err).Msg("failed to close idempotency store") + } + }() + // pre-check wallet balance if err := bridge.preCheckBalance(ctx); err != nil { return err } + // Crash recovery: reconcile any transactions left in PROCESSING state by a + // previous run before we start consuming new events. Non-fatal — unreconciled + // transactions are retried when their Ready event fires again. + if err := bridge.reconcilePendingTransactions(ctx); err != nil { + return errors.Wrap(err, "startup reconciliation failed") + } + log.Info(). Str("event_action", "bridge_started"). Str("event_kind", "event"). @@ -148,6 +187,27 @@ func (bridge *Bridge) Start(ctx context.Context) error { if data.Err != nil { return errors.Wrap(data.Err, "failed to get tfchain events") } + // Ready events are processed before Created/Expired events: a Ready + // event submits a payment to Stellar whose signatures are time-sensitive + // (they expire), so it must not wait behind proposal/expiry handling. + for _, withdawReadyEvent := range data.Events.WithdrawReadyEvents { + err := bridge.handleWithdrawReady(ctx, withdawReadyEvent) + if err != nil { + if errors.Is(err, pkg.ErrTransactionAlreadyBurned) { + continue + } + return errors.Wrap(err, "an error occurred while handling WithdrawReadyEvents") + } + } + for _, refundReadyEvent := range data.Events.RefundReadyEvents { + err := bridge.handleRefundReady(ctx, refundReadyEvent) + if err != nil { + if errors.Is(err, pkg.ErrTransactionAlreadyRefunded) { + continue + } + return errors.Wrap(err, "an error occurred while handling RefundReadyEvents") + } + } for _, withdrawCreatedEvent := range data.Events.WithdrawCreatedEvents { err := bridge.handleWithdrawCreated(ctx, withdrawCreatedEvent) if err != nil { @@ -164,30 +224,12 @@ func (bridge *Bridge) Start(ctx context.Context) error { return errors.Wrap(err, "an error occurred while handling WithdrawExpiredEvents") } } - for _, withdawReadyEvent := range data.Events.WithdrawReadyEvents { - err := bridge.handleWithdrawReady(ctx, withdawReadyEvent) - if err != nil { - if errors.Is(err, pkg.ErrTransactionAlreadyBurned) { - continue - } - return errors.Wrap(err, "an error occurred while handling WithdrawReadyEvents") - } - } for _, refundExpiredEvent := range data.Events.RefundExpiredEvents { err := bridge.handleRefundExpired(ctx, refundExpiredEvent) if err != nil { return errors.Wrap(err, "an error occurred while handling RefundExpiredEvents") } } - for _, refundReadyEvent := range data.Events.RefundReadyEvents { - err := bridge.handleRefundReady(ctx, refundReadyEvent) - if err != nil { - if errors.Is(err, pkg.ErrTransactionAlreadyRefunded) { - continue - } - return errors.Wrap(err, "an error occurred while handling RefundReadyEvents") - } - } case data := <-stellarSub: if data.Err != nil { return errors.Wrap(data.Err, "failed to get stellar payments") @@ -222,3 +264,98 @@ func (bridge *Bridge) Start(ctx context.Context) error { time.Sleep(1 * time.Second) } } + +// reconcilePendingTransactions runs once at startup to recover transactions that a +// previous run left in PROCESSING state — i.e. the Stellar payment may or may not +// have been submitted before the bridge stopped. For each pending withdraw/refund it +// looks for a matching outgoing Stellar transaction (by memo, falling back to the +// sequence number for pre-memo submissions). If found, the funds already left the +// bridge, so it only completes the TFChain confirmation and marks the entry COMPLETED. +// If not found, the entry is left PROCESSING and will be retried when its Ready event +// fires again. All failures here are non-fatal: a transient Horizon/RPC problem must +// not stop the bridge from starting. +func (bridge *Bridge) reconcilePendingTransactions(ctx context.Context) error { + pendingWithdraws, err := bridge.idempotency.GetPendingWithdraws() + if err != nil { + return errors.Wrap(err, "failed to get pending withdraws") + } + pendingRefunds, err := bridge.idempotency.GetPendingRefunds() + if err != nil { + return errors.Wrap(err, "failed to get pending refunds") + } + + if len(pendingWithdraws) == 0 && len(pendingRefunds) == 0 { + return nil + } + + log.Info(). + Int("pending_withdraws", len(pendingWithdraws)). + Int("pending_refunds", len(pendingRefunds)). + Msg("reconciling pending transactions from previous run") + + // Fetch outgoing transactions once and reuse the page for all lookups, avoiding + // one Horizon HTTP call per pending transaction. + outgoingPage, err := bridge.wallet.FetchOutgoingTransactionsPage(ctx) + if err != nil { + // Non-fatal: pending txs are retried when their next Ready event fires. + log.Warn().Err(err).Msg("failed to fetch Horizon transactions for reconciliation, pending transactions will retry on next event") + outgoingPage = hProtocol.TransactionsPage{} + } + + for _, txID := range pendingWithdraws { + // Recover by the text memo (burn tx id), falling back to the account sequence + // number for payments submitted by a pre-memo bridge version. + stellarTx := bridge.wallet.FindPaymentByMemoInPage(outgoingPage, fmt.Sprint(txID)) + if stellarTx == nil { + burnTx, err := bridge.subClient.GetBurnTransaction(types.U64(txID)) + if err != nil { + log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to get burn tx for sequence lookup during reconciliation") + } else { + stellarTx = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(burnTx.SequenceNumber)) + } + } + + if stellarTx == nil { + log.Info().Uint64("tx_id", txID).Msg("reconcile: no Stellar tx found by memo or sequence, will retry on next event") + continue + } + + log.Info().Uint64("tx_id", txID).Msg("reconcile: found existing Stellar payment, completing TFChain confirmation") + if err := bridge.subClient.RetrySetWithdrawExecuted(ctx, txID); err != nil { + log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to set withdraw executed during reconciliation") + continue + } + if err := bridge.idempotency.MarkWithdrawCompleted(txID); err != nil { + log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to mark withdraw completed during reconciliation") + } + } + + for _, txHash := range pendingRefunds { + stellarTx := bridge.wallet.FindRefundByReturnHashInPage(outgoingPage, txHash) + if stellarTx == nil { + refundTx, err := bridge.subClient.GetRefundTransaction(txHash) + if err != nil { + log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to get refund tx for sequence lookup during reconciliation") + } else { + stellarTx = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(refundTx.SequenceNumber)) + } + } + + if stellarTx == nil { + log.Info().Str("tx_hash", txHash).Msg("reconcile: no Stellar refund found by return hash or sequence, will retry on next event") + continue + } + + log.Info().Str("tx_hash", txHash).Msg("reconcile: found existing Stellar refund, completing TFChain confirmation") + if err := bridge.subClient.RetrySetRefundTransactionExecutedTx(ctx, txHash); err != nil { + log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to set refund executed during reconciliation") + continue + } + if err := bridge.idempotency.MarkRefundCompleted(txHash); err != nil { + log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to mark refund completed during reconciliation") + } + } + + log.Info().Msg("reconciliation complete") + return nil +} diff --git a/bridge/tfchain_bridge/pkg/bridge/refund.go b/bridge/tfchain_bridge/pkg/bridge/refund.go index 1b07eb65a..dc07eee1b 100644 --- a/bridge/tfchain_bridge/pkg/bridge/refund.go +++ b/bridge/tfchain_bridge/pkg/bridge/refund.go @@ -70,12 +70,81 @@ func (bridge *Bridge) handleRefundExpired(ctx context.Context, refundExpiredEven func (bridge *Bridge) handleRefundReady(ctx context.Context, refundReadyEvent subpkg.RefundTransactionReadyEvent) error { logger := log.Logger.With().Str("trace_id", refundReadyEvent.Hash).Logger() - refunded, err := bridge.subClient.IsRefundedAlready(refundReadyEvent.Hash) + txHash := refundReadyEvent.Hash + + // 1. Idempotency: if we've already fully processed this refund, skip it. + state, err := bridge.idempotency.GetRefundState(txHash) + if err != nil { + return err + } + if state == pkg.TxStateCompleted { + logger.Info(). + Str("event_action", "refund_skipped"). + Str("event_kind", "event"). + Str("category", "refund"). + Msg("idempotency: refund already completed, skipping") + return pkg.ErrTransactionAlreadyRefunded + } + + // 2. If the refund is PROCESSING, the Stellar payment may already have been + // submitted before a crash. Check Horizon for an existing outgoing refund (by + // MemoReturn hash, falling back to sequence number for pre-memo submissions). If + // found, only the TFChain confirmation is outstanding — complete that instead of + // re-submitting (which would risk a double payment). + if state == pkg.TxStateProcessing { + logger.Warn(). + Str("event_action", "refund_crash_recovery"). + Str("event_kind", "event"). + Str("category", "refund"). + Msg("idempotency: refund in PROCESSING state, checking Stellar for an existing payment") + + // Non-fatal on Horizon error: leave PROCESSING and retry on the next event. + outgoingPage, ferr := bridge.wallet.FetchOutgoingTransactionsPage(ctx) + if ferr != nil { + logger.Warn().Err(ferr).Str("tx_hash", txHash). + Msg("failed to fetch Horizon transactions for PROCESSING check; will retry on next event") + return nil + } + + found := bridge.wallet.FindRefundByReturnHashInPage(outgoingPage, txHash) != nil + if !found { + refundForSeq, serr := bridge.subClient.GetRefundTransaction(txHash) + if serr != nil { + // Inconclusive recovery (chain read failed) must not fall through to a + // re-submit, so leave PROCESSING and retry on the next event. + logger.Warn().Err(serr).Str("tx_hash", txHash). + Msg("failed to get refund tx for sequence lookup during PROCESSING check; will retry on next event") + return nil + } + found = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(refundForSeq.SequenceNumber)) != nil + } + if found { + logger.Info(). + Str("event_action", "refund_recovered"). + Str("event_kind", "event"). + Str("category", "refund"). + Msg("idempotency: found existing Stellar refund, completing TFChain confirmation") + if cerr := bridge.subClient.RetrySetRefundTransactionExecutedTx(ctx, txHash); cerr != nil { + return cerr + } + if merr := bridge.idempotency.MarkRefundCompleted(txHash); merr != nil { + logger.Warn().Err(merr).Str("tx_hash", txHash).Msg("idempotency: failed to mark refund completed") + } + return nil + } + logger.Info().Msg("idempotency: no Stellar refund found by return hash or sequence, safe to retry") + } + + // 3. Already refunded on chain? + refunded, err := bridge.subClient.IsRefundedAlready(txHash) if err != nil { return err } if refunded { + if merr := bridge.idempotency.MarkRefundCompleted(txHash); merr != nil { + logger.Warn().Err(merr).Str("tx_hash", txHash).Msg("idempotency: failed to mark refund completed") + } logger.Info(). Str("event_action", "refund_skipped"). Str("event_kind", "event"). @@ -84,7 +153,7 @@ func (bridge *Bridge) handleRefundReady(ctx context.Context, refundReadyEvent su return pkg.ErrTransactionAlreadyRefunded } - refund, err := bridge.subClient.GetRefundTransaction(refundReadyEvent.Hash) + refund, err := bridge.subClient.GetRefundTransaction(txHash) if err != nil { return err } @@ -98,6 +167,12 @@ func (bridge *Bridge) handleRefundReady(ctx context.Context, refundReadyEvent su return nil } + // 4. Mark PROCESSING before submitting to Stellar, so a crash between the submit + // and the TFChain confirmation is recoverable via the PROCESSING check above. + if err := bridge.idempotency.MarkRefundProcessing(txHash); err != nil { + return err + } + // Todo, retry here? if err = bridge.wallet.CreateRefundPaymentWithSignaturesAndSubmit(ctx, refund.Target, uint64(refund.Amount), refund.TxHash, refund.Signatures, int64(refund.SequenceNumber)); err != nil { // A refund submission must never crash the bridge. A single failure would @@ -122,6 +197,11 @@ func (bridge *Bridge) handleRefundReady(ctx context.Context, refundReadyEvent su if execErr := bridge.subClient.RetrySetRefundTransactionExecutedTx(ctx, refund.TxHash); execErr != nil { return execErr } + // The quarantined refund is terminally handled (executed on chain, + // funds forfeited); mark it COMPLETED so it is never re-processed. + if merr := bridge.idempotency.MarkRefundCompleted(txHash); merr != nil { + logger.Warn().Err(merr).Str("tx_hash", txHash).Msg("idempotency: failed to mark refund completed") + } return nil } @@ -143,6 +223,9 @@ func (bridge *Bridge) handleRefundReady(ctx context.Context, refundReadyEvent su if err != nil { return err } + if merr := bridge.idempotency.MarkRefundCompleted(txHash); merr != nil { + logger.Warn().Err(merr).Str("tx_hash", txHash).Msg("idempotency: failed to mark refund completed") + } logger.Info(). Str("event_action", "refund_completed"). Str("event_kind", "event"). diff --git a/bridge/tfchain_bridge/pkg/bridge/withdraw.go b/bridge/tfchain_bridge/pkg/bridge/withdraw.go index 8f25ce531..230ae78ba 100644 --- a/bridge/tfchain_bridge/pkg/bridge/withdraw.go +++ b/bridge/tfchain_bridge/pkg/bridge/withdraw.go @@ -130,13 +130,87 @@ func (bridge *Bridge) handleWithdrawExpired(ctx context.Context, withdrawExpired func (bridge *Bridge) handleWithdrawReady(ctx context.Context, withdrawReady subpkg.WithdrawReadyEvent) error { logger := log.Logger.With().Str("trace_id", fmt.Sprint(withdrawReady.ID)).Logger() - // ctx_with_trace_id := context.WithValue(ctx, "trace_id", fmt.Sprint(withdrawReady.ID)) - burned, err := bridge.subClient.IsBurnedAlready(types.U64(withdrawReady.ID)) + txID := withdrawReady.ID + txKey := fmt.Sprint(txID) + + // 1. Idempotency: if we've already fully processed this withdraw, skip it. + state, err := bridge.idempotency.GetWithdrawState(txID) if err != nil { return err } + if state == pkg.TxStateCompleted { + logger.Info(). + Str("event_action", "withdraw_skipped"). + Str("event_kind", "event"). + Str("category", "withdraw"). + Msg("idempotency: withdraw already completed, skipping") + return pkg.ErrTransactionAlreadyBurned + } + + // 2. If the withdraw is PROCESSING, the Stellar payment may already have been + // submitted before a crash. Look for an existing outgoing payment by its text memo + // (the burn tx id), falling back to the account sequence number for payments + // submitted by a pre-memo bridge version. If found, the funds already left the + // bridge, so only the TFChain confirmation is outstanding — complete that instead + // of re-submitting (which would risk a double payment). + if state == pkg.TxStateProcessing { + logger.Warn(). + Str("event_action", "withdraw_crash_recovery"). + Str("event_kind", "event"). + Str("category", "withdraw"). + Msg("idempotency: withdraw in PROCESSING state, checking Stellar for an existing payment") + + // Non-fatal on Horizon error: leave the tx PROCESSING and retry on the next + // event. Returning an error here would crash the bridge during a Horizon + // outage, which is worse than gracefully deferring. + outgoingPage, ferr := bridge.wallet.FetchOutgoingTransactionsPage(ctx) + if ferr != nil { + logger.Warn().Err(ferr).Uint64("tx_id", txID). + Msg("failed to fetch Horizon transactions for PROCESSING check; will retry on next event") + return nil + } + + found := bridge.wallet.FindPaymentByMemoInPage(outgoingPage, txKey) != nil + if !found { + // Inconclusive recovery (chain read failed) must not fall through to a + // re-submit, so on error leave PROCESSING and retry on the next event. + burnTxForSeq, serr := bridge.subClient.GetBurnTransaction(types.U64(txID)) + if serr != nil { + logger.Warn().Err(serr).Uint64("tx_id", txID). + Msg("failed to get burn tx for sequence lookup during PROCESSING check; will retry on next event") + return nil + } + found = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(burnTxForSeq.SequenceNumber)) != nil + } + if found { + logger.Info(). + Str("event_action", "withdraw_recovered"). + Str("event_kind", "event"). + Str("category", "withdraw"). + Msg("idempotency: found existing Stellar payment, completing TFChain confirmation") + if cerr := bridge.subClient.RetrySetWithdrawExecuted(ctx, txID); cerr != nil { + return cerr + } + if merr := bridge.idempotency.MarkWithdrawCompleted(txID); merr != nil { + logger.Warn().Err(merr).Uint64("tx_id", txID).Msg("idempotency: failed to mark withdraw completed") + } + return nil + } + // Not found. If the payment had been submitted and then scrolled out of the + // 200-record window, a re-submit reuses the same (already-consumed) sequence + // and Stellar rejects it with tx_bad_seq, so no double payment can occur. + logger.Info().Msg("idempotency: no Stellar payment found by memo or sequence, safe to retry") + } + // 3. Already burned on chain? + burned, err := bridge.subClient.IsBurnedAlready(types.U64(txID)) + if err != nil { + return err + } if burned { + if merr := bridge.idempotency.MarkWithdrawCompleted(txID); merr != nil { + logger.Warn().Err(merr).Uint64("tx_id", txID).Msg("idempotency: failed to mark withdraw completed") + } logger.Info(). Str("event_action", "withdraw_skipped"). Str("event_kind", "event"). @@ -145,7 +219,7 @@ func (bridge *Bridge) handleWithdrawReady(ctx context.Context, withdrawReady sub return pkg.ErrTransactionAlreadyBurned } - burnTx, err := bridge.subClient.GetBurnTransaction(types.U64(withdrawReady.ID)) + burnTx, err := bridge.subClient.GetBurnTransaction(types.U64(txID)) if err != nil { return err } @@ -159,11 +233,19 @@ func (bridge *Bridge) handleWithdrawReady(ctx context.Context, withdrawReady sub return nil } - // todo add memo hash - err = bridge.wallet.CreatePaymentWithSignaturesAndSubmit(ctx, burnTx.Target, uint64(burnTx.Amount), fmt.Sprint(withdrawReady.ID), burnTx.Signatures, int64(burnTx.SequenceNumber)) + // 4. Mark PROCESSING before submitting to Stellar, so a crash between the submit + // and the TFChain confirmation is recoverable via the PROCESSING check above. + if err := bridge.idempotency.MarkWithdrawProcessing(txID); err != nil { + return err + } + + // txKey (the burn tx id) is set as the payment's text memo for traceability and + // crash recovery; see CreatePaymentWithSignaturesAndSubmit. + err = bridge.wallet.CreatePaymentWithSignaturesAndSubmit(ctx, burnTx.Target, uint64(burnTx.Amount), txKey, burnTx.Signatures, int64(burnTx.SequenceNumber)) if err != nil { // we can log and skip here as we could depend on tfcahin retry mechanism - // to notify us again about related burn tx + // to notify us again about related burn tx. The tx stays PROCESSING and is + // reconciled on the next attempt. logger.Info(). Str("event_action", "withdraw_postponed"). Str("event_kind", "event"). @@ -173,6 +255,15 @@ func (bridge *Bridge) handleWithdrawReady(ctx context.Context, withdrawReady sub Msgf("the withdraw has been postponed due to a problem in sending this transaction to the stellar network. error was %s", err.Error()) return nil } + + // 5. Stellar payment submitted — confirm on TFChain, then mark COMPLETED. + if err := bridge.subClient.RetrySetWithdrawExecuted(ctx, txID); err != nil { + return err + } + if err := bridge.idempotency.MarkWithdrawCompleted(txID); err != nil { + logger.Warn().Err(err).Uint64("tx_id", txID).Msg("idempotency: failed to mark withdraw completed") + } + logger.Info(). Str("event_action", "withdraw_completed"). Str("event_kind", "event"). @@ -186,7 +277,7 @@ func (bridge *Bridge) handleWithdrawReady(ctx context.Context, withdrawReady sub Str("outcome", "bridged")). Msg("the transfer has completed") - return bridge.subClient.RetrySetWithdrawExecuted(ctx, withdrawReady.ID) + return nil } func (bridge *Bridge) handleBadWithdraw(ctx context.Context, withdraw subpkg.WithdrawCreatedEvent) error { diff --git a/bridge/tfchain_bridge/pkg/idempotency.go b/bridge/tfchain_bridge/pkg/idempotency.go new file mode 100644 index 000000000..85d81a019 --- /dev/null +++ b/bridge/tfchain_bridge/pkg/idempotency.go @@ -0,0 +1,197 @@ +package pkg + +import ( + "encoding/json" + "fmt" + "strconv" + + bolt "go.etcd.io/bbolt" +) + +// TxState represents the processing state of a bridge transaction. +type TxState string + +const ( + // TxStateProcessing means the Stellar transaction has been (or is being) submitted, + // but TFChain confirmation has not yet been recorded. + TxStateProcessing TxState = "PROCESSING" + // TxStateCompleted means both Stellar submission and TFChain confirmation are done. + TxStateCompleted TxState = "COMPLETED" +) + +var ( + bucketWithdraw = []byte("withdraw") + bucketRefund = []byte("refund") +) + +// IdempotencyStore provides crash-safe tracking of transaction processing state +// using a bbolt (BoltDB) embedded database. It prevents double Stellar submissions +// when the bridge crashes between Stellar tx submit and TFChain confirmation. +// IdempotencyStore is a bbolt-backed persistent store for transaction states. +// Keys are never deleted — COMPLETED entries accumulate over time. At ~50 bytes +// per entry, growth is negligible even at high transaction volumes (e.g. 1000 +// txs/day → ~18 MB/year). Pruning is intentionally omitted for simplicity and +// auditability. +type IdempotencyStore struct { + db *bolt.DB +} + +// NewIdempotencyStore opens or creates the bbolt database at the given path. +func NewIdempotencyStore(path string) (*IdempotencyStore, error) { + db, err := bolt.Open(path, 0600, nil) + if err != nil { + return nil, fmt.Errorf("failed to open idempotency store at %s: %w", path, err) + } + + err = db.Update(func(tx *bolt.Tx) error { + if _, err := tx.CreateBucketIfNotExists(bucketWithdraw); err != nil { + return err + } + _, err := tx.CreateBucketIfNotExists(bucketRefund) + return err + }) + if err != nil { + db.Close() + return nil, err + } + + return &IdempotencyStore{db: db}, nil +} + +// Reset clears all withdraw and refund state. The store keys are chain-scoped +// (withdraw tx ids restart from a low number after a chain reset and would collide +// with stale COMPLETED entries, causing new withdraws to be wrongly skipped), so the +// store must be wiped whenever the underlying chain/bridge account is reset. This is +// invoked when the bridge is started with the rescan/fresh-start flag, alongside the +// Stellar cursor reset. +func (s *IdempotencyStore) Reset() error { + return s.db.Update(func(tx *bolt.Tx) error { + for _, bucket := range [][]byte{bucketWithdraw, bucketRefund} { + if err := tx.DeleteBucket(bucket); err != nil && err != bolt.ErrBucketNotFound { + return err + } + if _, err := tx.CreateBucket(bucket); err != nil { + return err + } + } + return nil + }) +} + +// MarkWithdrawProcessing records that a withdraw is about to be submitted to Stellar. +func (s *IdempotencyStore) MarkWithdrawProcessing(txID uint64) error { + return s.setState(bucketWithdraw, strconv.FormatUint(txID, 10), TxStateProcessing) +} + +// MarkWithdrawCompleted records that a withdraw has been fully processed +// (Stellar tx submitted AND TFChain confirmation recorded). +func (s *IdempotencyStore) MarkWithdrawCompleted(txID uint64) error { + return s.setState(bucketWithdraw, strconv.FormatUint(txID, 10), TxStateCompleted) +} + +// GetWithdrawState returns the current state of a withdraw transaction. +// Returns empty string if the transaction has never been tracked. +func (s *IdempotencyStore) GetWithdrawState(txID uint64) (TxState, error) { + return s.getState(bucketWithdraw, strconv.FormatUint(txID, 10)) +} + +// GetPendingWithdraws returns all withdraw transaction IDs that are in PROCESSING state. +// These are candidates for crash recovery reconciliation. +func (s *IdempotencyStore) GetPendingWithdraws() ([]uint64, error) { + var pending []uint64 + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketWithdraw) + return b.ForEach(func(k, v []byte) error { + var state TxState + if err := json.Unmarshal(v, &state); err != nil { + return nil // skip corrupted entries + } + if state == TxStateProcessing { + id, err := strconv.ParseUint(string(k), 10, 64) + if err != nil { + return nil // skip non-numeric keys + } + pending = append(pending, id) + } + return nil + }) + }) + return pending, err +} + +// MarkRefundProcessing records that a refund is about to be submitted to Stellar. +func (s *IdempotencyStore) MarkRefundProcessing(txHash string) error { + return s.setState(bucketRefund, txHash, TxStateProcessing) +} + +// MarkRefundCompleted records that a refund has been fully processed. +func (s *IdempotencyStore) MarkRefundCompleted(txHash string) error { + return s.setState(bucketRefund, txHash, TxStateCompleted) +} + +// GetRefundState returns the current state of a refund transaction. +func (s *IdempotencyStore) GetRefundState(txHash string) (TxState, error) { + return s.getState(bucketRefund, txHash) +} + +// GetPendingRefunds returns all refund transaction hashes that are in PROCESSING state. +func (s *IdempotencyStore) GetPendingRefunds() ([]string, error) { + var pending []string + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucketRefund) + return b.ForEach(func(k, v []byte) error { + var state TxState + if err := json.Unmarshal(v, &state); err != nil { + return nil + } + if state == TxStateProcessing { + pending = append(pending, string(k)) + } + return nil + }) + }) + return pending, err +} + +// Close closes the underlying bbolt database. +func (s *IdempotencyStore) Close() error { + return s.db.Close() +} + +func (s *IdempotencyStore) setState(bucket []byte, key string, state TxState) error { + return s.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + + // Guard against downgrading a COMPLETED entry back to PROCESSING. + // This should never happen via normal code paths (callers check state first), + // but we enforce it at the store level as a safety net. + if state == TxStateProcessing { + existing := b.Get([]byte(key)) + if existing != nil { + var cur TxState + if err := json.Unmarshal(existing, &cur); err == nil && cur == TxStateCompleted { + return fmt.Errorf("refusing to downgrade completed tx %q to PROCESSING", key) + } + } + } + + val, err := json.Marshal(state) + if err != nil { + return err + } + return b.Put([]byte(key), val) + }) +} + +func (s *IdempotencyStore) getState(bucket []byte, key string) (TxState, error) { + var state TxState + err := s.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket(bucket) + val := b.Get([]byte(key)) + if val == nil { + return nil // state remains zero value (empty string) + } + return json.Unmarshal(val, &state) + }) + return state, err +} diff --git a/bridge/tfchain_bridge/pkg/stellar/stellar.go b/bridge/tfchain_bridge/pkg/stellar/stellar.go index 62b2c460f..821a365ed 100644 --- a/bridge/tfchain_bridge/pkg/stellar/stellar.go +++ b/bridge/tfchain_bridge/pkg/stellar/stellar.go @@ -4,7 +4,9 @@ import ( "context" "encoding/base64" "encoding/hex" + "encoding/json" "fmt" + "io" "math/big" "net/http" "strconv" @@ -86,6 +88,14 @@ func (w *StellarWallet) CreatePaymentAndReturnSignature(ctx context.Context, tar return "", 0, err } + // Tag the withdraw payment with the burn tx id (text memo) for traceability on + // Stellar and to make the payment recoverable by memo after a crash. This memo is + // part of the signed transaction, so it MUST be identical to the one set in + // CreatePaymentWithSignaturesAndSubmit; otherwise the collected signatures will + // not authorize the submitted transaction. (Changing this is a breaking change: + // all validators must run the same version, or signature sets become inconsistent.) + txnBuild.Memo = txnbuild.MemoText(fmt.Sprint(txID)) + txn, err := w.createTransaction(ctx, txnBuild, true) if err != nil { return "", 0, err @@ -104,6 +114,11 @@ func (w *StellarWallet) CreatePaymentWithSignaturesAndSubmit(ctx context.Context return err } + // Must match the memo set in CreatePaymentAndReturnSignature (the burn tx id), + // otherwise the collected signatures will not authorize this transaction. Here + // txHash is the decimal burn tx id string (fmt.Sprint(withdrawReady.ID)). + txnBuild.Memo = txnbuild.MemoText(txHash) + txn, err := w.createTransaction(ctx, txnBuild, false) if err != nil { return err @@ -722,3 +737,104 @@ func (w *StellarWallet) StatBridgeAccount() (string, error) { } return "", errors.New("source account does not have trustline") } + +// fetchOutgoingTransactions fetches the most recent outgoing transactions from the +// bridge account directly from Horizon, ordered newest-first. +func (w *StellarWallet) fetchOutgoingTransactions(ctx context.Context, limit uint) (hProtocol.TransactionsPage, error) { + client, err := w.getHorizonClient() + if err != nil { + return hProtocol.TransactionsPage{}, errors.Wrap(err, "failed to get horizon client") + } + + reqURL := fmt.Sprintf("%stransactions?source_account=%s&order=desc&limit=%d", + strings.TrimRight(client.HorizonURL, "/")+"/", + w.config.StellarBridgeAccount, + limit, + ) + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + return hProtocol.TransactionsPage{}, errors.Wrap(err, "failed to build horizon request") + } + + // Reuse the Horizon client's HTTP transport which has timeouts configured + httpResp, err := client.HTTP.Do(httpReq) + if err != nil { + return hProtocol.TransactionsPage{}, errors.Wrap(err, "failed to execute horizon request") + } + defer httpResp.Body.Close() + + // Validate status before decoding — a non-200 response would decode as an empty + // TransactionsPage and cause crash recovery to falsely conclude "no tx found" + if httpResp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(httpResp.Body, 1024)) + return hProtocol.TransactionsPage{}, fmt.Errorf("horizon returned HTTP %d: %s", httpResp.StatusCode, strings.TrimSpace(string(body))) + } + + // Limit body size to protect against unexpectedly large responses + var page hProtocol.TransactionsPage + if err := json.NewDecoder(io.LimitReader(httpResp.Body, 4*1024*1024)).Decode(&page); err != nil { + return hProtocol.TransactionsPage{}, errors.Wrap(err, "failed to decode horizon response") + } + + return page, nil +} + +// FetchOutgoingTransactionsPage fetches the 200 most recent outgoing transactions +// from the bridge account in a single Horizon request. Callers that need multiple +// lookups (memo + sequence) should fetch once and use the page-based helpers below +// to avoid redundant HTTP round-trips. +func (w *StellarWallet) FetchOutgoingTransactionsPage(ctx context.Context) (hProtocol.TransactionsPage, error) { + page, err := w.fetchOutgoingTransactions(ctx, 200) + if err != nil { + return hProtocol.TransactionsPage{}, errors.Wrap(err, "failed to fetch outgoing transactions from Horizon") + } + return page, nil +} + +// FindPaymentByMemoInPage scans a pre-fetched transactions page for a text memo match. +// Withdraw payments are tagged with the burn tx id as a text memo, so this finds an +// already-submitted withdraw during crash recovery. Use this when you already have a +// page from FetchOutgoingTransactionsPage to avoid redundant Horizon API calls. +func (w *StellarWallet) FindPaymentByMemoInPage(page hProtocol.TransactionsPage, memo string) *hProtocol.Transaction { + for _, tx := range page.Embedded.Records { + if tx.MemoType == "text" && tx.Memo == memo { + txCopy := tx + return &txCopy + } + } + return nil +} + +// FindRefundByReturnHashInPage scans a pre-fetched transactions page for a MemoReturn hash match. +// Horizon encodes MemoReturn as base64 in its JSON API, while txHash from TFChain is hex-encoded. +// This function decodes the hex hash to raw bytes and re-encodes as base64 before comparing. +func (w *StellarWallet) FindRefundByReturnHashInPage(page hProtocol.TransactionsPage, txHash string) *hProtocol.Transaction { + hashBytes, err := hex.DecodeString(txHash) + if err != nil { + log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to hex-decode refund tx hash for memo comparison") + return nil + } + hashBase64 := base64.StdEncoding.EncodeToString(hashBytes) + + for _, tx := range page.Embedded.Records { + if tx.MemoType == "return" && tx.Memo == hashBase64 { + txCopy := tx + return &txCopy + } + } + return nil +} + +// FindPaymentBySequenceInPage scans a pre-fetched transactions page for a source account +// sequence number match. Used as a fallback for pre-upgrade txs submitted without a memo. +func (w *StellarWallet) FindPaymentBySequenceInPage(page hProtocol.TransactionsPage, sequenceNumber int64) *hProtocol.Transaction { + seqStr := strconv.FormatInt(sequenceNumber, 10) + for _, tx := range page.Embedded.Records { + if tx.AccountSequence == seqStr { + txCopy := tx + return &txCopy + } + } + return nil +}