Skip to content

Commit d67d18e

Browse files
authored
fix(ci): race conditions in tests (#119)
* fix: race condition in TestFindProviders the test was incorrectly using require.Eventually on already-read results. also, parallelRouter doesn't deduplicate, so peers[0] can appear twice (once with addresses from mr1, once without from mr2). fixed by searching for the record with addresses before verifying filtration. * fix: deadlock in manyIter.Close() manyIter.Close() would deadlock when goroutines were blocked on unbuffered channel sends. fixed with synchronous drain that matches http streaming semantics from routing v1 spec: - idempotent close (safe to call multiple times) - drain channel to unblock senders (expected for early termination) - proper cleanup of child iterators also added missing defer it.Close() in tests. * fix: deadlock in cacheFallbackIter dispatchFindPeer goroutines blocked forever trying to send to unbuffered channel when iterator closed early. fixed with buffered channel + non-blocking sends that drop results if nobody is reading (acceptable for background lookups). also added context cancellation in Close() to stop goroutines. * fix: add missing iterator Close() calls in tests added defer it.Close() to all test iterators to prevent resource leaks that could cause intermittent hangs. * fix: infinite recursion in cacheFallbackIter.Next() replaced recursion with loop to avoid stack overflow. maintains original logic: after dispatching lookup for peer without addresses, continues trying source iterator or waiting for lookup results. also fixed deadlock when ongoingLookups reaches 0 between check and channel read by using periodic timeout. * refactor: simplify cacheFallbackIter.Next() - removed unnecessary handleRecord closure - cache ongoingLookups value to avoid race in logging - use defer timer.Stop() for proper cleanup * fix: resource leak and double-close issues - fix iterator leak in dispatchFindPeer by adding defer Close() - remove redundant goroutine in manyIter.Close() that could cause double-close panic. the channel is already closed by the goroutine in newManyIter.
1 parent 95b801a commit d67d18e

3 files changed

Lines changed: 111 additions & 45 deletions

File tree

server_cached_router.go

Lines changed: 73 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -117,56 +117,70 @@ type cacheFallbackIter struct {
117117
findPeersResult chan types.PeerRecord
118118
router cachedRouter
119119
ctx context.Context
120+
cancel context.CancelFunc
120121
ongoingLookups atomic.Int32
121122
}
122123

123124
// NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers.
124125
// It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call.
125126
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter {
127+
// Create a cancellable context for this iterator
128+
iterCtx, cancel := context.WithCancel(ctx)
129+
126130
iter := &cacheFallbackIter{
127131
sourceIter: sourceIter,
128132
router: router,
129-
ctx: ctx,
130-
findPeersResult: make(chan types.PeerRecord),
133+
ctx: iterCtx,
134+
cancel: cancel,
135+
findPeersResult: make(chan types.PeerRecord, 100), // Buffer to avoid drops in typical cases
131136
ongoingLookups: atomic.Int32{},
132137
}
133138

134139
return iter
135140
}
136141

137142
func (it *cacheFallbackIter) Next() bool {
138-
// Try to get the next value from the source iterator first
139-
if it.sourceIter.Next() {
140-
val := it.sourceIter.Val()
141-
handleRecord := func(id *peer.ID, record *types.PeerRecord) bool {
142-
record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *id, record.Addrs)
143-
if len(record.Addrs) > 0 {
144-
it.current = iter.Result[types.Record]{Val: record}
145-
return true
143+
for {
144+
// Try to get the next value from the source iterator first
145+
if it.sourceIter.Next() {
146+
val := it.sourceIter.Val()
147+
148+
switch val.Val.GetSchema() {
149+
case types.SchemaPeer:
150+
if record, ok := val.Val.(*types.PeerRecord); ok {
151+
record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *record.ID, record.Addrs)
152+
if len(record.Addrs) > 0 {
153+
it.current = iter.Result[types.Record]{Val: record}
154+
return true
155+
}
156+
157+
logger.Infow("no cached addresses found in cacheFallbackIter, dispatching find peers", "peer", record.ID)
158+
if it.router.cachedAddrBook.ShouldProbePeer(*record.ID) {
159+
it.ongoingLookups.Add(1) // important to increment before dispatchFindPeer
160+
// If a record has no addrs, we dispatch a lookup to find addresses
161+
go it.dispatchFindPeer(*record)
162+
}
163+
// Continue to try next source item
164+
continue
165+
}
146166
}
147-
logger.Infow("no cached addresses found in cacheFallbackIter, dispatching find peers", "peer", id)
148-
149-
if it.router.cachedAddrBook.ShouldProbePeer(*id) {
150-
it.ongoingLookups.Add(1) // important to increment before dispatchFindPeer
151-
// If a record has no addrs, we dispatch a lookup to find addresses
152-
go it.dispatchFindPeer(*record)
153-
}
154-
return it.Next() // Recursively call Next() to either read from sourceIter or wait for lookup result
167+
it.current = val // pass through unknown schemas
168+
return true
155169
}
156170

157-
switch val.Val.GetSchema() {
158-
case types.SchemaPeer:
159-
if record, ok := val.Val.(*types.PeerRecord); ok {
160-
return handleRecord(record.ID, record)
161-
}
171+
// No more source items. If there are still ongoing lookups, wait for them
172+
ongoing := it.ongoingLookups.Load()
173+
if ongoing == 0 {
174+
// No more lookups, we're done
175+
return false
162176
}
163-
it.current = val // pass through unknown schemas
164-
return true
165-
}
166177

167-
// If there are still ongoing lookups, wait for them
168-
if it.ongoingLookups.Load() > 0 {
169-
logger.Infow("waiting for ongoing find peers result")
178+
logger.Infow("waiting for ongoing find peers result", "ongoing", ongoing)
179+
180+
// Use a timeout to recheck ongoingLookups periodically
181+
// This prevents deadlock if ongoingLookups becomes 0 after we check
182+
timer := time.NewTimer(100 * time.Millisecond)
183+
defer timer.Stop() // Ensure cleanup even if we return early
170184
select {
171185
case result, ok := <-it.findPeersResult:
172186
if !ok {
@@ -175,15 +189,14 @@ func (it *cacheFallbackIter) Next() bool {
175189
if len(result.Addrs) > 0 { // Only if the lookup returned a result and it has addrs
176190
it.current = iter.Result[types.Record]{Val: &result}
177191
return true
178-
} else {
179-
return it.Next() // recursively call Next() in case there are more ongoing lookups
180192
}
193+
// If no addresses, continue the loop to check for more source items or results
181194
case <-it.ctx.Done():
182195
return false
196+
case <-timer.C:
197+
// Timeout expired, loop back to try source iterator again and recheck ongoingLookups
183198
}
184199
}
185-
186-
return false
187200
}
188201

189202
func (it *cacheFallbackIter) Val() iter.Result[types.Record] {
@@ -194,6 +207,12 @@ func (it *cacheFallbackIter) Val() iter.Result[types.Record] {
194207
}
195208

196209
func (it *cacheFallbackIter) Close() error {
210+
// Cancel the context to stop any ongoing lookups
211+
if it.cancel != nil {
212+
it.cancel()
213+
}
214+
215+
// Close the source iterator
197216
return it.sourceIter.Close()
198217
}
199218

@@ -207,25 +226,42 @@ func (it *cacheFallbackIter) dispatchFindPeer(record types.PeerRecord) {
207226
defer cancel()
208227

209228
peersIt, err := it.router.FindPeers(ctx, *record.ID, 1)
229+
if err == nil {
230+
defer peersIt.Close() // Ensure cleanup of the iterator
231+
}
210232

211233
// Check if the parent context is done before sending
212234
if it.ctx.Err() != nil {
213235
return // Exit early if the parent context is done
214236
}
215237

238+
// Helper to send result without blocking
239+
sendResult := func(r types.PeerRecord) {
240+
select {
241+
case it.findPeersResult <- r:
242+
// Sent successfully
243+
case <-it.ctx.Done():
244+
// Context cancelled, exit
245+
default:
246+
// Channel full or nobody listening anymore, drop the result
247+
// This is OK - these are best-effort background lookups
248+
logger.Debugw("dropping find peers result, nobody listening", "peer", r.ID)
249+
}
250+
}
251+
216252
if err != nil {
217-
it.findPeersResult <- record // pass back the record with no addrs
253+
sendResult(record) // pass back the record with no addrs
218254
return
219255
}
220256
peers, err := iter.ReadAllResults(peersIt)
221257
if err != nil {
222-
it.findPeersResult <- record // pass back the record with no addrs
258+
sendResult(record) // pass back the record with no addrs
223259
return
224260
}
225261
if len(peers) > 0 {
226262
// If we found the peer, pass back the result
227-
it.findPeersResult <- *peers[0]
263+
sendResult(*peers[0])
228264
} else {
229-
it.findPeersResult <- record // pass back the record with no addrs
265+
sendResult(record) // pass back the record with no addrs
230266
}
231267
}

server_routers.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,20 @@ func (mi *manyIter[T]) Val() iter.Result[T] {
195195
}
196196

197197
func (mi *manyIter[T]) Close() error {
198+
if mi.done {
199+
return nil // Already closed, idempotent
200+
}
198201
mi.done = true
199-
mi.cancel()
200-
mi.wg.Wait()
202+
mi.cancel() // Signal goroutines to stop
203+
204+
// The channel will be closed by the goroutine in newManyIter once all workers finish
205+
// We just need to drain it to unblock any pending sends
206+
// This is expected behavior when client terminates early (per HTTP routing spec)
207+
for range mi.ch {
208+
// Discard remaining values
209+
}
210+
211+
// Now close child iterators
201212
var err error
202213
for _, it := range mi.its {
203214
err = errors.Join(err, it.Close())

server_routers_test.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ func TestFindProviders(t *testing.T) {
296296
var d router
297297
d = parallelRouter{}
298298
it, err := d.FindProviders(ctx, c, 10)
299-
300299
require.NoError(t, err)
300+
defer it.Close()
301301
require.False(t, it.Next())
302302

303303
mr1 := &mockRouter{}
@@ -338,15 +338,31 @@ func TestFindProviders(t *testing.T) {
338338

339339
it, err = d.FindProviders(ctx, c, 10)
340340
require.NoError(t, err)
341+
defer it.Close()
341342

342343
results, err := iter.ReadAllResults(it)
343344
require.NoError(t, err)
344345
require.Len(t, results, 5)
345346

346-
require.Eventually(t, func() bool {
347-
return len(results[0].(*types.PeerRecord).Addrs) == 1
348-
}, time.Second*3, time.Millisecond*100)
349-
require.Equal(t, publicAddr.String(), results[0].(*types.PeerRecord).Addrs[0].String())
347+
// The parallelRouter uses manyIter which merges results from multiple routers concurrently.
348+
// Both mr1 and mr2 send a record for peers[0]:
349+
// - mr1 sends peers[0] WITH addresses (private, loopback, public)
350+
// - mr2 sends peers[0] WITHOUT addresses
351+
// Due to concurrent execution, either record could arrive first in the results.
352+
// The parallelRouter doesn't deduplicate, so both records are included.
353+
// We need to find the record that has addresses to verify the sanitizeRouter
354+
// correctly filtered out private/loopback addresses, keeping only public ones.
355+
var peerWithAddrs *types.PeerRecord
356+
for _, r := range results {
357+
pr := r.(*types.PeerRecord)
358+
if *pr.ID == peers[0] && len(pr.Addrs) > 0 {
359+
peerWithAddrs = pr
360+
break
361+
}
362+
}
363+
require.NotNil(t, peerWithAddrs, "should have found peer[0] with addresses")
364+
require.Len(t, peerWithAddrs.Addrs, 1)
365+
require.Equal(t, publicAddr.String(), peerWithAddrs.Addrs[0].String())
350366
})
351367

352368
t.Run("Failed to Create All Iterators", func(t *testing.T) {
@@ -389,6 +405,7 @@ func TestFindProviders(t *testing.T) {
389405

390406
it, err := d.FindProviders(ctx, c, 10)
391407
require.NoError(t, err)
408+
defer it.Close()
392409

393410
results, err := iter.ReadAllResults(it)
394411
require.NoError(t, err)
@@ -405,8 +422,8 @@ func TestFindPeers(t *testing.T) {
405422

406423
d := parallelRouter{}
407424
it, err := d.FindPeers(ctx, pid, 10)
408-
409425
require.NoError(t, err)
426+
defer it.Close()
410427
require.False(t, it.Next())
411428

412429
mr1 := &mockRouter{}
@@ -439,6 +456,7 @@ func TestFindPeers(t *testing.T) {
439456

440457
it, err = d.FindPeers(ctx, pid, 10)
441458
require.NoError(t, err)
459+
defer it.Close()
442460

443461
results, err := iter.ReadAllResults(it)
444462
require.NoError(t, err)
@@ -484,6 +502,7 @@ func TestFindPeers(t *testing.T) {
484502

485503
it, err := d.FindPeers(ctx, pid, 10)
486504
require.NoError(t, err)
505+
defer it.Close()
487506

488507
results, err := iter.ReadAllResults(it)
489508
require.NoError(t, err)

0 commit comments

Comments
 (0)