Skip to content

Commit 5e92a85

Browse files
committed
decouple nodeFeed from Table mutex in waitForNodes
1 parent a786e88 commit 5e92a85

2 files changed

Lines changed: 83 additions & 12 deletions

File tree

p2p/discover/table.go

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -753,35 +753,66 @@ func (tab *Table) deleteNode(n *enode.Node) {
753753

754754
// waitForNodes blocks until the table contains at least n nodes.
755755
func (tab *Table) waitForNodes(ctx context.Context, n int) error {
756+
// Wrap ctx so the forwarder goroutine exits when waitForNodes returns,
757+
// regardless of whether the caller's ctx is canceled.
758+
ctx, cancel := context.WithCancel(ctx)
759+
defer cancel()
760+
761+
// Set up a notification channel that gets unblocked when there was any activity on
762+
// the table. Ultimately this reads from the table's nodeFeed, but can't use the feed
763+
// directly on the same goroutine that takes Table.mutex, it would deadlock.
764+
var notify chan struct{}
765+
var notifyErr error
766+
initsub := func() event.Subscription {
767+
notify = make(chan struct{}, 1)
768+
newnode := make(chan *enode.Node, 1)
769+
sub := tab.nodeFeed.Subscribe(newnode)
770+
go func() {
771+
defer close(notify)
772+
for {
773+
select {
774+
case <-newnode:
775+
select {
776+
case notify <- struct{}{}:
777+
default:
778+
}
779+
case <-ctx.Done():
780+
notifyErr = ctx.Err()
781+
return
782+
case <-tab.closeReq:
783+
notifyErr = errClosed
784+
return
785+
}
786+
}
787+
}()
788+
return sub
789+
}
790+
756791
getlength := func() (count int) {
757792
for _, b := range &tab.buckets {
758793
count += len(b.entries)
759794
}
760795
return count
761796
}
762797

763-
var ch chan *enode.Node
764798
for {
765799
tab.mutex.Lock()
766800
if getlength() >= n {
767801
tab.mutex.Unlock()
768802
return nil
769803
}
770-
if ch == nil {
771-
// Init subscription.
772-
ch = make(chan *enode.Node)
773-
sub := tab.nodeFeed.Subscribe(ch)
804+
if notify == nil {
805+
// Lazily init the subscription. Do this while holding the
806+
// lock so we don't miss any events that change the node count.
807+
sub := initsub()
774808
defer sub.Unsubscribe()
775809
}
776810
tab.mutex.Unlock()
777811

778-
// Wait for a node add event.
779-
select {
780-
case <-ch:
781-
case <-ctx.Done():
782-
return ctx.Err()
783-
case <-tab.closeReq:
784-
return errClosed
812+
// Wait for table event.
813+
if _, ok := <-notify; !ok {
814+
break
785815
}
786816
}
817+
return notifyErr
787818
}

p2p/discover/table_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package discover
1818

1919
import (
20+
"context"
2021
"crypto/ecdsa"
2122
"fmt"
2223
"math/rand"
@@ -550,6 +551,45 @@ func TestSetFallbackNodes_DNSHostname(t *testing.T) {
550551
t.Logf("resolved localhost to %v", resolved.IPAddr())
551552
}
552553

554+
// This test checks that waitForNodes does not block addFoundNode.
555+
// See https://github.com/ethereum/go-ethereum/issues/34881.
556+
func TestTable_waitForNodesLocking(t *testing.T) {
557+
transport := newPingRecorder()
558+
tab, db := newTestTable(transport, Config{})
559+
defer db.Close()
560+
defer tab.close()
561+
<-tab.initDone
562+
563+
// waitForNodes will never reach this count, so it stays subscribed
564+
// to nodeFeed and looping for the duration of the test.
565+
waitCtx, cancelWait := context.WithCancel(context.Background())
566+
defer cancelWait()
567+
waitDone := make(chan struct{})
568+
go func() {
569+
defer close(waitDone)
570+
tab.waitForNodes(waitCtx, 1<<20)
571+
}()
572+
573+
// Call addFoundNode in loop to send to the feed.
574+
addDone := make(chan struct{})
575+
go func() {
576+
defer close(addDone)
577+
for i := range 10000 {
578+
d := 240 + (i % 17)
579+
n := nodeAtDistance(tab.self().ID(), d, intIP(i))
580+
tab.addFoundNode(n, true)
581+
}
582+
}()
583+
584+
select {
585+
case <-addDone:
586+
cancelWait()
587+
<-waitDone
588+
case <-time.After(10 * time.Second):
589+
t.Fatal("deadlock detected: add loop did not finish within 10s")
590+
}
591+
}
592+
553593
func newkey() *ecdsa.PrivateKey {
554594
key, err := crypto.GenerateKey()
555595
if err != nil {

0 commit comments

Comments
 (0)