Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/topology/kademlia/kademlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ type Kad struct {
logger log.Logger // logger
bootnode bool // indicates whether the node is working in bootnode mode
collector *im.Collector
metricsDB *shed.DB // backing store for the metrics collector; closed on shutdown
Comment thread
acud marked this conversation as resolved.
quit chan struct{} // quit channel
halt chan struct{} // halt channel
done chan struct{} // signal that `manage` has quit
Expand Down Expand Up @@ -246,6 +247,7 @@ func New(
logger: logger.WithName(loggerName).Register(),
bootnode: opt.BootnodeMode,
collector: imc,
metricsDB: sdb,
quit: make(chan struct{}),
halt: make(chan struct{}),
done: make(chan struct{}),
Expand Down Expand Up @@ -1595,6 +1597,12 @@ func (k *Kad) Close() error {
}
k.logger.Debug("metrics collector finalized", "elapsed", time.Since(start))

if k.metricsDB != nil {
if dbErr := k.metricsDB.Close(); dbErr != nil {
k.logger.Debug("unable to close metrics store", "error", dbErr)
}
}

return err
}

Expand Down
285 changes: 168 additions & 117 deletions pkg/topology/kademlia/kademlia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync"
"sync/atomic"
"testing"
"testing/synctest"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -891,45 +892,70 @@ func TestAddressBookPrune(t *testing.T) {
func TestAddressBookQuickPrune(t *testing.T) {
t.Parallel()

var (
failedConns int32
base, kad, ab, _, signer = newTestKademlia(t, nil, &failedConns, kademlia.Options{
TimeToRetry: new(time.Millisecond),
})
)
kad.SetStorageRadius(2)
// Run inside a synctest bubble so the manage loop and the peer retry
// windows run on a fake clock. Time only advances when every bubbled
// goroutine is durably blocked, which makes the connect/prune sequence
// deterministic and independent of real wall-clock timing on loaded CI
// runners.
synctest.Test(t, func(t *testing.T) {
var (
failedConns int32
base, kad, ab, _, signer = newTestKademlia(t, nil, &failedConns, kademlia.Options{
TimeToRetry: new(time.Millisecond),
})
)
kad.SetStorageRadius(2)

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
testutil.CleanupCloser(t, kad)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
// Close inside the bubble: synctest requires all bubbled goroutines to
// exit before the test function returns, so we cannot defer this to
// t.Cleanup (which runs after the bubble has closed).
defer func() {
if err := kad.Close(); err != nil {
t.Error(err)
}
// goleveldb's mpoolDrain waits up to 1s after the metrics store is
// closed before exiting and is not awaited by DB.Close, so advance
// the fake clock past it and let it settle before the bubble ends.
time.Sleep(2 * time.Second)
synctest.Wait()
}()

time.Sleep(100 * time.Millisecond)
// let the startup manage pass settle
synctest.Wait()

nonConnPeer, err := bzz.NewAddress(signer, []ma.Multiaddr{nonConnectableAddress}, swarm.RandAddressAt(t, base, 1), 0, nonce, 1, common.Address{})
if err != nil {
t.Fatal(err)
}
if err := ab.Put(nonConnPeer.Overlay, *nonConnPeer, false); err != nil {
t.Fatal(err)
}

// add non connectable peer; AddPeers triggers the manage loop which
// immediately attempts to connect via connectBalanced (the peer is in
// bin 1 which is below storageRadius 2, so connectNeighbours skips it).
kad.AddPeers(nonConnPeer.Overlay)
nonConnPeer, err := bzz.NewAddress(signer, []ma.Multiaddr{nonConnectableAddress}, swarm.RandAddressAt(t, base, 1), 0, nonce, 1, common.Address{})
if err != nil {
t.Fatal(err)
}
if err := ab.Put(nonConnPeer.Overlay, *nonConnPeer, false); err != nil {
t.Fatal(err)
}

for range kademlia.MaxConnAttempts {
time.Sleep(10 * time.Millisecond)
kad.Trigger()
}
// add non connectable peer; AddPeers triggers the manage loop which
// immediately attempts to connect via connectBalanced (the peer is in
// bin 1 which is below storageRadius 2, so connectNeighbours skips it).
kad.AddPeers(nonConnPeer.Overlay)

// after maxConnAttempts (4) failed dials the peer must be pruned
waitCounterAtLeast(t, &failedConns, int32(kademlia.MaxConnAttempts))
_, _, err = ab.Get(nonConnPeer.Overlay)
if !errors.Is(err, addressbook.ErrNotFound) {
t.Fatal(err)
}
// Each failed dial puts the peer into a TimeToRetry (1ms) window, so a
// single manage pass yields at most one failure. Trigger a pass, wait
// for it to settle, then advance the fake clock past the retry window
// and repeat. After maxConnAttempts failures the peer is pruned
// (removed from the address book).
for i := 0; ; i++ {
if _, _, err := ab.Get(nonConnPeer.Overlay); errors.Is(err, addressbook.ErrNotFound) {
break
}
if i >= 4*kademlia.MaxConnAttempts {
t.Fatalf("peer not pruned after %d manage passes, got %d failed connection attempts", i, atomic.LoadInt32(&failedConns))
}
kad.Trigger()
synctest.Wait()
time.Sleep(2 * time.Millisecond)
}
})
}

func TestClosestPeer(t *testing.T) {
Expand Down Expand Up @@ -1287,104 +1313,129 @@ func TestStart(t *testing.T) {
func TestOutofDepthPrune(t *testing.T) {
t.Parallel()

var (
conns, failedConns int32 // how many connect calls were made to the p2p mock

saturationPeers = 4
overSaturationPeers = 16
pruneWakeup = time.Millisecond * 100
pruneFuncImpl *func(uint8)
pruneMux = sync.Mutex{}
pruneFunc = func(depth uint8) {
pruneMux.Lock()
defer pruneMux.Unlock()
f := *pruneFuncImpl
f(depth)
}

base, kad, ab, _, signer = newTestKademlia(t, &conns, &failedConns, kademlia.Options{
SaturationPeers: new(saturationPeers),
OverSaturationPeers: new(overSaturationPeers),
PruneFunc: pruneFunc,
ExcludeFunc: defaultExcludeFunc,
PruneWakeup: &pruneWakeup,
})
)
// Run inside a synctest bubble so the manage and prune loops run on a fake
// clock. The connect cascade is drained with synctest.Wait(), while the
// time-driven pruning is stepped forward by the fake-clocked spinlock.Wait
// polls; this removes the real-clock barriers that made the test flaky on
// loaded CI runners.
synctest.Test(t, func(t *testing.T) {
var (
conns, failedConns int32 // how many connect calls were made to the p2p mock

saturationPeers = 4
overSaturationPeers = 16
pruneWakeup = time.Millisecond * 100
pruneFuncImpl *func(uint8)
pruneMux = sync.Mutex{}
pruneFunc = func(depth uint8) {
pruneMux.Lock()
defer pruneMux.Unlock()
f := *pruneFuncImpl
f(depth)
}

kad.SetStorageRadius(0)
base, kad, ab, _, signer = newTestKademlia(t, &conns, &failedConns, kademlia.Options{
SaturationPeers: new(saturationPeers),
OverSaturationPeers: new(overSaturationPeers),
PruneFunc: pruneFunc,
ExcludeFunc: defaultExcludeFunc,
PruneWakeup: &pruneWakeup,
})
)

// implement empty prune func
pruneMux.Lock()
pruneImpl := func(uint8) {}
pruneFuncImpl = &(pruneImpl)
pruneMux.Unlock()
kad.SetStorageRadius(0)

if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
testutil.CleanupCloser(t, kad)
// implement empty prune func
pruneMux.Lock()
pruneImpl := func(uint8) {}
pruneFuncImpl = &(pruneImpl)
pruneMux.Unlock()

// bin 0,1 balanced, rest not
for i := range 6 {
var peers []swarm.Address
if i < 2 {
peers = mineBin(t, base, i, 20, true)
} else {
peers = mineBin(t, base, i, 20, false)
if err := kad.Start(context.Background()); err != nil {
t.Fatal(err)
}
for _, peer := range peers {
addOne(t, signer, kad, ab, peer)
// Close inside the bubble: synctest requires all bubbled goroutines to
// exit before the test function returns.
defer func() {
if err := kad.Close(); err != nil {
t.Error(err)
}
// goleveldb's mpoolDrain waits up to 1s after the metrics store is
// closed before exiting and is not awaited by DB.Close, so advance
// the fake clock past it and let it settle before the bubble ends.
time.Sleep(2 * time.Second)
synctest.Wait()
}()

// bin 0,1 balanced, rest not
for i := range 6 {
var peers []swarm.Address
if i < 2 {
peers = mineBin(t, base, i, 20, true)
} else {
peers = mineBin(t, base, i, 20, false)
}
for _, peer := range peers {
addOne(t, signer, kad, ab, peer)
}
// let the connect cascade triggered by the added peers settle
synctest.Wait()
kDepth(t, kad, i)
}
time.Sleep(time.Millisecond * 10)
kDepth(t, kad, i)
}

// check that bin 0, 1 are balanced, but not 2
waitBalanced(t, kad, 0)
waitBalanced(t, kad, 1)
if kad.IsBalanced(2) {
t.Fatal("bin 2 should not be balanced")
}

// wait for kademlia connectors and pruning to finish
time.Sleep(time.Millisecond * 500)

// check that no pruning has happened
bins := binSizes(kad)
for i := range 6 {
if bins[i] <= overSaturationPeers {
t.Fatalf("bin %d, got %d, want more than %d", i, bins[i], overSaturationPeers)
// check that bin 0, 1 are balanced, but not 2
waitBalanced(t, kad, 0)
waitBalanced(t, kad, 1)
if kad.IsBalanced(2) {
t.Fatal("bin 2 should not be balanced")
}
}

kad.SetStorageRadius(6)
// wait for kademlia connectors to connect the added peers; no pruning
// happens yet because the prune func is a no-op at this point.
if err := spinlock.Wait(spinLockWaitTime, func() bool {
bins := binSizes(kad)
for i := range 6 {
if bins[i] <= overSaturationPeers {
return false
}
}
return true
}); err != nil {
t.Fatalf("timed out waiting for bins to fill: got %v, want each more than %d", binSizes(kad), overSaturationPeers)
}

// set prune func to the default
pruneMux.Lock()
pruneImpl = func(depth uint8) {
kademlia.PruneOversaturatedBinsFunc(kad)(depth)
}
pruneFuncImpl = &(pruneImpl)
pruneMux.Unlock()
kad.SetStorageRadius(6)

// add a peer to kick start pruning
addr := swarm.RandAddressAt(t, base, 6)
addOne(t, signer, kad, ab, addr)
// set prune func to the default
pruneMux.Lock()
pruneImpl = func(depth uint8) {
kademlia.PruneOversaturatedBinsFunc(kad)(depth)
}
pruneFuncImpl = &(pruneImpl)
pruneMux.Unlock()

// wait for kademlia connectors and pruning to finish
time.Sleep(time.Millisecond * 500)
// add a peer to kick start pruning
addr := swarm.RandAddressAt(t, base, 6)
addOne(t, signer, kad, ab, addr)

// check bins have been pruned
bins = binSizes(kad)
for i := range uint8(5) {
if bins[i] != overSaturationPeers {
t.Fatalf("bin %d, got %d, want %d", i, bins[i], overSaturationPeers)
// wait for kademlia connectors and pruning to finish: bins 0..4 must be
// pruned down to exactly overSaturationPeers.
if err := spinlock.Wait(spinLockWaitTime, func() bool {
bins := binSizes(kad)
for i := range uint8(5) {
if bins[i] != overSaturationPeers {
return false
}
}
return true
}); err != nil {
t.Fatalf("timed out waiting for bins to be pruned: got %v, want each %d", binSizes(kad), overSaturationPeers)
}
}

// check that bin 0,1 remains balanced after pruning
waitBalanced(t, kad, 0)
waitBalanced(t, kad, 1)
// check that bin 0,1 remains balanced after pruning
waitBalanced(t, kad, 0)
waitBalanced(t, kad, 1)
})
}

// TestPruneExcludeOps tests that the prune bin func counts peers in each bin correctly using the peer's reachability status and does not over prune.
Expand Down
Loading