Skip to content

Commit 62a51f6

Browse files
committed
staticaddr/loopin: wait for risk acceptance notification
Wait for the server's static loop-in risk-accepted notification before starting the client payment deadline. The server may intentionally hold the swap at the confirmation-risk gate after HTLC signing, and the client deadline should not run while that server-side wait is still in progress. Cache risk-accepted notifications by swap hash inside the local notification manager and replay them to the per-swap subscriber. This covers both reconnects and the internal race where the global notification stream receives the server event before the static loop-in FSM registers its waiter.
1 parent cd0153a commit 62a51f6

6 files changed

Lines changed: 774 additions & 31 deletions

File tree

notifications/manager.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ const (
2626
// static loop in sweep requests.
2727
NotificationTypeStaticLoopInSweepRequest
2828

29+
// NotificationTypeStaticLoopInRiskAccepted is the notification type for
30+
// static loop in confirmation risk acceptance.
31+
NotificationTypeStaticLoopInRiskAccepted
32+
2933
// NotificationTypeUnfinishedSwap is the notification type for unfinished
3034
// swap notifications.
3135
NotificationTypeUnfinishedSwap
@@ -76,6 +80,9 @@ type Manager struct {
7680
hasL402 bool
7781

7882
subscribers map[NotificationType][]subscriber
83+
84+
staticLoopInRiskAccepted map[lntypes.Hash]*swapserverrpc.
85+
ServerStaticLoopInRiskAcceptedNotification
7986
}
8087

8188
// NewManager creates a new notification manager.
@@ -88,6 +95,10 @@ func NewManager(cfg *Config) *Manager {
8895
return &Manager{
8996
cfg: cfg,
9097
subscribers: make(map[NotificationType][]subscriber),
98+
staticLoopInRiskAccepted: make(
99+
map[lntypes.Hash]*swapserverrpc.
100+
ServerStaticLoopInRiskAcceptedNotification,
101+
),
91102
}
92103
}
93104

@@ -143,6 +154,42 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
143154
return notifChan
144155
}
145156

157+
// SubscribeStaticLoopInRiskAccepted subscribes to static loop in risk accepted
158+
// notifications.
159+
func (m *Manager) SubscribeStaticLoopInRiskAccepted(ctx context.Context,
160+
swapHash lntypes.Hash,
161+
) <-chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification {
162+
163+
notifChan := make(
164+
chan *swapserverrpc.ServerStaticLoopInRiskAcceptedNotification, 1,
165+
)
166+
167+
sub := subscriber{
168+
subCtx: ctx,
169+
recvChan: notifChan,
170+
}
171+
172+
m.Lock()
173+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted] = append(
174+
m.subscribers[NotificationTypeStaticLoopInRiskAccepted], sub,
175+
)
176+
if ntfn, ok := m.staticLoopInRiskAccepted[swapHash]; ok {
177+
notifChan <- ntfn
178+
delete(m.staticLoopInRiskAccepted, swapHash)
179+
}
180+
m.Unlock()
181+
182+
context.AfterFunc(ctx, func() {
183+
m.removeSubscriber(NotificationTypeStaticLoopInRiskAccepted, sub)
184+
m.Lock()
185+
delete(m.staticLoopInRiskAccepted, swapHash)
186+
m.Unlock()
187+
close(notifChan)
188+
})
189+
190+
return notifChan
191+
}
192+
146193
// SubscribeUnfinishedSwaps subscribes to the unfinished swap notifications.
147194
func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
148195
) <-chan *swapserverrpc.ServerUnfinishedSwapNotification {
@@ -328,6 +375,37 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
328375
}
329376
}
330377

