Skip to content

Commit 183d542

Browse files
committed
assets+loopdb: handle asset deposit expiry and timeout sweep
With this commit, asset deposit expiry is checked on each new block. When a deposit expires, a timeout sweep is published. The deposit state is updated both when the sweep is first published and again upon its confirmation.
1 parent b954e59 commit 183d542

5 files changed

Lines changed: 296 additions & 0 deletions

File tree

assets/deposit/manager.go

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/lightninglabs/taproot-assets/rpcutils"
2121
"github.com/lightninglabs/taproot-assets/taprpc"
2222
"github.com/lightninglabs/taproot-assets/taprpc/assetwalletrpc"
23+
"google.golang.org/protobuf/proto"
2324
)
2425

2526
var (
@@ -31,6 +32,10 @@ var (
3132
// shutting down and that no further calls should be made to it.
3233
ErrManagerShuttingDown = errors.New("asset deposit manager is " +
3334
"shutting down")
35+
36+
// lockExpiration us the expiration time we use for sweep fee
37+
// paying inputs.
38+
lockExpiration = time.Hour * 24
3439
)
3540

3641
// DepositUpdateCallback is a callback that is called when a deposit state is
@@ -67,6 +72,10 @@ type Manager struct {
6772
// currentHeight is the current block height of the chain.
6873
currentHeight uint32
6974

75+
// pendingSweeps is a map of all pending timeout sweeps. The key is the
76+
// deposit ID.
77+
pendingSweeps map[string]struct{}
78+
7079
// deposits is a map of all active deposits. The key is the deposit ID.
7180
deposits map[string]*Deposit
7281

@@ -118,6 +127,7 @@ func NewManager(depositServiceClient swapserverrpc.AssetDepositServiceClient,
118127
sweeper: sweeper,
119128
addressParams: addressParams,
120129
deposits: make(map[string]*Deposit),
130+
pendingSweeps: make(map[string]struct{}),
121131
subscribers: make(map[string]map[uint64]DepositUpdateCallback), //nolint:lll
122132
callEnter: make(chan struct{}),
123133
callLeave: make(chan struct{}),
@@ -220,6 +230,43 @@ func (m *Manager) criticalError(err error) {
220230

221231
// handleBlockEpoch is called when a new block is added to the chain.
222232
func (m *Manager) handleBlockEpoch(ctx context.Context, height uint32) error {
233+
for _, d := range m.deposits {
234+
if d.State != StateConfirmed {
235+
continue
236+
}
237+
238+
log.Debugf("Checking if deposit %v is expired, expiry=%v", d.ID,
239+
d.ConfirmationHeight+d.CsvExpiry)
240+
241+
if height < d.ConfirmationHeight+d.CsvExpiry {
242+
continue
243+
}
244+
245+
err := m.handleDepositExpired(ctx, d)
246+
if err != nil {
247+
log.Errorf("Unable to update deposit %v state: %v",
248+
d.ID, err)
249+
250+
return err
251+
}
252+
}
253+
254+
// Now publish the timeout sweeps for all expired deposits and also
255+
// move them to the pending sweeps map.
256+
for _, d := range m.deposits {
257+
// TODO(bhandras): republish will insert a new transfer entry in
258+
// tapd, despite the transfer already existing. To avoid that,
259+
// we won't re-publish the timeout sweep for now.
260+
if d.State != StateExpired {
261+
continue
262+
}
263+
264+
err := m.publishTimeoutSweep(ctx, d)
265+
if err != nil {
266+
return err
267+
}
268+
}
269+
223270
return nil
224271
}
225272

@@ -740,3 +787,212 @@ func (m *Manager) ListDeposits(ctx context.Context, minConfs, maxConfs uint32) (
740787

741788
return filteredDeposits, nil
742789
}
790+
791+
// handleDepositStateUpdate updates the deposit state in the store and
792+
// notifies all subscribers of the deposit state change.
793+
func (m *Manager) handleDepositExpired(ctx context.Context, d *Deposit) error {
794+
d.State = StateExpired
795+
err := d.GenerateSweepKeys(ctx, m.tapClient)
796+
if err != nil {
797+
log.Errorf("Unable to generate sweep keys for deposit %v: %v",
798+
d.ID, err)
799+
}
800+
801+
return m.handleDepositStateUpdate(ctx, d)
802+
}
803+
804+
// publishTimeoutSweep publishes a timeout sweep for the deposit. As we use the
805+
// same lock ID for the sponsoring inputs, it's possible to republish the sweep
806+
// however it'll create a new transfer entry in tapd, which we want to avoid
807+
// (for now).
808+
func (m *Manager) publishTimeoutSweep(ctx context.Context, d *Deposit) error {
809+
log.Infof("(Re)publishing timeout sweep for deposit %v", d.ID)
810+
811+
// TODO(bhandras): conf target should be dynamic/configrable.
812+
const confTarget = 2
813+
feeRateSatPerKw, err := m.walletKit.EstimateFeeRate(
814+
ctx, confTarget,
815+
)
816+
if err != nil {
817+
return err
818+
}
819+
820+
lockID, err := d.lockID()
821+
if err != nil {
822+
return err
823+
}
824+
825+
snedResp, err := m.sweeper.PublishDepositTimeoutSweep(
826+
ctx, d.Kit, d.Proof, asset.NewScriptKey(d.SweepScriptKey),
827+
d.SweepInternalKey, d.timeoutSweepLabel(),
828+
feeRateSatPerKw.FeePerVByte(), lockID, lockExpiration,
829+
)
830+
if err != nil {
831+
// TODO(bhandras): handle republish errors.
832+
log.Infof("Unable to publish timeout sweep for deposit %v: %v",
833+
d.ID, err)
834+
} else {
835+
log.Infof("Published timeout sweep for deposit %v: %x", d.ID,
836+
snedResp.Transfer.AnchorTxHash)
837+
838+
// Update deposit state on first successful publish.
839+
if d.State != StateTimeoutSweepPublished {
840+
d.State = StateTimeoutSweepPublished
841+
err = m.handleDepositStateUpdate(ctx, d)
842+
if err != nil {
843+
log.Errorf("Unable to update deposit %v "+
844+
"state: %v", d.ID, err)
845+
846+
return err
847+
}
848+
}
849+
}
850+
851+
// Start monitoring the sweep unless we're already doing so.
852+
if _, ok := m.pendingSweeps[d.ID]; !ok {
853+
err := m.waitForDepositSweep(ctx, d, d.timeoutSweepLabel())
854+
if err != nil {
855+
log.Errorf("Unable to wait for deposit %v spend: %v",
856+
d.ID, err)
857+
858+
return err
859+
}
860+
861+
m.pendingSweeps[d.ID] = struct{}{}
862+
}
863+
864+
return nil
865+
}
866+
867+
// waitForDepositSpend waits for the deposit to be spent. It subscribes to
868+
// receive events for the deposit's sweep address notifying us once the transfer
869+
// has completed.
870+
func (m *Manager) waitForDepositSweep(ctx context.Context, d *Deposit,
871+
label string) error {
872+
873+
log.Infof("Waiting for deposit sweep confirmation %s", d.ID)
874+
875+
eventChan, errChan, err := m.tapClient.WaitForSendComplete(
876+
ctx, d.SweepScriptKey.SerializeCompressed(), label,
877+
)
878+
if err != nil {
879+
log.Errorf("unable to subscribe to send events for deposit "+
880+
"sweep: %v", err,
881+
)
882+
}
883+
884+
go func() {
885+
select {
886+
case event := <-eventChan:
887+
// At this point we can consider the deposit confirmed.
888+
err = m.handleDepositSpend(ctx, d, event.Transfer)
889+
if err != nil {
890+
m.criticalError(err)
891+
}
892+
893+
case err := <-errChan:
894+
m.criticalError(err)
895+
}
896+
}()
897+
898+
return nil
899+
}
900+
901+
func formatProtoJSON(resp proto.Message) (string, error) {
902+
jsonBytes, err := taprpc.ProtoJSONMarshalOpts.Marshal(resp)
903+
if err != nil {
904+
return "", err
905+
}
906+
907+
return string(jsonBytes), nil
908+
}
909+
910+
func toJSON(resp proto.Message) string {
911+
jsonStr, _ := formatProtoJSON(resp)
912+
913+
return jsonStr
914+
}
915+
916+
// handleDepositSpend is called when the deposit is spent. It updates the
917+
// deposit state and releases the inputs used for the deposit sweep.
918+
func (m *Manager) handleDepositSpend(ctx context.Context, d *Deposit,
919+
transfer *taprpc.AssetTransfer) error {
920+
921+
done, err := m.scheduleNextCall()
922+
if err != nil {
923+
log.Errorf("Unable to schedule next call: %v", err)
924+
925+
return err
926+
}
927+
defer done()
928+
929+
switch d.State {
930+
case StateTimeoutSweepPublished:
931+
d.State = StateSwept
932+
933+
err := m.releaseDepositSweepInputs(ctx, d)
934+
if err != nil {
935+
log.Errorf("Unable to release deposit sweep inputs: "+
936+
"%v", err)
937+
938+
return err
939+
}
940+
941+
default:
942+
err := fmt.Errorf("Spent deposit %s in unexpected state %s",
943+
d.ID, d.State)
944+
945+
log.Errorf(err.Error())
946+
947+
return err
948+
}
949+
950+
log.Tracef("Deposit %s spent in transfer: %s\n", d.ID, toJSON(transfer))
951+
952+
// TODO(bhandras): should save the spend details to the store?
953+
err = m.handleDepositStateUpdate(ctx, d)
954+
if err != nil {
955+
return err
956+
}
957+
958+
// Sanity check that the deposit is in the pending sweeps map.
959+
if _, ok := m.pendingSweeps[d.ID]; !ok {
960+
log.Errorf("Deposit %v not found in pending deposits", d.ID)
961+
}
962+
963+
// We can now remove the deposit from the pending sweeps map as we don't
964+
// need to monitor for the spend anymore.
965+
delete(m.pendingSweeps, d.ID)
966+
967+
return nil
968+
}
969+
970+
// releaseDepositSweepInputs releases the inputs that were used for the deposit
971+
// sweep.
972+
func (m *Manager) releaseDepositSweepInputs(ctx context.Context,
973+
d *Deposit) error {
974+
975+
lockID, err := d.lockID()
976+
if err != nil {
977+
return err
978+
}
979+
980+
leases, err := m.walletKit.ListLeases(ctx)
981+
if err != nil {
982+
return err
983+
}
984+
985+
for _, lease := range leases {
986+
if lease.LockID != lockID {
987+
continue
988+
}
989+
990+
// Unlock any UTXOs that were used for the deposit sweep.
991+
err = m.walletKit.ReleaseOutput(ctx, lockID, lease.Outpoint)
992+
if err != nil {
993+
return err
994+
}
995+
}
996+
997+
return nil
998+
}

assets/deposit/sql_store.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ type Querier interface {
2929

3030
GetAssetDeposits(ctx context.Context) ([]sqlc.GetAssetDepositsRow,
3131
error)
32+
33+
SetAssetDepositSweepKeys(ctx context.Context,
34+
arg sqlc.SetAssetDepositSweepKeysParams) error
3235
}
3336

3437
// DepositBaseDB is the interface that contains all the queries generated
@@ -127,6 +130,20 @@ func (s *SQLStore) UpdateDeposit(ctx context.Context, d *Deposit) error {
127130
if err != nil {
128131
return err
129132
}
133+
134+
case StateExpired:
135+
scriptKey := d.SweepScriptKey.SerializeCompressed()
136+
internalKey := d.SweepInternalKey.SerializeCompressed()
137+
err := tx.SetAssetDepositSweepKeys(
138+
ctx, sqlc.SetAssetDepositSweepKeysParams{
139+
DepositID: d.ID,
140+
SweepScriptPubkey: scriptKey,
141+
SweepInternalPubkey: internalKey,
142+
},
143+
)
144+
if err != nil {
145+
return err
146+
}
130147
}
131148

132149
return tx.UpdateDepositState(

loopdb/sqlc/asset_deposits.sql.go

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/querier.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/queries/asset_deposits.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,8 @@ JOIN asset_deposit_updates u ON u.id = (
3939
LIMIT 1
4040
)
4141
ORDER BY d.created_at ASC;
42+
43+
-- name: SetAssetDepositSweepKeys :exec
44+
UPDATE asset_deposits
45+
SET sweep_script_pubkey = $2, sweep_internal_pubkey = $3
46+
WHERE deposit_id = $1;

0 commit comments

Comments
 (0)