Skip to content

Commit 6d9cf87

Browse files
committed
staticaddr/loopin: wait for risk acceptance notification
Wait for the server's static loop-in risk-accepted notification before starting the client payment deadline. The server may intentionally hold the swap at the confirmation-risk gate after HTLC signing, and the client deadline should not run while that server-side wait is still in progress. Cache risk-accepted notifications by swap hash inside the local notification manager and replay them to the per-swap subscriber. This covers both reconnects and the internal race where the global notification stream receives the server event before the static loop-in FSM registers its waiter.
1 parent a8a3418 commit 6d9cf87

6 files changed

Lines changed: 774 additions & 31 deletions

File tree

notifications/manager.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ const (
2626
// static loop in sweep requests.
2727
NotificationTypeStaticLoopInSweepRequest
2828

29+
// NotificationTypeStaticLoopInRiskAccepted is the notification type for
30+
// static loop in confirmation risk acceptance.
31+
NotificationTypeStaticLoopInRiskAccepted
32+
2933
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3034
// swap notifications.
3135
NotificationTypeUnfinishedSwap
@@ -76,6 +80,9 @@ type Manager struct {
7680
hasL402 bool
7781

7882
subscribers map[NotificationType][]subscriber
83+
84+
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
85+
ServerStaticLoopInRiskAcceptedNotification
7986
}
8087

8188
// NewManager creates a new notification manager.
@@ -88,6 +95,10 @@ func NewManager(cfg *Config) *Manager {
8895
return &Manager{
8996
cfg: cfg,
9097
subscribers: make(map[NotificationType][]subscriber),
98+
staticLoopInRiskAccepted: make(
99+
map[lntypes.Hash]*swapserverrpc.
100+
ServerStaticLoopInRiskAcceptedNotification,
101+
),
91102
}
92103
}
93104

@@ -143,6 +154,42 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
143154
return notifChan
144155
}
145156

157+
// SubscribeStaticLoopInRiskAccepted subscribes to static loop in risk accepted
158+
// notifications.
159+
func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
160+
swapHash lntypes.Hash,
161+
) <-chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification {
162+
163+
notifChan := make(
164+
chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification, 1,
165+
)
166+
167+
sub := subscriber{
168+
subCtx: ctx,
169+
recvChan: notifChan,
170+
}
171+
172+
m.Lock()
173+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted] = append(
174+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted], sub,
175+
)
176+
if ntfn, ok := m.staticLoopInRiskAccepted[swapHash]; ok {
177+
notifChan <- ntfn
178+
delete(m.staticLoopInRiskAccepted, swapHash)
179+
}
180+
m.Unlock()
181+
182+
context.AfterFunc(ctx, func() {
183+
m.removeSubscriber(NotificationTypeStaticLoopInRiskAccepted, sub)
184+
m.Lock()
185+
delete(m.staticLoopInRiskAccepted, swapHash)
186+
m.Unlock()
187+
close(notifChan)
188+
})
189+
190+
return notifChan
191+
}
192+
146193
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
147194
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
148195
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -328,6 +375,37 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
328375
}
329376
}
330377

378+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
379+
// We'll forward the static loop in risk accepted notification to all
380+
// subscribers.
381+
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
382+
m.Lock()
383+
defer m.Unlock()
384+
385+
if riskAcceptedNtfn != nil {
386+
swapHash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
387+
if err != nil {
388+
log.Warnf("Received invalid static loop in risk "+
389+
"accepted notification: %v", err)
390+
} else {
391+
m.staticLoopInRiskAccepted[swapHash] =
392+
riskAcceptedNtfn
393+
}
394+
}
395+
396+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
397+
recvChan := sub.recvChan.(chan *swapserverrpc.
398+
ServerStaticLoopInRiskAcceptedNotification)
399+
400+
select {
401+
case recvChan <- riskAcceptedNtfn:
402+
case <-sub.subCtx.Done():
403+
default:
404+
log.Debugf("Dropping static loop in risk " +
405+
"accepted notification for slow subscriber")
406+
}
407+
}
408+
331409
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
332410
// We'll forward the unfinished swap notification to all
333411
// subscribers.

notifications/manager_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,76 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
362362
}
363363
}
364364

365+
// TestManager_StaticLoopInRiskAcceptedNotification tests that the Manager
366+
// forwards static loop in risk accepted notifications to subscribers.
367+
func TestManager_StaticLoopInRiskAcceptedNotification(t *testing.T) {
368+
t.Parallel()
369+
370+
mgr := NewManager(&Config{})
371+
372+
subCtx, subCancel := context.WithCancel(t.Context())
373+
defer subCancel()
374+
375+
swapHash := lntypes.Hash{0x04, 0x05}
376+
377+
subChan := mgr.SubscribeStaticLoopInRiskAccepted(subCtx, swapHash)
378+
379+
mgr.handleNotification(
380+
&swapserverrpc.SubscribeNotificationsResponse{
381+
Notification: &swapserverrpc.
382+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
383+
StaticLoopInRiskAccepted: &swapserverrpc.
384+
ServerStaticLoopInRiskAcceptedNotification{
385+
SwapHash: swapHash[:],
386+
},
387+
},
388+
},
389+
)
390+
391+
select {
392+
case received := <-subChan:
393+
require.Equal(t, swapHash[:], received.SwapHash)
394+
395+
case <-time.After(time.Second):
396+
t.Fatal("did not receive risk accepted notification")
397+
}
398+
}
399+
400+
// TestManager_StaticLoopInRiskAcceptedNotificationReplay tests that the Manager
401+
// replays a risk accepted notification that arrives before the swap-specific
402+
// subscriber is registered.
403+
func TestManager_StaticLoopInRiskAcceptedNotificationReplay(t *testing.T) {
404+
t.Parallel()
405+
406+
mgr := NewManager(&Config{})
407+
408+
swapHash := lntypes.Hash{0x06, 0x07}
409+
mgr.handleNotification(
410+
&swapserverrpc.SubscribeNotificationsResponse{
411+
Notification: &swapserverrpc.
412+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
413+
StaticLoopInRiskAccepted: &swapserverrpc.
414+
ServerStaticLoopInRiskAcceptedNotification{
415+
SwapHash: swapHash[:],
416+
},
417+
},
418+
},
419+
)
420+
421+
subCtx, subCancel := context.WithCancel(t.Context())
422+
defer subCancel()
423+
424+
subChan := mgr.SubscribeStaticLoopInRiskAccepted(subCtx, swapHash)
425+
426+
select {
427+
case received := <-subChan:
428+
require.Equal(t, swapHash[:], received.SwapHash)
429+
430+
case <-time.After(time.Second):
431+
t.Fatal("did not replay risk accepted notification")
432+
}
433+
}
434+
365435
// TestManager_Backoff verifies that repeated failures in
366436
// subscribeNotifications cause the Manager to space out subscription attempts
367437
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)