Skip to content

Commit 644ba07

Browse files
committed
staticaddr/loopin: handle risk rejection notification
Add client handling for the server's static loop-in risk-rejected notification. If the server aborts confirmation-risk waiting before payment, the client fails the local swap instead of waiting for a payment deadline that will never start. Cache rejected notifications by swap hash using the same replay path as accepted notifications, and clear the opposite cached state when a final risk decision is received. This keeps reconnect and subscription-order races from stranding the client in the risk wait.
1 parent 57c32ad commit 644ba07

7 files changed

Lines changed: 755 additions & 65 deletions

File tree

notifications/manager.go

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const (
3030
// static loop in confirmation risk acceptance.
3131
NotificationTypeStaticLoopInRiskAccepted
3232

33+
// NotificationTypeStaticLoopInRiskRejected is the notification type for
34+
// static loop in confirmation risk rejection.
35+
NotificationTypeStaticLoopInRiskRejected
36+
3337
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3438
// swap notifications.
3539
NotificationTypeUnfinishedSwap
@@ -83,6 +87,9 @@ type Manager struct {
8387

8488
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
8589
ServerStaticLoopInRiskAcceptedNotification
90+
91+
staticLoopInRiskRejected map[lntypes.Hash]*swapserverrpc.
92+
ServerStaticLoopInRiskRejectedNotification
8693
}
8794

8895
// NewManager creates a new notification manager.
@@ -99,12 +106,17 @@ func NewManager(cfg *Config) *Manager {
99106
map[lntypes.Hash]*swapserverrpc.
100107
ServerStaticLoopInRiskAcceptedNotification,
101108
),
109+
staticLoopInRiskRejected: make(
110+
map[lntypes.Hash]*swapserverrpc.
111+
ServerStaticLoopInRiskRejectedNotification,
112+
),
102113
}
103114
}
104115

105116
type subscriber struct {
106117
subCtx context.Context
107118
recvChan any
119+
swapHash *lntypes.Hash
108120
}
109121

110122
// SubscribeReservations subscribes to the reservation notifications.
@@ -167,6 +179,7 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
167179
sub := subscriber{
168180
subCtx: ctx,
169181
recvChan: notifChan,
182+
swapHash: &swapHash,
170183
}
171184

172185
m.Lock()
@@ -190,6 +203,43 @@ func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
190203
return notifChan
191204
}
192205

206+
// SubscribeStaticLoopInRiskRejected subscribes to static loop in risk rejected
207+
// notifications.
208+
func (m *Manager) SubscribeStaticLoopInRiskRejected(ctx context.Context,
209+
swapHash lntypes.Hash,
210+
) <-chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification {
211+
212+
notifChan := make(
213+
chan *swapserverrpc.ServerStaticLoopInRiskRejectedNotification, 1,
214+
)
215+
216+
sub := subscriber{
217+
subCtx: ctx,
218+
recvChan: notifChan,
219+
swapHash: &swapHash,
220+
}
221+
222+
m.Lock()
223+
m.subscribers[NotificationTypeStaticLoopInRiskRejected] = append(
224+
m.subscribers[NotificationTypeStaticLoopInRiskRejected], sub,
225+
)
226+
if ntfn, ok := m.staticLoopInRiskRejected[swapHash]; ok {
227+
notifChan <- ntfn
228+
delete(m.staticLoopInRiskRejected, swapHash)
229+
}
230+
m.Unlock()
231+
232+
context.AfterFunc(ctx, func() {
233+
m.removeSubscriber(NotificationTypeStaticLoopInRiskRejected, sub)
234+
m.Lock()
235+
delete(m.staticLoopInRiskRejected, swapHash)
236+
m.Unlock()
237+
close(notifChan)
238+
})
239+
240+
return notifChan
241+
}
242+
193243
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
194244
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
195245
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -376,24 +426,37 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
376426
}
377427

378428
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
379-
// We'll forward the static loop in risk accepted notification to all
380-
// subscribers.
429+
// We'll forward the static loop in risk accepted notification to the
430+
// subscriber for the matching swap.
381431
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
382432
m.Lock()
383433
defer m.Unlock()
384434

435+
var (
436+
swapHash lntypes.Hash
437+
hasSwapHash bool
438+
)
385439
if riskAcceptedNtfn != nil {
386-
swapHash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
440+
hash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
387441
if err != nil {
388442
log.Warnf("Received invalid static loop in risk "+
389443
"accepted notification: %v", err)
390444
} else {
391-
m.staticLoopInRiskAccepted[swapHash] =
445+
swapHash = hash
446+
hasSwapHash = true
447+
m.staticLoopInRiskAccepted[hash] =
392448
riskAcceptedNtfn
449+
delete(m.staticLoopInRiskRejected, hash)
393450
}
394451
}
395452

