Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions bridge/tfchain_bridge/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ require (
github.com/rs/zerolog v1.26.0
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/stellar/go v0.0.0-20210922122349-e6f322c047c5
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/vedhavyas/go-subkey v1.0.3
)

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/threefoldtech/tfchain/clients/tfchain-client-go v0.0.0-20230607082553-5605bca61c79
go.etcd.io/bbolt v1.3.9
)

require (
Expand Down Expand Up @@ -50,7 +51,7 @@ require (
github.com/rs/cors v1.8.2 // indirect
github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 // indirect
github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a // indirect
github.com/stretchr/testify v1.7.2 // indirect
github.com/stretchr/testify v1.8.1 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
Expand Down
16 changes: 11 additions & 5 deletions bridge/tfchain_bridge/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,17 @@ github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a/go.mod h1:yoxyU/M8n
github.com/stellar/throttled v2.2.3-0.20190823235211-89d75816f59d+incompatible/go.mod h1:7CJ23pXirXBJq45DqvO6clzTEGM/l1SfKrgrzLry8b4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As=
github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
Expand Down Expand Up @@ -381,6 +383,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down Expand Up @@ -502,6 +506,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
173 changes: 155 additions & 18 deletions bridge/tfchain_bridge/pkg/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package bridge

import (
"context"
"fmt"
"strconv"
"time"

"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg"
"github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/stellar"
subpkg "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/substrate"
Expand All @@ -26,6 +29,7 @@ type Bridge struct {
blockPersistency *pkg.ChainPersistency
config *pkg.BridgeConfig
depositFee int64
idempotency *pkg.IdempotencyStore
}

func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, error) {
Expand Down Expand Up @@ -64,12 +68,33 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, erro
return nil, "", err
}

// Crash-safe idempotency store, kept alongside the block persistency file.
// Records the PROCESSING/COMPLETED state of withdraws and refunds so that a
// crash between submitting a Stellar payment and confirming it on TFChain is
// recovered without double-paying or double-confirming.
idempotency, err := pkg.NewIdempotencyStore(cfg.PersistencyFile + ".idem.db")
if err != nil {
return nil, "", errors.Wrap(err, "failed to open idempotency store")
}

// The idempotency store is chain-scoped: withdraw keys are TFChain burn tx ids,
// which restart from a low number after a chain reset and would otherwise collide
// with stale COMPLETED entries, causing new withdraws to be wrongly skipped. The
// rescan flag marks a fresh start (it also zeroes the Stellar cursor above), so
// clear the store here too.
if cfg.RescanBridgeAccount {
if err := idempotency.Reset(); err != nil {
return nil, "", errors.Wrap(err, "failed to reset idempotency store")
}
}

bridge := &Bridge{
subClient: subClient,
blockPersistency: blockPersistency,
wallet: wallet,
config: &cfg,
depositFee: depositFee,
idempotency: idempotency,
}
// stat deposit fee?
return bridge, wallet.GetKeypair().Address(), nil
Expand All @@ -93,11 +118,25 @@ func (bridge *Bridge) preCheckBalance(ctx context.Context) error {
}

func (bridge *Bridge) Start(ctx context.Context) error {
// Close the idempotency store when Start returns.
defer func() {
if err := bridge.idempotency.Close(); err != nil {
log.Warn().Err(err).Msg("failed to close idempotency store")
}
}()

// pre-check wallet balance
if err := bridge.preCheckBalance(ctx); err != nil {
return err
}

// Crash recovery: reconcile any transactions left in PROCESSING state by a
// previous run before we start consuming new events. Non-fatal — unreconciled
// transactions are retried when their Ready event fires again.
if err := bridge.reconcilePendingTransactions(ctx); err != nil {
return errors.Wrap(err, "startup reconciliation failed")
}

log.Info().
Str("event_action", "bridge_started").
Str("event_kind", "event").
Expand Down Expand Up @@ -148,6 +187,27 @@ func (bridge *Bridge) Start(ctx context.Context) error {
if data.Err != nil {
return errors.Wrap(data.Err, "failed to get tfchain events")
}
// Ready events are processed before Created/Expired events: a Ready
// event submits a payment to Stellar whose signatures are time-sensitive
// (they expire), so it must not wait behind proposal/expiry handling.
for _, withdawReadyEvent := range data.Events.WithdrawReadyEvents {
err := bridge.handleWithdrawReady(ctx, withdawReadyEvent)
if err != nil {
if errors.Is(err, pkg.ErrTransactionAlreadyBurned) {
continue
}
return errors.Wrap(err, "an error occurred while handling WithdrawReadyEvents")
}
}
for _, refundReadyEvent := range data.Events.RefundReadyEvents {
err := bridge.handleRefundReady(ctx, refundReadyEvent)
if err != nil {
if errors.Is(err, pkg.ErrTransactionAlreadyRefunded) {
continue
}
return errors.Wrap(err, "an error occurred while handling RefundReadyEvents")
}
}
for _, withdrawCreatedEvent := range data.Events.WithdrawCreatedEvents {
err := bridge.handleWithdrawCreated(ctx, withdrawCreatedEvent)
if err != nil {
Expand All @@ -164,30 +224,12 @@ func (bridge *Bridge) Start(ctx context.Context) error {
return errors.Wrap(err, "an error occurred while handling WithdrawExpiredEvents")
}
}
for _, withdawReadyEvent := range data.Events.WithdrawReadyEvents {
err := bridge.handleWithdrawReady(ctx, withdawReadyEvent)
if err != nil {
if errors.Is(err, pkg.ErrTransactionAlreadyBurned) {
continue
}
return errors.Wrap(err, "an error occurred while handling WithdrawReadyEvents")
}
}
for _, refundExpiredEvent := range data.Events.RefundExpiredEvents {
err := bridge.handleRefundExpired(ctx, refundExpiredEvent)
if err != nil {
return errors.Wrap(err, "an error occurred while handling RefundExpiredEvents")
}
}
for _, refundReadyEvent := range data.Events.RefundReadyEvents {
err := bridge.handleRefundReady(ctx, refundReadyEvent)
if err != nil {
if errors.Is(err, pkg.ErrTransactionAlreadyRefunded) {
continue
}
return errors.Wrap(err, "an error occurred while handling RefundReadyEvents")
}
}
case data := <-stellarSub:
if data.Err != nil {
return errors.Wrap(data.Err, "failed to get stellar payments")
Expand Down Expand Up @@ -222,3 +264,98 @@ func (bridge *Bridge) Start(ctx context.Context) error {
time.Sleep(1 * time.Second)
}
}

// reconcilePendingTransactions runs once at startup to recover transactions that a
// previous run left in PROCESSING state — i.e. the Stellar payment may or may not
// have been submitted before the bridge stopped. For each pending withdraw/refund it
// looks for a matching outgoing Stellar transaction (by memo, falling back to the
// sequence number for pre-memo submissions). If found, the funds already left the
// bridge, so it only completes the TFChain confirmation and marks the entry COMPLETED.
// If not found, the entry is left PROCESSING and will be retried when its Ready event
// fires again. All failures here are non-fatal: a transient Horizon/RPC problem must
// not stop the bridge from starting.
func (bridge *Bridge) reconcilePendingTransactions(ctx context.Context) error {
pendingWithdraws, err := bridge.idempotency.GetPendingWithdraws()
if err != nil {
return errors.Wrap(err, "failed to get pending withdraws")
}
pendingRefunds, err := bridge.idempotency.GetPendingRefunds()
if err != nil {
return errors.Wrap(err, "failed to get pending refunds")
}

if len(pendingWithdraws) == 0 && len(pendingRefunds) == 0 {
return nil
}

log.Info().
Int("pending_withdraws", len(pendingWithdraws)).
Int("pending_refunds", len(pendingRefunds)).
Msg("reconciling pending transactions from previous run")

// Fetch outgoing transactions once and reuse the page for all lookups, avoiding
// one Horizon HTTP call per pending transaction.
outgoingPage, err := bridge.wallet.FetchOutgoingTransactionsPage(ctx)
if err != nil {
// Non-fatal: pending txs are retried when their next Ready event fires.
log.Warn().Err(err).Msg("failed to fetch Horizon transactions for reconciliation, pending transactions will retry on next event")
outgoingPage = hProtocol.TransactionsPage{}
}

for _, txID := range pendingWithdraws {
// Recover by the text memo (burn tx id), falling back to the account sequence
// number for payments submitted by a pre-memo bridge version.
stellarTx := bridge.wallet.FindPaymentByMemoInPage(outgoingPage, fmt.Sprint(txID))
if stellarTx == nil {
burnTx, err := bridge.subClient.GetBurnTransaction(types.U64(txID))
if err != nil {
log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to get burn tx for sequence lookup during reconciliation")
} else {
stellarTx = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(burnTx.SequenceNumber))
}
}

if stellarTx == nil {
log.Info().Uint64("tx_id", txID).Msg("reconcile: no Stellar tx found by memo or sequence, will retry on next event")
continue
}

log.Info().Uint64("tx_id", txID).Msg("reconcile: found existing Stellar payment, completing TFChain confirmation")
if err := bridge.subClient.RetrySetWithdrawExecuted(ctx, txID); err != nil {
log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to set withdraw executed during reconciliation")
continue
}
if err := bridge.idempotency.MarkWithdrawCompleted(txID); err != nil {
log.Warn().Err(err).Uint64("tx_id", txID).Msg("failed to mark withdraw completed during reconciliation")
}
}

for _, txHash := range pendingRefunds {
stellarTx := bridge.wallet.FindRefundByReturnHashInPage(outgoingPage, txHash)
if stellarTx == nil {
refundTx, err := bridge.subClient.GetRefundTransaction(txHash)
if err != nil {
log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to get refund tx for sequence lookup during reconciliation")
} else {
stellarTx = bridge.wallet.FindPaymentBySequenceInPage(outgoingPage, int64(refundTx.SequenceNumber))
}
}

if stellarTx == nil {
log.Info().Str("tx_hash", txHash).Msg("reconcile: no Stellar refund found by return hash or sequence, will retry on next event")
continue
}

log.Info().Str("tx_hash", txHash).Msg("reconcile: found existing Stellar refund, completing TFChain confirmation")
if err := bridge.subClient.RetrySetRefundTransactionExecutedTx(ctx, txHash); err != nil {
log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to set refund executed during reconciliation")
continue
}
if err := bridge.idempotency.MarkRefundCompleted(txHash); err != nil {
log.Warn().Err(err).Str("tx_hash", txHash).Msg("failed to mark refund completed during reconciliation")
}
}

log.Info().Msg("reconciliation complete")
return nil
}
Loading
Loading