Skip to content

Commit cd0153a

Browse files
committed
staticaddr: harden client deposit readiness
ListUnspentDeposits now reports only wallet UTXOs that have an active Deposited record. That matches the static loop-in admission path and avoids exposing wallet-seen outputs that are not ready for loop-in selection. Make local notification fan-out non-blocking for best-effort categories so a slow subscriber cannot stall the notification manager while it holds the subscriber lock. Static loop-in sweep signing requests remain blocking because they are work requests required for sweepbatcher presigning and must not be dropped.
1 parent 06b2a11 commit cd0153a

4 files changed

Lines changed: 144 additions & 45 deletions

File tree

loopd/swapclient_server.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,18 +1667,16 @@ func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
16671667
}
16681668

16691669
// ListUnspentRaw returns the unspent wallet view of the backing lnd
1670-
// wallet. It might be that deposits show up there that are actually
1671-
// not spendable because they already have been used but not yet spent
1672-
// by the server. We filter out such deposits here.
1670+
// wallet. Static loop-in initiation requires an active deposit record,
1671+
// so only deposits that are both wallet-visible and tracked as
1672+
// Deposited are returned here.
16731673
var (
1674-
outpoints []string
1675-
isUnspent = make(map[wire.OutPoint]struct{})
1676-
knownUtxos = make(map[wire.OutPoint]struct{})
1674+
outpoints []string
1675+
isUnspent = make(map[wire.OutPoint]struct{})
16771676
)
16781677

16791678
for _, utxo := range utxos {
16801679
outpoints = append(outpoints, utxo.OutPoint.String())
1681-
knownUtxos[utxo.OutPoint] = struct{}{}
16821680
}
16831681

16841682
// Check the spent status of the deposits by looking at their states.
@@ -1690,26 +1688,16 @@ func (s *swapClientServer) ListUnspentDeposits(ctx context.Context,
16901688
return nil, err
16911689
}
16921690

1693-
knownDeposits := make(map[wire.OutPoint]struct{}, len(deposits))
16941691
for _, d := range deposits {
16951692
if d == nil {
16961693
continue
16971694
}
16981695

1699-
knownDeposits[d.OutPoint] = struct{}{}
17001696
if d.IsInState(deposit.Deposited) {
17011697
isUnspent[d.OutPoint] = struct{}{}
17021698
}
17031699
}
17041700

