diff --git a/.changelog/6270.internal.md b/.changelog/6270.internal.md new file mode 100644 index 00000000000..ab56351f25e --- /dev/null +++ b/.changelog/6270.internal.md @@ -0,0 +1,7 @@ +go/p2p: Ensure only server providers advertise themselves + +Previously, the host would advertise itself upon creation +of p2p protocol client, even if the server was not running. + +Advertisement is now independent and is only triggered +when serving the P2P protocol. diff --git a/go/p2p/api/api.go b/go/p2p/api/api.go index 0ce37633493..654aa744b1f 100644 --- a/go/p2p/api/api.go +++ b/go/p2p/api/api.go @@ -68,6 +68,11 @@ type Service interface { Publish(ctx context.Context, topic string, msg any) // RegisterHandler registers a message handler for the specified runtime and topic kind. + // + // In addition, it triggers advertisement of the host's readiness to serve the specified topic, + // allowing remote peers without existing connection to discover it. + // + // Ensure your server is indeed ready, to avoid advertising prematurely. RegisterHandler(topic string, handler Handler) // BlockPeer blocks a specific peer from being used by the local node. @@ -83,6 +88,11 @@ type Service interface { RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int) // RegisterProtocolServer registers a protocol server for the given protocol. + // + // In addition, it triggers advertisement of the host's readiness to serve the specified protocol, + // allowing remote peers without existing connection to discover it. + // + // Ensure your server is indeed ready, to avoid advertising prematurely. RegisterProtocolServer(srv rpc.Server) // GetMinRepublishInterval returns the minimum republish interval that needs to be respected by diff --git a/go/p2p/p2p.go b/go/p2p/p2p.go index 2433000cc0e..c74b243989f 100644 --- a/go/p2p/p2p.go +++ b/go/p2p/p2p.go @@ -305,7 +305,8 @@ func (p *p2p) RegisterHandler(topic string, handler api.Handler) { "topic", topic, ) - p.peerMgr.RegisterTopic(topic, minTopicPeers, totalTopicPeers) + p.peerMgr.TrackTopicPeers(topic, minTopicPeers, totalTopicPeers) + p.peerMgr.AdvertiseTopic(topic) } // Implements api.Service. @@ -321,7 +322,7 @@ func (p *p2p) BlockPeer(peerID core.PeerID) { // Implements api.Service. func (p *p2p) RegisterProtocol(pid core.ProtocolID, minPeers int, totalPeers int) { - p.peerMgr.RegisterProtocol(pid, minPeers, totalPeers) + p.peerMgr.TrackProtocolPeers(pid, minPeers, totalPeers) } // Implements api.Service. @@ -340,6 +341,8 @@ func (p *p2p) RegisterProtocolServer(srv rpc.Server) { p.host.SetStreamHandler(srv.Protocol(), srv.HandleStream) + p.peerMgr.AdvertiseProtocol(srv.Protocol()) + p.logger.Info("registered protocol server", "protocol_id", srv.Protocol(), ) diff --git a/go/p2p/peermgmt/discovery.go b/go/p2p/peermgmt/discovery.go index 257e8dc25d7..881d724044c 100644 --- a/go/p2p/peermgmt/discovery.go +++ b/go/p2p/peermgmt/discovery.go @@ -100,6 +100,10 @@ func (d *peerDiscovery) startAdvertising(ns string) { case d.advCh <- struct{}{}: default: } + + d.logger.Debug("triggered protocol advertisement", + "protocol", ns, + ) } // stopAdvertising stops advertising the given namespace. diff --git a/go/p2p/peermgmt/peermgr.go b/go/p2p/peermgmt/peermgr.go index b17ccf8a767..a9f18f824f0 100644 --- a/go/p2p/peermgmt/peermgr.go +++ b/go/p2p/peermgmt/peermgr.go @@ -168,9 +168,9 @@ func (m *PeerManager) NumTopicPeers(topic string) int { return len(m.pubsub.ListPeers(topic)) } -// RegisterProtocol starts tracking and managing peers that support the given protocol. +// TrackProtocolPeers starts tracking and managing peers that support the given protocol. // If the protocol is already registered, its values are updated. -func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPeers int) { +func (m *PeerManager) TrackProtocolPeers(p core.ProtocolID, minPeers int, totalPeers int) { m.mu.Lock() defer m.mu.Unlock() @@ -188,8 +188,6 @@ func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPee } m.protocols[p] = &watermark{minPeers, totalPeers} - m.discovery.startAdvertising(string(p)) - m.logger.Debug("protocol registered", "protocol", p, "min_peers", minPeers, @@ -197,9 +195,9 @@ func (m *PeerManager) RegisterProtocol(p core.ProtocolID, minPeers int, totalPee ) } -// RegisterTopic starts tracking and managing peers that support the given topic. +// TrackTopicPeers starts tracking and managing peers that support the given topic. // If the topic is already registered, its values are updated. -func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) { +func (m *PeerManager) TrackTopicPeers(topic string, minPeers int, totalPeers int) { m.mu.Lock() defer m.mu.Unlock() @@ -217,7 +215,6 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) } m.topics[topic] = &watermark{minPeers, totalPeers} - m.discovery.startAdvertising(topic) m.logger.Debug("topic registered", "topic", topic, @@ -226,9 +223,23 @@ func (m *PeerManager) RegisterTopic(topic string, minPeers int, totalPeers int) ) } -// UnregisterProtocol stops managing peers that support the given protocol. +// AdvertiseProtocol starts advertising readiness to serve the specified protocol. +// +// This enables remote peers without existing connection to find the host node. +func (m *PeerManager) AdvertiseProtocol(p core.ProtocolID) { + m.discovery.startAdvertising(string(p)) +} + +// AdvertiseTopic starts advertising readiness to serve the specified topic. +// +// This enables remote peers without existing connection to find the host node. +func (m *PeerManager) AdvertiseTopic(topic string) { + m.discovery.startAdvertising(topic) +} + +// StopTrackingProtocolPeers stops managing peers that support the given protocol. // If the protocol is not registered, this is a noop operation. -func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) { +func (m *PeerManager) StopTrackingProtocolPeers(p core.ProtocolID) { m.mu.Lock() defer m.mu.Unlock() @@ -237,16 +248,15 @@ func (m *PeerManager) UnregisterProtocol(p core.ProtocolID) { } delete(m.protocols, p) - m.discovery.stopAdvertising(string(p)) m.logger.Debug("protocol unregistered", "protocol", p, ) } -// UnregisterTopic stops managing peers that support the given topic. +// StopTrackingTopicPeers stops managing peers that support the given topic. // If the topic is not registered, this is a noop operation. -func (m *PeerManager) UnregisterTopic(topic string) { +func (m *PeerManager) StopTrackingTopicPeers(topic string) { m.mu.Lock() defer m.mu.Unlock() @@ -255,13 +265,22 @@ func (m *PeerManager) UnregisterTopic(topic string) { } delete(m.topics, topic) - m.discovery.stopAdvertising(topic) m.logger.Debug("topic unregistered", "topic", topic, ) } +// StopAdvertisingProtocol stops advertising readiness to serve the specified protocol. +func (m *PeerManager) StopAdvertisingProtocol(p core.ProtocolID) { + m.discovery.stopAdvertising(string(p)) +} + +// StopAdvertisingTopic stops advertising readiness to serve the specified protocol. +func (m *PeerManager) StopAdvertisingTopic(topic string) { + m.discovery.stopAdvertising(topic) +} + func (m *PeerManager) run(ctx context.Context) { // Start background services. m.backup.start() diff --git a/go/p2p/peermgmt/peermgr_test.go b/go/p2p/peermgmt/peermgr_test.go index d7aea190a64..e1a3955fb35 100644 --- a/go/p2p/peermgmt/peermgr_test.go +++ b/go/p2p/peermgmt/peermgr_test.go @@ -126,7 +126,7 @@ func (s *PeerManagerTestSuite) TestRegisterProtocol() { for i := 0; i < 3; i++ { p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i)) - s.manager.RegisterProtocol(p, 1, 10) + s.manager.TrackProtocolPeers(p, 1, 10) require.Equal(i+1, len(s.manager.Protocols())) } } @@ -136,7 +136,7 @@ func (s *PeerManagerTestSuite) TestRegisterTopic() { for i := 0; i < 3; i++ { t := fmt.Sprintf("topic %d", i) - s.manager.RegisterTopic(t, 1, 10) + s.manager.TrackTopicPeers(t, 1, 10) require.Equal(i+1, len(s.manager.Topics())) } } @@ -146,20 +146,20 @@ func (s *PeerManagerTestSuite) TestUnregisterProtocol() { for i := 0; i < 3; i++ { p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i)) - s.manager.RegisterProtocol(p, 1, 10) + s.manager.TrackProtocolPeers(p, 1, 10) require.Equal(i+1, len(s.manager.Protocols())) } - s.manager.UnregisterProtocol("404") + s.manager.StopTrackingProtocolPeers("404") require.Equal(3, len(s.manager.Protocols())) for i := 0; i < 3; i++ { p := core.ProtocolID(fmt.Sprintf("/protocol/test/%d.0.0", i)) - s.manager.UnregisterProtocol(p) + s.manager.StopTrackingProtocolPeers(p) require.Equal(2-i, len(s.manager.Protocols())) } - s.manager.UnregisterProtocol("404") + s.manager.StopTrackingProtocolPeers("404") require.Equal(0, len(s.manager.Protocols())) } @@ -168,19 +168,19 @@ func (s *PeerManagerTestSuite) TestUnregisterTopic() { for i := 0; i < 3; i++ { t := fmt.Sprintf("topic %d", i) - s.manager.RegisterTopic(t, 1, 10) + s.manager.TrackTopicPeers(t, 1, 10) require.Equal(i+1, len(s.manager.Topics())) } - s.manager.UnregisterTopic("404") + s.manager.StopTrackingTopicPeers("404") require.Equal(3, len(s.manager.Topics())) for i := 0; i < 3; i++ { - s.manager.UnregisterTopic(fmt.Sprintf("topic %d", i)) + s.manager.StopTrackingTopicPeers(fmt.Sprintf("topic %d", i)) require.Equal(2-i, len(s.manager.Topics())) } - s.manager.UnregisterTopic("404") + s.manager.StopTrackingTopicPeers("404") require.Equal(0, len(s.manager.Topics())) } diff --git a/go/worker/common/p2p/txsync/protocol.go b/go/worker/common/p2p/txsync/protocol.go index fbf9390b09c..e5aef20bdd7 100644 --- a/go/worker/common/p2p/txsync/protocol.go +++ b/go/worker/common/p2p/txsync/protocol.go @@ -3,6 +3,7 @@ package txsync import ( "github.com/libp2p/go-libp2p/core" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/version" @@ -16,6 +17,11 @@ const TxSyncProtocolID = "txsync" // TxSyncProtocolVersion is the supported version of the transaction sync protocol. var TxSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} +// ProtocolID returns the runtime transaction sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion) +} + // Constants related to the GetTxs method. const ( MethodGetTxs = "GetTxs" diff --git a/go/worker/common/p2p/txsync/server.go b/go/worker/common/p2p/txsync/server.go index 681e6a6e45c..4f8f0135a3e 100644 --- a/go/worker/common/p2p/txsync/server.go +++ b/go/worker/common/p2p/txsync/server.go @@ -5,7 +5,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" "github.com/oasisprotocol/oasis-core/go/runtime/txpool" ) @@ -52,5 +51,5 @@ func (s *service) handleGetTxs(request *GetTxsRequest) (*GetTxsResponse, error) // NewServer creates a new transaction sync protocol server. func NewServer(chainContext string, runtimeID common.Namespace, txPool txpool.TransactionPool) rpc.Server { - return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, TxSyncProtocolID, TxSyncProtocolVersion), &service{txPool}) + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{txPool}) } diff --git a/go/worker/keymanager/p2p/protocol.go b/go/worker/keymanager/p2p/protocol.go index 1eeb6a32af1..f7dfda7f407 100644 --- a/go/worker/keymanager/p2p/protocol.go +++ b/go/worker/keymanager/p2p/protocol.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/version" "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" @@ -18,6 +19,11 @@ const KeyManagerProtocolID = "keymanager" // KeyManagerProtocolVersion is the supported version of the keymanager protocol. var KeyManagerProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} +// ProtocolID returns the runtime keymanager protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion) +} + // Constants related to the CallEnclave method. const ( MethodCallEnclave = "CallEnclave" diff --git a/go/worker/keymanager/p2p/server.go b/go/worker/keymanager/p2p/server.go index 33ea1caff65..4b0d2775acf 100644 --- a/go/worker/keymanager/p2p/server.go +++ b/go/worker/keymanager/p2p/server.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" enclaverpc "github.com/oasisprotocol/oasis-core/go/runtime/enclaverpc/api" ) @@ -52,5 +51,5 @@ func (s *service) handleCallEnclave(ctx context.Context, request *CallEnclaveReq func NewServer(chainContext string, runtimeID common.Namespace, km KeyManager) rpc.Server { initMetrics() - return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, KeyManagerProtocolID, KeyManagerProtocolVersion), &service{km}) + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{km}) } diff --git a/go/worker/storage/p2p/pub/protocol.go b/go/worker/storage/p2p/pub/protocol.go index d3dba65e187..e3641b77b78 100644 --- a/go/worker/storage/p2p/pub/protocol.go +++ b/go/worker/storage/p2p/pub/protocol.go @@ -3,6 +3,7 @@ package pub import ( "github.com/libp2p/go-libp2p/core" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/version" "github.com/oasisprotocol/oasis-core/go/p2p/peermgmt" @@ -16,6 +17,11 @@ const StoragePubProtocolID = "storagepub" // StoragePubProtocolVersion is the supported version of the storage pub protocol. var StoragePubProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} +// ProtocolID returns the runtime storage pub protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion) +} + // Constants related to the Get method. const ( MethodGet = "Get" diff --git a/go/worker/storage/p2p/pub/server.go b/go/worker/storage/p2p/pub/server.go index 8a9e6ddc0e4..01ca1f70a54 100644 --- a/go/worker/storage/p2p/pub/server.go +++ b/go/worker/storage/p2p/pub/server.go @@ -5,7 +5,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" storage "github.com/oasisprotocol/oasis-core/go/storage/api" ) @@ -44,5 +43,5 @@ func (s *service) HandleRequest(ctx context.Context, method string, body cbor.Ra // NewServer creates a new storage pub protocol server. func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server { - return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, StoragePubProtocolID, StoragePubProtocolVersion), &service{backend}) + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) } diff --git a/go/worker/storage/p2p/sync/protocol.go b/go/worker/storage/p2p/sync/protocol.go index e59e22c04e3..7dca895e014 100644 --- a/go/worker/storage/p2p/sync/protocol.go +++ b/go/worker/storage/p2p/sync/protocol.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core" + "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/version" @@ -20,6 +21,11 @@ const StorageSyncProtocolID = "storagesync" // StorageSyncProtocolVersion is the supported version of the storage sync protocol. var StorageSyncProtocolVersion = version.Version{Major: 2, Minor: 0, Patch: 0} +// ProtocolID returns the runtime storage sync protocol ID. +func ProtocolID(chainContext string, runtimeID common.Namespace) core.ProtocolID { + return protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion) +} + // Constants related to the GetDiff method. const ( MethodGetDiff = "GetDiff" diff --git a/go/worker/storage/p2p/sync/server.go b/go/worker/storage/p2p/sync/server.go index a5e93c26fd5..22b6731465a 100644 --- a/go/worker/storage/p2p/sync/server.go +++ b/go/worker/storage/p2p/sync/server.go @@ -6,7 +6,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common" "github.com/oasisprotocol/oasis-core/go/common/cbor" - "github.com/oasisprotocol/oasis-core/go/p2p/protocol" "github.com/oasisprotocol/oasis-core/go/p2p/rpc" storage "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" @@ -105,5 +104,5 @@ func (s *service) handleGetCheckpointChunk(ctx context.Context, request *GetChec // NewServer creates a new storage sync protocol server. func NewServer(chainContext string, runtimeID common.Namespace, backend storage.Backend) rpc.Server { - return rpc.NewServer(protocol.NewRuntimeProtocolID(chainContext, runtimeID, StorageSyncProtocolID, StorageSyncProtocolVersion), &service{backend}) + return rpc.NewServer(ProtocolID(chainContext, runtimeID), &service{backend}) }