Skip to content

Commit 44c7825

Browse files
committed
withdraw: persist tx before state transitions
Persist finalized withdrawal tx pointers to all selected deposits before triggering state transitions. This avoids restart races where deposits can remain in WITHDRAWING without a recoverable finalized tx reference. Recovery now groups withdrawing deposits by tx hash and restores missing tx pointers when there is exactly one in-flight withdrawal cluster. Recovered pointers are written back so subsequent restarts remain stable. Add a regression test which exercises recoverWithdrawals end-to-end and verifies restart recovery for WITHDRAWING deposits missing FinalizedWithdrawalTx.
1 parent d141818 commit 44c7825

2 files changed

Lines changed: 297 additions & 31 deletions

File tree

staticaddr/withdraw/manager.go

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -238,20 +238,11 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error {
238238
return err
239239
}
240240

241-
// Group the deposits by their finalized withdrawal transaction.
242-
depositsByWithdrawalTx := make(map[chainhash.Hash][]*deposit.Deposit)
243-
hash2tx := make(map[chainhash.Hash]*wire.MsgTx)
244-
for _, d := range withdrawingDeposits {
245-
withdrawalTx := d.FinalizedWithdrawalTx
246-
if withdrawalTx == nil {
247-
continue
248-
}
249-
txid := withdrawalTx.TxHash()
250-
hash2tx[txid] = withdrawalTx
251-
252-
depositsByWithdrawalTx[txid] = append(
253-
depositsByWithdrawalTx[txid], d,
254-
)
241+
depositsByWithdrawalTx, hash2tx, err := m.groupWithdrawingDepositsByTx(
242+
ctx, withdrawingDeposits,
243+
)
244+
if err != nil {
245+
return err
255246
}
256247

257248
// Publishing a transaction can take a while in neutrino mode, so
@@ -303,6 +294,98 @@ func (m *Manager) recoverWithdrawals(ctx context.Context) error {
303294
return nil
304295
}
305296

297+
// groupWithdrawingDepositsByTx clusters withdrawing deposits by their finalized
298+
// withdrawal transaction hash.
299+
func (m *Manager) groupWithdrawingDepositsByTx(ctx context.Context,
300+
withdrawingDeposits []*deposit.Deposit) (
301+
map[chainhash.Hash][]*deposit.Deposit, map[chainhash.Hash]*wire.MsgTx,
302+
error) {
303+
304+
depositsByWithdrawalTx := make(map[chainhash.Hash][]*deposit.Deposit)
305+
hash2tx := make(map[chainhash.Hash]*wire.MsgTx)
306+
307+
// Build an index of all known finalized withdrawal transactions.
308+
for _, d := range withdrawingDeposits {
309+
if d.FinalizedWithdrawalTx == nil {
310+
continue
311+
}
312+
313+
txid := d.FinalizedWithdrawalTx.TxHash()
314+
hash2tx[txid] = d.FinalizedWithdrawalTx
315+
}
316+
317+
// If exactly one tx hash is present, we can recover missing tx pointers
318+
// from that single cluster.
319+
var fallbackTx *wire.MsgTx
320+
if len(hash2tx) == 1 {
321+
for _, tx := range hash2tx {
322+
fallbackTx = tx
323+
}
324+
}
325+
326+
for _, d := range withdrawingDeposits {
327+
withdrawalTx := d.FinalizedWithdrawalTx
328+
if withdrawalTx == nil {
329+
if fallbackTx == nil {
330+
log.Warnf("Skipping withdrawing deposit %v "+
331+
"during recovery: missing finalized "+
332+
"withdrawal tx", d.OutPoint)
333+
334+
continue
335+
}
336+
337+
// Persist the recovered tx pointer so future restarts
338+
// don't depend on in-memory fallback recovery.
339+
d.Lock()
340+
d.FinalizedWithdrawalTx = fallbackTx
341+
d.Unlock()
342+
343+
err := m.cfg.DepositManager.UpdateDeposit(ctx, d)
344+
if err != nil {
345+
return nil, nil, fmt.Errorf("unable to "+
346+
"persist recovered finalized "+
347+
"withdrawal tx for deposit %v: %w",
348+
d.OutPoint, err)
349+
}
350+
351+
log.Warnf("Recovered missing finalized withdrawal tx "+
352+
"for deposit %v", d.OutPoint)
353+
354+
withdrawalTx = fallbackTx
355+
}
356+
357+
txid := withdrawalTx.TxHash()
358+
hash2tx[txid] = withdrawalTx
359+
depositsByWithdrawalTx[txid] = append(
360+
depositsByWithdrawalTx[txid], d,
361+
)
362+
}
363+
364+
return depositsByWithdrawalTx, hash2tx, nil
365+
}
366+
367+
// persistFinalizedWithdrawalTx updates the selected deposits with the finalized
368+
// withdrawal tx and persists the change before state transitions.
369+
func (m *Manager) persistFinalizedWithdrawalTx(ctx context.Context,
370+
deposits []*deposit.Deposit, finalizedTx *wire.MsgTx) error {
371+
372+
for _, d := range deposits {
373+
d.Lock()
374+
d.FinalizedWithdrawalTx = finalizedTx
375+
d.Unlock()
376+
}
377+
378+
for _, d := range deposits {
379+
err := m.cfg.DepositManager.UpdateDeposit(ctx, d)
380+
if err != nil {
381+
return fmt.Errorf("failed to update deposit %v: %w",
382+
d.OutPoint, err)
383+
}
384+
}
385+
386+
return nil
387+
}
388+
306389
// WithdrawDeposits starts a deposits withdrawal flow. If the amount is set to 0
307390
// the full amount of the selected deposits will be withdrawn.
308391
func (m *Manager) WithdrawDeposits(ctx context.Context,
@@ -478,14 +561,11 @@ func (m *Manager) WithdrawDeposits(ctx context.Context,
478561
m.mu.Unlock()
479562
}
480563

481-
// Attach the finalized withdrawal tx to the deposits. After a client
482-
// restart we can use this address as an indicator to republish the
483-
// withdrawal tx and continue the withdrawal.
484-
// Deposits with the same withdrawal tx are part of the same withdrawal.
485-
for _, d := range deposits {
486-
d.Lock()
487-
d.FinalizedWithdrawalTx = finalizedTx
488-
d.Unlock()
564+
// Persist the finalized withdrawal tx before state transitions so that
565+
// a restart can recover the full withdrawal cluster.
566+
err = m.persistFinalizedWithdrawalTx(ctx, deposits, finalizedTx)
567+
if err != nil {
568+
return "", "", err
489569
}
490570

491571
// Add the new withdrawal tx to the finalized withdrawals to republish
@@ -504,15 +584,6 @@ func (m *Manager) WithdrawDeposits(ctx context.Context,
504584
err)
505585
}
506586

507-
// Update the deposits in the database.
508-
for _, d := range deposits {
509-
err = m.cfg.DepositManager.UpdateDeposit(ctx, d)
510-
if err != nil {
511-
return "", "", fmt.Errorf("failed to update "+
512-
"deposit %w", err)
513-
}
514-
}
515-
516587
return finalizedTx.TxID(), withdrawalAddress.String(), nil
517588
}
518589

staticaddr/withdraw/manager_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@ import (
99
"github.com/btcsuite/btcd/chaincfg/chainhash"
1010
"github.com/btcsuite/btcd/txscript"
1111
"github.com/btcsuite/btcd/wire"
12+
"github.com/lightninglabs/loop/fsm"
13+
"github.com/lightninglabs/loop/staticaddr/address"
1214
"github.com/lightninglabs/loop/staticaddr/deposit"
15+
"github.com/lightninglabs/loop/staticaddr/script"
1316
"github.com/lightninglabs/loop/swapserverrpc"
1417
"github.com/lightninglabs/loop/test"
18+
"github.com/lightningnetwork/lnd/build"
1519
"github.com/lightningnetwork/lnd/funding"
1620
"github.com/lightningnetwork/lnd/input"
1721
"github.com/lightningnetwork/lnd/lnrpc"
@@ -20,6 +24,10 @@ import (
2024
"github.com/stretchr/testify/require"
2125
)
2226

27+
func init() {
28+
UseLogger(build.NewSubLogger("WDRW", nil))
29+
}
30+
2331
// TestNewManagerHeightValidation ensures the constructor rejects zero heights.
2432
func TestNewManagerHeightValidation(t *testing.T) {
2533
t.Parallel()
@@ -606,3 +614,190 @@ func TestCalculateWithdrawalTxValues(t *testing.T) {
606614
})
607615
}
608616
}
617+
618+
// recoveryDepositManager is a test stub that tracks recovery interactions for
619+
// deposits in the WITHDRAWING state.
620+
type recoveryDepositManager struct {
621+
withdrawingDeposits []*deposit.Deposit
622+
transitioned [][]wire.OutPoint
623+
updated []wire.OutPoint
624+
}
625+
626+
// GetActiveDepositsInState returns the preset withdrawing deposits for the
627+
// recovery test.
628+
func (m *recoveryDepositManager) GetActiveDepositsInState(
629+
_ fsm.StateType) (
630+
[]*deposit.Deposit, error) {
631+
632+
return m.withdrawingDeposits, nil
633+
}
634+
635+
// AllOutpointsActiveDeposits reports no active deposit set lookup in this
636+
// test stub.
637+
func (m *recoveryDepositManager) AllOutpointsActiveDeposits(
638+
_ []wire.OutPoint, _ fsm.StateType) ([]*deposit.Deposit, bool) {
639+
640+
return nil, false
641+
}
642+
643+
// TransitionDeposits records the outpoints transitioned by recovery.
644+
func (m *recoveryDepositManager) TransitionDeposits(_ context.Context,
645+
deposits []*deposit.Deposit, _ fsm.EventType, _ fsm.StateType) error {
646+
647+
outpoints := make([]wire.OutPoint, len(deposits))
648+
for i, d := range deposits {
649+
outpoints[i] = d.OutPoint
650+
}
651+
652+
m.transitioned = append(m.transitioned, outpoints)
653+
654+
return nil
655+
}
656+
657+
// UpdateDeposit records which deposits were updated during recovery.
658+
func (m *recoveryDepositManager) UpdateDeposit(_ context.Context,
659+
d *deposit.Deposit) error {
660+
661+
m.updated = append(m.updated, d.OutPoint)
662+
663+
return nil
664+
}
665+
666+
// recoveryAddressManager is a test stub that serves static address parameters
667+
// needed by withdrawal recovery.
668+
type recoveryAddressManager struct {
669+
params *address.Parameters
670+
}
671+
672+
// GetStaticAddressParameters returns the preset static address parameters for
673+
// the recovery test.
674+
func (m *recoveryAddressManager) GetStaticAddressParameters(
675+
_ context.Context) (*address.Parameters, error) {
676+
677+
return m.params, nil
678+
}
679+
680+
// GetStaticAddress returns no static address in this test stub.
681+
func (m *recoveryAddressManager) GetStaticAddress(
682+
_ context.Context) (*script.StaticAddress, error) {
683+
684+
return nil, nil
685+
}
686+
687+
// TestRecoverWithdrawalsIncludesMissingFinalizedTxDeposits verifies regression
688+
// coverage for restart recovery where some deposits are in WITHDRAWING but
689+
// missing FinalizedWithdrawalTx pointers.
690+
//
691+
// Without the fix this test still builds, but fails at runtime because the
692+
// legacy recovery code silently skips those deposits and only reinstates the
693+
// subset with non-nil FinalizedWithdrawalTx.
694+
func TestRecoverWithdrawalsIncludesMissingFinalizedTxDeposits(t *testing.T) {
695+
t.Parallel()
696+
697+
tx := wire.NewMsgTx(2)
698+
tx.AddTxIn(&wire.TxIn{
699+
PreviousOutPoint: wire.OutPoint{
700+
Hash: chainhash.Hash{9},
701+
Index: 0,
702+
},
703+
})
704+
tx.AddTxOut(&wire.TxOut{
705+
Value: 1000,
706+
PkScript: []byte{txscript.OP_1},
707+
})
708+
709+
known1 := &deposit.Deposit{
710+
OutPoint: wire.OutPoint{
711+
Hash: chainhash.Hash{1},
712+
Index: 0,
713+
},
714+
ConfirmationHeight: 100,
715+
FinalizedWithdrawalTx: tx,
716+
}
717+
known2 := &deposit.Deposit{
718+
OutPoint: wire.OutPoint{
719+
Hash: chainhash.Hash{2},
720+
Index: 0,
721+
},
722+
ConfirmationHeight: 100,
723+
FinalizedWithdrawalTx: tx,
724+
}
725+
missing1 := &deposit.Deposit{
726+
OutPoint: wire.OutPoint{
727+
Hash: chainhash.Hash{3},
728+
Index: 0,
729+
},
730+
ConfirmationHeight: 100,
731+
}
732+
missing2 := &deposit.Deposit{
733+
OutPoint: wire.OutPoint{
734+
Hash: chainhash.Hash{4},
735+
Index: 0,
736+
},
737+
ConfirmationHeight: 100,
738+
}
739+
740+
depositMgr := &recoveryDepositManager{
741+
withdrawingDeposits: []*deposit.Deposit{
742+
known1, known2, missing1, missing2,
743+
},
744+
}
745+
addrMgr := &recoveryAddressManager{
746+
params: &address.Parameters{
747+
PkScript: []byte{txscript.OP_1},
748+
},
749+
}
750+
751+
lnd := test.NewMockLnd()
752+
go func() {
753+
<-lnd.TxPublishChannel
754+
}()
755+
go func() {
756+
<-lnd.RegisterSpendChannel
757+
}()
758+
759+
mgr, err := NewManager(&ManagerConfig{
760+
DepositManager: depositMgr,
761+
WalletKit: lnd.WalletKit,
762+
ChainNotifier: lnd.ChainNotifier,
763+
AddressManager: addrMgr,
764+
}, 101)
765+
require.NoError(t, err)
766+
767+
ctx, cancel := context.WithCancel(context.Background())
768+
defer cancel()
769+
770+
err = mgr.recoverWithdrawals(ctx)
771+
require.NoError(t, err)
772+
773+
// Assert we re-instated one withdrawal cluster containing all four
774+
// deposits. The old buggy behavior re-instated only the two deposits
775+
// that already had finalized tx pointers.
776+
require.Len(t, depositMgr.transitioned, 1)
777+
require.Len(t, depositMgr.transitioned[0], 4)
778+
779+
transitioned := make(map[wire.OutPoint]struct{})
780+
for _, op := range depositMgr.transitioned[0] {
781+
transitioned[op] = struct{}{}
782+
}
783+
_, ok := transitioned[missing1.OutPoint]
784+
require.True(t, ok)
785+
_, ok = transitioned[missing2.OutPoint]
786+
require.True(t, ok)
787+
788+
// Missing pointers should be recovered and persisted.
789+
updated := make(map[wire.OutPoint]struct{})
790+
for _, op := range depositMgr.updated {
791+
updated[op] = struct{}{}
792+
}
793+
_, ok = updated[missing1.OutPoint]
794+
require.True(t, ok)
795+
_, ok = updated[missing2.OutPoint]
796+
require.True(t, ok)
797+
require.NotNil(t, missing1.FinalizedWithdrawalTx)
798+
require.NotNil(t, missing2.FinalizedWithdrawalTx)
799+
800+
// Shut down notifier goroutines started by recovery.
801+
cancel()
802+
lnd.WaitForFinished()
803+
}

0 commit comments

Comments
 (0)