Skip to content

Commit 2fe6a75

Browse files
committed
staticaddr/loopin: handle risk rejection notification
Add client handling for the server's static loop-in risk-rejected notification. If the server aborts confirmation-risk waiting before payment, the client fails the local swap instead of waiting for a payment deadline that will never start. Cache rejected notifications by swap hash using the same replay path as accepted notifications, and clear the opposite cached state when a final risk decision is received. This keeps reconnect and subscription-order races from stranding the client in the risk wait.
1 parent 62a51f6 commit 2fe6a75

5 files changed

Lines changed: 618 additions & 9 deletions

File tree

notifications/manager.go

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const (
3030
// static loop in confirmation risk acceptance.
3131
NotificationTypeStaticLoopInRiskAccepted
3232

33+
// NotificationTypeStaticLoopInRiskRejected is the notification type for
34+
// static loop in confirmation risk rejection.
35+
NotificationTypeStaticLoopInRiskRejected
36+
3337
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3438
// swap notifications.
3539
NotificationTypeUnfinishedSwap
@@ -83,6 +87,9 @@ type Manager struct {
8387

8488
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
8589
ServerStaticLoopInRiskAcceptedNotification
90+
91+
staticLoopInRiskRejected map[lntypes.Hash]*swapserverrpc.
92+
ServerStaticLoopInRiskRejectedNotification
8693
}
8794

8895
// NewManager creates a new notification manager.
@@ -99,12 +106,17 @@ func NewManager(cfg *Config) *Manager {
99106
map[lntypes.Hash]*swapserverrpc.
100107
ServerStaticLoopInRiskAcceptedNotification,
101108
),
109+
staticLoopInRiskRejected: make(
110+
map[lntypes.Hash]*swapserverrpc.
111+
ServerStaticLoopInRiskRejectedNotification,
112+
),
102113
}
103114
}
104115

105116
type subscriber struct {
106117
subCtx context.Context
107118
recvChan any
119+
swapHash *lntypes.Hash
108120
}
109121

110122
// SubscribeReservations subscribes to the reservation notifications.
@@ -167,6 +179,7 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
167179
sub := subscriber{
168180
subCtx: ctx,
169181
recvChan: notifChan,
182+
swapHash: &swapHash,
170183
}
171184

172185
m.Lock()
@@ -190,6 +203,43 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
190203
return notifChan
191204
}
192205

206+
// SubscribeStaticLoopInRiskRejected subscribes to static loop in risk rejected
207+
// notifications.
208+
func (m *Manager) SubscribeStaticLoopInRiskRejected(ctx context.Context,
209+
swapHash lntypes.Hash,
210+
) <-chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification {
211+
212+
notifChan := make(
213+
chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification, 1,
214+
)
215+
216+
sub := subscriber{
217+
subCtx: ctx,
218+
recvChan: notifChan,
219+
swapHash: &swapHash,
220+
}
221+
222+
m.Lock()
223+
m.subscribers[NotificationTypeStaticLoopInRiskRejected] = append(
224+
m.subscribers[NotificationTypeStaticLoopInRiskRejected], sub,
225+
)
226+
if ntfn, ok := m.staticLoopInRiskRejected[swapHash]; ok {
227+
notifChan <- ntfn
228+
delete(m.staticLoopInRiskRejected, swapHash)
229+
}
230+
m.Unlock()
231+
232+
context.AfterFunc(ctx, func() {
233+
m.removeSubscriber(NotificationTypeStaticLoopInRiskRejected, sub)
234+
m.Lock()
235+
delete(m.staticLoopInRiskRejected, swapHash)
236+
m.Unlock()
237+
close(notifChan)
238+
})
239+
240+
return notifChan
241+
}
242+
193243
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
194244
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
195245
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -376,24 +426,37 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
376426
}
377427

378428
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
379-
// We'll forward the static loop in risk accepted notification to all
380-
// subscribers.
429+
// We'll forward the static loop in risk accepted notification to the
430+
// subscriber for the matching swap.
381431
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
382432
m.Lock()
383433
defer m.Unlock()
384434

435+
var (
436+
swapHash lntypes.Hash
437+
hasSwapHash bool
438+
)
385439
if riskAcceptedNtfn != nil {
386-
swapHash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
440+
hash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
387441
if err != nil {
388442
log.Warnf("Received invalid static loop in risk "+
389443
"accepted notification: %v", err)
390444
} else {
391-
m.staticLoopInRiskAccepted[swapHash] =
445+
swapHash = hash
446+
hasSwapHash = true
447+
m.staticLoopInRiskAccepted[hash] =
392448
riskAcceptedNtfn
449+
delete(m.staticLoopInRiskRejected, hash)
393450
}
394451
}
395452