378+
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
379+
// We'll forward the static loop in risk accepted notification to all
380+
// subscribers.
381+
riskAcceptedNtfn := ntfn.GetStaticLoopInRiskAccepted()
382+
m.Lock()
383+
defer m.Unlock()
384+
385+
if riskAcceptedNtfn != nil {
386+
swapHash, err := lntypes.MakeHash(riskAcceptedNtfn.SwapHash)
387+
if err != nil {
388+
log.Warnf("Received invalid static loop in risk "+
389+
"accepted notification: %v", err)
390+
} else {
391+
m.staticLoopInRiskAccepted[swapHash] =
392+
riskAcceptedNtfn
393+
}
394+
}
395+
396+
for _, sub := range m.subscribers[NotificationTypeStaticLoopInRiskAccepted] { // nolint: lll
397+
recvChan := sub.recvChan.(chan *swapserverrpc.
398+
ServerStaticLoopInRiskAcceptedNotification)
399+
400+
select {
401+
case recvChan <- riskAcceptedNtfn:
402+
case <-sub.subCtx.Done():
403+
default:
404+
log.Debugf("Dropping static loop in risk " +
405+
"accepted notification for slow subscriber")
406+
}
407+
}
408+
331409
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
332410
// We'll forward the unfinished swap notification to all
333411
// subscribers.

notifications/manager_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,76 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
298298
}
299299
}
300300

301+
// TestManager_StaticLoopInRiskAcceptedNotification tests that the Manager
302+
// forwards static loop in risk accepted notifications to subscribers.
303+
func TestManager_StaticLoopInRiskAcceptedNotification(t *testing.T) {
304+
t.Parallel()
305+
306+
mgr := NewManager(&Config{})
307+
308+
subCtx, subCancel := context.WithCancel(t.Context())
309+
defer subCancel()
310+
311+
swapHash := lntypes.Hash{0x04, 0x05}
312+
313+
subChan := mgr.SubscribeStaticLoopInRiskAccepted(subCtx, swapHash)
314+
315+
mgr.handleNotification(
316+
&swapserverrpc.SubscribeNotificationsResponse{
317+
Notification: &swapserverrpc.
318+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
319+
StaticLoopInRiskAccepted: &swapserverrpc.
320+
ServerStaticLoopInRiskAcceptedNotification{
321+
SwapHash: swapHash[:],
322+
},
323+
},
324+
},
325+
)
326+
327+
select {
328+
case received := <-subChan:
329+
require.Equal(t, swapHash[:], received.SwapHash)
330+
331+
case <-time.After(time.Second):
332+
t.Fatal("did not receive risk accepted notification")
333+
}
334+
}
335+
336+
// TestManager_StaticLoopInRiskAcceptedNotificationReplay tests that the Manager
337+
// replays a risk accepted notification that arrives before the swap-specific
338+
// subscriber is registered.
339+
func TestManager_StaticLoopInRiskAcceptedNotificationReplay(t *testing.T) {
340+
t.Parallel()
341+
342+
mgr := NewManager(&Config{})
343+
344+
swapHash := lntypes.Hash{0x06, 0x07}
345+
mgr.handleNotification(
346+
&swapserverrpc.SubscribeNotificationsResponse{
347+
Notification: &swapserverrpc.
348+
SubscribeNotificationsResponse_StaticLoopInRiskAccepted{
349+
StaticLoopInRiskAccepted: &swapserverrpc.
350+
ServerStaticLoopInRiskAcceptedNotification{
351+
SwapHash: swapHash[:],
352+
},
353+
},
354+
},
355+
)
356+
357+
subCtx, subCancel := context.WithCancel(t.Context())
358+
defer subCancel()
359+
360+
subChan := mgr.SubscribeStaticLoopInRiskAccepted(subCtx, swapHash)
361+
362+
select {
363+
case received := <-subChan:
364+
require.Equal(t, swapHash[:], received.SwapHash)
365+
366+
case <-time.After(time.Second):
367+
t.Fatal("did not replay risk accepted notification")
368+
}
369+
}
370+
301371
// TestManager_Backoff verifies that repeated failures in
302372
// subscribeNotifications cause the Manager to space out subscription attempts
303373
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)