396453
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
454+
if !hasSwapHash || sub.swapHash == nil ||
455+
*sub.swapHash != swapHash {
456+
457+
continue
458+
}
459+
397460
recvChan := sub.recvChan.(chan *swapserverrpc.
398461
ServerStaticLoopInRiskAcceptedNotification)
399462

@@ -406,6 +469,50 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
406469
}
407470
}
408471

472+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskRejected: // nolint: lll
473+
// We'll forward the static loop in risk rejected notification to the
474+
// subscriber for the matching swap.
475+
riskRejectedNtfn := ntfn.GetStaticLoopInRiskRejected()
476+
m.Lock()
477+
defer m.Unlock()
478+
479+
var (
480+
swapHash lntypes.Hash
481+
hasSwapHash bool
482+
)
483+
if riskRejectedNtfn != nil {
484+
hash, err := lntypes.MakeHash(riskRejectedNtfn.SwapHash)
485+
if err != nil {
486+
log.Warnf("Received invalid static loop in risk "+
487+
"rejected notification: %v", err)
488+
} else {
489+
swapHash = hash
490+
hasSwapHash = true
491+
m.staticLoopInRiskRejected[hash] =
492+
riskRejectedNtfn
493+
delete(m.staticLoopInRiskAccepted, hash)
494+
}
495+
}
496+
497+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskRejected] { // nolint: lll
498+
if !hasSwapHash || sub.swapHash == nil ||
499+
*sub.swapHash != swapHash {
500+
501+
continue
502+
}
503+
504+
recvChan := sub.recvChan.(chan *swapserverrpc.
505+
ServerStaticLoopInRiskRejectedNotification)
506+
507+
select {
508+
case recvChan <- riskRejectedNtfn:
509+
case <-sub.subCtx.Done():
510+
default:
511+
log.Debugf("Dropping static loop in risk " +
512+
"rejected notification for slow subscriber")
513+
}
514+
}
515+
409516
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
410517
// We'll forward the unfinished swap notification to all
411518
// subscribers.

notifications/manager_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,86 @@ func staticLoopInSweepNotification(
202202
}
203203
}
204204

205+
func staticLoopInRiskAcceptedNotification(
206+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207+
208+
return &swapserverrpc.SubscribeNotificationsResponse{
209+
Notification: &swapserverrpc.
210+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
211+
StaticLoopInRiskAccepted: &swapserverrpc.
212+
ServerStaticLoopInRiskAcceptedNotification{
213+
SwapHash: swapHash[:],
214+
},
215+
},
216+
}
217+
}
218+
219+
func staticLoopInRiskRejectedNotification(
220+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
221+
222+
return &swapserverrpc.SubscribeNotificationsResponse{
223+
Notification: &swapserverrpc.
224+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
225+
StaticLoopInRiskRejected: &swapserverrpc.
226+
ServerStaticLoopInRiskRejectedNotification{
227+
SwapHash: swapHash[:],
228+
},
229+
},
230+
}
231+
}
232+
233+
type staticLoopInRiskNotification interface {
234+
GetSwapHash() []byte
235+
}
236+
237+
func assertStaticLoopInRiskNotificationSwapScoped[
238+
T staticLoopInRiskNotification](t *testing.T,
239+
subscribe func(*Manager, context.Context, lntypes.Hash) <-chan T,
240+
notification func(lntypes.Hash) *swapserverrpc.
241+
SubscribeNotificationsResponse, label string,
242+
swapHashA, swapHashB lntypes.Hash) {
243+
244+
t.Helper()
245+
246+
mgr := NewManager(&Config{})
247+
248+
subCtx, subCancel := context.WithCancel(t.Context())
249+
defer subCancel()
250+
251+
subChanA := subscribe(mgr, subCtx, swapHashA)
252+
subChanB := subscribe(mgr, subCtx, swapHashB)
253+
254+
mgr.handleNotification(notification(swapHashA))
255+
256+
select {
257+
case received := <-subChanA:
258+
require.Equal(t, swapHashA[:], received.GetSwapHash())
259+
260+
case <-time.After(time.Second):
261+
t.Fatalf("did not receive first swap risk %s notification",
262+
label)
263+
}
264+
265+
select {
266+
case received := <-subChanB:
267+
t.Fatalf("second swap received wrong notification: %x",
268+
received.GetSwapHash())
269+
270+
default:
271+
}
272+
273+
mgr.handleNotification(notification(swapHashB))
274+
275+
select {
276+
case received := <-subChanB:
277+
require.Equal(t, swapHashB[:], received.GetSwapHash())
278+
279+
case <-time.After(time.Second):
280+
t.Fatalf("did not receive second swap risk %s notification",
281+
label)
282+
}
283+
}
284+
205285
// TestManager_SlowSubscriberDoesNotBlock tests that a subscriber with a full
206286
// notification channel does not block delivery to other subscribers.
207287
func TestManager_SlowSubscriberDoesNotBlock(t *testing.T) {
@@ -335,6 +415,22 @@ func TestManager_StaticLoopInRiskAcceptedNotification(t *testing.T) {
335415
}
336416
}
337417