1705-
// Any wallet outpoints that are unknown to the deposit store are new
1706-
// deposits and therefore still available.
1707-
for op := range knownUtxos {
1708-
if _, ok := knownDeposits[op]; !ok {
1709-
isUnspent[op] = struct{}{}
1710-
}
1711-
}
1712-
17131701
// Prepare the list of unspent deposits for the rpc response.
17141702
var respUtxos []*looprpc.Utxo
17151703
for _, u := range utxos {

loopd/swapclient_server_test.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,9 +1073,9 @@ func TestListUnspentDeposits(t *testing.T) {
10731073
return deposit.NewManager(&deposit.ManagerConfig{Store: store})
10741074
}
10751075

1076-
// Unknown deposits are available, Deposited is available and known
1077-
// non-Deposited states are excluded.
1078-
t.Run("unknown and Deposited included, locked states excluded",
1076+
// Only known Deposited records are available. Unknown deposits and
1077+
// known non-Deposited states are excluded.
1078+
t.Run("only known Deposited included",
10791079
func(t *testing.T) {
10801080
mock.SetListUnspent([]*lnwallet.Utxo{
10811081
utxoUnknown, utxoDeposited, utxoWithdrawn,
@@ -1098,19 +1098,17 @@ func TestListUnspentDeposits(t *testing.T) {
10981098
)
10991099
require.NoError(t, err)
11001100

1101-
// Expect the unknown utxo and the Deposited utxo only.
1102-
require.Len(t, resp.Utxos, 2)
1101+
// Expect the Deposited utxo only.
1102+
require.Len(t, resp.Utxos, 1)
11031103
got := map[string]struct{}{}
11041104
for _, u := range resp.Utxos {
11051105
got[u.Outpoint] = struct{}{}
11061106
// Confirm address string is non-empty and the
11071107
// same across utxos.
11081108
require.NotEmpty(t, u.StaticAddress)
11091109
}
1110-
_, ok1 := got[utxoUnknown.OutPoint.String()]
1111-
_, ok2 := got[utxoDeposited.OutPoint.String()]
1112-
require.True(t, ok1)
1113-
require.True(t, ok2)
1110+
_, ok := got[utxoDeposited.OutPoint.String()]
1111+
require.True(t, ok)
11141112
})
11151113

11161114
// Confirmation depth no longer changes availability; state does.
@@ -1138,19 +1136,17 @@ func TestListUnspentDeposits(t *testing.T) {
11381136
)
11391137
require.NoError(t, err)
11401138

1141-
require.Len(t, resp.Utxos, 2)
1139+
require.Len(t, resp.Utxos, 1)
11421140
got := map[string]struct{}{}
11431141
for _, u := range resp.Utxos {
11441142
got[u.Outpoint] = struct{}{}
11451143
}
1146-
_, ok1 := got[utxoUnknown.OutPoint.String()]
1147-
_, ok2 := got[utxoDeposited.OutPoint.String()]
1148-
require.True(t, ok1)
1149-
require.True(t, ok2)
1144+
_, ok := got[utxoDeposited.OutPoint.String()]
1145+
require.True(t, ok)
11501146
})
11511147

1152-
// Confirmed UTXO not present in store should be included.
1153-
t.Run("confirmed utxo not in store is included", func(t *testing.T) {
1148+
// Confirmed UTXO not present in store should be excluded.
1149+
t.Run("confirmed utxo not in store is excluded", func(t *testing.T) {
11541150
// Only return a confirmed UTXO from lnd and make sure the
11551151
// deposit manager/store doesn't know about it.
11561152
mock.SetListUnspent([]*lnwallet.Utxo{utxoConfirmedUnknown})
@@ -1168,13 +1164,6 @@ func TestListUnspentDeposits(t *testing.T) {
11681164
)
11691165
require.NoError(t, err)
11701166

1171-
// We expect the confirmed UTXO to be included even though it
1172-
// doesn't exist in the store yet.
1173-
require.Len(t, resp.Utxos, 1)
1174-
require.Equal(
1175-
t, utxoConfirmedUnknown.OutPoint.String(),
1176-
resp.Utxos[0].Outpoint,
1177-
)
1178-
require.NotEmpty(t, resp.Utxos[0].StaticAddress)
1167+
require.Empty(t, resp.Utxos)
11791168
})
11801169
}

notifications/manager.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,13 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
303303
recvChan := sub.recvChan.(chan *swapserverrpc.
304304
ServerReservationNotification)
305305

306-
recvChan <- reservationNtfn
306+
select {
307+
case recvChan <- reservationNtfn:
308+
case <-sub.subCtx.Done():
309+
default:
310+
log.Debugf("Dropping reservation " +
311+
"notification for slow subscriber")
312+
}
307313
}
308314
case *swapserverrpc.SubscribeNotificationsResponse_StaticLoopInSweep: // nolint: lll
309315
// We'll forward the static loop in sweep request to all
@@ -316,7 +322,10 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
316322
recvChan := sub.recvChan.(chan *swapserverrpc.
317323
ServerStaticLoopInSweepNotification)
318324

319-
recvChan <- staticLoopInSweepRequestNtfn
325+
select {
326+
case recvChan <- staticLoopInSweepRequestNtfn:
327+
case <-sub.subCtx.Done():
328+
}
320329
}
321330

322331
case *swapserverrpc.SubscribeNotificationsResponse_UnfinishedSwap: // nolint: lll
@@ -330,7 +339,10 @@ func (m *Manager) handleNotification(ntfn *swapserverrpc.
330339
recvChan := sub.recvChan.(chan *swapserverrpc.
331340
ServerUnfinishedSwapNotification)
332341

333-
recvChan <- unfinishedSwapNtfn
342+
select {
343+
case recvChan <- unfinishedSwapNtfn:
344+
case <-sub.subCtx.Done():
345+
}
334346
}
335347

336348
default:

notifications/manager_test.go

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
var (
2020
testReservationId = []byte{0x01, 0x02}
21-
testReservationId2 = []byte{0x01, 0x02}
21+
testReservationId2 = []byte{0x03, 0x04}
2222
)
2323

2424
// mockNotificationsClient implements the NotificationsClient interface for testing.
@@ -188,6 +188,116 @@ func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResp
188188
}
189189
}
190190

