Skip to content

Commit a8a3418

Browse files
committed
staticaddr: harden client deposit readiness
ListUnspentDeposits now reports only wallet UTXOs that have an active Deposited record. That matches the static loop-in admission path and avoids exposing wallet-seen outputs that are not ready for loop-in selection. Make local notification fan-out non-blocking for best-effort categories so a slow subscriber cannot stall the notification manager while it holds the subscriber lock. Static loop-in sweep signing requests remain blocking because they are work requests required for sweepbatcher presigning and must not be dropped.
1 parent 0b56d03 commit a8a3418

4 files changed

Lines changed: 208 additions & 45 deletions

File tree

loopd/swapclient_server.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,18 +1667,16 @@ func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
16671667
}
16681668

16691669
// ListUnspentRaw returns the unspent wallet view of the backing lnd
1670-
// wallet. It might be that deposits show up there that are actually
1671-
// not spendable because they already have been used but not yet spent
1672-
// by the server. We filter out such deposits here.
1670+
// wallet. Static loop-in initiation requires an active deposit record,
1671+
// so only deposits that are both wallet-visible and tracked as
1672+
// Deposited are returned here.
16731673
var (
1674-
outpoints []string
1675-
isUnspent = make(map[wire.OutPoint]struct{})
1676-
knownUtxos = make(map[wire.OutPoint]struct{})
1674+
outpoints []string
1675+
isUnspent = make(map[wire.OutPoint]struct{})
16771676
)
16781677

16791678
for _, utxo := range utxos {
16801679
outpoints = append(outpoints, utxo.OutPoint.String())
1681-
knownUtxos[utxo.OutPoint] = struct{}{}
16821680
}
16831681

16841682
// Check the spent status of the deposits by looking at their states.
@@ -1690,26 +1688,16 @@ func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
16901688
return nil, err
16911689
}
16921690

1693-
knownDeposits := make(map[wire.OutPoint]struct{}, len(deposits))
16941691
for _, d := range deposits {
16951692
if d == nil {
16961693
continue
16971694
}
16981695

1699-
knownDeposits[d.OutPoint] = struct{}{}
17001696
if d.IsInState(deposit.Deposited) {
17011697
isUnspent[d.OutPoint] = struct{}{}
17021698
}
17031699
}
17041700

