Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ctxc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Cortex struct {
protocolManager *ProtocolManager

discmix *enode.FairMix
dropper *dropper

// DB interfaces
chainDb ctxcdb.Database // Block chain database
Expand Down Expand Up @@ -275,6 +276,7 @@ func New(stack *node.Node, config *Config) (*Cortex, error) {

ctxc.miner = miner.New(ctxc, &config.Miner, ctxc.chainConfig, ctxc.eventMux, ctxc.engine, ctxc.isLocalBlock)
ctxc.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
ctxc.dropper = newDropper(ctxc.p2pServer.MaxDialedConns(), ctxc.p2pServer.MaxInboundConns())

ctxc.APIBackend = &CortexAPIBackend{stack.Config().AllowUnprotectedTxs, ctxc, nil}
if ctxc.APIBackend.allowUnprotectedTxs {
Expand Down Expand Up @@ -626,6 +628,9 @@ func (s *Cortex) Start(srvr *p2p.Server) error {
// Start the networking layer and the light server if requested
s.protocolManager.Start(maxPeers)

// Start the connection manager
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })

// start log indexer
// s.filterMaps.Start()
// go s.updateFilterMapsHeads()
Expand Down Expand Up @@ -718,6 +723,7 @@ func (s *Cortex) Stop() error {
s.synapse.Close()
}
s.discmix.Close()
s.dropper.Stop()
s.protocolManager.Stop()
// Then stop everything else.

Expand Down
167 changes: 167 additions & 0 deletions ctxc/dropper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package ctxc

import (
mrand "math/rand"
"slices"
"sync"
"time"

"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/CortexTheseus/metrics"
"github.com/CortexFoundation/CortexTheseus/p2p"
)

const (
// Interval between peer drop events (uniform between min and max)
peerDropIntervalMin = 3 * time.Minute
// Interval between peer drop events (uniform between min and max)
peerDropIntervalMax = 7 * time.Minute
// Avoid dropping peers for some time after connection
doNotDropBefore = 10 * time.Minute
// How close to max should we initiate the drop timer. O should be fine,
// dropping when no more peers can be added. Larger numbers result in more
// aggressive drop behavior.
peerDropThreshold = 0
)

var (
// droppedInbound is the number of inbound peers dropped
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil)
// droppedOutbound is the number of outbound peers dropped
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil)
)

// dropper monitors the state of the peer pool and makes changes as follows:
// - during sync the Downloader handles peer connections, so dropper is disabled
// - if not syncing and the peer count is close to the limit, it drops peers
// randomly every peerDropInterval to make space for new peers
// - peers are dropped separately from the inboud pool and from the dialed pool
type dropper struct {
maxDialPeers int // maximum number of dialed peers
maxInboundPeers int // maximum number of inbound peers
peersFunc getPeersFunc
syncingFunc getSyncingFunc

// peerDropTimer introduces churn if we are close to limit capacity.
// We handle Dialed and Inbound connections separately
peerDropTimer *time.Timer

wg sync.WaitGroup // wg for graceful shutdown
shutdownCh chan struct{}
}

// Callback type to get the list of connected peers.
type getPeersFunc func() []*p2p.Peer

// Callback type to get syncing status.
// Returns true while syncing, false when synced.
type getSyncingFunc func() bool

func newDropper(maxDialPeers, maxInboundPeers int) *dropper {
cm := &dropper{
maxDialPeers: maxDialPeers,
maxInboundPeers: maxInboundPeers,
peerDropTimer: time.NewTimer(randomDuration(peerDropIntervalMin, peerDropIntervalMax)),
shutdownCh: make(chan struct{}),
}
if peerDropIntervalMin > peerDropIntervalMax {
panic("peerDropIntervalMin duration must be less than or equal to peerDropIntervalMax duration")
}
return cm
}

// Start the dropper.
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
cm.peersFunc = srv.Peers
cm.syncingFunc = syncingFunc
cm.wg.Add(1)
go cm.loop()
}

// Stop the dropper.
func (cm *dropper) Stop() {
cm.peerDropTimer.Stop()
close(cm.shutdownCh)
cm.wg.Wait()
}

