Skip to content

Commit c646536

Browse files
committed
staticaddr/loopin: handle risk rejection notification
Add client handling for the server's static loop-in risk-rejected notification. If the server aborts confirmation-risk waiting before payment, the client fails the local swap instead of waiting for a payment deadline that will never start. Cache rejected notifications by swap hash using the same replay path as accepted notifications, and clear the opposite cached state when a final risk decision is received. This keeps reconnect and subscription-order races from stranding the client in the risk wait.
1 parent 6d9cf87 commit c646536

5 files changed

Lines changed: 618 additions & 9 deletions

File tree

notifications/manager.go

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const (
3030
// static loop in confirmation risk acceptance.
3131
NotificationTypeStaticLoopInRiskAccepted
3232

33+
// NotificationTypeStaticLoopInRiskRejected is the notification type for
34+
// static loop in confirmation risk rejection.
35+
NotificationTypeStaticLoopInRiskRejected
36+
3337
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3438
// swap notifications.
3539
NotificationTypeUnfinishedSwap
@@ -83,6 +87,9 @@ type Manager struct {
8387

8488
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
8589
ServerStaticLoopInRiskAcceptedNotification
90+
91+
staticLoopInRiskRejected map[lntypes.Hash]*swapserverrpc.
92+
ServerStaticLoopInRiskRejectedNotification
8693
}
8794

8895
// NewManager creates a new notification manager.
@@ -99,12 +106,17 @@ func NewManager(cfg *Config) *Manager {
99106
map[lntypes.Hash]*swapserverrpc.
100107
ServerStaticLoopInRiskAcceptedNotification,
101108
),
109+
staticLoopInRiskRejected: make(
110+
map[lntypes.Hash]*swapserverrpc.
111+
ServerStaticLoopInRiskRejectedNotification,
112+
),
102113
}
103114
}
104115

105116
type subscriber struct {
106117
subCtx context.Context
107118
recvChan any
119+
swapHash *lntypes.Hash
108120
}
109121

110122
// SubscribeReservations subscribes to the reservation notifications.
@@ -167,6 +179,7 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
167179
sub := subscriber{
168180
subCtx: ctx,
169181
recvChan: notifChan,
182+
swapHash: &swapHash,
170183
}
171184

172185
m.Lock()
@@ -190,6 +203,43 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
190203
return notifChan
191204
}
192205

206+
// SubscribeStaticLoopInRiskRejected subscribes to static loop in risk rejected
207+
// notifications.
208+
func (m *Manager) SubscribeStaticLoopInRiskRejected(ctx context.Context,
209+
swapHash lntypes.Hash,
210+
) <-chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification {
211+
212+
notifChan := make(
213+
chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification, 1,
214+
)
215+
216+
sub := subscriber{
217+
subCtx: ctx,
218+
recvChan: notifChan,
219+
swapHash: &swapHash,
220+
}
221+
222+
m.Lock()
223+
m.subscribers[NotificationTypeStaticLoopInRiskRejected] = append(
224+
m.subscribers[NotificationTypeStaticLoopInRiskRejected], sub,
225+
)
226+
if ntfn, ok := m.staticLoopInRiskRejected[swapHash]; ok {
227+
notifChan <- ntfn
228+
delete(m.staticLoopInRiskRejected, swapHash)
229+
}
230+
m.Unlock()
231+
232+
context.AfterFunc(ctx, func() {
233+
m.removeSubscriber(NotificationTypeStaticLoopInRiskRejected, sub)
234+
m.Lock()
235+
delete(m.staticLoopInRiskRejected, swapHash)
236+
m.Unlock()
237+
close(notifChan)
238+
})
239+
240+
return notifChan
241+
}
242+
193243
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
194244
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
195245
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -376,24 +426,37 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
376426
}
377427

378428
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
379-
// We'll forward the static loop in risk accepted notification to all
380-
// subscribers.
429+
// We'll forward the static loop in risk accepted notification to the
430+
// subscriber for the matching swap.
381431
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
382432
m.Lock()
383433
defer m.Unlock()
384434

435+
var (
436+
swapHash lntypes.Hash
437+
hasSwapHash bool
438+
)
385439
if riskAcceptedNtfn != nil {
386-
swapHash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
440+
hash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
387441
if err != nil {
388442
log.Warnf("Received invalid static loop in risk "+
389443
"accepted notification: %v", err)
390444
} else {
391-
m.staticLoopInRiskAccepted[swapHash] =
445+
swapHash = hash
446+
hasSwapHash = true
447+
m.staticLoopInRiskAccepted[hash] =
392448
riskAcceptedNtfn
449+
delete(m.staticLoopInRiskRejected, hash)
393450
}
394451
}
395452