1705-
// Any wallet outpoints that are unknown to the deposit store are new
1706-
// deposits and therefore still available.
1707-
for op := range knownUtxos {
1708-
if _, ok := knownDeposits[op]; !ok {
1709-
isUnspent[op] = struct{}{}
1710-
}
1711-
}
1712-
17131701
// Prepare the list of unspent deposits for the rpc response.
17141702
var respUtxos []*looprpc.Utxo
17151703
for _, u := range utxos {

loopd/swapclient_server_test.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,9 +1073,9 @@ func TestListUnspentDeposits(t *testing.T) {
10731073
return deposit.NewManager(&deposit.ManagerConfig{Store: store})
10741074
}
10751075

1076-
// Unknown deposits are available, Deposited is available and known
1077-
// non-Deposited states are excluded.
1078-
t.Run("unknown and Deposited included, locked states excluded",
1076+
// Only known Deposited records are available. Unknown deposits and
1077+
// known non-Deposited states are excluded.
1078+
t.Run("only known Deposited included",
10791079
func(t *testing.T) {
10801080
mock.SetListUnspent([]*lnwallet.Utxo{
10811081
utxoUnknown, utxoDeposited, utxoWithdrawn,
@@ -1098,19 +1098,17 @@ func TestListUnspentDeposits(t *testing.T) {
10981098
)
10991099
require.NoError(t, err)
11001100

1101-
// Expect the unknown utxo and the Deposited utxo only.
1102-
require.Len(t, resp.Utxos, 2)
1101+
// Expect the Deposited utxo only.
1102+
require.Len(t, resp.Utxos, 1)
11031103
got := map[string]struct{}{}
11041104
for _, u := range resp.Utxos {
11051105
got[u.Outpoint] = struct{}{}
11061106
// Confirm address string is non-empty and the
11071107
// same across utxos.
11081108
require.NotEmpty(t, u.StaticAddress)
11091109
}
1110-
_, ok1 := got[utxoUnknown.OutPoint.String()]
1111-
_, ok2 := got[utxoDeposited.OutPoint.String()]
1112-
require.True(t, ok1)
1113-
require.True(t, ok2)
1110+
_, ok := got[utxoDeposited.OutPoint.String()]
1111+
require.True(t, ok)
11141112
})
11151113

11161114
// Confirmation depth no longer changes availability; state does.
@@ -1138,19 +1136,17 @@ func TestListUnspentDeposits(t *testing.T) {
11381136
)
11391137
require.NoError(t, err)
11401138

1141-
require.Len(t, resp.Utxos, 2)
1139+
require.Len(t, resp.Utxos, 1)
11421140
got := map[string]struct{}{}
11431141
for _, u := range resp.Utxos {
11441142
got[u.Outpoint] = struct{}{}
11451143
}
1146-
_, ok1 := got[utxoUnknown.OutPoint.String()]
1147-
_, ok2 := got[utxoDeposited.OutPoint.String()]
1148-
require.True(t, ok1)
1149-
require.True(t, ok2)
1144+
_, ok := got[utxoDeposited.OutPoint.String()]
1145+
require.True(t, ok)
11501146
})
11511147

1152-
// Confirmed UTXO not present in store should be included.
1153-
t.Run("confirmed utxo not in store is included", func(t *testing.T) {
1148+
// Confirmed UTXO not present in store should be excluded.
1149+
t.Run("confirmed utxo not in store is excluded", func(t *testing.T) {
11541150
// Only return a confirmed UTXO from lnd and make sure the
11551151
// deposit manager/store doesn't know about it.
11561152
mock.SetListUnspent([]*lnwallet.Utxo{utxoConfirmedUnknown})
@@ -1168,13 +1164,6 @@ func TestListUnspentDeposits(t *testing.T) {
11681164
)
11691165
require.NoError(t, err)
11701166

1171-
// We expect the confirmed UTXO to be included even though it
1172-
// doesn't exist in the store yet.
1173-
require.Len(t, resp.Utxos, 1)
1174-
require.Equal(
1175-
t, utxoConfirmedUnknown.OutPoint.String(),
1176-
resp.Utxos[0].Outpoint,
1177-
)
1178-
require.NotEmpty(t, resp.Utxos[0].StaticAddress)
1167+
require.Empty(t, resp.Utxos)
11791168
})
11801169
}

notifications/manager.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,13 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
303303
recvChan := sub.recvChan.(chan *swapserverrpc.
304304
ServerReservationNotification)
305305

306-
recvChan <- reservationNtfn
306+
select {
307+
case recvChan <- reservationNtfn:
308+
case <-sub.subCtx.Done():
309+
default:
310+
log.Debugf("Dropping reservation " +
311+
"notification for slow subscriber")
312+
}
307313
}
308314
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep: // nolint: lll
309315
// We'll forward the static loop in sweep request to all
@@ -316,7 +322,10 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
316322
recvChan := sub.recvChan.(chan *swapserverrpc.
317323
ServerStaticLoopInSweepNotification)
318324

319-
recvChan <- staticLoopInSweepRequestNtfn
325+
select {
326+
case recvChan <- staticLoopInSweepRequestNtfn:
327+
case <-sub.subCtx.Done():
328+
}
320329
}
321330

322331
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
@@ -330,7 +339,10 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
330339
recvChan := sub.recvChan.(chan *swapserverrpc.
331340
ServerUnfinishedSwapNotification)
332341

333-
recvChan <- unfinishedSwapNtfn
342+
select {
343+
case recvChan <- unfinishedSwapNtfn:
344+
case <-sub.subCtx.Done():
345+
}
334346
}
335347

336348
default:

notifications/manager_test.go

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
var (
2020
testReservationId = []byte{0x01, 0x02}
21-
testReservationId2 = []byte{0x01, 0x02}
21+
testReservationId2 = []byte{0x03, 0x04}
2222
)
2323

2424
// mockNotificationsClient implements the NotificationsClient interface for testing.
@@ -188,6 +188,180 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp
188188
}
189189
}
190190

