From a95b5d80cb7ca82dc7d9b3a824d2178ee2bc3b35 Mon Sep 17 00:00:00 2001 From: Sun Hyuk Ahn Date: Wed, 13 Aug 2025 13:35:43 -0700 Subject: [PATCH 1/2] feat(discovery): add custom DHT namespace and optional rendezvous bootstrap From 7cb87ce20586787ab7c5538a70fb6e867cfa0ead Mon Sep 17 00:00:00 2001 From: Sun Hyuk Ahn Date: Wed, 13 Aug 2025 15:57:27 -0700 Subject: [PATCH 2/2] feat: harmony and legacy kad protocol support --- cmd/bootnode/main.go | 1 + cmd/harmony/main.go | 1 + p2p/discovery/discovery.go | 101 ++++++++++++++++++--- p2p/host.go | 32 ++++++- p2p/stream/protocols/sync/protocol_test.go | 4 +- 5 files changed, 121 insertions(+), 18 deletions(-) diff --git a/cmd/bootnode/main.go b/cmd/bootnode/main.go index 89779a236c..67f25e0aa9 100644 --- a/cmd/bootnode/main.go +++ b/cmd/bootnode/main.go @@ -176,6 +176,7 @@ func main() { DialTimeout: time.Minute, Muxer: *muxer, NoRelay: *noRelay, + IsBootNode: true, }) if err != nil { utils.FatalErrMsg(err, "cannot initialize network") diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 7b4c84b5a9..e4ef477802 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -613,6 +613,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, DialTimeout: hc.P2P.DialTimeout, Muxer: hc.P2P.Muxer, NoRelay: hc.P2P.NoRelay, + IsBootNode: false, }) if err != nil { return nil, errors.Wrap(err, "cannot create P2P network host") diff --git a/p2p/discovery/discovery.go b/p2p/discovery/discovery.go index 5271ec9d91..f4f86d03d5 100644 --- a/p2p/discovery/discovery.go +++ b/p2p/discovery/discovery.go @@ -2,10 +2,10 @@ package discovery import ( "context" + "sync" "time" "github.com/harmony-one/harmony/internal/utils" - dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/discovery" libp2p_host "github.com/libp2p/go-libp2p/core/host" @@ -21,14 +21,16 @@ type Discovery interface { Close() error Advertise(ctx context.Context, ns string) (time.Duration, error) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) - GetRawDiscovery() discovery.Discovery + GetRawDiscovery() []discovery.Discovery + // todo(sun): revert in phase 2 + // GetRawDiscovery() discovery.Discovery } // dhtDiscovery is a wrapper of libp2p dht discovery service. It implements Discovery // interface. type dhtDiscovery struct { - dht *libp2p_dht.IpfsDHT - disc discovery.Discovery + dht []*libp2p_dht.IpfsDHT + disc []discovery.Discovery host libp2p_host.Host opt DHTConfig @@ -38,11 +40,15 @@ type dhtDiscovery struct { } // NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface. -func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, dht *dht.IpfsDHT, opt DHTConfig) (Discovery, error) { - d := libp2p_dis.NewRoutingDiscovery(dht) +func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, opt DHTConfig, dhts ...*libp2p_dht.IpfsDHT) (Discovery, error) { + var d []discovery.Discovery + for _, dht := range dhts { + d = append(d, libp2p_dis.NewRoutingDiscovery(dht)) + } + logger := utils.Logger().With().Str("module", "discovery").Logger() return &dhtDiscovery{ - dht: dht, + dht: dhts, disc: d, host: host, opt: opt, @@ -54,28 +60,97 @@ func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p // Start bootstrap the dht discovery service. func (d *dhtDiscovery) Start() error { - return d.dht.Bootstrap(d.ctx) + for _, dht := range d.dht { + if err := dht.Bootstrap(d.ctx); err != nil { + return err + } + } + return nil + + // todo(sun): revert in phase 2 + // return d.dht.Bootstrap(d.ctx) } // Stop stop the dhtDiscovery service func (d *dhtDiscovery) Close() error { - d.dht.Close() + for _, dht := range d.dht { + if err := dht.Close(); err != nil { + return err + } + } d.cancel() return nil + + // todo(sun): revert in phase 2 + // d.dht.Close() + // d.cancel() + // return nil } // Advertise advertises a service func (d *dhtDiscovery) Advertise(ctx context.Context, ns string) (time.Duration, error) { - return d.disc.Advertise(ctx, ns) + var lastDur time.Duration + var lastErr error + for _, disc := range d.disc { + lastDur, lastErr = disc.Advertise(ctx, ns) + if lastErr != nil { + break + } + } + return lastDur, lastErr + + // todo(sun): revert in phase 2 + // return d.disc.Advertise(ctx, ns) } // FindPeers discovers peers providing a service func (d *dhtDiscovery) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) { - opt := discovery.Limit(peerLimit) - return d.disc.FindPeers(ctx, ns, opt) + mergedChan := make(chan libp2p_peer.AddrInfo) + var wg sync.WaitGroup + limitOpt := discovery.Limit(peerLimit) + + // loop through each discovery instance (harmony and legacy, in bootnode's case) + for _, disc := range d.disc { + wg.Add(1) + + // launch a goroutine for each DHT query + go func(disc discovery.Discovery) { + defer wg.Done() + peerChan, err := disc.FindPeers(ctx, ns, limitOpt) + if err != nil { + d.logger.Error().Err(err).Msg("Discovery failed in one of the DHTs") + return + } + + // read peers from the current DHT chan and forward to the merged chan + for peer := range peerChan { + select { + case mergedChan <- peer: + case <-ctx.Done(): + return + } + } + }(disc) + } + + // close the merged chan onceboth DHT queries are completed + go func() { + wg.Wait() + close(mergedChan) + }() + + // immediately return merged chan + return mergedChan, nil + + // todo(sun): revert in phase 2 + // opt := discovery.Limit(peerLimit) + // return d.disc.FindPeers(ctx, ns, opt) } // GetRawDiscovery get the raw discovery to be used for libp2p pubsub options -func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery { +// todo(sun): libp2p pubsub option only accepts a single discover.Discovery as option +// todo(sun): revert in phase 2 +// func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery { +func (d *dhtDiscovery) GetRawDiscovery() []discovery.Discovery { return d.disc } diff --git a/p2p/host.go b/p2p/host.go index 728afb9264..43ed2a3aa8 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -101,6 +101,8 @@ const ( MaxMessageHandlers = SetAsideForConsensus + SetAsideOtherwise // MaxMessageSize is 2Mb MaxMessageSize = 1 << 21 + // Harmony specific kad-dht protocol + HarmonyKadPrefix = "/harmony/kad/1.0.0" ) // HostConfig is the config structure to create a new host @@ -127,6 +129,7 @@ type HostConfig struct { DialTimeout time.Duration Muxer string NoRelay bool + IsBootNode bool } func init() { @@ -150,6 +153,7 @@ func NewHost(cfg HostConfig) (Host, error) { key = cfg.BLSKey pub = cfg.BLSKey.GetPublic() dataStorePath = cfg.DataStoreFile + isBootNode = cfg.IsBootNode ) pubKey := key.GetPublic() @@ -344,12 +348,32 @@ func NewHost(cfg HostConfig) (Host, error) { cancel() return nil, errors.Wrapf(err, "initialize libp2p raw options failed") } - idht, errDHT := dht.New(ctx, p2pHost, opts...) - if errDHT != nil { + + // todo(sun): for now, bootnodes should support both ipfs and harmony kad protocol + // todo(sun): all other nodes should only support harmony kad protocol + // bootnode supports both ipfs and harmony kad protocol during transition period + dhts := []*dht.IpfsDHT{} + if isBootNode { + utils.Logger().Info().Msg("Bootnode is configured to support the legacy IPFS KAD protocol for transition.") + legacyOpts := append(opts, dht.ProtocolPrefix(protocol.ID("/ipfs/kad/1.0.0"))) + legacyDht, err := dht.New(ctx, p2pHost, legacyOpts...) + if err != nil { + cancel() + return nil, errors.Wrapf(err, "cannot initialize legacy libp2p DHT") + } + dhts = append(dhts, legacyDht) + } + + // all other nodes should only support harmony kad protocol + harmonyOpts := append(opts, dht.ProtocolPrefix(protocol.ID(HarmonyKadPrefix))) + harmonyDht, err := dht.New(ctx, p2pHost, harmonyOpts...) + if err != nil { cancel() - return nil, errors.Wrapf(errDHT, "cannot initialize libp2p DHT") + return nil, errors.Wrapf(err, "cannot initialize harmony libp2p DHT") } - disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, idht, opt) + dhts = append(dhts, harmonyDht) + + disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, opt, dhts...) if err != nil { cancel() p2pHost.Close() diff --git a/p2p/stream/protocols/sync/protocol_test.go b/p2p/stream/protocols/sync/protocol_test.go index d6b440d099..016508e53b 100644 --- a/p2p/stream/protocols/sync/protocol_test.go +++ b/p2p/stream/protocols/sync/protocol_test.go @@ -107,6 +107,8 @@ func (disc *testDiscovery) FindPeers(ctx context.Context, ns string, peerLimit i return nil, nil } -func (disc *testDiscovery) GetRawDiscovery() discovery.Discovery { +// todo(sun): revert in phase 2 +// func (disc *testDiscovery) GetRawDiscovery() discovery.Discovery { +func (disc *testDiscovery) GetRawDiscovery() []discovery.Discovery { return nil }