396453
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
454+
if !hasSwapHash || sub.swapHash == nil ||
455+
*sub.swapHash != swapHash {
456+
457+
continue
458+
}
459+
397460
recvChan := sub.recvChan.(chan *swapserverrpc.
398461
ServerStaticLoopInRiskAcceptedNotification)
399462

@@ -406,6 +469,50 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
406469
}
407470
}
408471

472+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskRejected: // nolint: lll
473+
// We'll forward the static loop in risk rejected notification to the
474+
// subscriber for the matching swap.
475+
riskRejectedNtfn := ntfn.GetStaticLoopInRiskRejected()
476+
m.Lock()
477+
defer m.Unlock()
478+
479+
var (
480+
swapHash lntypes.Hash
481+
hasSwapHash bool
482+
)
483+
if riskRejectedNtfn != nil {
484+
hash, err := lntypes.MakeHash(riskRejectedNtfn.SwapHash)
485+
if err != nil {
486+
log.Warnf("Received invalid static loop in risk "+
487+
"rejected notification: %v", err)
488+
} else {
489+
swapHash = hash
490+
hasSwapHash = true
491+
m.staticLoopInRiskRejected[hash] =
492+
riskRejectedNtfn
493+
delete(m.staticLoopInRiskAccepted, hash)
494+
}
495+
}
496+
497+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskRejected] { // nolint: lll
498+
if !hasSwapHash || sub.swapHash == nil ||
499+
*sub.swapHash != swapHash {
500+
501+
continue
502+
}
503+
504+
recvChan := sub.recvChan.(chan *swapserverrpc.
505+
ServerStaticLoopInRiskRejectedNotification)
506+
507+
select {
508+
case recvChan <- riskRejectedNtfn:
509+
case <-sub.subCtx.Done():
510+
default:
511+
log.Debugf("Dropping static loop in risk " +
512+
"rejected notification for slow subscriber")
513+
}
514+
}
515+
409516
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
410517
// We'll forward the unfinished swap notification to all
411518
// subscribers.

notifications/manager_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,86 @@ func unfinishedSwapNotification(
216216
}
217217
}
218218

219+
func staticLoopInRiskAcceptedNotification(
220+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
221+
222+
return &swapserverrpc.SubscribeNotificationsResponse{
223+
Notification: &swapserverrpc.
224+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
225+
StaticLoopInRiskAccepted: &swapserverrpc.
226+
ServerStaticLoopInRiskAcceptedNotification{
227+
SwapHash: swapHash[:],
228+
},
229+
},
230+
}
231+
}
232+
233+
func staticLoopInRiskRejectedNotification(
234+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
235+
236+
return &swapserverrpc.SubscribeNotificationsResponse{
237+
Notification: &swapserverrpc.
238+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
239+
StaticLoopInRiskRejected: &swapserverrpc.
240+
ServerStaticLoopInRiskRejectedNotification{
241+
SwapHash: swapHash[:],
242+
},
243+
},
244+
}
245+
}
246+
247+
type staticLoopInRiskNotification interface {
248+
GetSwapHash() []byte
249+
}
250+
251+
func assertStaticLoopInRiskNotificationSwapScoped[
252+
T staticLoopInRiskNotification](t *testing.T,
253+
subscribe func(*Manager, context.Context, lntypes.Hash) <-chan T,
254+
notification func(lntypes.Hash) *swapserverrpc.
255+
SubscribeNotificationsResponse, label string,
256+
swapHashA, swapHashB lntypes.Hash) {
257+
258+
t.Helper()
259+
260+
mgr := NewManager(&Config{})
261+
262+
subCtx, subCancel := context.WithCancel(t.Context())
263+
defer subCancel()
264+
265+
subChanA := subscribe(mgr, subCtx, swapHashA)
266+
subChanB := subscribe(mgr, subCtx, swapHashB)
267+
268+
mgr.handleNotification(notification(swapHashA))
269+
270+
select {
271+
case received := <-subChanA:
272+
require.Equal(t, swapHashA[:], received.GetSwapHash())
273+
274+
case <-time.After(time.Second):
275+
t.Fatalf("did not receive first swap risk %s notification",
276+
label)
277+
}
278+
279+
select {
280+
case received := <-subChanB:
281+
t.Fatalf("second swap received wrong notification: %x",
282+
received.GetSwapHash())
283+
284+
default:
285+
}
286+
287+
mgr.handleNotification(notification(swapHashB))
288+
289+
select {
290+
case received := <-subChanB:
291+
require.Equal(t, swapHashB[:], received.GetSwapHash())
292+
293+
case <-time.After(time.Second):
294+
t.Fatalf("did not receive second swap risk %s notification",
295+
label)
296+
}
297+
}
298+
219299
// TestManager_SlowSubscriberDoesNotBlock tests that a subscriber with a full
220300
// notification channel does not block delivery to other subscribers.
221301
func TestManager_SlowSubscriberDoesNotBlock(t *testing.T) {
@@ -397,6 +477,22 @@ func TestManager_StaticLoopInRiskAcceptedNotification(t *testing.T) {
397477
}
398478
}
399479