191+
func unfinishedSwapNotification(
192+
swapHash lntypes.Hash) *swapserverrpc.SubscribeNotificationsResponse {
193+
194+
return &swapserverrpc.SubscribeNotificationsResponse{
195+
Notification: &swapserverrpc.
196+
SubscribeNotificationsResponse_UnfinishedSwap{
197+
UnfinishedSwap: &swapserverrpc.
198+
ServerUnfinishedSwapNotification{
199+
SwapHash: swapHash[:],
200+
},
201+
},
202+
}
203+
}
204+
205+
// TestManager_SlowSubscriberDoesNotBlock tests that a subscriber with a full
206+
// notification channel does not block delivery to other subscribers.
207+
func TestManager_SlowSubscriberDoesNotBlock(t *testing.T) {
208+
t.Parallel()
209+
210+
mgr := NewManager(&Config{})
211+
212+
slowCtx, slowCancel := context.WithCancel(t.Context())
213+
defer slowCancel()
214+
slowChan := mgr.SubscribeReservations(slowCtx)
215+
216+
fastCtx, fastCancel := context.WithCancel(t.Context())
217+
defer fastCancel()
218+
fastChan := mgr.SubscribeReservations(fastCtx)
219+
220+
firstNotif := getTestNotification(testReservationId)
221+
mgr.handleNotification(firstNotif)
222+
223+
received := <-fastChan
224+
require.Equal(t, testReservationId, received.ReservationId)
225+
226+
secondNotif := getTestNotification(testReservationId2)
227+
done := make(chan struct{})
228+
go func() {
229+
mgr.handleNotification(secondNotif)
230+
close(done)
231+
}()
232+
233+
require.Eventually(t, func() bool {
234+
select {
235+
case <-done:
236+
return true
237+
default:
238+
return false
239+
}
240+
}, time.Second, 10*time.Millisecond)
241+
242+
select {
243+
case received = <-fastChan:
244+
require.Equal(t, testReservationId2, received.ReservationId)
245+
246+
case <-time.After(time.Second):
247+
t.Fatal("fast subscriber did not receive notification")
248+
}
249+
250+
require.Len(t, slowChan, 1)
251+
}
252+
253+
// TestManager_UnfinishedSwapNotificationWaitsForSubscriber verifies that
254+
// unfinished swap recovery notifications are not dropped when the local
255+
// subscriber is briefly behind.
256+
func TestManager_UnfinishedSwapNotificationWaitsForSubscriber(t *testing.T) {
257+
t.Parallel()
258+
259+
mgr := NewManager(&Config{})
260+
261+
subCtx, subCancel := context.WithCancel(t.Context())
262+
defer subCancel()
263+
264+
subChan := mgr.SubscribeUnfinishedSwaps(subCtx)
265+
266+
swapHashA := lntypes.Hash{0x02, 0x03}
267+
swapHashB := lntypes.Hash{0x04, 0x05}
268+
269+
mgr.handleNotification(unfinishedSwapNotification(swapHashA))
270+
271+
done := make(chan struct{})
272+
go func() {
273+
mgr.handleNotification(unfinishedSwapNotification(swapHashB))
274+
close(done)
275+
}()
276+
277+
select {
278+
case received := <-subChan:
279+
require.Equal(t, swapHashA[:], received.SwapHash)
280+
281+
case <-time.After(time.Second):
282+
t.Fatal("did not receive first unfinished swap notification")
283+
}
284+
285+
select {
286+
case <-done:
287+
288+
case <-time.After(time.Second):
289+
t.Fatal("second unfinished swap notification did not unblock")
290+
}
291+
292+
select {
293+
case received := <-subChan:
294+
require.Equal(t, swapHashB[:], received.SwapHash)
295+
296+
case <-time.After(time.Second):
297+
t.Fatal("second unfinished swap notification was dropped")
298+
}
299+
}
300+
191301
// TestManager_Backoff verifies that repeated failures in
192302
// subscribeNotifications cause the Manager to space out subscription attempts
193303
// via a predictable incremental backoff.

0 commit comments

Comments
 (0)