Skip to content

Commit a6ba7ee

Browse files
timcooijmansfjl
authored andcommitted
fix(p2p/discover): avoid dropping unverified nodes when table is almost empty ethereum#21396 ethereum#21554
This change improves discovery behavior in small networks. Very small networks would often fail to bootstrap because all member nodes were dropping table content due to findnode failure. The check is now changed to avoid dropping nodes on findnode failure when their bucket is almost empty. It also relaxes the liveness check requirement for FINDNODE/v4 response nodes, returning unverified nodes as results when there aren't any verified nodes yet. The "findnode failed" log now reports whether the node was dropped instead of the number of results. The value of the "results" was always zero by definition. Co-authored-by: Felix Lange <fjl@twurst.com>
1 parent 0364de0 commit a6ba7ee

5 files changed

Lines changed: 136 additions & 26 deletions

File tree

p2p/discover/lookup.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,7 @@ func (it *lookup) startQueries() bool {
104104

105105
// The first query returns nodes from the local table.
106106
if it.queries == -1 {
107-
it.tab.mutex.Lock()
108-
closest := it.tab.closest(it.result.target, bucketSize, false)
109-
it.tab.mutex.Unlock()
107+
closest := it.tab.findnodeByID(it.result.target, bucketSize, false)
110108
// Avoid finishing the lookup too quickly if table is empty. It'd be better to wait
111109
// for the table to fill in this case, but there is no good mechanism for that
112110
// yet.
@@ -150,11 +148,14 @@ func (it *lookup) query(n *node, reply chan<- []*node) {
150148
} else if len(r) == 0 {
151149
fails++
152150
it.tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
153-
it.tab.log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "results", len(r), "err", err)
154-
if fails >= maxFindnodeFailures {
155-
it.tab.log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
151+
// Remove the node from the local table if it fails to return anything useful too
152+
// many times, but only if there are enough other nodes in the bucket.
153+
dropped := false
154+
if fails >= maxFindnodeFailures && it.tab.bucketLen(n.ID()) >= bucketSize/2 {
155+
dropped = true
156156
it.tab.delete(n)
157157
}
158+
it.tab.log.Trace("FINDNODE failed", "id", n.ID(), "failcount", fails, "dropped", dropped, "err", err)
158159
} else if fails > 0 {
159160
// Reset failure counter because it counts _consecutive_ failures.
160161
it.tab.db.UpdateFindFails(n.ID(), n.IP(), 0)

p2p/discover/table.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ func (tab *Table) nextRevalidateTime() time.Duration {
379379
}
380380

381381
// copyLiveNodes adds nodes from the table to the database if they have been in the table
382-
// longer then minTableTime.
382+
// longer than seedMinTableTime.
383383
func (tab *Table) copyLiveNodes() {
384384
tab.mutex.Lock()
385385
defer tab.mutex.Unlock()
@@ -394,22 +394,35 @@ func (tab *Table) copyLiveNodes() {
394394
}
395395
}
396396

397-
// closest returns the n nodes in the table that are closest to the
398-
// given id. The caller must hold tab.mutex.
399-
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance {
400-
// This is a very wasteful way to find the closest nodes but
401-
// obviously correct. I believe that tree-based buckets would make
402-
// this easier to implement efficiently.
403-
close := &nodesByDistance{target: target}
397+
// findnodeByID returns the n nodes in the table that are closest to the given id.
398+
// This is used by the FINDNODE/v4 handler.
399+
//
400+
// The preferLive parameter says whether the caller wants liveness-checked results. If
401+
// preferLive is true and the table contains any verified nodes, the result will not
402+
// contain unverified nodes. However, if there are no verified nodes at all, the result
403+
// will contain unverified nodes.
404+
func (tab *Table) findnodeByID(target enode.ID, nresults int, preferLive bool) *nodesByDistance {
405+
tab.mutex.Lock()
406+
defer tab.mutex.Unlock()
407+
408+
// Scan all buckets. There might be a better way to do this, but there aren't that many
409+
// buckets, so this solution should be fine. The worst-case complexity of this loop
410+
// is O(tab.len() * nresults).
411+
nodes := &nodesByDistance{target: target}
412+
liveNodes := &nodesByDistance{target: target}
404413
for _, b := range &tab.buckets {
405414
for _, n := range b.entries {
406-
if checklive && n.livenessChecks == 0 {
407-
continue
415+
nodes.push(n, nresults)
416+
if preferLive && n.livenessChecks > 0 {
417+
liveNodes.push(n, nresults)
408418
}
409-
close.push(n, nresults)
410419
}
411420
}
412-
return close
421+
422+
if preferLive && len(liveNodes.entries) > 0 {
423+
return liveNodes
424+
}
425+
return nodes
413426
}
414427

415428
// len returns the number of nodes in the table.
@@ -423,6 +436,14 @@ func (tab *Table) len() (n int) {
423436
return n
424437
}
425438

439+
// bucketLen returns the number of nodes in the bucket for the given ID.
440+
func (tab *Table) bucketLen(id enode.ID) int {
441+
tab.mutex.Lock()
442+
defer tab.mutex.Unlock()
443+
444+
return len(tab.bucket(id).entries)
445+
}
446+
426447
// bucket returns the bucket for the given node ID hash.
427448
func (tab *Table) bucket(id enode.ID) *bucket {
428449
d := enode.LogDist(tab.self().ID(), id)

p2p/discover/table_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func checkIPLimitInvariant(t *testing.T, tab *Table) {
201201
}
202202
}
203203

204-
func TestTable_closest(t *testing.T) {
204+
func TestTable_findnodeByID(t *testing.T) {
205205
t.Parallel()
206206

207207
test := func(test *closeTest) bool {
@@ -213,7 +213,7 @@ func TestTable_closest(t *testing.T) {
213213
fillTable(tab, test.All)
214214

215215
// check that closest(Target, N) returns nodes
216-
result := tab.closest(test.Target, test.N, false).entries
216+
result := tab.findnodeByID(test.Target, test.N, false).entries
217217
if hasDuplicates(result) {
218218
t.Errorf("result contains duplicates")
219219
return false

p2p/discover/v4_udp.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,16 @@ func (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target v4wire.Pubke
324324
Target: target,
325325
Expiration: uint64(time.Now().Add(expiration).Unix()),
326326
})
327-
return nodes, <-rm.errc
327+
// Ensure that callers don't see a timeout if the node actually responded. Since
328+
// findnode can receive more than one neighbors response, the reply matcher will be
329+
// active until the remote node sends enough nodes. If the remote end doesn't have
330+
// enough nodes the reply matcher will time out waiting for the second reply, but
331+
// there's no need for an error in that case.
332+
err := <-rm.errc
333+
if err == errTimeout && rm.reply != nil {
334+
err = nil
335+
}
336+
return nodes, err
328337
}
329338

330339
// RequestENR sends enrRequest to the given node and waits for a response.
@@ -453,9 +462,9 @@ func (t *UDPv4) loop() {
453462
if p.from == r.from && p.ptype == r.data.Kind() && p.ip.Equal(r.ip) {
454463
ok, requestDone := p.callback(r.data)
455464
matched = matched || ok
465+
p.reply = r.data
456466
// Remove the matcher if callback indicates that all replies have been received.
457467
if requestDone {
458-
p.reply = r.data
459468
p.errc <- nil
460469
plist.Remove(el)
461470
}
@@ -715,9 +724,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
715724

716725
// Determine closest nodes.
717726
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
718-
t.tab.mutex.Lock()
719-
closest := t.tab.closest(target, bucketSize, true).entries
720-
t.tab.mutex.Unlock()
727+
closest := t.tab.findnodeByID(target, bucketSize, true).entries
721728

722729
// Send neighbors in chunks with at most maxNeighbors per packet
723730
// to stay below the packet size limit.

p2p/discover/v4_udp_test.go

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
crand "crypto/rand"
2323
"encoding/binary"
2424
"errors"
25+
"fmt"
2526
"io"
2627
"math/rand"
2728
"net"
@@ -306,7 +307,7 @@ func TestUDPv4_findnode(t *testing.T) {
306307
test.table.db.UpdateLastPongReceived(remoteID, test.remoteaddr.IP, time.Now())
307308

308309
// check that closest neighbors are returned.
309-
expected := test.table.closest(testTarget.ID(), bucketSize, true)
310+
expected := test.table.findnodeByID(testTarget.ID(), bucketSize, true)
310311
test.packetIn(nil, &v4wire.Findnode{Target: testTarget, Expiration: futureExp})
311312
waitNeighbors := func(want []*node) {
312313
test.waitPacketOut(func(p *v4wire.Neighbors, to *net.UDPAddr, hash []byte) {
@@ -522,6 +523,86 @@ func TestUDPv4_EIP868(t *testing.T) {
522523
})
523524
}
524525

526+
// This test verifies that a small network of nodes can boot up into a healthy state.
527+
func TestUDPv4_smallNetConvergence(t *testing.T) {
528+
t.Parallel()
529+
530+
// Start the network.
531+
nodes := make([]*UDPv4, 4)
532+
for i := range nodes {
533+
var cfg Config
534+
if i > 0 {
535+
bn := nodes[0].Self()
536+
cfg.Bootnodes = []*enode.Node{bn}
537+
}
538+
nodes[i] = startLocalhostV4(t, cfg)
539+
defer nodes[i].Close()
540+
}
541+
542+
// Run through the iterator on all nodes until
543+
// they have all found each other.
544+
status := make(chan error, len(nodes))
545+
for i := range nodes {
546+
node := nodes[i]
547+
go func() {
548+
found := make(map[enode.ID]bool, len(nodes))
549+
it := node.RandomNodes()
550+
for it.Next() {
551+
found[it.Node().ID()] = true
552+
if len(found) == len(nodes) {
553+
status <- nil
554+
return
555+
}
556+
}
557+
status <- fmt.Errorf("node %s didn't find all nodes", node.Self().ID().TerminalString())
558+
}()
559+
}
560+
561+
// Wait for all status reports.
562+
timeout := time.NewTimer(30 * time.Second)
563+
defer timeout.Stop()
564+
for received := 0; received < len(nodes); {
565+
select {
566+
case <-timeout.C:
567+
for _, node := range nodes {
568+
node.Close()
569+
}
570+
case err := <-status:
571+
received++
572+
if err != nil {
573+
t.Error("ERROR:", err)
574+
return
575+
}
576+
}
577+
}
578+
}
579+
580+
func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
581+
t.Helper()
582+
583+
cfg.PrivateKey = newkey()
584+
db, _ := enode.OpenDB("")
585+
ln := enode.NewLocalNode(db, cfg.PrivateKey)
586+
587+
// Prefix logs with node ID.
588+
lprefix := fmt.Sprintf("(%s)", ln.ID().TerminalString())
589+
cfg.Log = testlog.Logger(t, log.LevelTrace).With("node-id", lprefix)
590+
591+
// Listen.
592+
socket, err := net.ListenUDP("udp4", &net.UDPAddr{IP: net.IP{127, 0, 0, 1}})
593+
if err != nil {
594+
t.Fatal(err)
595+
}
596+
realaddr := socket.LocalAddr().(*net.UDPAddr)
597+
ln.SetStaticIP(realaddr.IP)
598+
ln.SetFallbackUDP(realaddr.Port)
599+
udp, err := ListenV4(socket, ln, cfg)
600+
if err != nil {
601+
t.Fatal(err)
602+
}
603+
return udp
604+
}
605+
525606
// dgramPipe is a fake UDP socket. It queues all sent datagrams.
526607
type dgramPipe struct {
527608
mu *sync.Mutex

0 commit comments

Comments
 (0)