480+
// TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped verifies that a
481+
// notification for one swap does not occupy another swap's subscriber channel.
482+
func TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped(t *testing.T) {
483+
t.Parallel()
484+
485+
assertStaticLoopInRiskNotificationSwapScoped(
486+
t, func(m *Manager, ctx context.Context,
487+
swapHash lntypes.Hash) <-chan *swapserverrpc.
488+
ServerStaticLoopInRiskAcceptedNotification {
489+
490+
return m.SubscribeStaticLoopInRiskAccepted(ctx, swapHash)
491+
}, staticLoopInRiskAcceptedNotification, "accepted",
492+
lntypes.Hash{0x04, 0x05}, lntypes.Hash{0x06, 0x07},
493+
)
494+
}
495+
400496
// TestManager_StaticLoopInRiskAcceptedNotificationReplay tests that the Manager
401497
// replays a risk accepted notification that arrives before the swap-specific
402498
// subscriber is registered.
@@ -432,6 +528,92 @@ func TestManager_StaticLoopInRiskAcceptedNotificationReplay(t *testing.T) {
432528
}
433529
}
434530

531+
// TestManager_StaticLoopInRiskRejectedNotification tests that the Manager
532+
// forwards static loop in risk rejected notifications to subscribers.
533+
func TestManager_StaticLoopInRiskRejectedNotification(t *testing.T) {
534+
t.Parallel()
535+
536+
mgr := NewManager(&Config{})
537+
538+
subCtx, subCancel := context.WithCancel(t.Context())
539+
defer subCancel()
540+
541+
swapHash := lntypes.Hash{0x08, 0x09}
542+
543+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
544+
545+
mgr.handleNotification(
546+
&swapserverrpc.SubscribeNotificationsResponse{
547+
Notification: &swapserverrpc.
548+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
549+
StaticLoopInRiskRejected: &swapserverrpc.
550+
ServerStaticLoopInRiskRejectedNotification{
551+
SwapHash: swapHash[:],
552+
},
553+
},
554+
},
555+
)
556+
557+
select {
558+
case received := <-subChan:
559+
require.Equal(t, swapHash[:], received.SwapHash)
560+
561+
case <-time.After(time.Second):
562+
t.Fatal("did not receive risk rejected notification")
563+
}
564+
}
565+
566+
// TestManager_StaticLoopInRiskRejectedNotificationSwapScoped verifies that a
567+
// notification for one swap does not occupy another swap's subscriber channel.
568+
func TestManager_StaticLoopInRiskRejectedNotificationSwapScoped(t *testing.T) {
569+
t.Parallel()
570+
571+
assertStaticLoopInRiskNotificationSwapScoped(
572+
t, func(m *Manager, ctx context.Context,
573+
swapHash lntypes.Hash) <-chan *swapserverrpc.
574+
ServerStaticLoopInRiskRejectedNotification {
575+
576+
return m.SubscribeStaticLoopInRiskRejected(ctx, swapHash)
577+
}, staticLoopInRiskRejectedNotification, "rejected",
578+
lntypes.Hash{0x08, 0x09}, lntypes.Hash{0x0a, 0x0b},
579+
)
580+
}
581+
582+
// TestManager_StaticLoopInRiskRejectedNotificationReplay tests that the Manager
583+
// replays a risk rejected notification that arrives before the swap-specific
584+
// subscriber is registered.
585+
func TestManager_StaticLoopInRiskRejectedNotificationReplay(t *testing.T) {
586+
t.Parallel()
587+
588+
mgr := NewManager(&Config{})
589+
590+
swapHash := lntypes.Hash{0x0a, 0x0b}
591+
mgr.handleNotification(
592+
&swapserverrpc.SubscribeNotificationsResponse{
593+
Notification: &swapserverrpc.
594+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
595+
StaticLoopInRiskRejected: &swapserverrpc.
596+
ServerStaticLoopInRiskRejectedNotification{
597+
SwapHash: swapHash[:],
598+
},
599+
},
600+
},
601+
)
602+
603+
subCtx, subCancel := context.WithCancel(t.Context())
604+
defer subCancel()
605+
606+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
607+
608+
select {
609+
case received := <-subChan:
610+
require.Equal(t, swapHash[:], received.SwapHash)
611+
612+
case <-time.After(time.Second):
613+
t.Fatal("did not replay risk rejected notification")
614+
}
615+
}
616+
435617
// TestManager_Backoff verifies that repeated failures in
436618
// subscribeNotifications cause the Manager to space out subscription attempts
437619
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)