From 3c44fa7c1a452b6b0d62a62c2dfac57751a09595 Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 25 Jun 2026 09:59:48 +0200 Subject: [PATCH 1/3] feat: ttl cache for gossip deduplication --- pkg/hive/gossip_dedup_cache.go | 130 ++++++++++++++++++++++++++++ pkg/hive/gossip_dedup_cache_test.go | 64 ++++++++++++++ pkg/hive/hive.go | 37 +++++++- pkg/hive/hive_test.go | 87 +++++++++++++++++++ pkg/hive/metrics.go | 7 ++ 5 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 pkg/hive/gossip_dedup_cache.go create mode 100644 pkg/hive/gossip_dedup_cache_test.go diff --git a/pkg/hive/gossip_dedup_cache.go b/pkg/hive/gossip_dedup_cache.go new file mode 100644 index 00000000000..194a913ec12 --- /dev/null +++ b/pkg/hive/gossip_dedup_cache.go @@ -0,0 +1,130 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package hive + +import ( + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +const ( + defaultGossipDedupTTL = time.Minute + defaultGossipDedupPruneInterval = time.Minute +) + +// gossipDedupCache suppresses repeated outbound gossip of the same peer to the same +// addressee within a TTL window. +type gossipDedupCache struct { + mu sync.Mutex + seen map[string]map[string]int64 // addressee -> gossiped peer -> expiry unix nano + ttl time.Duration + quit chan struct{} + wg sync.WaitGroup +} + +func newGossipDedupCache(ttl, pruneInterval time.Duration) *gossipDedupCache { + if ttl == 0 { + ttl = defaultGossipDedupTTL + } + + if pruneInterval == 0 { + pruneInterval = defaultGossipDedupPruneInterval + } + + d := &gossipDedupCache{ + seen: make(map[string]map[string]int64), + ttl: ttl, + quit: make(chan struct{}), + } + + d.wg.Go(func() { + ticker := time.NewTicker(pruneInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + d.prune() + case <-d.quit: + return + } + } + }) + + return d +} + +func (d *gossipDedupCache) contains(addressee, peer swarm.Address) bool { + d.mu.Lock() + defer d.mu.Unlock() + + peers, ok := d.seen[addressee.ByteString()] + if !ok { + return false + } + + exp, ok := peers[peer.ByteString()] + if !ok { + return false + } + + return exp > time.Now().UnixNano() +} + +func (d *gossipDedupCache) add(addressee swarm.Address, peers ...swarm.Address) { + if len(peers) == 0 { + return + } + + d.mu.Lock() + defer d.mu.Unlock() + + exp := time.Now().Add(d.ttl).UnixNano() + key := addressee.ByteString() + + m, ok := d.seen[key] + if !ok { + m = make(map[string]int64) + d.seen[key] = m + } + + for _, p := range peers { + m[p.ByteString()] = exp + } +} + +func (d *gossipDedupCache) clearAddressee(addressee swarm.Address) { + d.mu.Lock() + defer d.mu.Unlock() + + delete(d.seen, addressee.ByteString()) +} + +func (d *gossipDedupCache) prune() { + d.mu.Lock() + defer d.mu.Unlock() + + now := time.Now().UnixNano() + for addressee, peers := range d.seen { + for peer, exp := range peers { + if exp <= now { + delete(peers, peer) + } + } + if len(peers) == 0 { + delete(d.seen, addressee) + } + } +} + +func (d *gossipDedupCache) close() { + close(d.quit) + d.wg.Wait() + + d.mu.Lock() + clear(d.seen) + d.mu.Unlock() +} diff --git a/pkg/hive/gossip_dedup_cache_test.go b/pkg/hive/gossip_dedup_cache_test.go new file mode 100644 index 00000000000..8f4b1fbfa88 --- /dev/null +++ b/pkg/hive/gossip_dedup_cache_test.go @@ -0,0 +1,64 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package hive + +import ( + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestGossipDedup(t *testing.T) { + t.Parallel() + + const ttl = 20 * time.Millisecond + + d := newGossipDedupCache(ttl, 0) + t.Cleanup(func() { d.close() }) + + addressee := swarm.RandAddress(t) + peer := swarm.RandAddress(t) + + if d.contains(addressee, peer) { + t.Fatal("unexpected hit on empty cache") + } + + d.add(addressee, peer) + if !d.contains(addressee, peer) { + t.Fatal("want cache hit after add") + } + + time.Sleep(ttl + 5*time.Millisecond) + if d.contains(addressee, peer) { + t.Fatal("want cache miss after ttl") + } + + d.add(addressee, peer) + d.clearAddressee(addressee) + if d.contains(addressee, peer) { + t.Fatal("want cache miss after clear") + } +} + +func TestGossipDedupPrune(t *testing.T) { + t.Parallel() + + const ttl = 20 * time.Millisecond + + d := newGossipDedupCache(ttl, 0) + t.Cleanup(func() { d.close() }) + + addressee := swarm.RandAddress(t) + peer := swarm.RandAddress(t) + + d.add(addressee, peer) + time.Sleep(2 * ttl) + d.prune() + + if d.contains(addressee, peer) { + t.Fatal("want expired entry pruned") + } +} diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index a991dbdd58b..1bfa99e0ebf 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -61,11 +61,16 @@ var ( // optional: a nil ChequebookVerifier disables the verification gate (and // records without a chequebook are accepted); a nil ChequebookStorer means // no overlay→chequebook persistence — addressbook writes happen directly. +// +// Gossip dedup fields are optional. type Options struct { BootnodeMode bool AllowPrivateCIDRs bool ChequebookVerifier chequebook.Verifier ChequebookStorer ChequebookStorer + + GossipDedupCacheTTL time.Duration + GossipDedupPruneInterval time.Duration } type Service struct { @@ -90,6 +95,7 @@ type Service struct { // chequebook are dropped. chequebookVerifier chequebook.Verifier chequebookStorer ChequebookStorer + gossipDedup *gossipDedupCache } func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, overlay swarm.Address, logger log.Logger, o Options) *Service { @@ -112,6 +118,8 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin chequebookStorer: o.ChequebookStorer, } + svc.gossipDedup = newGossipDedupCache(o.GossipDedupCacheTTL, o.GossipDedupPruneInterval) + if !o.BootnodeMode { svc.startCheckPeersHandler() } @@ -137,6 +145,11 @@ func (s *Service) Protocol() p2p.ProtocolSpec { var ErrShutdownInProgress = errors.New("shutdown in progress") func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error { + peers = s.filterGossipDedup(addressee, peers) + if len(peers) == 0 { + return nil + } + maxSize := maxBatchSize s.metrics.BroadcastPeers.Inc() s.metrics.BroadcastPeersPeers.Add(float64(len(peers))) @@ -173,6 +186,7 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) { func (s *Service) Close() error { close(s.quit) + s.gossipDedup.close() stopped := make(chan struct{}) go func() { @@ -202,7 +216,10 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa } }() w, _ := protobuf.NewWriterAndReader(stream) - var peersRequest pb.Peers + var ( + peersRequest pb.Peers + gossiped []swarm.Address + ) for _, p := range peers { if p.Equal(s.overlay) { s.logger.Debug("skipping self-address in broadcast", "overlay", p.String()) @@ -248,12 +265,17 @@ func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swa Timestamp: addr.Timestamp, ChequebookAddress: addr.ChequebookAddress.Bytes(), }) + gossiped = append(gossiped, p) } if err := w.WriteMsgWithContext(ctx, &peersRequest); err != nil { return fmt.Errorf("write Peers message: %w", err) } + if len(gossiped) > 0 { + s.gossipDedup.add(peer, gossiped...) + } + return nil } @@ -296,9 +318,22 @@ func (s *Service) peersHandler(ctx context.Context, peer p2p.Peer, stream p2p.St func (s *Service) disconnect(peer p2p.Peer) error { s.inLimiter.Clear(peer.Address.ByteString()) s.outLimiter.Clear(peer.Address.ByteString()) + s.gossipDedup.clearAddressee(peer.Address) return nil } +func (s *Service) filterGossipDedup(addressee swarm.Address, peers []swarm.Address) []swarm.Address { + filtered := make([]swarm.Address, 0, len(peers)) + for _, p := range peers { + if s.gossipDedup.contains(addressee, p) { + s.metrics.GossipDedupSkipped.Inc() + continue + } + filtered = append(filtered, p) + } + return filtered +} + func (s *Service) startCheckPeersHandler() { ctx, cancel := context.WithCancel(context.Background()) s.wg.Go(func() { diff --git a/pkg/hive/hive_test.go b/pkg/hive/hive_test.go index 9f6f599da43..d1f28ed32da 100644 --- a/pkg/hive/hive_test.go +++ b/pkg/hive/hive_test.go @@ -23,6 +23,7 @@ import ( "github.com/ethersphere/bee/v2/pkg/hive" "github.com/ethersphere/bee/v2/pkg/hive/pb" "github.com/ethersphere/bee/v2/pkg/log" + "github.com/ethersphere/bee/v2/pkg/p2p" "github.com/ethersphere/bee/v2/pkg/p2p/protobuf" "github.com/ethersphere/bee/v2/pkg/p2p/streamtest" "github.com/ethersphere/bee/v2/pkg/settlement/swap/chequebook" @@ -1344,3 +1345,89 @@ func TestHiveGossipUnderlayCaps(t *testing.T) { } }) } + +func TestBroadcastPeersGossipDedup(t *testing.T) { + t.Parallel() + + logger := log.Noop + statestore := mock.NewStateStore() + addressbook := ab.New(statestore) + networkID := uint64(1) + + underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") + if err != nil { + t.Fatal(err) + } + pk, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(pk) + gossipedOverlay, err := crypto.NewOverlayAddress(pk.PublicKey, networkID, nonce) + if err != nil { + t.Fatal(err) + } + gossipedAddr, err := bzz.NewAddress(signer, []ma.Multiaddr{underlay}, gossipedOverlay, networkID, nonce, 1, common.Address{}) + if err != nil { + t.Fatal(err) + } + if err := addressbook.Put(gossipedAddr.Overlay, *gossipedAddr, true); err != nil { + t.Fatal(err) + } + + streamer := streamtest.New() + serverAddress := swarm.RandAddress(t) + server := hive.New(streamer, ab.New(mock.NewStateStore()), networkID, serverAddress, logger, hive.Options{AllowPrivateCIDRs: true}) + testutil.CleanupCloser(t, server) + + recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) + clientAddress := swarm.RandAddress(t) + dedupTTL := 50 * time.Millisecond + client := hive.New(recorder, addressbook, networkID, clientAddress, logger, hive.Options{ + AllowPrivateCIDRs: true, + GossipDedupCacheTTL: dedupTTL, + }) + testutil.CleanupCloser(t, client) + + ctx := context.Background() + if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { + t.Fatal(err) + } + if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { + t.Fatal(err) + } + + records, err := recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil { + t.Fatal(err) + } + if got, want := len(records), 1; got != want { + t.Fatalf("got %d gossip messages, want %d", got, want) + } + + time.Sleep(dedupTTL + 10*time.Millisecond) + if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { + t.Fatal(err) + } + records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil { + t.Fatal(err) + } + if got, want := len(records), 2; got != want { + t.Fatalf("after ttl got %d gossip messages, want %d", got, want) + } + + if err := client.Protocol().DisconnectOut(p2p.Peer{Address: serverAddress, FullNode: true}); err != nil { + t.Fatal(err) + } + if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { + t.Fatal(err) + } + records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil { + t.Fatal(err) + } + if got, want := len(records), 3; got != want { + t.Fatalf("after disconnect got %d gossip messages, want %d", got, want) + } +} diff --git a/pkg/hive/metrics.go b/pkg/hive/metrics.go index 849c45abec9..fa555fc67a2 100644 --- a/pkg/hive/metrics.go +++ b/pkg/hive/metrics.go @@ -32,6 +32,7 @@ type metrics struct { TimestampRejected *prometheus.CounterVec LegacyRecordSkipped prometheus.Counter + GossipDedupSkipped prometheus.Counter } func newMetrics() metrics { @@ -137,6 +138,12 @@ func newMetrics() metrics { }, []string{"reason"}, ), + GossipDedupSkipped: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "gossip_dedup_skipped_total", + Help: "Number of peer gossip entries suppressed by the outbound dedup cache.", + }), ChequebookVerification: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, From 0825424a2364fc64275b49197da45bd7eb9bfe93 Mon Sep 17 00:00:00 2001 From: sbackend Date: Fri, 26 Jun 2026 12:49:10 +0200 Subject: [PATCH 2/3] feat: send peers in batches --- pkg/hive/export_test.go | 12 +++- pkg/hive/gossip_buffer.go | 121 +++++++++++++++++++++++++++++++++ pkg/hive/gossip_buffer_test.go | 80 ++++++++++++++++++++++ pkg/hive/hive.go | 85 +++++++++++++++++++++-- pkg/hive/hive_test.go | 120 ++++++++++++++++++++++++++++++++ pkg/hive/metrics.go | 15 ++++ 6 files changed, 425 insertions(+), 8 deletions(-) create mode 100644 pkg/hive/gossip_buffer.go create mode 100644 pkg/hive/gossip_buffer_test.go diff --git a/pkg/hive/export_test.go b/pkg/hive/export_test.go index 3d190772f18..7b034117fcd 100644 --- a/pkg/hive/export_test.go +++ b/pkg/hive/export_test.go @@ -11,13 +11,21 @@ import ( "github.com/ethersphere/bee/v2/pkg/hive/pb" ) -var MaxBatchSize = maxBatchSize -var LimitBurst = limitBurst +var ( + MaxBatchSize = maxBatchSize + LimitBurst = limitBurst + CoalesceThreshold = coalesceThreshold +) func (s *Service) SetTimeFunc(f func() time.Time) { s.now = f } +// FlushGossipBufferForTest drains the outbound gossip coalesce buffer synchronously. +func (s *Service) FlushGossipBufferForTest() { + s.flushGossipEntries(s.gossipBuf.takeAll()) +} + // CheckAndAddPeers exposes the internal ingestion path for tests, // bypassing the stream and rate limiter. func (s *Service) CheckAndAddPeers(peers pb.Peers) { diff --git a/pkg/hive/gossip_buffer.go b/pkg/hive/gossip_buffer.go new file mode 100644 index 00000000000..65e361c3da8 --- /dev/null +++ b/pkg/hive/gossip_buffer.go @@ -0,0 +1,121 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package hive + +import ( + "math/rand/v2" + "sync" + "time" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +const ( + defaultGossipCoalesceInterval = time.Second + // coalesceThreshold: gossips with fewer peers are buffered; larger + // (already-batched) messages are dispatched immediately. + coalesceThreshold = 2 +) + +// gossipBuffer accumulates single-peer outbound gossip per addressee so it can be +// flushed as one batched message. +type gossipBuffer struct { + mu sync.Mutex + pending map[string]*pendingGossip // addressee bytestring -> buffered peers + interval time.Duration + jitter time.Duration + maxBatch int +} + +type pendingGossip struct { + addressee swarm.Address + peers map[string]swarm.Address // peer bytestring -> address (set semantics) + deadline time.Time +} + +func newGossipBuffer(interval time.Duration, maxBatch int) *gossipBuffer { + if interval == 0 { + interval = defaultGossipCoalesceInterval + } + return &gossipBuffer{ + pending: make(map[string]*pendingGossip), + interval: interval, + jitter: interval / 4, + maxBatch: maxBatch, + } +} + +// add buffers peers for the addressee. If the buffer reaches maxBatch it is +// removed and returned so the caller can flush it immediately; otherwise nil. +func (b *gossipBuffer) add(now time.Time, addressee swarm.Address, peers ...swarm.Address) *pendingGossip { + b.mu.Lock() + defer b.mu.Unlock() + + key := addressee.ByteString() + e, ok := b.pending[key] + if !ok { + e = &pendingGossip{ + addressee: addressee, + peers: make(map[string]swarm.Address), + deadline: now.Add(b.interval + time.Duration(rand.Int64N(int64(b.jitter)))), + } + b.pending[key] = e + } + for _, p := range peers { + e.peers[p.ByteString()] = p + } + + if len(e.peers) >= b.maxBatch { + delete(b.pending, key) + return e + } + return nil +} + +// takeDue removes and returns all entries whose deadline has passed. +func (b *gossipBuffer) takeDue(now time.Time) []*pendingGossip { + return b.take(func(e *pendingGossip) bool { return !e.deadline.After(now) }) +} + +// takeAll removes and returns everything (used for the shutdown drain). +func (b *gossipBuffer) takeAll() []*pendingGossip { + return b.take(func(*pendingGossip) bool { return true }) +} + +func (b *gossipBuffer) clearAddressee(addressee swarm.Address) { + b.mu.Lock() + defer b.mu.Unlock() + + delete(b.pending, addressee.ByteString()) +} + +func (b *gossipBuffer) pendingAddressees() int { + b.mu.Lock() + defer b.mu.Unlock() + + return len(b.pending) +} + +func (b *gossipBuffer) take(match func(*pendingGossip) bool) []*pendingGossip { + b.mu.Lock() + defer b.mu.Unlock() + + var out []*pendingGossip + for key, e := range b.pending { + if match(e) { + out = append(out, e) + delete(b.pending, key) + } + } + return out +} + +func (e *pendingGossip) addresses() []swarm.Address { + out := make([]swarm.Address, 0, len(e.peers)) + for _, p := range e.peers { + out = append(out, p) + } + return out +} diff --git a/pkg/hive/gossip_buffer_test.go b/pkg/hive/gossip_buffer_test.go new file mode 100644 index 00000000000..c831b5149ad --- /dev/null +++ b/pkg/hive/gossip_buffer_test.go @@ -0,0 +1,80 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package hive + +import ( + "testing" + "time" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func TestGossipBufferAddAndDue(t *testing.T) { + t.Parallel() + + const interval = 100 * time.Millisecond + + b := newGossipBuffer(interval, maxBatchSize) + addressee := swarm.RandAddress(t) + peer1 := swarm.RandAddress(t) + peer2 := swarm.RandAddress(t) + + now := time.Now() + if full := b.add(now, addressee, peer1); full != nil { + t.Fatal("unexpected immediate flush") + } + + if due := b.takeDue(now); len(due) != 0 { + t.Fatalf("want no due entries, got %d", len(due)) + } + + if full := b.add(now, addressee, peer2); full != nil { + t.Fatal("unexpected immediate flush") + } + + afterDeadline := now.Add(interval + interval/4 + time.Millisecond) + due := b.takeDue(afterDeadline) + if len(due) != 1 { + t.Fatalf("want 1 due entry, got %d", len(due)) + } + if got := len(due[0].addresses()); got != 2 { + t.Fatalf("want 2 coalesced peers, got %d", got) + } +} + +func TestGossipBufferMaxBatchFlush(t *testing.T) { + t.Parallel() + + b := newGossipBuffer(time.Second, 2) + addressee := swarm.RandAddress(t) + now := time.Now() + + b.add(now, addressee, swarm.RandAddress(t)) + full := b.add(now, addressee, swarm.RandAddress(t)) + if full == nil { + t.Fatal("want immediate flush at maxBatch") + } + if got := len(full.addresses()); got != 2 { + t.Fatalf("want 2 peers in full batch, got %d", got) + } + if due := b.takeDue(now.Add(time.Second)); len(due) != 0 { + t.Fatalf("want empty buffer after maxBatch flush, got %d due", len(due)) + } +} + +func TestGossipBufferClearAddressee(t *testing.T) { + t.Parallel() + + b := newGossipBuffer(time.Second, maxBatchSize) + addressee := swarm.RandAddress(t) + now := time.Now() + + b.add(now, addressee, swarm.RandAddress(t)) + b.clearAddressee(addressee) + + if all := b.takeAll(); len(all) != 0 { + t.Fatalf("want buffer cleared on disconnect, got %d entries", len(all)) + } +} diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 1bfa99e0ebf..368eeef17b2 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -71,6 +71,7 @@ type Options struct { GossipDedupCacheTTL time.Duration GossipDedupPruneInterval time.Duration + GossipCoalesceInterval time.Duration } type Service struct { @@ -96,6 +97,7 @@ type Service struct { chequebookVerifier chequebook.Verifier chequebookStorer ChequebookStorer gossipDedup *gossipDedupCache + gossipBuf *gossipBuffer } func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uint64, overlay swarm.Address, logger log.Logger, o Options) *Service { @@ -119,10 +121,12 @@ func New(streamer p2p.Streamer, addressbook addressbook.GetPutter, networkID uin } svc.gossipDedup = newGossipDedupCache(o.GossipDedupCacheTTL, o.GossipDedupPruneInterval) + svc.gossipBuf = newGossipBuffer(o.GossipCoalesceInterval, maxBatchSize) if !o.BootnodeMode { svc.startCheckPeersHandler() } + svc.startGossipCoalescer() return svc } @@ -150,10 +154,35 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p return nil } - maxSize := maxBatchSize s.metrics.BroadcastPeers.Inc() s.metrics.BroadcastPeersPeers.Add(float64(len(peers))) + // Already-batched messages go out immediately; single-peer gossips are coalesced. + if len(peers) >= coalesceThreshold { + return s.broadcastNow(ctx, addressee, peers...) + } + + select { + case <-s.quit: + return ErrShutdownInProgress + default: + } + + // Buffer; if it just filled up, flush it synchronously while still in the call + // (caller's ctx is alive here, so it is safe to use). + if full := s.gossipBuf.add(s.now(), addressee, peers...); full != nil { + s.metrics.GossipCoalescedFlushes.Inc() + s.setCoalesceBufferGauge() + return s.broadcastNow(ctx, full.addressee, full.addresses()...) + } + s.setCoalesceBufferGauge() + return nil +} + +// broadcastNow performs the synchronous, rate-limited, batched send. +func (s *Service) broadcastNow(ctx context.Context, addressee swarm.Address, peers ...swarm.Address) error { + maxSize := maxBatchSize + for len(peers) > 0 { if maxSize > len(peers) { maxSize = len(peers) @@ -165,8 +194,8 @@ func (s *Service) BroadcastPeers(ctx context.Context, addressee swarm.Address, p } select { - case <-s.quit: - return ErrShutdownInProgress + case <-ctx.Done(): + return ctx.Err() default: } @@ -186,7 +215,6 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) { func (s *Service) Close() error { close(s.quit) - s.gossipDedup.close() stopped := make(chan struct{}) go func() { @@ -194,12 +222,15 @@ func (s *Service) Close() error { s.wg.Wait() }() + var err error select { case <-stopped: - return nil case <-time.After(time.Second * 5): - return errors.New("hive: waited 5 seconds to close active goroutines") + err = errors.New("hive: waited 5 seconds to close active goroutines") } + + s.gossipDedup.close() + return err } func (s *Service) sendPeers(ctx context.Context, peer swarm.Address, peers []swarm.Address) (err error) { @@ -319,6 +350,8 @@ func (s *Service) disconnect(peer p2p.Peer) error { s.inLimiter.Clear(peer.Address.ByteString()) s.outLimiter.Clear(peer.Address.ByteString()) s.gossipDedup.clearAddressee(peer.Address) + s.gossipBuf.clearAddressee(peer.Address) + s.setCoalesceBufferGauge() return nil } @@ -334,6 +367,46 @@ func (s *Service) filterGossipDedup(addressee swarm.Address, peers []swarm.Addre return filtered } +func (s *Service) startGossipCoalescer() { + tick := s.gossipBuf.interval / 4 + if tick <= 0 { + tick = s.gossipBuf.interval + } + + s.wg.Go(func() { + ticker := time.NewTicker(tick) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.flushGossipEntries(s.gossipBuf.takeDue(s.now())) + case <-s.quit: + s.flushGossipEntries(s.gossipBuf.takeAll()) + return + } + } + }) +} + +func (s *Service) flushGossipEntries(entries []*pendingGossip) { + if len(entries) > 0 { + s.metrics.GossipCoalescedFlushes.Add(float64(len(entries))) + } + s.setCoalesceBufferGauge() + + for _, e := range entries { + ctx, cancel := context.WithTimeout(context.Background(), messageTimeout) + if err := s.broadcastNow(ctx, e.addressee, e.addresses()...); err != nil { + s.logger.Debug("hive: coalesced gossip flush failed", "addressee", e.addressee, "error", err) + } + cancel() + } +} + +func (s *Service) setCoalesceBufferGauge() { + s.metrics.GossipCoalesceBufferSize.Set(float64(s.gossipBuf.pendingAddressees())) +} + func (s *Service) startCheckPeersHandler() { ctx, cancel := context.WithCancel(context.Background()) s.wg.Go(func() { diff --git a/pkg/hive/hive_test.go b/pkg/hive/hive_test.go index d1f28ed32da..2c811c01725 100644 --- a/pkg/hive/hive_test.go +++ b/pkg/hive/hive_test.go @@ -304,6 +304,9 @@ func TestBroadcastPeers(t *testing.T) { if err := client.BroadcastPeers(context.Background(), tc.addresee, tc.peers...); err != nil { t.Fatal(err) } + if len(tc.peers) < hive.CoalesceThreshold { + client.FlushGossipBufferForTest() + } testutil.CleanupCloser(t, client) // get a record for this stream @@ -1396,6 +1399,7 @@ func TestBroadcastPeersGossipDedup(t *testing.T) { if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { t.Fatal(err) } + client.FlushGossipBufferForTest() records, err := recorder.Records(serverAddress, "hive", "2.0.0", "peers") if err != nil { @@ -1409,6 +1413,7 @@ func TestBroadcastPeersGossipDedup(t *testing.T) { if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { t.Fatal(err) } + client.FlushGossipBufferForTest() records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") if err != nil { t.Fatal(err) @@ -1423,6 +1428,7 @@ func TestBroadcastPeersGossipDedup(t *testing.T) { if err := client.BroadcastPeers(ctx, serverAddress, gossipedOverlay); err != nil { t.Fatal(err) } + client.FlushGossipBufferForTest() records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") if err != nil { t.Fatal(err) @@ -1431,3 +1437,117 @@ func TestBroadcastPeersGossipDedup(t *testing.T) { t.Fatalf("after disconnect got %d gossip messages, want %d", got, want) } } + +func TestBroadcastPeersCoalesce(t *testing.T) { + t.Parallel() + + logger := log.Noop + statestore := mock.NewStateStore() + addressbook := ab.New(statestore) + networkID := uint64(1) + + overlays := make([]swarm.Address, 3) + for i := range overlays { + underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/" + strconv.Itoa(2000+i)) + if err != nil { + t.Fatal(err) + } + pk, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(pk) + overlay, err := crypto.NewOverlayAddress(pk.PublicKey, networkID, nonce) + if err != nil { + t.Fatal(err) + } + bzzAddr, err := bzz.NewAddress(signer, []ma.Multiaddr{underlay}, overlay, networkID, nonce, 1, common.Address{}) + if err != nil { + t.Fatal(err) + } + if err := addressbook.Put(bzzAddr.Overlay, *bzzAddr, true); err != nil { + t.Fatal(err) + } + overlays[i] = bzzAddr.Overlay + } + + streamer := streamtest.New() + serverAddress := swarm.RandAddress(t) + server := hive.New(streamer, ab.New(mock.NewStateStore()), networkID, serverAddress, logger, hive.Options{AllowPrivateCIDRs: true}) + testutil.CleanupCloser(t, server) + + recorder := streamtest.New(streamtest.WithProtocols(server.Protocol())) + clientAddress := swarm.RandAddress(t) + client := hive.New(recorder, addressbook, networkID, clientAddress, logger, hive.Options{AllowPrivateCIDRs: true}) + testutil.CleanupCloser(t, client) + + ctx := context.Background() + for _, overlay := range overlays { + if err := client.BroadcastPeers(ctx, serverAddress, overlay); err != nil { + t.Fatal(err) + } + } + + records, err := recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil && !errors.Is(err, streamtest.ErrRecordsNotFound) { + t.Fatal(err) + } + if got := len(records); got != 0 { + t.Fatalf("before flush got %d gossip messages, want 0", got) + } + + client.FlushGossipBufferForTest() + + records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil { + t.Fatal(err) + } + if got, want := len(records), 1; got != want { + t.Fatalf("after flush got %d gossip messages, want %d", got, want) + } + + messages, err := readAndAssertPeersMsgs(records[0].In(), 1) + if err != nil { + t.Fatal(err) + } + if got, want := len(messages[0].Peers), len(overlays); got != want { + t.Fatalf("coalesced peer count: got %d, want %d", got, want) + } + + // Batched gossip is sent immediately without coalescing (use fresh peers). + batchedOverlays := make([]swarm.Address, 2) + for i := range batchedOverlays { + underlay, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/" + strconv.Itoa(3000+i)) + if err != nil { + t.Fatal(err) + } + pk, err := crypto.GenerateSecp256k1Key() + if err != nil { + t.Fatal(err) + } + signer := crypto.NewDefaultSigner(pk) + overlay, err := crypto.NewOverlayAddress(pk.PublicKey, networkID, nonce) + if err != nil { + t.Fatal(err) + } + bzzAddr, err := bzz.NewAddress(signer, []ma.Multiaddr{underlay}, overlay, networkID, nonce, 1, common.Address{}) + if err != nil { + t.Fatal(err) + } + if err := addressbook.Put(bzzAddr.Overlay, *bzzAddr, true); err != nil { + t.Fatal(err) + } + batchedOverlays[i] = bzzAddr.Overlay + } + + if err := client.BroadcastPeers(ctx, serverAddress, batchedOverlays...); err != nil { + t.Fatal(err) + } + records, err = recorder.Records(serverAddress, "hive", "2.0.0", "peers") + if err != nil { + t.Fatal(err) + } + if got, want := len(records), 2; got != want { + t.Fatalf("after batched broadcast got %d gossip messages, want %d", got, want) + } +} diff --git a/pkg/hive/metrics.go b/pkg/hive/metrics.go index fa555fc67a2..263de93224a 100644 --- a/pkg/hive/metrics.go +++ b/pkg/hive/metrics.go @@ -33,6 +33,9 @@ type metrics struct { LegacyRecordSkipped prometheus.Counter GossipDedupSkipped prometheus.Counter + + GossipCoalescedFlushes prometheus.Counter + GossipCoalesceBufferSize prometheus.Gauge } func newMetrics() metrics { @@ -144,6 +147,18 @@ func newMetrics() metrics { Name: "gossip_dedup_skipped_total", Help: "Number of peer gossip entries suppressed by the outbound dedup cache.", }), + GossipCoalescedFlushes: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "gossip_coalesced_flushes_total", + Help: "Number of coalesced outbound gossip flushes dispatched.", + }), + GossipCoalesceBufferSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "gossip_coalesce_buffer_size", + Help: "Number of addressees with outbound gossip buffered awaiting coalesced flush.", + }), ChequebookVerification: prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: m.Namespace, From 646bdb598707dde7c35944024ce2cc6dfd5d1b73 Mon Sep 17 00:00:00 2001 From: sbackend Date: Fri, 26 Jun 2026 13:15:42 +0200 Subject: [PATCH 3/3] feat: peer distribution with gradient --- pkg/topology/kademlia/export_test.go | 1 + pkg/topology/kademlia/fanout_quotas_test.go | 127 ++++++++++++++++++++ pkg/topology/kademlia/kademlia.go | 110 ++++++++++++++++- 3 files changed, 237 insertions(+), 1 deletion(-) create mode 100644 pkg/topology/kademlia/fanout_quotas_test.go diff --git a/pkg/topology/kademlia/export_test.go b/pkg/topology/kademlia/export_test.go index 4d4587b7b0a..3fadc877a1e 100644 --- a/pkg/topology/kademlia/export_test.go +++ b/pkg/topology/kademlia/export_test.go @@ -16,6 +16,7 @@ var ( return k.pruneOversaturatedBins } GenerateCommonBinPrefixes = generateCommonBinPrefixes + FanoutQuotas = fanoutQuotas ) const ( diff --git a/pkg/topology/kademlia/fanout_quotas_test.go b/pkg/topology/kademlia/fanout_quotas_test.go new file mode 100644 index 00000000000..d76eabd5f77 --- /dev/null +++ b/pkg/topology/kademlia/fanout_quotas_test.go @@ -0,0 +1,127 @@ +// Copyright 2026 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package kademlia + +import ( + "testing" + + "github.com/ethersphere/bee/v2/pkg/swarm" +) + +func sumQuotas(quotas [swarm.MaxBins]int, hi uint8) int { + total := 0 + for bin := uint8(0); bin < hi; bin++ { + total += quotas[bin] + } + return total +} + +func TestFanoutQuotasEmpty(t *testing.T) { + t.Parallel() + + quotas := fanoutQuotas([swarm.MaxBins]int{}, 12, 2) + if sum := sumQuotas(quotas, 12); sum != 0 { + t.Fatalf("want zero budget on empty bins, got %d", sum) + } +} + +func TestFanoutQuotasSkipsEmptyBins(t *testing.T) { + t.Parallel() + + var counts [swarm.MaxBins]int + counts[0] = 5 + counts[4] = 5 + counts[8] = 5 + + const perBin = 2 + quotas := fanoutQuotas(counts, 12, perBin) + + active := 3 + wantBudget := perBin * active + if got := sumQuotas(quotas, 12); got != wantBudget { + t.Fatalf("budget: got %d, want %d", got, wantBudget) + } + for bin, c := range counts { + if c == 0 && quotas[bin] != 0 { + t.Fatalf("empty bin %d got quota %d", bin, quotas[bin]) + } + if c > 0 && quotas[bin] == 0 { + t.Fatalf("non-empty bin %d got zero quota", bin) + } + } +} + +func TestFanoutQuotasBandwidthNeutral(t *testing.T) { + t.Parallel() + + const ( + hi = uint8(12) + perBin = 2 + peers = 10 + ) + + var counts [swarm.MaxBins]int + for bin := uint8(0); bin < hi; bin++ { + counts[bin] = peers + } + + quotas := fanoutQuotas(counts, hi, perBin) + + active := int(hi) + wantBudget := perBin * active + if got := sumQuotas(quotas, hi); got != wantBudget { + t.Fatalf("budget: got %d, want %d (bandwidth-neutral)", got, wantBudget) + } +} + +func TestFanoutQuotasGradientShape(t *testing.T) { + t.Parallel() + + const ( + hi = uint8(12) + perBin = 2 + peers = 10 + ) + + var counts [swarm.MaxBins]int + for bin := uint8(0); bin < hi; bin++ { + counts[bin] = peers + } + + quotas := fanoutQuotas(counts, hi, perBin) + + // Closer shallow bins (higher index) should receive at least as many recipients + // as more distant ones when peer counts are equal. + for bin := uint8(1); bin < hi; bin++ { + if quotas[bin] < quotas[bin-1] { + t.Fatalf("quota not non-decreasing: bin %d=%d, bin %d=%d", bin-1, quotas[bin-1], bin, quotas[bin]) + } + } + + // Documented example: bins 0-5 get 1, bins 6-11 get 3. + for bin := uint8(0); bin < 6; bin++ { + if quotas[bin] != 1 { + t.Fatalf("bin %d: got quota %d, want 1", bin, quotas[bin]) + } + } + for bin := uint8(6); bin < hi; bin++ { + if quotas[bin] != 3 { + t.Fatalf("bin %d: got quota %d, want 3", bin, quotas[bin]) + } + } +} + +func TestFanoutQuotasCappedByPeerCount(t *testing.T) { + t.Parallel() + + var counts [swarm.MaxBins]int + counts[11] = 1 // only one peer in the closest shallow bin + + quotas := fanoutQuotas(counts, 12, 10) + + if quotas[11] != 1 { + t.Fatalf("bin 11: got quota %d, want 1 (capped by peer count)", quotas[11]) + } +} diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 57c4c41a950..3c806b9332d 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -58,6 +58,10 @@ const ( defaultTimeToRetry = 2 * defaultShortRetry defaultPruneWakeup = 5 * time.Minute defaultBroadcastBinSize = 2 + // defaultGradientBroadcast enables gradient-weighted fan-out across the + // shallow (distant) bins: the same per-Announce gossip budget is shifted + // toward bins closer to the neighborhood instead of being spread flatly. + defaultGradientBroadcast = true ) var ( @@ -98,6 +102,7 @@ type Options struct { OverSaturationPeers *int BootnodeOverSaturationPeers *int BroadcastBinSize *int + GradientBroadcast *bool LowWaterMark *int } @@ -119,6 +124,7 @@ type kadOptions struct { OverSaturationPeers int BootnodeOverSaturationPeers int BroadcastBinSize int + GradientBroadcast bool LowWaterMark int } @@ -140,6 +146,7 @@ func newKadOptions(o Options) kadOptions { OverSaturationPeers: defaultValInt(o.OverSaturationPeers, defaultOverSaturationPeers), BootnodeOverSaturationPeers: defaultValInt(o.BootnodeOverSaturationPeers, defaultBootNodeOverSaturationPeers), BroadcastBinSize: defaultValInt(o.BroadcastBinSize, defaultBroadcastBinSize), + GradientBroadcast: defaultValBool(o.GradientBroadcast, defaultGradientBroadcast), LowWaterMark: defaultValInt(o.LowWaterMark, defaultLowWaterMark), } @@ -164,6 +171,13 @@ func defaultValDuration(v *time.Duration, d time.Duration) time.Duration { return *v } +func defaultValBool(v *bool, d bool) bool { + if v == nil { + return d + } + return *v +} + func makeSaturationFunc(o kadOptions) binSaturationFunc { os := o.OverSaturationPeers if o.BootnodeMode { @@ -1042,6 +1056,29 @@ func (k *Kad) Announce(ctx context.Context, peer swarm.Address, fullnode bool) e depth := k.neighborhoodDepth() isNeighbor := swarm.Proximity(peer.Bytes(), k.base.Bytes()) >= depth + // Shallow (distant) bins are gossiped to a capped, gradient-weighted subset. + // When the peer is a neighbor, bins [depth, MaxBins) keep full gossip; + // otherwise every bin is shallow. The number of shallow bins is dynamic (it + // tracks storageRadius), so the fan-out budget is derived in the moment from + // BroadcastBinSize and the count of non-empty shallow bins. + shallowHi := swarm.MaxBins + if isNeighbor { + shallowHi = depth + } + + var ( + shallowPeers [swarm.MaxBins][]swarm.Address + counts [swarm.MaxBins]int + quotas [swarm.MaxBins]int + ) + for bin := uint8(0); bin < shallowHi; bin++ { + shallowPeers[bin] = k.binPeers(bin, true) + counts[bin] = len(shallowPeers[bin]) + } + if k.opt.GradientBroadcast { + quotas = fanoutQuotas(counts, shallowHi, k.opt.BroadcastBinSize) + } + outer: for bin := range swarm.MaxBins { @@ -1053,7 +1090,11 @@ outer: if bin >= depth && isNeighbor { connectedPeers = k.binPeers(bin, false) // broadcast all neighborhood peers } else { - connectedPeers, err = randomSubset(k.binPeers(bin, true), k.opt.BroadcastBinSize) + count := k.opt.BroadcastBinSize + if k.opt.GradientBroadcast { + count = quotas[bin] + } + connectedPeers, err = randomSubset(shallowPeers[bin], count) if err != nil { return err } @@ -1598,6 +1639,73 @@ func (k *Kad) Close() error { return err } +// fanoutQuotas distributes a gossip fan-out budget across the shallow (distant) +// bins [0, hi), giving bins closer to the neighborhood (higher bin index, i.e. +// nearer storageRadius) a larger share. +// +// The budget is derived in the moment as perBin * , so it tracks the dynamic number of distant bins (which follows +// storageRadius) and stays bandwidth-neutral versus the flat per-bin selection: +// the same total recipients, redistributed toward closer bins. Every non-empty +// bin keeps a coverage floor of one recipient, each quota is capped by the +// bin's connected-peer count, and the total stays within budget (it may exceed +// budget only via the floor, which cannot happen while perBin >= 1). The +// returned array is indexed by bin. +func fanoutQuotas(counts [swarm.MaxBins]int, hi uint8, perBin int) [swarm.MaxBins]int { + var quotas [swarm.MaxBins]int + + totalWeight := 0 + active := 0 + for bin := uint8(0); bin < hi; bin++ { + if counts[bin] == 0 { + continue + } + totalWeight += int(bin) + 1 + quotas[bin] = 1 // coverage floor for every non-empty shallow bin + active++ + } + if active == 0 { + return quotas + } + + budget := perBin * active + used := active // floors already handed out + + // Share the budget left after the floors, weighted toward closer bins. + if remaining := budget - used; remaining > 0 { + for bin := uint8(0); bin < hi; bin++ { + if counts[bin] == 0 { + continue + } + extra := remaining * (int(bin) + 1) / totalWeight // proportional floor + if room := counts[bin] - quotas[bin]; extra > room { + extra = room + } + quotas[bin] += extra + used += extra + } + // Hand out the rounding leftover, closest bin first. + for leftover := budget - used; leftover > 0; { + progressed := false + for bin := int(hi) - 1; bin >= 0; bin-- { + if leftover == 0 { + break + } + if counts[bin] > 0 && quotas[bin] < counts[bin] { + quotas[bin]++ + leftover-- + progressed = true + } + } + if !progressed { + break // every non-empty bin is capped at its peer count + } + } + } + + return quotas +} + func randomSubset(addrs []swarm.Address, count int) ([]swarm.Address, error) { if count >= len(addrs) { return addrs, nil