Skip to content

Commit 8cfd40e

Browse files
committed
feat/pubsub
1 parent 7c5d616 commit 8cfd40e

8 files changed

Lines changed: 898 additions & 2 deletions

File tree

pkg/api/api.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/ethersphere/bee/v2/pkg/postage"
4343
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
4444
"github.com/ethersphere/bee/v2/pkg/pss"
45+
"github.com/ethersphere/bee/v2/pkg/pubsub"
4546
"github.com/ethersphere/bee/v2/pkg/resolver"
4647
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
4748
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
@@ -95,6 +96,9 @@ const (
9596
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
9697
SwarmActPublisherHeader = "Swarm-Act-Publisher"
9798
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"
99+
SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer"
100+
SwarmPubsubGsocPublicKeyHeader = "Swarm-Pubsub-Gsoc-Public-Key"
101+
SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic"
98102

99103
ImmutableHeader = "Immutable"
100104
GasPriceHeader = "Gas-Price"
@@ -189,6 +193,7 @@ type Service struct {
189193
topologyDriver topology.Driver
190194
p2p p2p.DebugService
191195
l2p2p layer2.IP2pService
196+
pubsubSvc *pubsub.Service
192197
accounting accounting.Interface
193198
chequebook chequebook.Service
194199
pseudosettle settlement.Interface
@@ -273,6 +278,7 @@ type ExtraOptions struct {
273278
NodeStatus *status.Service
274279
PinIntegrity PinIntegrity
275280
Layer2P2p layer2.IP2pService
281+
PubsubService *pubsub.Service
276282
}
277283

278284
func New(
@@ -365,6 +371,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
365371
s.pseudosettle = e.Pseudosettle
366372
s.blockTime = e.BlockTime
367373
s.l2p2p = e.Layer2P2p
374+
s.pubsubSvc = e.PubsubService
368375

369376
s.statusSem = semaphore.NewWeighted(1)
370377
s.postageSem = semaphore.NewWeighted(1)
@@ -593,6 +600,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
593600
SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader,
594601
SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
595602
SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader,
603+
SwarmPubsubPeerHeader, SwarmPubsubGsocPublicKeyHeader, SwarmPubsubGsocTopicHeader,
596604
}
597605
allowedHeadersStr := strings.Join(allowedHeaders, ", ")
598606

pkg/api/pubsub.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright 2026 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package api
6+
7+
import (
8+
"context"
9+
"encoding/hex"
10+
"net/http"
11+
"time"
12+
13+
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
14+
"github.com/ethersphere/bee/v2/pkg/pubsub"
15+
"github.com/ethersphere/bee/v2/pkg/swarm"
16+
"github.com/gorilla/mux"
17+
"github.com/gorilla/websocket"
18+
ma "github.com/multiformats/go-multiaddr"
19+
)
20+
21+
func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
22+
logger := s.logger.WithName("pubsub").Build()
23+
24+
paths := struct {
25+
Topic string `map:"topic" validate:"required"`
26+
}{}
27+
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
28+
response("invalid path params", logger, w)
29+
return
30+
}
31+
32+
var topicAddr [32]byte
33+
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
34+
copy(topicAddr[:], decoded)
35+
} else {
36+
h := swarm.NewHasher()
37+
_, _ = h.Write([]byte(paths.Topic))
38+
copy(topicAddr[:], h.Sum(nil))
39+
}
40+
41+
// Required header: underlay multiaddr
42+
peerHeader := r.Header.Get(SwarmPubsubPeerHeader)
43+
if peerHeader == "" {
44+
jsonhttp.BadRequest(w, "missing Swarm-Pubsub-Peer header")
45+
return
46+
}
47+
underlay, err := ma.NewMultiaddr(peerHeader)
48+
if err != nil {
49+
logger.Debug("invalid peer multiaddr", "value", peerHeader, "error", err)
50+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Peer header")
51+
return
52+
}
53+
54+
// Optional headers: GSOC fields for Participant upgrade
55+
var connectOpts pubsub.ConnectOptions
56+
57+
gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader)
58+
gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader)
59+
if gsocPubKeyHex != "" && gsocTopicHex != "" {
60+
gsocOwner, err := hex.DecodeString(gsocPubKeyHex)
61+
if err != nil {
62+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Public-Key header")
63+
return
64+
}
65+
gsocID, err := hex.DecodeString(gsocTopicHex)
66+
if err != nil {
67+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Topic header")
68+
return
69+
}
70+
connectOpts.GsocOwner = gsocOwner
71+
connectOpts.GsocID = gsocID
72+
connectOpts.ReadWrite = true
73+
}
74+
75+
headers := struct {
76+
KeepAlive time.Duration `map:"Swarm-Keep-Alive"`
77+
}{}
78+
if response := s.mapStructure(r.Header, &headers); response != nil {
79+
response("invalid header params", logger, w)
80+
return
81+
}
82+
83+
if s.beeMode == DevMode {
84+
logger.Warning("pubsub endpoint is disabled in dev mode")
85+
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
86+
return
87+
}
88+
89+
// Connect to broker peer
90+
ctx, cancel := context.WithCancel(context.Background())
91+
subscriberConn, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
92+
if err != nil {
93+
cancel()
94+
logger.Debug("pubsub connect failed", "error", err)
95+
jsonhttp.InternalServerError(w, "pubsub connect failed")
96+
return
97+
}
98+
99+
// Upgrade to WebSocket
100+
upgrader := websocket.Upgrader{
101+
ReadBufferSize: swarm.ChunkWithSpanSize,
102+
WriteBufferSize: swarm.ChunkWithSpanSize,
103+
CheckOrigin: s.checkOrigin,
104+
}
105+
106+
conn, err := upgrader.Upgrade(w, r, nil)
107+
if err != nil {
108+
cancel()
109+
_ = subscriberConn.Stream.Close()
110+
logger.Debug("websocket upgrade failed", "error", err)
111+
logger.Error(nil, "websocket upgrade failed")
112+
jsonhttp.InternalServerError(w, "upgrade failed")
113+
return
114+
}
115+
116+
pingPeriod := headers.KeepAlive * time.Second
117+
if pingPeriod == 0 {
118+
pingPeriod = time.Minute
119+
}
120+
121+
isParticipant := connectOpts.ReadWrite
122+
123+
s.wsWg.Add(1)
124+
go func() {
125+
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isParticipant)
126+
_ = conn.Close()
127+
subscriberConn.Cancel()
128+
s.wsWg.Done()
129+
}()
130+
}
131+
132+
func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
133+
if s.pubsubSvc == nil {
134+
jsonhttp.NotFound(w, "pubsub service not available")
135+
return
136+
}
137+
138+
topics := s.pubsubSvc.Topics()
139+
jsonhttp.OK(w, struct {
140+
Topics []pubsub.TopicInfo `json:"topics"`
141+
}{
142+
Topics: topics,
143+
})
144+
}