396453
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
454+
if !hasSwapHash || sub.swapHash == nil ||
455+
*sub.swapHash != swapHash {
456+
457+
continue
458+
}
459+
397460
recvChan := sub.recvChan.(chan *swapserverrpc.
398461
ServerStaticLoopInRiskAcceptedNotification)
399462

@@ -406,6 +469,50 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
406469
}
407470
}
408471

472+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskRejected: // nolint: lll
473+
// We'll forward the static loop in risk rejected notification to the
474+
// subscriber for the matching swap.
475+
riskRejectedNtfn := ntfn.GetStaticLoopInRiskRejected()
476+
m.Lock()
477+
defer m.Unlock()
478+
479+
var (
480+
swapHash lntypes.Hash
481+
hasSwapHash bool
482+
)
483+
if riskRejectedNtfn != nil {
484+
hash, err := lntypes.MakeHash(riskRejectedNtfn.SwapHash)
485+
if err != nil {
486+
log.Warnf("Received invalid static loop in risk "+
487+
"rejected notification: %v", err)
488+
} else {
489+
swapHash = hash
490+
hasSwapHash = true
491+
m.staticLoopInRiskRejected[hash] =
492+
riskRejectedNtfn
493+
delete(m.staticLoopInRiskAccepted, hash)
494+
}
495+
}
496+
497+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskRejected] { // nolint: lll
498+
if !hasSwapHash || sub.swapHash == nil ||
499+
*sub.swapHash != swapHash {
500+
501+
continue
502+
}
503+
504+
recvChan := sub.recvChan.(chan *swapserverrpc.
505+
ServerStaticLoopInRiskRejectedNotification)
506+
507+
select {
508+
case recvChan <- riskRejectedNtfn:
509+
case <-sub.subCtx.Done():
510+
default:
511+
log.Debugf("Dropping static loop in risk " +
512+
"rejected notification for slow subscriber")
513+
}
514+
}
515+
409516
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
410517
// We'll forward the unfinished swap notification to all
411518
// subscribers.

notifications/manager_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,86 @@ func unfinishedSwapNotification(
202202
}
203203
}
204204

205+
func staticLoopInRiskAcceptedNotification(
206+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207+
208+
return &swapserverrpc.SubscribeNotificationsResponse{
209+
Notification: &swapserverrpc.
210+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
211+
StaticLoopInRiskAccepted: &swapserverrpc.
212+
ServerStaticLoopInRiskAcceptedNotification{
213+
SwapHash: swapHash[:],
214+
},
215+
},
216+
}
217+
}
218+
219+
func staticLoopInRiskRejectedNotification(
220+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
221+
222+
return &swapserverrpc.SubscribeNotificationsResponse{
223+
Notification: &swapserverrpc.
224+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
225+
StaticLoopInRiskRejected: &swapserverrpc.
226+
ServerStaticLoopInRiskRejectedNotification{
227+
SwapHash: swapHash[:],
228+
},
229+
},
230+
}
231+
}
232+
233+
type staticLoopInRiskNotification interface {
234+
GetSwapHash() []byte
235+
}
236+
237+
func assertStaticLoopInRiskNotificationSwapScoped[
238+
T staticLoopInRiskNotification](t *testing.T,
239+
subscribe func(*Manager, context.Context, lntypes.Hash) <-chan T,
240+
notification func(lntypes.Hash) *swapserverrpc.
241+
SubscribeNotificationsResponse, label string,
242+
swapHashA, swapHashB lntypes.Hash) {
243+
244+
t.Helper()
245+
246+
mgr := NewManager(&Config{})
247+
248+
subCtx, subCancel := context.WithCancel(t.Context())
249+
defer subCancel()
250+
251+
subChanA := subscribe(mgr, subCtx, swapHashA)
252+
subChanB := subscribe(mgr, subCtx, swapHashB)
253+
254+
mgr.handleNotification(notification(swapHashA))
255+
256+
select {
257+
case received := <-subChanA:
258+
require.Equal(t, swapHashA[:], received.GetSwapHash())
259+
260+
case <-time.After(time.Second):
261+
t.Fatalf("did not receive first swap risk %s notification",
262+
label)
263+
}
264+
265+
select {
266+
case received := <-subChanB:
267+
t.Fatalf("second swap received wrong notification: %x",
268+
received.GetSwapHash())
269+
270+
default:
271+
}
272+
273+
mgr.handleNotification(notification(swapHashB))
274+
275+
select {
276+
case received := <-subChanB:
277+
require.Equal(t, swapHashB[:], received.GetSwapHash())
278+
279+
case <-time.After(time.Second):
280+
t.Fatalf("did not receive second swap risk %s notification",
281+
label)
282+
}
283+
}
284+
205285
// TestManager_SlowSubscriberDoesNotBlock tests that a subscriber with a full
206286
// notification channel does not block delivery to other subscribers.
207287
func TestManager_SlowSubscriberDoesNotBlock(t *testing.T) {
@@ -333,6 +413,22 @@ func TestManager_StaticLoopInRiskAcceptedNotification(t *testing.T) {
333413
}
334414
}
335415

