Skip to content

Commit 7604be5

Browse files
committed
notifications: queue blocking fanout
1 parent 83b289e commit 7604be5

2 files changed

Lines changed: 157 additions & 12 deletions

File tree

notifications/manager.go

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,85 @@ type subscriber struct {
124124
subCtx context.Context
125125
recvChan any
126126
swapHash *lntypes.Hash
127+
enqueue func(any)
128+
}
129+
130+
func newNotificationQueue[T any](ctx context.Context,
131+
recvChan chan T) func(any) {
132+
133+
type queue struct {
134+
sync.Mutex
135+
136+
pending []T
137+
notify chan struct{}
138+
}
139+
140+
q := &queue{
141+
notify: make(chan struct{}, 1),
142+
}
143+
144+
go func() {
145+
defer func() {
146+
if recover() != nil {
147+
log.Debugf("subscriber channel closed before " +
148+
"notification delivery")
149+
}
150+
}()
151+
152+
for {
153+
q.Lock()
154+
if len(q.pending) == 0 {
155+
q.Unlock()
156+
157+
select {
158+
case <-q.notify:
159+
continue
160+
161+
case <-ctx.Done():
162+
return
163+
}
164+
}
165+
166+
ntfn := q.pending[0]
167+
q.pending = q.pending[1:]
168+
q.Unlock()
169+
170+
select {
171+
case recvChan <- ntfn:
172+
case <-ctx.Done():
173+
return
174+
}
175+
}
176+
}()
177+
178+
return func(ntfn any) {
179+
typedNtfn, ok := ntfn.(T)
180+
if !ok {
181+
log.Warnf("unexpected notification type %T", ntfn)
182+
return
183+
}
184+
185+
q.Lock()
186+
q.pending = append(q.pending, typedNtfn)
187+
q.Unlock()
188+
189+
select {
190+
case q.notify <- struct{}{}:
191+
default:
192+
}
193+
}
194+
}
195+
196+
func queueNotification[T any](sub subscriber, recvChan chan T, ntfn T) {
197+
if sub.enqueue != nil {
198+
sub.enqueue(ntfn)
199+
return
200+
}
201+
202+
select {
203+
case recvChan <- ntfn:
204+
case <-sub.subCtx.Done():
205+
}
127206
}
128207

