@@ -2,12 +2,15 @@ package bridge
22
33import (
44 "context"
5+ "fmt"
56 "strconv"
67 "time"
78
9+ "github.com/centrifuge/go-substrate-rpc-client/v4/types"
810 "github.com/pkg/errors"
911 "github.com/rs/zerolog"
1012 "github.com/rs/zerolog/log"
13+ hProtocol "github.com/stellar/go/protocols/horizon"
1114 "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg"
1215 "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/stellar"
1316 subpkg "github.com/threefoldtech/tfchain/bridge/tfchain_bridge/pkg/substrate"
@@ -26,6 +29,7 @@ type Bridge struct {
2629 blockPersistency * pkg.ChainPersistency
2730 config * pkg.BridgeConfig
2831 depositFee int64
32+ idempotency * pkg.IdempotencyStore
2933}
3034
3135func NewBridge (ctx context.Context , cfg pkg.BridgeConfig ) (* Bridge , string , error ) {
@@ -64,12 +68,33 @@ func NewBridge(ctx context.Context, cfg pkg.BridgeConfig) (*Bridge, string, erro
6468 return nil , "" , err
6569 }
6670
71+ // Crash-safe idempotency store, kept alongside the block persistency file.
72+ // Records the PROCESSING/COMPLETED state of withdraws and refunds so that a
73+ // crash between submitting a Stellar payment and confirming it on TFChain is
74+ // recovered without double-paying or double-confirming.
75+ idempotency , err := pkg .NewIdempotencyStore (cfg .PersistencyFile + ".idem.db" )
76+ if err != nil {
77+ return nil , "" , errors .Wrap (err , "failed to open idempotency store" )
78+ }
79+
80+ // The idempotency store is chain-scoped: withdraw keys are TFChain burn tx ids,
81+ // which restart from a low number after a chain reset and would otherwise collide
82+ // with stale COMPLETED entries, causing new withdraws to be wrongly skipped. The
83+ // rescan flag marks a fresh start (it also zeroes the Stellar cursor above), so
84+ // clear the store here too.
85+ if cfg .RescanBridgeAccount {
86+ if err := idempotency .Reset (); err != nil {
87+ return nil , "" , errors .Wrap (err , "failed to reset idempotency store" )
88+ }
89+ }
90+
6791 bridge := & Bridge {
6892 subClient : subClient ,
6993 blockPersistency : blockPersistency ,
7094 wallet : wallet ,
7195 config : & cfg ,
7296 depositFee : depositFee ,
97+ idempotency : idempotency ,
7398 }
7499 // stat deposit fee?
75100 return bridge , wallet .GetKeypair ().Address (), nil
@@ -93,11 +118,25 @@ func (bridge *Bridge) preCheckBalance(ctx context.Context) error {
93118}
94119
95120func (bridge * Bridge ) Start (ctx context.Context ) error {
121+ // Close the idempotency store when Start returns.
122+ defer func () {
123+ if err := bridge .idempotency .Close (); err != nil {
124+ log .Warn ().Err (err ).Msg ("failed to close idempotency store" )
125+ }
126+ }()
127+
96128 // pre-check wallet balance
97129 if err := bridge .preCheckBalance (ctx ); err != nil {
98130 return err
99131 }
100132
133+ // Crash recovery: reconcile any transactions left in PROCESSING state by a
134+ // previous run before we start consuming new events. Non-fatal — unreconciled
135+ // transactions are retried when their Ready event fires again.
136+ if err := bridge .reconcilePendingTransactions (ctx ); err != nil {
137+ return errors .Wrap (err , "startup reconciliation failed" )
138+ }
139+
101140 log .Info ().
102141 Str ("event_action" , "bridge_started" ).
103142 Str ("event_kind" , "event" ).
@@ -148,6 +187,27 @@ func (bridge *Bridge) Start(ctx context.Context) error {
148187 if data .Err != nil {
149188 return errors .Wrap (data .Err , "failed to get tfchain events" )
150189 }
190+ // Ready events are processed before Created/Expired events: a Ready
191+ // event submits a payment to Stellar whose signatures are time-sensitive
192+ // (they expire), so it must not wait behind proposal/expiry handling.
193+ for _ , withdawReadyEvent := range data .Events .WithdrawReadyEvents {
194+ err := bridge .handleWithdrawReady (ctx , withdawReadyEvent )
195+ if err != nil {
196+ if errors .Is (err , pkg .ErrTransactionAlreadyBurned ) {
197+ continue
198+ }
199+ return errors .Wrap (err , "an error occurred while handling WithdrawReadyEvents" )
200+ }
201+ }
202+ for _ , refundReadyEvent := range data .Events .RefundReadyEvents {
203+ err := bridge .handleRefundReady (ctx , refundReadyEvent )
204+ if err != nil {
205+ if errors .Is (err , pkg .ErrTransactionAlreadyRefunded ) {
206+ continue
207+ }
208+ return errors .Wrap (err , "an error occurred while handling RefundReadyEvents" )
209+ }
210+ }
151211 for _ , withdrawCreatedEvent := range data .Events .WithdrawCreatedEvents {
152212 err := bridge .handleWithdrawCreated (ctx , withdrawCreatedEvent )
153213 if err != nil {
@@ -164,30 +224,12 @@ func (bridge *Bridge) Start(ctx context.Context) error {
164224 return errors .Wrap (err , "an error occurred while handling WithdrawExpiredEvents" )
165225 }
166226 }
167- for _ , withdawReadyEvent := range data .Events .WithdrawReadyEvents {
168- err := bridge .handleWithdrawReady (ctx , withdawReadyEvent )
169- if err != nil {
170- if errors .Is (err , pkg .ErrTransactionAlreadyBurned ) {
171- continue
172- }
173- return errors .Wrap (err , "an error occurred while handling WithdrawReadyEvents" )
174- }
175- }
176227 for _ , refundExpiredEvent := range data .Events .RefundExpiredEvents {
177228 err := bridge .handleRefundExpired (ctx , refundExpiredEvent )
178229 if err != nil {
179230 return errors .Wrap (err , "an error occurred while handling RefundExpiredEvents" )
180231 }
181232 }
182- for _ , refundReadyEvent := range data .Events .RefundReadyEvents {
183- err := bridge .handleRefundReady (ctx , refundReadyEvent )
184- if err != nil {
185- if errors .Is (err , pkg .ErrTransactionAlreadyRefunded ) {
186- continue
187- }
188- return errors .Wrap (err , "an error occurred while handling RefundReadyEvents" )
189- }
190- }
191233 case data := <- stellarSub :
192234 if data .Err != nil {
193235 return errors .Wrap (data .Err , "failed to get stellar payments" )
@@ -222,3 +264,98 @@ func (bridge *Bridge) Start(ctx context.Context) error {
222264 time .Sleep (1 * time .Second )
223265 }
224266}
267+
268+ // reconcilePendingTransactions runs once at startup to recover transactions that a
269+ // previous run left in PROCESSING state — i.e. the Stellar payment may or may not
270+ // have been submitted before the bridge stopped. For each pending withdraw/refund it
271+ // looks for a matching outgoing Stellar transaction (by memo, falling back to the
272+ // sequence number for pre-memo submissions). If found, the funds already left the
273+ // bridge, so it only completes the TFChain confirmation and marks the entry COMPLETED.
274+ // If not found, the entry is left PROCESSING and will be retried when its Ready event
275+ // fires again. All failures here are non-fatal: a transient Horizon/RPC problem must
276+ // not stop the bridge from starting.
277+ func (bridge * Bridge ) reconcilePendingTransactions (ctx context.Context ) error {
278+ pendingWithdraws , err := bridge .idempotency .GetPendingWithdraws ()
279+ if err != nil {
280+ return errors .Wrap (err , "failed to get pending withdraws" )
281+ }
282+ pendingRefunds , err := bridge .idempotency .GetPendingRefunds ()
283+ if err != nil {
284+ return errors .Wrap (err , "failed to get pending refunds" )
285+ }
286+
287+ if len (pendingWithdraws ) == 0 && len (pendingRefunds ) == 0 {
288+ return nil
289+ }
290+
291+ log .Info ().
292+ Int ("pending_withdraws" , len (pendingWithdraws )).
293+ Int ("pending_refunds" , len (pendingRefunds )).
294+ Msg ("reconciling pending transactions from previous run" )
295+
296+ // Fetch outgoing transactions once and reuse the page for all lookups, avoiding
297+ // one Horizon HTTP call per pending transaction.
298+ outgoingPage , err := bridge .wallet .FetchOutgoingTransactionsPage (ctx )
299+ if err != nil {
300+ // Non-fatal: pending txs are retried when their next Ready event fires.
301+ log .Warn ().Err (err ).Msg ("failed to fetch Horizon transactions for reconciliation, pending transactions will retry on next event" )
302+ outgoingPage = hProtocol.TransactionsPage {}
303+ }
304+
305+ for _ , txID := range pendingWithdraws {
306+ // Recover by the text memo (burn tx id), falling back to the account sequence
307+ // number for payments submitted by a pre-memo bridge version.
308+ stellarTx := bridge .wallet .FindPaymentByMemoInPage (outgoingPage , fmt .Sprint (txID ))
309+ if stellarTx == nil {
310+ burnTx , err := bridge .subClient .GetBurnTransaction (types .U64 (txID ))
311+ if err != nil {
312+ log .Warn ().Err (err ).Uint64 ("tx_id" , txID ).Msg ("failed to get burn tx for sequence lookup during reconciliation" )
313+ } else {
314+ stellarTx = bridge .wallet .FindPaymentBySequenceInPage (outgoingPage , int64 (burnTx .SequenceNumber ))
315+ }
316+ }
317+
318+ if stellarTx == nil {
319+ log .Info ().Uint64 ("tx_id" , txID ).Msg ("reconcile: no Stellar tx found by memo or sequence, will retry on next event" )
320+ continue
321+ }
322+
323+ log .Info ().Uint64 ("tx_id" , txID ).Msg ("reconcile: found existing Stellar payment, completing TFChain confirmation" )
324+ if err := bridge .subClient .RetrySetWithdrawExecuted (ctx , txID ); err != nil {
325+ log .Warn ().Err (err ).Uint64 ("tx_id" , txID ).Msg ("failed to set withdraw executed during reconciliation" )
326+ continue
327+ }
328+ if err := bridge .idempotency .MarkWithdrawCompleted (txID ); err != nil {
329+ log .Warn ().Err (err ).Uint64 ("tx_id" , txID ).Msg ("failed to mark withdraw completed during reconciliation" )
330+ }
331+ }
332+
333+ for _ , txHash := range pendingRefunds {
334+ stellarTx := bridge .wallet .FindRefundByReturnHashInPage (outgoingPage , txHash )
335+ if stellarTx == nil {
336+ refundTx , err := bridge .subClient .GetRefundTransaction (txHash )
337+ if err != nil {
338+ log .Warn ().Err (err ).Str ("tx_hash" , txHash ).Msg ("failed to get refund tx for sequence lookup during reconciliation" )
339+ } else {
340+ stellarTx = bridge .wallet .FindPaymentBySequenceInPage (outgoingPage , int64 (refundTx .SequenceNumber ))
341+ }
342+ }
343+
344+ if stellarTx == nil {
345+ log .Info ().Str ("tx_hash" , txHash ).Msg ("reconcile: no Stellar refund found by return hash or sequence, will retry on next event" )
346+ continue
347+ }
348+
349+ log .Info ().Str ("tx_hash" , txHash ).Msg ("reconcile: found existing Stellar refund, completing TFChain confirmation" )
350+ if err := bridge .subClient .RetrySetRefundTransactionExecutedTx (ctx , txHash ); err != nil {
351+ log .Warn ().Err (err ).Str ("tx_hash" , txHash ).Msg ("failed to set refund executed during reconciliation" )
352+ continue
353+ }
354+ if err := bridge .idempotency .MarkRefundCompleted (txHash ); err != nil {
355+ log .Warn ().Err (err ).Str ("tx_hash" , txHash ).Msg ("failed to mark refund completed during reconciliation" )
356+ }
357+ }
358+
359+ log .Info ().Msg ("reconciliation complete" )
360+ return nil
361+ }
0 commit comments