Skip to content

Commit 8de8fac

Browse files
committed
feat/pubsub
1 parent f53887f commit 8de8fac

8 files changed

Lines changed: 952 additions & 2 deletions

File tree

pkg/api/api.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"github.com/ethersphere/bee/v2/pkg/postage"
4242
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
4343
"github.com/ethersphere/bee/v2/pkg/pss"
44+
"github.com/ethersphere/bee/v2/pkg/pubsub"
4445
"github.com/ethersphere/bee/v2/pkg/resolver"
4546
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
4647
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
@@ -94,6 +95,9 @@ const (
9495
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
9596
SwarmActPublisherHeader = "Swarm-Act-Publisher"
9697
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"
98+
SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer"
99+
SwarmPubsubGsocPublicKeyHeader = "Swarm-Pubsub-Gsoc-Public-Key"
100+
SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic"
97101

98102
ImmutableHeader = "Immutable"
99103
GasPriceHeader = "Gas-Price"
@@ -187,6 +191,7 @@ type Service struct {
187191

188192
topologyDriver topology.Driver
189193
p2p p2p.DebugService
194+
pubsubSvc *pubsub.Service
190195
accounting accounting.Interface
191196
chequebook chequebook.Service
192197
pseudosettle settlement.Interface
@@ -270,6 +275,7 @@ type ExtraOptions struct {
270275
SyncStatus func() (bool, error)
271276
NodeStatus *status.Service
272277
PinIntegrity PinIntegrity
278+
PubsubService *pubsub.Service
273279
}
274280

275281
func New(
@@ -361,6 +367,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
361367
s.lightNodes = e.LightNodes
362368
s.pseudosettle = e.Pseudosettle
363369
s.blockTime = e.BlockTime
370+
s.pubsubSvc = e.PubsubService
364371

365372
s.statusSem = semaphore.NewWeighted(1)
366373
s.postageSem = semaphore.NewWeighted(1)
@@ -589,6 +596,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
589596
SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader,
590597
SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
591598
SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader,
599+
SwarmPubsubPeerHeader, SwarmPubsubGsocPublicKeyHeader, SwarmPubsubGsocTopicHeader,
592600
}
593601
allowedHeadersStr := strings.Join(allowedHeaders, ", ")
594602

pkg/api/pubsub.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Copyright 2026 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package api
6+
7+
import (
8+
"context"
9+
"encoding/hex"
10+
"net/http"
11+
"time"
12+
13+
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
14+
"github.com/ethersphere/bee/v2/pkg/pubsub"
15+
"github.com/ethersphere/bee/v2/pkg/swarm"
16+
"github.com/gorilla/mux"
17+
"github.com/gorilla/websocket"
18+
ma "github.com/multiformats/go-multiaddr"
19+
)
20+
21+
func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
22+
logger := s.logger.WithName("pubsub").Build()
23+
24+
paths := struct {
25+
Topic string `map:"topic" validate:"required"`
26+
}{}
27+
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
28+
response("invalid path params", logger, w)
29+
return
30+
}
31+
32+
var topicAddr [32]byte
33+
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
34+
copy(topicAddr[:], decoded)
35+
} else {
36+
h := swarm.NewHasher()
37+
_, _ = h.Write([]byte(paths.Topic))
38+
copy(topicAddr[:], h.Sum(nil))
39+
}
40+
41+
// Required header: underlay multiaddr
42+
peerHeader := r.Header.Get(SwarmPubsubPeerHeader)
43+
if peerHeader == "" {
44+
jsonhttp.BadRequest(w, "missing Swarm-Pubsub-Peer header")
45+
return
46+
}
47+
underlay, err := ma.NewMultiaddr(peerHeader)
48+
if err != nil {
49+
logger.Debug("invalid peer multiaddr", "value", peerHeader, "error", err)
50+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Peer header")
51+
return
52+
}
53+
54+
// Optional headers: GSOC fields for Participant upgrade
55+
var connectOpts pubsub.ConnectOptions
56+
57+
gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader)
58+
gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader)
59+
if gsocPubKeyHex != "" && gsocTopicHex != "" {
60+
gsocOwner, err := hex.DecodeString(gsocPubKeyHex)
61+
if err != nil {
62+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Public-Key header")
63+
return
64+
}
65+
gsocID, err := hex.DecodeString(gsocTopicHex)
66+
if err != nil {
67+
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Topic header")
68+
return
69+
}
70+
connectOpts.GsocOwner = gsocOwner
71+
connectOpts.GsocID = gsocID
72+
connectOpts.ReadWrite = true
73+
}
74+
75+
headers := struct {
76+
KeepAlive time.Duration `map:"Swarm-Keep-Alive"`
77+
}{}
78+
if response := s.mapStructure(r.Header, &headers); response != nil {
79+
response("invalid header params", logger, w)
80+
return
81+
}
82+
83+
if s.beeMode == DevMode {
84+
logger.Warning("pubsub endpoint is disabled in dev mode")
85+
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
86+
return
87+
}
88+
89+
// Connect to broker peer
90+
ctx, cancel := context.WithCancel(context.Background())
91+
subscriberConn, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
92+
if err != nil {
93+
cancel()
94+
logger.Debug("pubsub connect failed", "error", err)
95+
jsonhttp.InternalServerError(w, "pubsub connect failed")
96+
return
97+
}
98+
99+
// Upgrade to WebSocket
100+
upgrader := websocket.Upgrader{
101+
ReadBufferSize: swarm.ChunkWithSpanSize,
102+
WriteBufferSize: swarm.ChunkWithSpanSize,
103+
CheckOrigin: s.checkOrigin,
104+
}
105+
106+
conn, err := upgrader.Upgrade(w, r, nil)
107+
if err != nil {
108+
cancel()
109+
_ = subscriberConn.Stream.Close()
110+
logger.Debug("websocket upgrade failed", "error", err)
111+
logger.Error(nil, "websocket upgrade failed")
112+
jsonhttp.InternalServerError(w, "upgrade failed")
113+
return
114+
}
115+
116+
pingPeriod := headers.KeepAlive * time.Second
117+
if pingPeriod == 0 {
118+
pingPeriod = time.Minute
119+
}
120+
121+
isParticipant := connectOpts.ReadWrite
122+
123+
s.wsWg.Add(1)
124+
go func() {
125+
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isParticipant)
126+
_ = conn.Close()
127+
subscriberConn.Cancel()
128+
s.wsWg.Done()
129+
}()
130+
}
131+
132+
func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
133+
if s.pubsubSvc == nil {
134+
jsonhttp.NotFound(w, "pubsub service not available")
135+
return
136+
}
137+
138+
topics := s.pubsubSvc.Topics()
139+
jsonhttp.OK(w, struct {
140+
Topics []pubsub.TopicInfo `json:"topics"`
141+
}{
142+
Topics: topics,
143+
})
144+
}

