Skip to content

Commit b3fd80e

Browse files
committed
notifications: queue blocking fanout
Some notification types are work requests and must not be dropped when a subscriber channel is temporarily full. Sending them synchronously from the global notification handler can stall the stream while a slow subscriber holds back reads. Add a per-subscriber queue for blocking notification classes and deliver those notifications from a worker tied to the subscriber context. Keep direct sends as the fallback for subscribers that do not need queued delivery. Use the queued path for static loop-in sweep requests and unfinished swap notifications, and fix subscriber removal to compare channel identity now that subscribers contain function fields.
1 parent 3255451 commit b3fd80e

2 files changed

Lines changed: 161 additions & 12 deletions

File tree

notifications/manager.go

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,87 @@ type subscriber struct {
132132
subCtx context.Context
133133
recvChan any
134134
swapHash *lntypes.Hash
135+
enqueue func(any)
136+
}
137+
138+
// newNotificationQueue creates a per-subscriber FIFO delivery function.
139+
func newNotificationQueue[T any](ctx context.Context,
140+
recvChan chan T) func(any) {
141+
142+
type queue struct {
143+
sync.Mutex
144+
145+
pending []T
146+
notify chan struct{}
147+
}
148+
149+
q := &queue{
150+
notify: make(chan struct{}, 1),
151+
}
152+
153+
go func() {
154+
defer func() {
155+
if recover() != nil {
156+
log.Debugf("subscriber channel closed before " +
157+
"notification delivery")
158+
}
159+
}()
160+
161+
for {
162+
q.Lock()
163+
if len(q.pending) == 0 {
164+
q.Unlock()
165+
166+
select {
167+
case <-q.notify:
168+
continue
169+
170+
case <-ctx.Done():
171+
return
172+
}
173+
}
174+
175+
ntfn := q.pending[0]
176+
q.pending = q.pending[1:]
177+
q.Unlock()
178+
179+
select {
180+
case recvChan <- ntfn:
181+
case <-ctx.Done():
182+
return
183+
}
184+
}
185+
}()
186+
187+
return func(ntfn any) {
188+
typedNtfn, ok := ntfn.(T)
189+
if !ok {
190+
log.Warnf("unexpected notification type %T", ntfn)
191+
return
192+
}
193+
194+
q.Lock()
195+
q.pending = append(q.pending, typedNtfn)
196+
q.Unlock()
197+
198+
select {
199+
case q.notify <- struct{}{}:
200+
default:
201+
}
202+
}
203+
}
204+
205+
// queueNotification queues or synchronously sends a must-deliver notification.
206+
func queueNotification[T any](sub subscriber, recvChan chan T, ntfn T) {
207+
if sub.enqueue != nil {
208+
sub.enqueue(ntfn)
209+
return
210+
}
211+
212+
select {
213+
case recvChan <- ntfn:
214+
case <-sub.subCtx.Done():
215+
}
135216
}
136217

137218
// SubscribeReservations subscribes to the reservation notifications.
@@ -166,6 +247,7 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
166247
sub := subscriber{
167248
subCtx: ctx,
168249
recvChan: notifChan,
250+
enqueue: newNotificationQueue(ctx, notifChan),
169251
}
170252

171253
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
@@ -265,6 +347,7 @@ func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
265347
sub := subscriber{
266348
subCtx: ctx,
267349
recvChan: notifChan,
350+
enqueue: newNotificationQueue(ctx, notifChan),
268351
}
269352

270353
m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
@@ -455,10 +538,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
455538
recvChan := sub.recvChan.(chan *swapserverrpc.
456539
ServerStaticLoopInSweepNotification)
457540

458-
select {
459-
case recvChan <- staticLoopInSweepRequestNtfn:
460-
case <-sub.subCtx.Done():
461-
}
541+
queueNotification(sub, recvChan, staticLoopInSweepRequestNtfn)
462542
}
463543

464544
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
@@ -586,10 +666,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
586666
recvChan := sub.recvChan.(chan *swapserverrpc.
587667
ServerUnfinishedSwapNotification)
588668