pkg/api/router.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,16 @@ func (s *Service) mountAPI() {
368368
web.FinalHandlerFunc(s.layer2WsHandler),
369369
))
370370

371+
handle("/pubsub/{topic}", web.ChainHandlers(
372+
web.FinalHandlerFunc(s.pubsubWsHandler),
373+
))
374+
375+
handle("/pubsub/", web.ChainHandlers(
376+
web.FinalHandler(jsonhttp.MethodHandler{
377+
"GET": http.HandlerFunc(s.pubsubListHandler),
378+
}),
379+
))
380+
371381
handle("/pss/subscribe/{topic}", web.ChainHandlers(
372382
web.FinalHandlerFunc(s.pssWsHandler),
373383
))

pkg/node/node.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"github.com/ethersphere/bee/v2/pkg/pricer"
5050
"github.com/ethersphere/bee/v2/pkg/pricing"
5151
"github.com/ethersphere/bee/v2/pkg/pss"
52+
"github.com/ethersphere/bee/v2/pkg/pubsub"
5253
"github.com/ethersphere/bee/v2/pkg/puller"
5354
"github.com/ethersphere/bee/v2/pkg/pullsync"
5455
"github.com/ethersphere/bee/v2/pkg/pusher"
@@ -186,6 +187,8 @@ type Options struct {
186187
WarmupTime time.Duration
187188
WelcomeMessage string
188189
WhitelistedWithdrawalAddress []string
190+
PubsubBrokerMode bool
191+
PubsubMaxConnections int
189192
}
190193

191194
const (
@@ -651,6 +654,7 @@ func NewBee(
651654
Nonce: nonce,
652655
ValidateOverlay: chainEnabled,
653656
Registry: registry,
657+
PubsubReservedStreamSlots: o.PubsubMaxConnections,
654658
})
655659
if err != nil {
656660
return nil, fmt.Errorf("p2p service: %w", err)
@@ -718,6 +722,11 @@ func NewBee(
718722

719723
l2P2p := layer2.NewP2pService(p2ps, logger)
720724

725+
pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode, o.PubsubMaxConnections)
726+
if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil {
727+
return nil, fmt.Errorf("pubsub protocol: %w", err)
728+
}
729+
721730
// Construct protocols.
722731
pingPong := pingpong.New(p2ps, logger, tracer)
723732

@@ -1245,6 +1254,7 @@ func NewBee(
12451254
NodeStatus: nodeStatus,
12461255
PinIntegrity: localStore.PinIntegrity(),
12471256
Layer2P2p: &l2P2p,
1257+
PubsubService: pubsubSvc,
12481258
}
12491259

12501260
if o.APIAddr != "" {

pkg/p2p/libp2p/libp2p.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type Options struct {
149149
HeadersRWTimeout time.Duration
150150
Registry *prometheus.Registry
151151
autoTLSCertManager autoTLSCertManager
152+
PubsubReservedStreamSlots int
152153
}
153154

154155
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, logger log.Logger, tracer *tracing.Tracer, o Options) (s *Service, returnErr error) {
@@ -209,11 +210,16 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
209210
}
210211

211212
// Tweak certain settings
213+
inboundLimit := rcmgr.LimitVal(IncomingStreamCountLimit - o.PubsubReservedStreamSlots)
214+
if inboundLimit < 0 {
215+
inboundLimit = 0
216+
}
217+
212218
cfg := rcmgr.PartialLimitConfig{
213219
System: rcmgr.ResourceLimits{
214-
Streams: IncomingStreamCountLimit + OutgoingStreamCountLimit,
220+
Streams: inboundLimit + OutgoingStreamCountLimit,
215221
StreamsOutbound: OutgoingStreamCountLimit,
216-
StreamsInbound: IncomingStreamCountLimit,
222+
StreamsInbound: inboundLimit,
217223
},
218224
}
219225

0 commit comments

Comments
 (0)