416+
// TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped verifies that a
417+
// notification for one swap does not occupy another swap's subscriber channel.
418+
func TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped(t *testing.T) {
419+
t.Parallel()
420+
421+
assertStaticLoopInRiskNotificationSwapScoped(
422+
t, func(m *Manager, ctx context.Context,
423+
swapHash lntypes.Hash) <-chan *swapserverrpc.
424+
ServerStaticLoopInRiskAcceptedNotification {
425+
426+
return m.SubscribeStaticLoopInRiskAccepted(ctx, swapHash)
427+
}, staticLoopInRiskAcceptedNotification, "accepted",
428+
lntypes.Hash{0x04, 0x05}, lntypes.Hash{0x06, 0x07},
429+
)
430+
}
431+
336432
// TestManager_StaticLoopInRiskAcceptedNotificationReplay tests that the Manager
337433
// replays a risk accepted notification that arrives before the swap-specific
338434
// subscriber is registered.
@@ -368,6 +464,92 @@ func TestManager_StaticLoopInRiskAcceptedNotificationReplay(t *testing.T) {
368464
}
369465
}
370466

467+
// TestManager_StaticLoopInRiskRejectedNotification tests that the Manager
468+
// forwards static loop in risk rejected notifications to subscribers.
469+
func TestManager_StaticLoopInRiskRejectedNotification(t *testing.T) {
470+
t.Parallel()
471+
472+
mgr := NewManager(&Config{})
473+
474+
subCtx, subCancel := context.WithCancel(t.Context())
475+
defer subCancel()
476+
477+
swapHash := lntypes.Hash{0x08, 0x09}
478+
479+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
480+
481+
mgr.handleNotification(
482+
&swapserverrpc.SubscribeNotificationsResponse{
483+
Notification: &swapserverrpc.
484+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
485+
StaticLoopInRiskRejected: &swapserverrpc.
486+
ServerStaticLoopInRiskRejectedNotification{
487+
SwapHash: swapHash[:],
488+
},
489+
},
490+
},
491+
)
492+
493+
select {
494+
case received := <-subChan:
495+
require.Equal(t, swapHash[:], received.SwapHash)
496+
497+
case <-time.After(time.Second):
498+
t.Fatal("did not receive risk rejected notification")
499+
}
500+
}
501+
502+
// TestManager_StaticLoopInRiskRejectedNotificationSwapScoped verifies that a
503+
// notification for one swap does not occupy another swap's subscriber channel.
504+
func TestManager_StaticLoopInRiskRejectedNotificationSwapScoped(t *testing.T) {
505+
t.Parallel()
506+
507+
assertStaticLoopInRiskNotificationSwapScoped(
508+
t, func(m *Manager, ctx context.Context,
509+
swapHash lntypes.Hash) <-chan *swapserverrpc.
510+
ServerStaticLoopInRiskRejectedNotification {
511+
512+
return m.SubscribeStaticLoopInRiskRejected(ctx, swapHash)
513+
}, staticLoopInRiskRejectedNotification, "rejected",
514+
lntypes.Hash{0x08, 0x09}, lntypes.Hash{0x0a, 0x0b},
515+
)
516+
}
517+
518+
// TestManager_StaticLoopInRiskRejectedNotificationReplay tests that the Manager
519+
// replays a risk rejected notification that arrives before the swap-specific
520+
// subscriber is registered.
521+
func TestManager_StaticLoopInRiskRejectedNotificationReplay(t *testing.T) {
522+
t.Parallel()
523+
524+
mgr := NewManager(&Config{})
525+
526+
swapHash := lntypes.Hash{0x0a, 0x0b}
527+
mgr.handleNotification(
528+
&swapserverrpc.SubscribeNotificationsResponse{
529+
Notification: &swapserverrpc.
530+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
531+
StaticLoopInRiskRejected: &swapserverrpc.
532+
ServerStaticLoopInRiskRejectedNotification{
533+
SwapHash: swapHash[:],
534+
},
535+
},
536+
},
537+
)
538+
539+
subCtx, subCancel := context.WithCancel(t.Context())
540+
defer subCancel()
541+
542+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
543+
544+
select {
545+
case received := <-subChan:
546+
require.Equal(t, swapHash[:], received.SwapHash)
547+
548+
case <-time.After(time.Second):
549+
t.Fatal("did not replay risk rejected notification")
550+
}
551+
}
552+
371553
// TestManager_Backoff verifies that repeated failures in
372554
// subscribeNotifications cause the Manager to space out subscription attempts
373555
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)