Skip to content

Commit 20ae8b9

Browse files
committed
staticaddr/loopin: wait for risk decisions
Subscribe to static loop-in confirmation-risk notifications, start the payment deadline only after server acceptance or legacy confirmation fallback, and cancel the swap invoice when the server rejects the risk wait.
1 parent 722e76f commit 20ae8b9

6 files changed

Lines changed: 1382 additions & 32 deletions

File tree

notifications/manager.go

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,14 @@ const (
2626
// static loop in sweep requests.
2727
NotificationTypeStaticLoopInSweepRequest
2828

29+
// NotificationTypeStaticLoopInRiskAccepted is the notification type for
30+
// static loop in confirmation risk acceptance.
31+
NotificationTypeStaticLoopInRiskAccepted
32+
33+
// NotificationTypeStaticLoopInRiskRejected is the notification type for
34+
// static loop in confirmation risk rejection.
35+
NotificationTypeStaticLoopInRiskRejected
36+
2937
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3038
// swap notifications.
3139
NotificationTypeUnfinishedSwap
@@ -92,6 +100,12 @@ type Manager struct {
92100
hasL402 bool
93101

94102
subscribers map[NotificationType][]subscriber
103+
104+
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
105+
ServerStaticLoopInRiskAcceptedNotification
106+
107+
staticLoopInRiskRejected map[lntypes.Hash]*swapserverrpc.
108+
ServerStaticLoopInRiskRejectedNotification
95109
}
96110

97111
// NewManager creates a new notification manager.
@@ -107,13 +121,22 @@ func NewManager(cfg *Config) *Manager {
107121
return &Manager{
108122
cfg: cfg,
109123
subscribers: make(map[NotificationType][]subscriber),
124+
staticLoopInRiskAccepted: make(
125+
map[lntypes.Hash]*swapserverrpc.
126+
ServerStaticLoopInRiskAcceptedNotification,
127+
),
128+
staticLoopInRiskRejected: make(
129+
map[lntypes.Hash]*swapserverrpc.
130+
ServerStaticLoopInRiskRejectedNotification,
131+
),
110132
}
111133
}
112134

113135
type subscriber struct {
114136
subCtx context.Context
115137
recvChan any
116138
enqueue func(any)
139+
swapHash *lntypes.Hash
117140
}
118141

119142
// newNotificationQueue creates a per-subscriber FIFO delivery function.
@@ -271,6 +294,80 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
271294
return notifChan
272295
}
273296

297+
// SubscribeStaticLoopInRiskAccepted subscribes to static loop in risk accepted
298+
// notifications.
299+
func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
300+
swapHash lntypes.Hash,
301+
) <-chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification {
302+
303+
notifChan := make(
304+
chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification, 1,
305+
)
306+
307+
sub := subscriber{
308+
subCtx: ctx,
309+
recvChan: notifChan,
310+
swapHash: &swapHash,
311+
}
312+
313+
m.Lock()
314+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted] = append(
315+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted], sub,
316+
)
317+
if ntfn, ok := m.staticLoopInRiskAccepted[swapHash]; ok {
318+
notifChan <- ntfn
319+
delete(m.staticLoopInRiskAccepted, swapHash)
320+
}
321+
m.Unlock()
322+
323+
context.AfterFunc(ctx, func() {
324+
m.removeSubscriber(NotificationTypeStaticLoopInRiskAccepted, sub)
325+
m.Lock()
326+
delete(m.staticLoopInRiskAccepted, swapHash)
327+
m.Unlock()
328+
close(notifChan)
329+
})
330+
331+
return notifChan
332+
}
333+
334+
// SubscribeStaticLoopInRiskRejected subscribes to static loop in risk rejected
335+
// notifications.
336+
func (m *Manager) SubscribeStaticLoopInRiskRejected(ctx context.Context,
337+
swapHash lntypes.Hash,
338+
) <-chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification {
339+
340+
notifChan := make(
341+
chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification, 1,
342+
)
343+
344+
sub := subscriber{
345+
subCtx: ctx,
346+
recvChan: notifChan,
347+
swapHash: &swapHash,
348+
}
349+
350+
m.Lock()
351+
m.subscribers[NotificationTypeStaticLoopInRiskRejected] = append(
352+
m.subscribers[NotificationTypeStaticLoopInRiskRejected], sub,
353+
)
354+
if ntfn, ok := m.staticLoopInRiskRejected[swapHash]; ok {
355+
notifChan <- ntfn
356+
delete(m.staticLoopInRiskRejected, swapHash)
357+
}
358+
m.Unlock()
359+
360+
context.AfterFunc(ctx, func() {
361+
m.removeSubscriber(NotificationTypeStaticLoopInRiskRejected, sub)
362+
m.Lock()
363+
delete(m.staticLoopInRiskRejected, swapHash)
364+
m.Unlock()
365+
close(notifChan)
366+
})
367+
368+
return notifChan
369+
}
370+
274371
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
275372
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
276373
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -476,6 +573,94 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
476573
queueNotification(sub, recvChan, staticLoopInSweepRequestNtfn)
477574
}
478575