589-
select {
590-
case recvChan <- unfinishedSwapNtfn:
591-
case <-sub.subCtx.Done():
592-
}
669+
queueNotification(sub, recvChan, unfinishedSwapNtfn)
593670
}
594671

595672
case *swapserverrpc.SubscribeNotificationsResponse_HtlcConfirmed:
@@ -633,7 +710,7 @@ func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) {
633710
subs := m.subscribers[notifType]
634711
newSubs := make([]subscriber, 0, len(subs))
635712
for _, s := range subs {
636-
if s != sub {
713+
if s.recvChan != sub.recvChan {
637714
newSubs = append(newSubs, s)
638715
}
639716
}

notifications/manager_test.go

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,22 @@ func unfinishedSwapNotification(
205205
}
206206
}
207207

208+
// staticLoopInSweepNotification builds a static loop-in sweep notification.
209+
func staticLoopInSweepNotification(
210+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
211+
212+
return &swapserverrpc.SubscribeNotificationsResponse{
213+
Notification: &swapserverrpc.
214+
SubscribeNotificationsResponse_StaticLoopInSweep{
215+
StaticLoopInSweep: &swapserverrpc.
216+
ServerStaticLoopInSweepNotification{
217+
SwapHash: swapHash[:],
218+
},
219+
},
220+
}
221+
}
222+
223+
// staticLoopInRiskAcceptedNotification builds a risk accepted notification.
208224
func staticLoopInRiskAcceptedNotification(
209225
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
210226

@@ -359,6 +375,15 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
359375
close(done)
360376
}()
361377

378+
require.Eventually(t, func() bool {
379+
select {
380+
case <-done:
381+
return true
382+
default:
383+
return false
384+
}
385+
}, time.Second, 10*time.Millisecond)
386+
362387
select {
363388
case received := <-subChan:
364389
require.Equal(t, swapHashA[:], received.SwapHash)
@@ -368,18 +393,65 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
368393
}
369394

370395
select {
371-
case <-done:
396+
case received := <-subChan:
397+
require.Equal(t, swapHashB[:], received.SwapHash)
372398

373399
case <-time.After(time.Second):
374-
t.Fatal("second unfinished swap notification did not unblock")
400+
t.Fatal("second unfinished swap notification was dropped")
401+
}
402+
}
403+
404+
// TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber verifies
405+
// that a full static-loop-in sweep subscriber channel does not block the global
406+
// notification receive loop.
407+
func TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber(
408+
t *testing.T) {
409+
410+
t.Parallel()
411+
412+
mgr := NewManager(&Config{})
413+
414+
subCtx, subCancel := context.WithCancel(t.Context())
415+
defer subCancel()
416+
417+
subChan := mgr.SubscribeStaticLoopInSweepRequests(subCtx)
418+
419+
swapHashA := lntypes.Hash{0x12, 0x13}
420+
swapHashB := lntypes.Hash{0x14, 0x15}
421+
422+
mgr.handleNotification(t.Context(), staticLoopInSweepNotification(swapHashA))
423+
424+
done := make(chan struct{})
425+
go func() {
426+
mgr.handleNotification(
427+
t.Context(), staticLoopInSweepNotification(swapHashB),
428+
)
429+
close(done)
430+
}()
431+
432+
require.Eventually(t, func() bool {
433+
select {
434+
case <-done:
435+
return true
436+
default:
437+
return false
438+
}
439+
}, time.Second, 10*time.Millisecond)
440+
441+
select {
442+
case received := <-subChan:
443+
require.Equal(t, swapHashA[:], received.SwapHash)
444+
445+
case <-time.After(time.Second):
446+
t.Fatal("did not receive first sweep notification")
375447
}
376448

377449
select {
378450
case received := <-subChan:
379451
require.Equal(t, swapHashB[:], received.SwapHash)
380452

381453
case <-time.After(time.Second):
382-
t.Fatal("second unfinished swap notification was dropped")
454+
t.Fatal("second sweep notification was not queued")
383455
}
384456
}
385457

0 commit comments

Comments
 (0)