129208
// SubscribeReservations subscribes to the reservation notifications.
@@ -158,6 +237,7 @@ func (m *Manager) SubscribeStaticLoopInSweepRequests(ctx context.Context,
158237
sub := subscriber{
159238
subCtx: ctx,
160239
recvChan: notifChan,
240+
enqueue: newNotificationQueue(ctx, notifChan),
161241
}
162242

163243
m.addSubscriber(NotificationTypeStaticLoopInSweepRequest, sub)
@@ -257,6 +337,7 @@ func (m *Manager) SubscribeUnfinishedSwaps(ctx context.Context,
257337
sub := subscriber{
258338
subCtx: ctx,
259339
recvChan: notifChan,
340+
enqueue: newNotificationQueue(ctx, notifChan),
260341
}
261342

262343
m.addSubscriber(NotificationTypeUnfinishedSwap, sub)
@@ -426,10 +507,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
426507
recvChan := sub.recvChan.(chan *swapserverrpc.
427508
ServerStaticLoopInSweepNotification)
428509

429-
select {
430-
case recvChan <- staticLoopInSweepRequestNtfn:
431-
case <-sub.subCtx.Done():
432-
}
510+
queueNotification(sub, recvChan, staticLoopInSweepRequestNtfn)
433511
}
434512

435513
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInRiskAccepted: // nolint: lll
@@ -557,10 +635,7 @@ func (m *Manager) handleNotification(ctx context.Context, ntfn *swapserverrpc.
557635
recvChan := sub.recvChan.(chan *swapserverrpc.
558636
ServerUnfinishedSwapNotification)
559637

560-
select {
561-
case recvChan <- unfinishedSwapNtfn:
562-
case <-sub.subCtx.Done():
563-
}
638+
queueNotification(sub, recvChan, unfinishedSwapNtfn)
564639
}
565640

566641
default:
@@ -583,7 +658,7 @@ func (m *Manager) removeSubscriber(notifType NotificationType, sub subscriber) {
583658
subs := m.subscribers[notifType]
584659
newSubs := make([]subscriber, 0, len(subs))
585660
for _, s := range subs {
586-
if s != sub {
661+
if s.recvChan != sub.recvChan {
587662
newSubs = append(newSubs, s)
588663
}
589664
}

notifications/manager_test.go

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,20 @@ func unfinishedSwapNotification(
202202
}
203203
}
204204

205+
func staticLoopInSweepNotification(
206+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207+
208+
return &swapserverrpc.SubscribeNotificationsResponse{
209+
Notification: &swapserverrpc.
210+
SubscribeNotificationsResponse_StaticLoopInSweep{
211+
StaticLoopInSweep: &swapserverrpc.
212+
ServerStaticLoopInSweepNotification{
213+
SwapHash: swapHash[:],
214+
},
215+
},
216+
}
217+
}
218+
205219
func staticLoopInRiskAcceptedNotification(
206220
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207221

@@ -354,6 +368,15 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
354368
close(done)
355369
}()
356370

371+
require.Eventually(t, func() bool {
372+
select {
373+
case <-done:
374+
return true
375+
default:
376+
return false
377+
}
378+
}, time.Second, 10*time.Millisecond)
379+
357380
select {
358381
case received := <-subChan:
359382
require.Equal(t, swapHashA[:], received.SwapHash)
@@ -363,18 +386,65 @@ func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
363386
}
364387

365388
select {
366-
case <-done:
389+
case received := <-subChan:
390+
require.Equal(t, swapHashB[:], received.SwapHash)
367391

368392
case <-time.After(time.Second):
369-
t.Fatal("second unfinished swap notification did not unblock")
393+
t.Fatal("second unfinished swap notification was dropped")
394+
}
395+
}
396+
397+
// TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber verifies
398+
// that a full static-loop-in sweep subscriber channel does not block the global
399+
// notification receive loop.
400+
func TestManager_StaticLoopInSweepNotificationQueuesForSlowSubscriber(
401+
t *testing.T) {
402+
403+
t.Parallel()
404+
405+
mgr := NewManager(&Config{})
406+
407+
subCtx, subCancel := context.WithCancel(t.Context())
408+
defer subCancel()
409+
410+
subChan := mgr.SubscribeStaticLoopInSweepRequests(subCtx)
411+
412+
swapHashA := lntypes.Hash{0x12, 0x13}
413+
swapHashB := lntypes.Hash{0x14, 0x15}
414+
415+
mgr.handleNotification(t.Context(), staticLoopInSweepNotification(swapHashA))
416+
417+
done := make(chan struct{})
418+
go func() {
419+
mgr.handleNotification(
420+
t.Context(), staticLoopInSweepNotification(swapHashB),
421+
)
422+
close(done)
423+
}()
424+
425+
require.Eventually(t, func() bool {
426+
select {
427+
case <-done:
428+
return true
429+
default:
430+
return false
431+
}
432+
}, time.Second, 10*time.Millisecond)
433+
434+
select {
435+
case received := <-subChan:
436+
require.Equal(t, swapHashA[:], received.SwapHash)
437+
438+
case <-time.After(time.Second):
439+
t.Fatal("did not receive first sweep notification")
370440
}
371441

372442
select {
373443
case received := <-subChan:
374444
require.Equal(t, swapHashB[:], received.SwapHash)
375445

376446
case <-time.After(time.Second):
377-
t.Fatal("second unfinished swap notification was dropped")
447+
t.Fatal("second sweep notification was not queued")
378448
}
379449
}
380450

0 commit comments

Comments
 (0)