576+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
577+
// We'll forward the static loop in risk accepted notification to the
578+
// subscriber for the matching swap.
579+
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
580+
m.Lock()
581+
defer m.Unlock()
582+
583+
var (
584+
swapHash lntypes.Hash
585+
hasSwapHash bool
586+
)
587+
if riskAcceptedNtfn != nil {
588+
hash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
589+
if err != nil {
590+
log.Warnf("Received invalid static loop in risk "+
591+
"accepted notification: %v", err)
592+
} else {
593+
swapHash = hash
594+
hasSwapHash = true
595+
m.staticLoopInRiskAccepted[hash] =
596+
riskAcceptedNtfn
597+
delete(m.staticLoopInRiskRejected, hash)
598+
}
599+
}
600+
601+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
602+
if !hasSwapHash || sub.swapHash == nil ||
603+
*sub.swapHash != swapHash {
604+
605+
continue
606+
}
607+
608+
recvChan := sub.recvChan.(chan *swapserverrpc.
609+
ServerStaticLoopInRiskAcceptedNotification)
610+
611+
select {
612+
case recvChan <- riskAcceptedNtfn:
613+
case <-sub.subCtx.Done():
614+
default:
615+
log.Debugf("Dropping static loop in risk " +
616+
"accepted notification for slow subscriber")
617+
}
618+
}
619+
620+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskRejected: // nolint: lll
621+
// We'll forward the static loop in risk rejected notification to the
622+
// subscriber for the matching swap.
623+
riskRejectedNtfn := ntfn.GetStaticLoopInRiskRejected()
624+
m.Lock()
625+
defer m.Unlock()
626+
627+
var (
628+
swapHash lntypes.Hash
629+
hasSwapHash bool
630+
)
631+
if riskRejectedNtfn != nil {
632+
hash, err := lntypes.MakeHash(riskRejectedNtfn.SwapHash)
633+
if err != nil {
634+
log.Warnf("Received invalid static loop in risk "+
635+
"rejected notification: %v", err)
636+
} else {
637+
swapHash = hash
638+
hasSwapHash = true
639+
m.staticLoopInRiskRejected[hash] =
640+
riskRejectedNtfn
641+
delete(m.staticLoopInRiskAccepted, hash)
642+
}
643+
}
644+
645+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskRejected] { // nolint: lll
646+
if !hasSwapHash || sub.swapHash == nil ||
647+
*sub.swapHash != swapHash {
648+
649+
continue
650+
}
651+
652+
recvChan := sub.recvChan.(chan *swapserverrpc.
653+
ServerStaticLoopInRiskRejectedNotification)
654+
655+
select {
656+
case recvChan <- riskRejectedNtfn:
657+
case <-sub.subCtx.Done():
658+
default:
659+
log.Debugf("Dropping static loop in risk " +
660+
"rejected notification for slow subscriber")
661+
}
662+
}
663+
479664
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
480665
// We'll forward the unfinished swap notification to all
481666
// subscribers.

0 commit comments

Comments
 (0)