pkg/api/router.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,70 @@ func (s *Service) mountAPI() {
364364
),
365365
})
366366

367+
handle("/pubsub/{topic}", web.ChainHandlers(
368+
web.FinalHandlerFunc(s.pubsubWsHandler),
369+
))
370+
371+
handle("/pubsub/", web.ChainHandlers(
372+
web.FinalHandler(jsonhttp.MethodHandler{
373+
"GET": http.HandlerFunc(s.pubsubListHandler),
374+
}),
375+
))
376+
377+
handle("/pss/subscribe/{topic}", web.ChainHandlers(
378+
web.FinalHandlerFunc(s.pssWsHandler),
379+
))
380+
381+
handle("/tags", web.ChainHandlers(
382+
web.FinalHandler(jsonhttp.MethodHandler{
383+
"GET": http.HandlerFunc(s.listTagsHandler),
384+
"POST": web.ChainHandlers(
385+
jsonhttp.NewMaxBodyBytesHandler(1024),
386+
web.FinalHandlerFunc(s.createTagHandler),
387+
),
388+
})),
389+
)
390+
391+
handle("/tags/{id}", web.ChainHandlers(
392+
web.FinalHandler(jsonhttp.MethodHandler{
393+
"GET": http.HandlerFunc(s.getTagHandler),
394+
"DELETE": http.HandlerFunc(s.deleteTagHandler),
395+
"PATCH": web.ChainHandlers(
396+
jsonhttp.NewMaxBodyBytesHandler(1024),
397+
web.FinalHandlerFunc(s.doneSplitHandler),
398+
),
399+
})),
400+
)
401+
402+
handle("/pins", web.ChainHandlers(
403+
web.FinalHandler(jsonhttp.MethodHandler{
404+
"GET": http.HandlerFunc(s.listPinnedRootHashes),
405+
})),
406+
)
407+
408+
handle("/pins/check", web.ChainHandlers(
409+
web.FinalHandler(jsonhttp.MethodHandler{
410+
"GET": http.HandlerFunc(s.pinIntegrityHandler),
411+
}),
412+
))
413+
414+
handle("/pins/{reference}", web.ChainHandlers(
415+
web.FinalHandler(jsonhttp.MethodHandler{
416+
"GET": http.HandlerFunc(s.getPinnedRootHash),
417+
"POST": http.HandlerFunc(s.pinRootHash),
418+
"DELETE": http.HandlerFunc(s.unpinRootHash),
419+
})),
420+
)
421+
422+
handle("/stewardship/{address}", jsonhttp.MethodHandler{
423+
"GET": web.ChainHandlers(
424+
web.FinalHandlerFunc(s.stewardshipGetHandler),
425+
),
426+
"PUT": web.ChainHandlers(
427+
web.FinalHandlerFunc(s.stewardshipPutHandler),
428+
),
429+
})
430+
367431
handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler))
368432