191+
func staticLoopInSweepNotification(
192+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
193+
194+
return &swapserverrpc.SubscribeNotificationsResponse{
195+
Notification: &swapserverrpc.
196+
SubscribeNotificationsResponse_StaticLoopInSweep{
197+
StaticLoopInSweep: &swapserverrpc.
198+
ServerStaticLoopInSweepNotification{
199+
SwapHash: swapHash[:],
200+
},
201+
},
202+
}
203+
}
204+
205+
func unfinishedSwapNotification(
206+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
207+
208+
return &swapserverrpc.SubscribeNotificationsResponse{
209+
Notification: &swapserverrpc.
210+
SubscribeNotificationsResponse_UnfinishedSwap{
211+
UnfinishedSwap: &swapserverrpc.
212+
ServerUnfinishedSwapNotification{
213+
SwapHash: swapHash[:],
214+
},
215+
},
216+
}
217+
}
218+
219+
// TestManager_SlowSubscriberDoesNotBlock tests that a subscriber with a full
220+
// notification channel does not block delivery to other subscribers.
221+
func TestManager_SlowSubscriberDoesNotBlock(t *testing.T) {
222+
t.Parallel()
223+
224+
mgr := NewManager(&Config{})
225+
226+
slowCtx, slowCancel := context.WithCancel(t.Context())
227+
defer slowCancel()
228+
slowChan := mgr.SubscribeReservations(slowCtx)
229+
230+
fastCtx, fastCancel := context.WithCancel(t.Context())
231+
defer fastCancel()
232+
fastChan := mgr.SubscribeReservations(fastCtx)
233+
234+
firstNotif := getTestNotification(testReservationId)
235+
mgr.handleNotification(firstNotif)
236+
237+
received := <-fastChan
238+
require.Equal(t, testReservationId, received.ReservationId)
239+
240+
secondNotif := getTestNotification(testReservationId2)
241+
done := make(chan struct{})
242+
go func() {
243+
mgr.handleNotification(secondNotif)
244+
close(done)
245+
}()
246+
247+
require.Eventually(t, func() bool {
248+
select {
249+
case <-done:
250+
return true
251+
default:
252+
return false
253+
}
254+
}, time.Second, 10*time.Millisecond)
255+
256+
select {
257+
case received = <-fastChan:
258+
require.Equal(t, testReservationId2, received.ReservationId)
259+
260+
case <-time.After(time.Second):
261+
t.Fatal("fast subscriber did not receive notification")
262+
}
263+
264+
require.Len(t, slowChan, 1)
265+
}
266+
267+
// TestManager_StaticLoopInSweepNotificationWaitsForSubscriber verifies that
268+
// static loop in sweep signing requests are not dropped when the local
269+
// subscriber is briefly behind.
270+
func TestManager_StaticLoopInSweepNotificationWaitsForSubscriber(
271+
t *testing.T) {
272+
273+
t.Parallel()
274+
275+
mgr := NewManager(&Config{})
276+
277+
subCtx, subCancel := context.WithCancel(t.Context())
278+
defer subCancel()
279+
280+
subChan := mgr.SubscribeStaticLoopInSweepRequests(subCtx)
281+
282+
swapHashA := lntypes.Hash{0x02, 0x03}
283+
swapHashB := lntypes.Hash{0x04, 0x05}
284+
285+
mgr.handleNotification(staticLoopInSweepNotification(swapHashA))
286+
287+
done := make(chan struct{})
288+
go func() {
289+
mgr.handleNotification(staticLoopInSweepNotification(swapHashB))
290+
close(done)
291+
}()
292+
293+
select {
294+
case received := <-subChan:
295+
require.Equal(t, swapHashA[:], received.SwapHash)
296+
297+
case <-time.After(time.Second):
298+
t.Fatal("did not receive first sweep notification")
299+
}
300+
301+
select {
302+
case <-done:
303+
304+
case <-time.After(time.Second):
305+
t.Fatal("second sweep notification did not unblock")
306+
}
307+
308+
select {
309+
case received := <-subChan:
310+
require.Equal(t, swapHashB[:], received.SwapHash)
311+
312+
case <-time.After(time.Second):
313+
t.Fatal("second sweep notification was dropped")
314+
}
315+
}
316+
317+
// TestManager_UnfinishedSwapNotificationWaitsForSubscriber verifies that
318+
// unfinished swap recovery notifications are not dropped when the local
319+
// subscriber is briefly behind.
320+
func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
321+
t.Parallel()
322+
323+
mgr := NewManager(&Config{})
324+
325+
subCtx, subCancel := context.WithCancel(t.Context())
326+
defer subCancel()
327+
328+
subChan := mgr.SubscribeUnfinishedSwaps(subCtx)
329+
330+
swapHashA := lntypes.Hash{0x02, 0x03}
331+
swapHashB := lntypes.Hash{0x04, 0x05}
332+
333+
mgr.handleNotification(unfinishedSwapNotification(swapHashA))
334+
335+
done := make(chan struct{})
336+
go func() {
337+
mgr.handleNotification(unfinishedSwapNotification(swapHashB))
338+
close(done)
339+
}()
340+
341+
select {
342+
case received := <-subChan:
343+
require.Equal(t, swapHashA[:], received.SwapHash)
344+
345+
case <-time.After(time.Second):
346+
t.Fatal("did not receive first unfinished swap notification")
347+
}
348+
349+
select {
350+
case <-done:
351+
352+
case <-time.After(time.Second):
353+
t.Fatal("second unfinished swap notification did not unblock")
354+
}
355+
356+
select {
357+
case received := <-subChan:
358+
require.Equal(t, swapHashB[:], received.SwapHash)
359+
360+
case <-time.After(time.Second):
361+
t.Fatal("second unfinished swap notification was dropped")
362+
}
363+
}
364+
191365
// TestManager_Backoff verifies that repeated failures in
192366
// subscribeNotifications cause the Manager to space out subscription attempts
193367
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)