Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/hive/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ import (
"github.com/ethersphere/bee/v2/pkg/hive/pb"
)

var MaxBatchSize = maxBatchSize
var LimitBurst = limitBurst
var (
MaxBatchSize = maxBatchSize
LimitBurst = limitBurst
CoalesceThreshold = coalesceThreshold
)

func (s *Service) SetTimeFunc(f func() time.Time) {
s.now = f
}

// FlushGossipBufferForTest drains the outbound gossip coalesce buffer synchronously.
func (s *Service) FlushGossipBufferForTest() {
s.flushGossipEntries(s.gossipBuf.takeAll())
}

// CheckAndAddPeers exposes the internal ingestion path for tests,
// bypassing the stream and rate limiter.
func (s *Service) CheckAndAddPeers(peers pb.Peers) {
Expand Down
121 changes: 121 additions & 0 deletions pkg/hive/gossip_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hive

import (
"math/rand/v2"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

const (
defaultGossipCoalesceInterval = time.Second
// coalesceThreshold: gossips with fewer peers are buffered; larger
// (already-batched) messages are dispatched immediately.
coalesceThreshold = 2
)

// gossipBuffer accumulates single-peer outbound gossip per addressee so it can be
// flushed as one batched message.
type gossipBuffer struct {
mu sync.Mutex
pending map[string]*pendingGossip // addressee bytestring -> buffered peers
interval time.Duration
jitter time.Duration
maxBatch int
}

type pendingGossip struct {
addressee swarm.Address
peers map[string]swarm.Address // peer bytestring -> address (set semantics)
deadline time.Time
}

func newGossipBuffer(interval time.Duration, maxBatch int) *gossipBuffer {
if interval == 0 {
interval = defaultGossipCoalesceInterval
}
return &gossipBuffer{
pending: make(map[string]*pendingGossip),
interval: interval,
jitter: interval / 4,
maxBatch: maxBatch,
}
}

// add buffers peers for the addressee. If the buffer reaches maxBatch it is
// removed and returned so the caller can flush it immediately; otherwise nil.
func (b *gossipBuffer) add(now time.Time, addressee swarm.Address, peers ...swarm.Address) *pendingGossip {
b.mu.Lock()
defer b.mu.Unlock()

key := addressee.ByteString()
e, ok := b.pending[key]
if !ok {
e = &pendingGossip{
addressee: addressee,
peers: make(map[string]swarm.Address),
deadline: now.Add(b.interval + time.Duration(rand.Int64N(int64(b.jitter)))),
}
b.pending[key] = e
}
for _, p := range peers {
e.peers[p.ByteString()] = p
}

if len(e.peers) >= b.maxBatch {
delete(b.pending, key)
return e
}
return nil
}

// takeDue removes and returns all entries whose deadline has passed.
func (b *gossipBuffer) takeDue(now time.Time) []*pendingGossip {
return b.take(func(e *pendingGossip) bool { return !e.deadline.After(now) })
}

// takeAll removes and returns everything (used for the shutdown drain).
func (b *gossipBuffer) takeAll() []*pendingGossip {
return b.take(func(*pendingGossip) bool { return true })
}

func (b *gossipBuffer) clearAddressee(addressee swarm.Address) {
b.mu.Lock()
defer b.mu.Unlock()

delete(b.pending, addressee.ByteString())
}

func (b *gossipBuffer) pendingAddressees() int {
b.mu.Lock()
defer b.mu.Unlock()

return len(b.pending)
}

func (b *gossipBuffer) take(match func(*pendingGossip) bool) []*pendingGossip {
b.mu.Lock()
defer b.mu.Unlock()

var out []*pendingGossip
for key, e := range b.pending {
if match(e) {
out = append(out, e)
delete(b.pending, key)
}
}
return out
}

func (e *pendingGossip) addresses() []swarm.Address {
out := make([]swarm.Address, 0, len(e.peers))
for _, p := range e.peers {
out = append(out, p)
}
return out
}
80 changes: 80 additions & 0 deletions pkg/hive/gossip_buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hive

import (
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

func TestGossipBufferAddAndDue(t *testing.T) {
t.Parallel()

const interval = 100 * time.Millisecond

b := newGossipBuffer(interval, maxBatchSize)
addressee := swarm.RandAddress(t)
peer1 := swarm.RandAddress(t)
peer2 := swarm.RandAddress(t)

now := time.Now()
if full := b.add(now, addressee, peer1); full != nil {
t.Fatal("unexpected immediate flush")
}

if due := b.takeDue(now); len(due) != 0 {
t.Fatalf("want no due entries, got %d", len(due))
}

if full := b.add(now, addressee, peer2); full != nil {
t.Fatal("unexpected immediate flush")
}

afterDeadline := now.Add(interval + interval/4 + time.Millisecond)
due := b.takeDue(afterDeadline)
if len(due) != 1 {
t.Fatalf("want 1 due entry, got %d", len(due))
}
if got := len(due[0].addresses()); got != 2 {
t.Fatalf("want 2 coalesced peers, got %d", got)
}
}

func TestGossipBufferMaxBatchFlush(t *testing.T) {
t.Parallel()

b := newGossipBuffer(time.Second, 2)
addressee := swarm.RandAddress(t)
now := time.Now()

b.add(now, addressee, swarm.RandAddress(t))
full := b.add(now, addressee, swarm.RandAddress(t))
if full == nil {
t.Fatal("want immediate flush at maxBatch")
}
if got := len(full.addresses()); got != 2 {
t.Fatalf("want 2 peers in full batch, got %d", got)
}
if due := b.takeDue(now.Add(time.Second)); len(due) != 0 {
t.Fatalf("want empty buffer after maxBatch flush, got %d due", len(due))
}
}

func TestGossipBufferClearAddressee(t *testing.T) {
t.Parallel()

b := newGossipBuffer(time.Second, maxBatchSize)
addressee := swarm.RandAddress(t)
now := time.Now()

b.add(now, addressee, swarm.RandAddress(t))
b.clearAddressee(addressee)

if all := b.takeAll(); len(all) != 0 {
t.Fatalf("want buffer cleared on disconnect, got %d entries", len(all))
}
}
130 changes: 130 additions & 0 deletions pkg/hive/gossip_dedup_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package hive

import (
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/swarm"
)

const (
defaultGossipDedupTTL = time.Minute
defaultGossipDedupPruneInterval = time.Minute
)

// gossipDedupCache suppresses repeated outbound gossip of the same peer to the same
// addressee within a TTL window.
type gossipDedupCache struct {
mu sync.Mutex
seen map[string]map[string]int64 // addressee -> gossiped peer -> expiry unix nano
ttl time.Duration
quit chan struct{}
wg sync.WaitGroup
}

func newGossipDedupCache(ttl, pruneInterval time.Duration) *gossipDedupCache {
if ttl == 0 {
ttl = defaultGossipDedupTTL
}

if pruneInterval == 0 {
pruneInterval = defaultGossipDedupPruneInterval
}

d := &gossipDedupCache{
seen: make(map[string]map[string]int64),
ttl: ttl,
quit: make(chan struct{}),
}

d.wg.Go(func() {
ticker := time.NewTicker(pruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
d.prune()
case <-d.quit:
return
}
}
})

return d
}

func (d *gossipDedupCache) contains(addressee, peer swarm.Address) bool {
d.mu.Lock()
defer d.mu.Unlock()

peers, ok := d.seen[addressee.ByteString()]
if !ok {
return false
}

exp, ok := peers[peer.ByteString()]
if !ok {
return false
}

return exp > time.Now().UnixNano()
}

func (d *gossipDedupCache) add(addressee swarm.Address, peers ...swarm.Address) {
if len(peers) == 0 {
return
}

d.mu.Lock()
defer d.mu.Unlock()

exp := time.Now().Add(d.ttl).UnixNano()
key := addressee.ByteString()

m, ok := d.seen[key]
if !ok {
m = make(map[string]int64)
d.seen[key] = m
}

for _, p := range peers {
m[p.ByteString()] = exp
}
}

func (d *gossipDedupCache) clearAddressee(addressee swarm.Address) {
d.mu.Lock()
defer d.mu.Unlock()

delete(d.seen, addressee.ByteString())
}

func (d *gossipDedupCache) prune() {
d.mu.Lock()
defer d.mu.Unlock()

now := time.Now().UnixNano()
for addressee, peers := range d.seen {
for peer, exp := range peers {
if exp <= now {
delete(peers, peer)
}
}
if len(peers) == 0 {
delete(d.seen, addressee)
}
}
}

func (d *gossipDedupCache) close() {
close(d.quit)
d.wg.Wait()

d.mu.Lock()
clear(d.seen)
d.mu.Unlock()
}
Loading
Loading