369433
handle("/gsoc/subscribe/{address}", web.ChainHandlers(

pkg/node/node.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import (
4848
"github.com/ethersphere/bee/v2/pkg/pricer"
4949
"github.com/ethersphere/bee/v2/pkg/pricing"
5050
"github.com/ethersphere/bee/v2/pkg/pss"
51+
"github.com/ethersphere/bee/v2/pkg/pubsub"
5152
"github.com/ethersphere/bee/v2/pkg/puller"
5253
"github.com/ethersphere/bee/v2/pkg/pullsync"
5354
"github.com/ethersphere/bee/v2/pkg/pusher"
@@ -192,6 +193,8 @@ type Options struct {
192193
WarmupTime time.Duration
193194
WelcomeMessage string
194195
WhitelistedWithdrawalAddress []string
196+
PubsubBrokerMode bool
197+
PubsubMaxConnections int
195198
}
196199

197200
const (
@@ -665,6 +668,7 @@ func NewBee(
665668
Nonce: nonce,
666669
ValidateOverlay: chainEnabled,
667670
Registry: registry,
671+
PubsubReservedStreamSlots: o.PubsubMaxConnections,
668672
})
669673
if err != nil {
670674
return nil, fmt.Errorf("p2p service: %w", err)
@@ -737,6 +741,11 @@ func NewBee(
737741
return nil, fmt.Errorf("init batch service: %w", err)
738742
}
739743

744+
pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode, o.PubsubMaxConnections)
745+
if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil {
746+
return nil, fmt.Errorf("pubsub protocol: %w", err)
747+
}
748+
740749
// Construct protocols.
741750
pingPong := pingpong.New(p2ps, logger, tracer)
742751

@@ -1266,6 +1275,7 @@ func NewBee(
12661275
SyncStatus: syncStatusFn,
12671276
NodeStatus: nodeStatus,
12681277
PinIntegrity: localStore.PinIntegrity(),
1278+
PubsubService: pubsubSvc,
12691279
}
12701280

12711281
if o.APIAddr != "" {

pkg/p2p/libp2p/libp2p.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type Options struct {
149149
HeadersRWTimeout time.Duration
150150
Registry *prometheus.Registry
151151
autoTLSCertManager autoTLSCertManager
152+
PubsubReservedStreamSlots int
152153
}
153154

154155
func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, logger log.Logger, tracer *tracing.Tracer, o Options) (s *Service, returnErr error) {
@@ -209,11 +210,16 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
209210
}
210211

211212
// Tweak certain settings
213+
inboundLimit := rcmgr.LimitVal(IncomingStreamCountLimit - o.PubsubReservedStreamSlots)
214+
if inboundLimit < 0 {
215+
inboundLimit = 0
216+
}
217+
212218
cfg := rcmgr.PartialLimitConfig{
213219
System: rcmgr.ResourceLimits{
214-
Streams: IncomingStreamCountLimit + OutgoingStreamCountLimit,
220+
Streams: inboundLimit + OutgoingStreamCountLimit,
215221
StreamsOutbound: OutgoingStreamCountLimit,
216-
StreamsInbound: IncomingStreamCountLimit,
222+
StreamsInbound: inboundLimit,
217223
},
218224
}
219225

0 commit comments

Comments
 (0)