418+
// TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped verifies that a
419+
// notification for one swap does not occupy another swap's subscriber channel.
420+
func TestManager_StaticLoopInRiskAcceptedNotificationSwapScoped(t *testing.T) {
421+
t.Parallel()
422+
423+
assertStaticLoopInRiskNotificationSwapScoped(
424+
t, func(m *Manager, ctx context.Context,
425+
swapHash lntypes.Hash) <-chan *swapserverrpc.
426+
ServerStaticLoopInRiskAcceptedNotification {
427+
428+
return m.SubscribeStaticLoopInRiskAccepted(ctx, swapHash)
429+
}, staticLoopInRiskAcceptedNotification, "accepted",
430+
lntypes.Hash{0x04, 0x05}, lntypes.Hash{0x06, 0x07},
431+
)
432+
}
433+
338434
// TestManager_StaticLoopInRiskAcceptedNotificationReplay tests that the Manager
339435
// replays a risk accepted notification that arrives before the swap-specific
340436
// subscriber is registered.
@@ -370,6 +466,92 @@ func TestManager_StaticLoopInRiskAcceptedNotificationReplay(t *testing.T) {
370466
}
371467
}
372468

469+
// TestManager_StaticLoopInRiskRejectedNotification tests that the Manager
470+
// forwards static loop in risk rejected notifications to subscribers.
471+
func TestManager_StaticLoopInRiskRejectedNotification(t *testing.T) {
472+
t.Parallel()
473+
474+
mgr := NewManager(&Config{})
475+
476+
subCtx, subCancel := context.WithCancel(t.Context())
477+
defer subCancel()
478+
479+
swapHash := lntypes.Hash{0x08, 0x09}
480+
481+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
482+
483+
mgr.handleNotification(
484+
&swapserverrpc.SubscribeNotificationsResponse{
485+
Notification: &swapserverrpc.
486+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
487+
StaticLoopInRiskRejected: &swapserverrpc.
488+
ServerStaticLoopInRiskRejectedNotification{
489+
SwapHash: swapHash[:],
490+
},
491+
},
492+
},
493+
)
494+
495+
select {
496+
case received := <-subChan:
497+
require.Equal(t, swapHash[:], received.SwapHash)
498+
499+
case <-time.After(time.Second):
500+
t.Fatal("did not receive risk rejected notification")
501+
}
502+
}
503+
504+
// TestManager_StaticLoopInRiskRejectedNotificationSwapScoped verifies that a
505+
// notification for one swap does not occupy another swap's subscriber channel.
506+
func TestManager_StaticLoopInRiskRejectedNotificationSwapScoped(t *testing.T) {
507+
t.Parallel()
508+
509+
assertStaticLoopInRiskNotificationSwapScoped(
510+
t, func(m *Manager, ctx context.Context,
511+
swapHash lntypes.Hash) <-chan *swapserverrpc.
512+
ServerStaticLoopInRiskRejectedNotification {
513+
514+
return m.SubscribeStaticLoopInRiskRejected(ctx, swapHash)
515+
}, staticLoopInRiskRejectedNotification, "rejected",
516+
lntypes.Hash{0x08, 0x09}, lntypes.Hash{0x0a, 0x0b},
517+
)
518+
}
519+
520+
// TestManager_StaticLoopInRiskRejectedNotificationReplay tests that the Manager
521+
// replays a risk rejected notification that arrives before the swap-specific
522+
// subscriber is registered.
523+
func TestManager_StaticLoopInRiskRejectedNotificationReplay(t *testing.T) {
524+
t.Parallel()
525+
526+
mgr := NewManager(&Config{})
527+
528+
swapHash := lntypes.Hash{0x0a, 0x0b}
529+
mgr.handleNotification(
530+
&swapserverrpc.SubscribeNotificationsResponse{
531+
Notification: &swapserverrpc.
532+
SubscribeNotificationsResponse_StaticLoopInRiskRejected{
533+
StaticLoopInRiskRejected: &swapserverrpc.
534+
ServerStaticLoopInRiskRejectedNotification{
535+
SwapHash: swapHash[:],
536+
},
537+
},
538+
},
539+
)
540+
541+
subCtx, subCancel := context.WithCancel(t.Context())
542+
defer subCancel()
543+
544+
subChan := mgr.SubscribeStaticLoopInRiskRejected(subCtx, swapHash)
545+
546+
select {
547+
case received := <-subChan:
548+
require.Equal(t, swapHash[:], received.SwapHash)
549+
550+
case <-time.After(time.Second):
551+
t.Fatal("did not replay risk rejected notification")
552+
}
553+
}
554+
373555
// TestManager_Backoff verifies that repeated failures in
374556
// subscribeNotifications cause the Manager to space out subscription attempts
375557
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)