// dropRandomPeer selects one of the peers randomly and drops it from the peer pool.
func (cm *dropper) dropRandomPeer() bool {
peers := cm.peersFunc()
var numInbound int
for _, p := range peers {
if p.Inbound() {
numInbound++
}
}
numDialed := len(peers) - numInbound

selectDoNotDrop := func(p *p2p.Peer) bool {
// Avoid dropping trusted and static peers, or recent peers.
// Only drop peers if their respective category (dialed/inbound)
// is close to limit capacity.
return p.Trusted() || p.StaticDialed() ||
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold)
}

droppable := slices.DeleteFunc(peers, selectDoNotDrop)
if len(droppable) > 0 {
p := droppable[mrand.Intn(len(droppable))]
log.Debug("Dropping random peer", "inbound", p.Inbound(),
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
p.Disconnect(p2p.DiscUselessPeer)
if p.Inbound() {
droppedInbound.Mark(1)
} else {
droppedOutbound.Mark(1)
}
return true
}
return false
}

// randomDuration generates a random duration between min and max.
func randomDuration(min, max time.Duration) time.Duration {
if min > max {
panic("min duration must be less than or equal to max duration")
}
return time.Duration(mrand.Int63n(int64(max-min)) + int64(min))
}

// loop is the main loop of the connection dropper.
func (cm *dropper) loop() {
defer cm.wg.Done()

for {
select {
case <-cm.peerDropTimer.C:
// Drop a random peer if we are not syncing and the peer count is close to the limit.
if !cm.syncingFunc() {
cm.dropRandomPeer()
}
cm.peerDropTimer.Reset(randomDuration(peerDropIntervalMin, peerDropIntervalMax))
case <-cm.shutdownCh:
return
}
}
}
26 changes: 25 additions & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,35 @@ func (p *Peer) String() string {
return fmt.Sprintf("Peer %x %v", id[:8], p.RemoteAddr())
}

// Inbound returns true if the peer is an inbound connection
// Inbound returns true if the peer is an inbound (not dialed) connection.
func (p *Peer) Inbound() bool {
return p.rw.is(inboundConn)
}

// Trusted returns true if the peer is configured as trusted.
// Trusted peers are accepted in above the MaxInboundConns limit.
// The peer can be either inbound or dialed.
func (p *Peer) Trusted() bool {
return p.rw.is(trustedConn)
}

// DynDialed returns true if the peer was dialed successfully (passed handshake) and
// it is not configured as static.
func (p *Peer) DynDialed() bool {
return p.rw.is(dynDialedConn)
}

// StaticDialed returns true if the peer was dialed successfully (passed handshake) and
// it is configured as static.
func (p *Peer) StaticDialed() bool {
return p.rw.is(staticDialedConn)
}

// Lifetime returns the time since peer creation.
func (p *Peer) Lifetime() mclock.AbsTime {
return mclock.Now() - p.created
}

func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer {
protomap := matchProtocols(protocols, conn.caps, conn)
p := &Peer{
Expand Down
10 changes: 5 additions & 5 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (srv *Server) setupDiscovery() error {
func (srv *Server) setupDialScheduler() {
config := dialConfig{
self: srv.localnode.ID(),
maxDialPeers: srv.maxDialedConns(),
maxDialPeers: srv.MaxDialedConns(),
maxActiveDials: srv.MaxPendingPeers,
log: srv.Logger,
netRestrict: srv.NetRestrict,
Expand All @@ -527,11 +527,11 @@ func (srv *Server) setupDialScheduler() {
}
}

func (srv *Server) maxInboundConns() int {
return srv.MaxPeers - srv.maxDialedConns()
func (srv *Server) MaxInboundConns() int {
return srv.MaxPeers - srv.MaxDialedConns()
}

func (srv *Server) maxDialedConns() (limit int) {
func (srv *Server) MaxDialedConns() (limit int) {
if srv.NoDial || srv.MaxPeers == 0 {
return 0
}
Expand Down Expand Up @@ -736,7 +736,7 @@ func (srv *Server) postHandshakeChecks(peers map[enode.ID]*Peer, inboundCount in
switch {
case !c.is(trustedConn) && len(peers) >= srv.MaxPeers:
return DiscTooManyPeers
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.maxInboundConns():
case !c.is(trustedConn) && c.is(inboundConn) && inboundCount >= srv.MaxInboundConns():
return DiscTooManyPeers
case peers[c.node.ID()] != nil:
return DiscAlreadyConnected
Expand Down