Skip to content

Commit 8fd491d

Browse files
committed
notifications: queue blocking fanout
1 parent 61d505f commit 8fd491d

2 files changed

Lines changed: 157 additions & 12 deletions

File tree

notifications/manager.go

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,85 @@ type subscriber struct {
122122
subCtx context.Context
123123
recvChan any
124124
swapHash *lntypes.Hash
125+
enqueue func(any)
126+
}
127+
128+
func newNotificationQueue[T any](ctx context.Context,
129+
recvChan chan T) func(any) {
130+
131+
type queue struct {
132+
sync.Mutex
133+
134+
pending []T
135+
notify chan struct{}
136+
}
137+
138+
q := &queue{
139+
notify: make(chan struct{}, 1),
140+
}
141+
142+
go func() {
143+
defer func() {
144+
if recover() != nil {
145+
log.Debugf("subscriber channel closed before " +
146+
"notification delivery")
147+
}
148+
}()
149+
150+
for {
151+
q.Lock()
152+
if len(q.pending) == 0 {
153+
q.Unlock()
154+
155+
select {
156+
case <-q.notify:
157+
continue
158+
159+
case <-ctx.Done():
160+
return
161+
}
162+
}
163+
164+
ntfn := q.pending[0]
165+
q.pending = q.pending[1:]
166+
q.Unlock()
167+
168+
select {
169+
case recvChan <- ntfn:
170+
case <-ctx.Done():
171+
return
172+
}
173+
}
174+
}()
175+
176+
return func(ntfn any) {
177+
typedNtfn, ok := ntfn.(T)
178+
if !ok {
179+
log.Warnf("unexpected notification type %T", ntfn)
180+
return
181+
}
182+
183+
q.Lock()
184+
q.pending = append(q.pending, typedNtfn)
185+
q.Unlock()
186+
187+
select {
188+
case q.notify <- struct{}{}:
189+
default:
190+
}
191+
}
192+
}
193+
194+
func queueNotification[T any](sub subscriber, recvChan chan T, ntfn T) {
195+
if sub.enqueue != nil {
196+
sub.enqueue(ntfn)
197+
return
198+
}
199+
200+
select {
201+
case recvChan <- ntfn:
202+
case <-sub.subCtx.Done():
203+
}
125204
}
126205

127206
// SubscribeReservations subscribes to the reservation notifications.
@@ -156,6 +235,7 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
156235
sub := subscriber{
157236
subCtx: ctx,
158237
recvChan: notifChan,
238+
enqueue: newNotificationQueue(ctx, notifChan),
159239
}
160240

161241
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
@@ -255,6 +335,7 @@ func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
255335
sub := subscriber{
256336
subCtx: ctx,
257337
recvChan: notifChan,
338+
enqueue: newNotificationQueue(ctx, notifChan),
258339
}
259340

260341
m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
@@ -424,10 +505,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
424505
recvChan := sub.recvChan.(chan *swapserverrpc.
425506
ServerStaticLoopInSweepNotification)
426507

427-
select {
428-
case recvChan <- staticLoopInSweepRequestNtfn:
429-
case <-sub.subCtx.Done():
430-
}
508+
queueNotification(sub, recvChan, staticLoopInSweepRequestNtfn)
431509
}
432510

433511
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
@@ -557,10 +635,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
557635
recvChan := sub.recvChan.(chan *swapserverrpc.
558636
ServerUnfinishedSwapNotification)
559637

560-
select {
561-
case recvChan <- unfinishedSwapNtfn:
562-
case <-sub.subCtx.Done():
563-
}
638+
queueNotification(sub, recvChan, unfinishedSwapNtfn)
564639
}
565640

566641
default:
@@ -583,7 +658,7 @@ func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) {
583658
subs := m.subscribers[notifType]
584659
newSubs := make([]subscriber, 0, len(subs))
585660
for _, s := range subs {
586-
if s != sub {
661+
if s.recvChan != sub.recvChan {
587662
newSubs = append(newSubs, s)
588663
}
589664
}

notifications/manager_test.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ func unfinishedSwapNotification(
202202
}
203203
}
204204

205+
func staticLoopInSweepNotification(
206+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207+
208+
return &swapserverrpc.SubscribeNotificationsResponse{
209+
Notification: &swapserverrpc.
210+
SubscribeNotificationsResponse_StaticLoopInSweep{
211+
StaticLoopInSweep: &swapserverrpc.
212+
ServerStaticLoopInSweepNotification{
213+
SwapHash: swapHash[:],
214+
},
215+
},
216+
}
217+
}
218+
205219
func staticLoopInRiskAcceptedNotification(
206220
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207221

@@ -354,6 +368,15 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
354368
close(done)
355369
}()
356370

371+
require.Eventually(t, func() bool {
372+
select {
373+
case <-done:
374+
return true
375+
default:
376+
return false
377+
}
378+
}, time.Second, 10*time.Millisecond)
379+
357380
select {
358381
case received := <-subChan:
359382
require.Equal(t, swapHashA[:], received.SwapHash)
@@ -363,18 +386,65 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
363386
}
364387

365388
select {
366-
case <-done:
389+
case received := <-subChan:
390+
require.Equal(t, swapHashB[:], received.SwapHash)
367391

368392
case <-time.After(time.Second):
369-
t.Fatal("second unfinished swap notification did not unblock")
393+
t.Fatal("second unfinished swap notification was dropped")
394+
}
395+
}
396+
397+
// TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber verifies
398+
// that a full static-loop-in sweep subscriber channel does not block the global
399+
// notification receive loop.
400+
func TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber(
401+
t *testing.T) {
402+
403+
t.Parallel()
404+
405+
mgr := NewManager(&Config{})
406+
407+
subCtx, subCancel := context.WithCancel(t.Context())
408+
defer subCancel()
409+
410+
subChan := mgr.SubscribeStaticLoopInSweepRequests(subCtx)
411+
412+
swapHashA := lntypes.Hash{0x12, 0x13}
413+
swapHashB := lntypes.Hash{0x14, 0x15}
414+
415+
mgr.handleNotification(t.Context(), staticLoopInSweepNotification(swapHashA))
416+
417+
done := make(chan struct{})
418+
go func() {
419+
mgr.handleNotification(
420+
t.Context(), staticLoopInSweepNotification(swapHashB),
421+
)
422+
close(done)
423+
}()
424+
425+
require.Eventually(t, func() bool {
426+
select {
427+
case <-done:
428+
return true
429+
default:
430+
return false
431+
}
432+
}, time.Second, 10*time.Millisecond)
433+
434+
select {
435+
case received := <-subChan:
436+
require.Equal(t, swapHashA[:], received.SwapHash)
437+
438+
case <-time.After(time.Second):
439+
t.Fatal("did not receive first sweep notification")
370440
}
371441

372442
select {
373443
case received := <-subChan:
374444
require.Equal(t, swapHashB[:], received.SwapHash)
375445

376446
case <-time.After(time.Second):
377-
t.Fatal("second unfinished swap notification was dropped")
447+
t.Fatal("second sweep notification was not queued")
378448
}
379449
}
380450

0 commit comments

Comments
 (0)