Skip to content

Commit a07a23d

Browse files
author
Marko Petzold
committed
router: pre-encode broker fan-out events once per (variant, serializer)
The broker's per-event fan-out previously had every per-session send goroutine re-encode the same EVENT message independently. With 200 subscribers on JSON that's 200x redundant codec.Encode + msgToList + allocation work per published event. Move encoding from N per-session goroutines into the single broker actor goroutine. The broker pre-encodes the EVENT once for every serializer in active use across the subscription group, stores the bytes in a *wamp.SharedMessage wrapper, and dispatches that wrapper to each subscriber's send channel. The peer's encodeOutbound looks up its serializer's bytes in the cache and writes them directly to the wire — zero encoding work in the hot per-subscriber path. Tracking — three pieces of refcount state on each subscription, all updated under the broker actor goroutine on subscribe / unsubscribe (lock-free): - serializers — set of wire-serializer formats in use across non-local subscribers, used to know which serializer-specific caches to populate. - subsWithPubIdent — count of non-local subscribers that declared the publisher_identification feature. - subsWithoutPubIdent — count of non-local subscribers that did NOT declare it. At fan-out time the broker uses the pub-ident counters to decide which event variant(s) to build: needPlain = !disclose || subsWithoutPubIdent > 0 needDisclosed = disclose && subsWithPubIdent > 0 The disclosed variant has publisher identity stamped into Details once (it's the same identity for every disclosed subscriber in this fan-out — the publisher's authid/authrole). When all subscribers have FeaturePubIdent and the publisher requested disclosure (the overwhelmingly common case in production), only the disclosed variant is built. Local subscribers (in-process Go clients, meta sessions, tests) keep the per-clone path for mutation isolation — TestEventContentSafety pins this contract. They're excluded from the refcount. websocketPeer.encodeOutbound and the rawsocket equivalent recognize *wamp.SharedMessage via type assertion. Cache miss is a programmer error (broker contract violation) and panics rather than silently falling back to encoding — earlier work spent days chasing a silent cache-miss regression that this contract makes loud. Bench delta on the heavy preset (4cpu router, 4cpu harness, 4 pubs × 200 subs × 5000/s, JSON, disclosure forced on by ironflock-router's authorizer): baseline opt-1 alloc-hoist this commit events sent rate 1370/s 1457/s 2247/s (+64%) events delivered 264k/s 277k/s 449k/s (+70%) p50 latency 2.96s 2.81s 19.7ms (150x) p99 latency 4.55s 4.56s 150ms (30x) router CPU peak 369% 407% 297% (-27%) The router is delivering 70% more events on 27% less CPU. Latency collapsed because the broker pipeline is no longer back-pressured by per-subscriber alloc cost. Router pprof after the change shows JSONSerializer.Serialize gone from the top frames (was 28-32%); 70% of remaining router CPU is syscall.Syscall6 doing write(2) to subscriber TCP sockets — the kernel-write boundary, not addressable in user space. Co-resident pieces: - wamp.SharedMessage type (new file). - serialize.Serializer.ID() method on the interface; impls return their constant. Plus serialize.Provider, an optional interface that wamp.Peer impls satisfy to expose their serializer ID to the broker without per-message type assertion. Coverage: 63.4% -> 62.4% (more statements, same test set).
1 parent b4a45ab commit a07a23d

9 files changed

Lines changed: 439 additions & 43 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
# WAMP v2 router library, client library and router service
44

55
[![Main CI](https://github.com/gammazero/nexus/actions/workflows/main-golint.yml/badge.svg)](https://github.com/gammazero/nexus/actions/workflows/main-golint.yml)
6-
[![Coverage](https://img.shields.io/badge/coverage-63.4%25-orange)](https://github.com/gammazero/nexus/actions/workflows/main-golint.yml)
6+
[![Coverage](https://img.shields.io/badge/coverage-62.4%25-orange)](https://github.com/gammazero/nexus/actions/workflows/main-golint.yml)
77
[![License](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)
88
[![GoDoc](https://godoc.org/github.com/gammazero/nexus?status.svg)](https://godoc.org/github.com/gammazero/nexus)
99

router/broker.go

Lines changed: 237 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/gammazero/deque"
99
"github.com/gammazero/nexus/v3/stdlog"
10+
"github.com/gammazero/nexus/v3/transport/serialize"
1011
"github.com/gammazero/nexus/v3/wamp"
1112
)
1213

@@ -35,6 +36,32 @@ type subscription struct {
3536
match string // match policy
3637
created string // when subscription was created
3738
subscribers map[*wamp.Session]struct{}
39+
40+
// serializers is a refcounted set of wire-serializer formats in
41+
// active use across this subscription's subscribers (network peers
42+
// only — local peers don't serialize and aren't tracked). Used by
43+
// the fan-out cache to know which formats to pre-encode without
44+
// scanning every subscriber per event. Updated on subscribe /
45+
// unsubscribe under the broker actor goroutine, so unsynchronized
46+
// access is safe.
47+
serializers map[serialize.Serialization]int
48+
49+
// Refcounts of non-local subscribers split by their declared
50+
// publisher_identification feature flag (WAMP §14.4.4). The broker
51+
// uses these at fan-out time to decide which event variant(s) to
52+
// pre-encode:
53+
//
54+
// - subsWithPubIdent > 0 → may need disclosed variant
55+
// (only when disclose is also true)
56+
// - subsWithoutPubIdent > 0 → needs plain variant
57+
//
58+
// In the common case where every subscriber advertises the feature
59+
// AND the publisher requested disclosure, only the disclosed
60+
// variant is built. Local subscribers are excluded from both
61+
// counters because they always go through the per-subscriber clone
62+
// path (mutation isolation).
63+
subsWithPubIdent int
64+
subsWithoutPubIdent int
3865
}
3966

4067
// storedEvent is a wrapper around wamp event message with timestamp to be used
@@ -372,17 +399,115 @@ func (b *broker) syncPublish(pub *wamp.Session, msg *wamp.Publish, pubID wamp.ID
372399

373400
func newSubscription(id wamp.ID, subscriber *wamp.Session, topic wamp.URI, match string) *subscription {
374401
subscribers := map[*wamp.Session]struct{}{}
402+
serializers := map[serialize.Serialization]int{}
403+
var subsWithPubIdent, subsWithoutPubIdent int
375404
if subscriber != nil {
376405
subscribers[subscriber] = struct{}{}
406+
if serID, ok := sessionSerializer(subscriber); ok {
407+
serializers[serID]++
408+
if subscriber.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent) {
409+
subsWithPubIdent++
410+
} else {
411+
subsWithoutPubIdent++
412+
}
413+
}
377414
}
378415

379416
return &subscription{
380-
id: id,
381-
topic: topic,
382-
match: match,
383-
created: wamp.NowISO8601(),
384-
subscribers: subscribers,
417+
id: id,
418+
topic: topic,
419+
match: match,
420+
created: wamp.NowISO8601(),
421+
subscribers: subscribers,
422+
serializers: serializers,
423+
subsWithPubIdent: subsWithPubIdent,
424+
subsWithoutPubIdent: subsWithoutPubIdent,
425+
}
426+
}
427+
428+
// sessionSerializer returns the wire serializer ID of sess's peer if
429+
// the peer satisfies serialize.Provider (i.e. is a serializing
430+
// network peer). Returns false for local in-process peers, which
431+
// don't go through a serializer on the wire and therefore don't
432+
// participate in the fan-out byte cache.
433+
func sessionSerializer(sess *wamp.Session) (serialize.Serialization, bool) {
434+
if sess == nil || sess.Peer == nil {
435+
return 0, false
436+
}
437+
sp, ok := sess.Peer.(serialize.Provider)
438+
if !ok {
439+
return 0, false
385440
}
441+
return sp.Serializer(), true
442+
}
443+
444+
// Stateless singleton serializers used by the fan-out byte cache.
445+
// Each Serializer is a struct with no fields, so a single instance
446+
// per type can be reused across all encode calls.
447+
//
448+
//nolint:gochecknoglobals
449+
var (
450+
jsonEncoder = &serialize.JSONSerializer{}
451+
msgpackEncoder = &serialize.MessagePackSerializer{}
452+
cborEncoder = &serialize.CBORSerializer{}
453+
)
454+
455+
// serializerForID maps a Serialization ID to its concrete encoder.
456+
// Returns (nil, false) for AUTO or unknown values — the caller skips
457+
// pre-encoding for that format (cache miss → fallback to per-session
458+
// encode of the inner message).
459+
func serializerForID(id serialize.Serialization) (serialize.Serializer, bool) {
460+
switch id {
461+
case serialize.JSON:
462+
return jsonEncoder, true
463+
case serialize.MSGPACK:
464+
return msgpackEncoder, true
465+
case serialize.CBOR:
466+
return cborEncoder, true
467+
}
468+
return nil, false
469+
}
470+
471+
// addSubscriber records a new session in sub.subscribers and bumps the
472+
// serializer + publisher-identification refcounts. Idempotent: if the
473+
// session is already in the set, no change.
474+
func (sub *subscription) addSubscriber(sess *wamp.Session) {
475+
if _, already := sub.subscribers[sess]; already {
476+
return
477+
}
478+
sub.subscribers[sess] = struct{}{}
479+
serID, isNet := sessionSerializer(sess)
480+
if isNet {
481+
sub.serializers[serID]++
482+
if sess.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent) {
483+
sub.subsWithPubIdent++
484+
} else {
485+
sub.subsWithoutPubIdent++
486+
}
487+
}
488+
}
489+
490+
// removeSubscriber drops sess from sub.subscribers and decrements the
491+
// serializer + publisher-identification refcounts. Returns true if the
492+
// session was in the set.
493+
func (sub *subscription) removeSubscriber(sess *wamp.Session) bool {
494+
if _, ok := sub.subscribers[sess]; !ok {
495+
return false
496+
}
497+
delete(sub.subscribers, sess)
498+
serID, isNet := sessionSerializer(sess)
499+
if isNet {
500+
sub.serializers[serID]--
501+
if sub.serializers[serID] <= 0 {
502+
delete(sub.serializers, serID)
503+
}
504+
if sess.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent) {
505+
sub.subsWithPubIdent--
506+
} else {
507+
sub.subsWithoutPubIdent--
508+
}
509+
}
510+
return true
386511
}
387512

388513
func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, event *wamp.Event) {
@@ -452,8 +577,9 @@ func (b *broker) syncSubscribe(subscriber *wamp.Session, msg *wamp.Subscribe, ma
452577
})
453578
return
454579
}
455-
// Add subscriber to existing subscription.
456-
sub.subscribers[subscriber] = struct{}{}
580+
// Add subscriber to existing subscription (refcounts the
581+
// serializer for the fan-out byte cache).
582+
sub.addSubscriber(subscriber)
457583
}
458584

459585
// Add the subscription ID to the set of subscriptions for the subscriber.
@@ -508,8 +634,9 @@ func (b *broker) syncUnsubscribe(subscriber *wamp.Session, msg *wamp.Unsubscribe
508634
return
509635
}
510636

511-
// Remove subscribed session from subscription.
512-
delete(sub.subscribers, subscriber)
637+
// Remove subscribed session from subscription (refcounts down the
638+
// serializer for the fan-out byte cache).
639+
sub.removeSubscriber(subscriber)
513640

514641
// If no more subscribers on this subscription, delete subscription and
515642
// send on_delete meta event.
@@ -563,8 +690,9 @@ func (b *broker) syncRemoveSession(subscriber *wamp.Session) {
563690
if !ok {
564691
continue
565692
}
566-
// Remove subscribed session from subscription.
567-
delete(sub.subscribers, subscriber)
693+
// Remove subscribed session from subscription (refcounts down
694+
// the serializer for the fan-out byte cache).
695+
sub.removeSubscriber(subscriber)
568696

569697
// If no more subscribers on this subscription.
570698
if len(sub.subscribers) == 0 {
@@ -611,27 +739,81 @@ func (b *broker) syncPubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.I
611739
args := msg.Arguments
612740
argsKw := msg.ArgumentsKw
613741

614-
// Shared event used for every NON-LOCAL subscriber that does not
615-
// trigger per-subscriber disclosure. Pointer is reused across
616-
// recipients whose send path serializes to bytes — those goroutines
617-
// only READ the struct (encode + write), never mutate, so concurrent
618-
// reads of the embedded args / argsKw / details are safe.
742+
// Build only the event variants the subscriber group actually
743+
// needs (WAMP §14.4.4). Subscribers refcount their interest in
744+
// publisher identity at subscribe time:
745+
//
746+
// - plain variant — needed when at least one non-local
747+
// subscriber lacks FeaturePubIdent, OR
748+
// when disclose is false (every subscriber
749+
// gets the plain event).
750+
// - disclosed variant — needed when disclose is true AND at
751+
// least one non-local subscriber declared
752+
// FeaturePubIdent.
619753
//
620-
// Local subscribers (in-process Go clients) read the struct directly
621-
// in their event handlers and may mutate it (TestEventContentSafety
622-
// pins this contract). Sharing across local subscribers would let
623-
// one subscriber's mutation become visible to another. So local
624-
// subscribers always get a per-subscriber Event with deep-copied
625-
// args / argsKw / details — same isolation prepareEvent gave before
626-
// this optimization.
627-
sharedEvent := &wamp.Event{
754+
// Local subscribers always go through the per-clone path below
755+
// for mutation isolation; they don't influence variant selection.
756+
needPlain := !disclose || sub.subsWithoutPubIdent > 0
757+
needDisclosed := disclose && sub.subsWithPubIdent > 0
758+
759+
// plainEvent is always constructed — it's cheap (struct + shared
760+
// args/argsKw/details references) and serves as the default
761+
// fallback for the rare per-subscriber paths that don't pick a
762+
// pre-encoded variant (e.g. local clones, or a non-Provider peer
763+
// that isn't tracked in sub.serializers). Pre-encoding into the
764+
// shared cache only happens when needPlain is true.
765+
plainEvent := &wamp.Event{
628766
Publication: pubID,
629767
Subscription: sub.id,
630768
Arguments: args,
631769
ArgumentsKw: argsKw,
632770
Details: baseDetails,
633771
}
772+
var disclosedEvent *wamp.Event
773+
if needDisclosed {
774+
discDetails := make(wamp.Dict, len(baseDetails)+3)
775+
maps.Copy(discDetails, baseDetails)
776+
disclosePublisher(pub, discDetails)
777+
disclosedEvent = &wamp.Event{
778+
Publication: pubID,
779+
Subscription: sub.id,
780+
Arguments: args,
781+
ArgumentsKw: argsKw,
782+
Details: discDetails,
783+
}
784+
}
634785

786+
// Pre-encode each needed variant ONCE per active wire serializer.
787+
// Single-goroutine work in broker.run, so no contention. Per-
788+
// session send goroutines downstream find a populated cache and
789+
// skip encoding entirely.
790+
var sharedPlain, sharedDisclosed *wamp.SharedMessage
791+
if len(sub.serializers) > 0 {
792+
if needPlain {
793+
sharedPlain = wamp.NewSharedMessage(plainEvent)
794+
for serID := range sub.serializers {
795+
ser, ok := serializerForID(serID)
796+
if !ok {
797+
continue
798+
}
799+
if b, err := ser.Serialize(plainEvent); err == nil {
800+
sharedPlain.Store(int(serID), b)
801+
}
802+
}
803+
}
804+
if disclosedEvent != nil {
805+
sharedDisclosed = wamp.NewSharedMessage(disclosedEvent)
806+
for serID := range sub.serializers {
807+
ser, ok := serializerForID(serID)
808+
if !ok {
809+
continue
810+
}
811+
if b, err := ser.Serialize(disclosedEvent); err == nil {
812+
sharedDisclosed.Store(int(serID), b)
813+
}
814+
}
815+
}
816+
}
635817
for subscriber := range sub.subscribers {
636818
// Do not send event to publisher.
637819
if subscriber == pub && excludePublisher {
@@ -653,19 +835,20 @@ func (b *broker) syncPubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.I
653835
}
654836
}
655837

656-
needPerSub := subscriber.IsLocal() ||
657-
(disclose && subscriber.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent))
658-
659-
var event *wamp.Event
660-
if !needPerSub {
661-
event = sharedEvent
662-
} else {
663-
// Per-subscriber copy: isolates mutation from the local-handler
664-
// path, and carries the publisher-identity disclosure when
665-
// that's enabled for this subscriber.
838+
wantsDisclosure := disclose &&
839+
subscriber.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent)
840+
841+
var msgOut wamp.Message
842+
switch {
843+
case subscriber.IsLocal():
844+
// Local subscribers (in-process Go clients, meta sessions,
845+
// tests) read the *wamp.Event struct directly in their
846+
// handlers and may mutate it (TestEventContentSafety pins
847+
// this contract). They get a per-subscriber clone with
848+
// deep-copied maps/slices for mutation isolation.
666849
perSubDetails := make(wamp.Dict, len(baseDetails)+3)
667850
maps.Copy(perSubDetails, baseDetails)
668-
if disclose && subscriber.HasFeature(wamp.RoleSubscriber, wamp.FeaturePubIdent) {
851+
if wantsDisclosure {
669852
disclosePublisher(pub, perSubDetails)
670853
}
671854
perSubArgs := args
@@ -678,16 +861,33 @@ func (b *broker) syncPubEvent(pub *wamp.Session, msg *wamp.Publish, pubID wamp.I
678861
perSubArgsKw = make(wamp.Dict, len(argsKw))
679862
maps.Copy(perSubArgsKw, argsKw)
680863
}
681-
event = &wamp.Event{
864+
msgOut = &wamp.Event{
682865
Publication: pubID,
683866
Subscription: sub.id,
684867
Arguments: perSubArgs,
685868
ArgumentsKw: perSubArgsKw,
686869
Details: perSubDetails,
687870
}
871+
872+
case wantsDisclosure && sharedDisclosed != nil:
873+
msgOut = sharedDisclosed
874+
875+
case sharedPlain != nil:
876+
msgOut = sharedPlain
877+
878+
default:
879+
// No populated cache (rare race between subscribe refcount
880+
// update and a fan-out, or unsupported serializer ID).
881+
// Fall back to the bare event so the per-session goroutine
882+
// encodes it on the spot.
883+
if wantsDisclosure && disclosedEvent != nil {
884+
msgOut = disclosedEvent
885+
} else {
886+
msgOut = plainEvent
887+
}
688888
}
689889

690-
b.trySend(subscriber, event)
890+
b.trySend(subscriber, msgOut)
691891
}
692892

693893
// If event history store is enabled for subscription let's save event

0 commit comments

Comments
 (0)