Skip to content

Commit f701aef

Browse files
floatdropclaude
andauthored
feat(session): add RequestMux for routing inbound requests by type (#31)
* feat(session): add RequestMux for routing inbound requests by type RequestMux is the request-stream counterpart of Demux: it replaces the hand-rolled "AcceptRequest loop + type switch + dispatch" a server writes with per-message-type handler registration. Handle(message.Type, h), OnUnknown, and Run(ctx, sess) mirror Demux's API and concurrency contract (synchronous dispatch; handlers spawn a goroutine for long-lived streams). Run surfaces AcceptRequest errors unchanged so the caller can escalate the session-fatal ones (§10.1 Request-ID violations, token-cache faults) by closing the session — the unmatched-type default rejects with REQUEST_ERROR NOT_SUPPORTED. Includes table tests (routing by type, late registration, OnUnknown, and the default reject), an ExampleRequestMux, and a README row. Relay adoption is deferred: its per-type limiter + token-verify pre-dispatch hooks are a separate concern. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> * feat(session): add generic HandleType for typed RequestMux handlers HandleType[T](mux, func(*Request, T)) registers a handler keyed by T's message.Type and hands the handler the already-asserted typed message, so callers don't repeat req.First.(*message.X). It's a free function because Go methods can't take type parameters; the type key is derived from a zero T (the message Type() methods are constant returns, safe on a nil pointer). Adds TestRequestMuxHandleType and switches the SUBSCRIBE branch of ExampleRequestMux to HandleType. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 5bc5a20 commit f701aef

4 files changed

Lines changed: 320 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ grouped here by the file they live in:
131131
| Publish a track | `ExampleSession_Publish` |
132132
| Subscribe to a track | `ExampleSession_Subscribe` |
133133
| Route many tracks' data streams | `ExampleDemux` |
134+
| Route inbound requests (server side) | `ExampleRequestMux` |
134135
| Joining / standalone FETCH | `ExampleSession_Fetch`, `ExampleSession_Fetch_standalone`, `ExampleIncomingFetchStream` |
135136
| Update a live request | `ExampleSession_UpdateRequest` |
136137
| End a publication | `Example_endingAPublication` |

pkg/moqt/session/example_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,46 @@ func drain(s *session.IncomingSubgroupStream, label string) {
192192
}()
193193
}
194194

195+
// Routing inbound requests on the server side. RequestMux is the request-stream
196+
// counterpart of Demux: register a handler per message.Type instead of hand-
197+
// rolling an AcceptRequest loop + type switch.
198+
func ExampleRequestMux() {
199+
var server *session.Session // e.g. from session.Server
200+
ctx := context.Background()
201+
202+
mux := session.NewRequestMux()
203+
204+
// HandleType hands the handler the already-asserted typed message. A
205+
// SUBSCRIBE handler keeps the request stream open for the subscription's
206+
// lifetime, so it spawns a goroutine — Run dispatches synchronously, exactly
207+
// like Demux.
208+
session.HandleType(mux, func(r *session.Request, _ *message.Subscribe) {
209+
go func() {
210+
pub, err := r.AcceptSubscribe(nil) // writes SUBSCRIBE_OK, returns a Publication
211+
if err != nil {
212+
return
213+
}
214+
defer pub.Close()
215+
// … push objects via pub.OpenSubgroup … then pub.Done(...).
216+
_ = pub
217+
}()
218+
})
219+
mux.Handle(message.TypePublishNamespace, func(r *session.Request) {
220+
_ = r.Reply(&message.RequestOK{})
221+
_ = r.Stream.Close()
222+
})
223+
224+
// Optional: without OnUnknown, an unhandled type is rejected NOT_SUPPORTED.
225+
mux.OnUnknown(func(r *session.Request) {
226+
_ = r.RejectError(moqt.RequestNotSupported, "unsupported request type")
227+
})
228+
229+
// Run returns when ctx is cancelled or AcceptRequest fails. A session-fatal
230+
// error (e.g. *session.ErrDuplicateRequestID) should be escalated by closing
231+
// the session with the mapped code.
232+
_ = mux.Run(ctx, server)
233+
}
234+
195235
// Backfilling with a Relative Joining FETCH (§10.12.2): a FilterLargestObject
196236
// subscription only delivers objects strictly after the live edge, so the
197237
// current group is invisible until the next one lands. A joining FETCH keyed

pkg/moqt/session/requestmux.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package session
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/floatdrop/moq-go/pkg/moqt"
8+
"github.com/floatdrop/moq-go/pkg/moqt/message"
9+
)
10+
11+
// RequestHandler handles one inbound request that a [RequestMux] routed to it by
12+
// the [message.Type] of its first message. It is invoked synchronously by
13+
// [RequestMux.Run]; spawn a goroutine inside it when a request must be serviced
14+
// concurrently with accepting the next one (see [RequestMux.Run]).
15+
type RequestHandler func(*Request)
16+
17+
// RequestMux routes the requests accepted from a [Session] to per-type handlers,
18+
// replacing the hand-rolled "AcceptRequest loop + type-switch + dispatch" a
19+
// server otherwise writes. It is the request-stream counterpart of [Demux],
20+
// which does the same for inbound data streams.
21+
//
22+
// Requests are dispatched by the [message.Type] of their first message — e.g.
23+
// [message.TypeSubscribe] for an inbound SUBSCRIBE. A request whose type has no
24+
// registered handler is passed to the OnUnknown callback.
25+
//
26+
// Handlers may be registered or replaced at any time, including while
27+
// [RequestMux.Run] is executing. Registration is safe for concurrent use.
28+
//
29+
// The zero value is not ready for use — construct with [NewRequestMux].
30+
type RequestMux struct {
31+
mu sync.RWMutex
32+
handlers map[message.Type]RequestHandler
33+
onUnknown func(*Request)
34+
}
35+
36+
// NewRequestMux returns an empty RequestMux ready for handler registration.
37+
func NewRequestMux() *RequestMux {
38+
return &RequestMux{handlers: make(map[message.Type]RequestHandler)}
39+
}
40+
41+
// Handle registers h for inbound requests whose first message is of type t
42+
// (e.g. [message.TypeSubscribe]). A nil h unregisters t; registering a type
43+
// that already has a handler replaces it.
44+
func (m *RequestMux) Handle(t message.Type, h RequestHandler) {
45+
m.mu.Lock()
46+
defer m.mu.Unlock()
47+
if h == nil {
48+
delete(m.handlers, t)
49+
return
50+
}
51+
m.handlers[t] = h
52+
}
53+
54+
// HandleType registers h for inbound requests whose first message is the
55+
// concrete type T (e.g. *message.Subscribe), handing h the already-asserted
56+
// typed message alongside the [*Request]. It is the generic form of
57+
// [RequestMux.Handle]: the [message.Type] key is derived from T, and the type
58+
// assertion a Handle callback would otherwise repeat (req.First.(*message.X)) is
59+
// done once, here.
60+
//
61+
// Go methods cannot take type parameters, so this is a free function taking the
62+
// mux as its first argument. A nil h unregisters T's type; registering a type
63+
// that already has a handler replaces it.
64+
func HandleType[T message.WithRequestID](m *RequestMux, h func(*Request, T)) {
65+
var zero T // nil pointer; message Type() methods are constant returns
66+
if h == nil {
67+
m.Handle(zero.Type(), nil)
68+
return
69+
}
70+
m.Handle(zero.Type(), func(req *Request) {
71+
msg, _ := req.First.(T)
72+
h(req, msg)
73+
})
74+
}
75+
76+
// OnUnknown sets the callback invoked for an accepted request whose type has no
77+
// registered handler. With no callback set (the default, or a nil f), an
78+
// unmatched request is rejected with REQUEST_ERROR NOT_SUPPORTED and its stream
79+
// FIN'd so it does not leak.
80+
func (m *RequestMux) OnUnknown(f func(*Request)) {
81+
m.mu.Lock()
82+
defer m.mu.Unlock()
83+
m.onUnknown = f
84+
}
85+
86+
// Run accepts requests from sess and dispatches each to its registered handler
87+
// until ctx is cancelled or [Session.AcceptRequest] returns an error, which Run
88+
// returns.
89+
//
90+
// Some AcceptRequest errors are session-fatal protocol violations — a §10.1
91+
// Request-ID parity/monotonicity violation (*ErrRequestIDParityViolation /
92+
// *ErrDuplicateRequestID) or a token-cache fault (*TokenCacheError) — that the
93+
// caller MUST escalate by closing the session with the mapped code (see
94+
// [Session.AcceptRequest]). Run surfaces the error unchanged so the caller can
95+
// inspect it with errors.As and Close accordingly.
96+
//
97+
// Dispatch is synchronous: a handler runs to completion before Run accepts the
98+
// next request, mirroring a hand-written accept loop and [Demux.Run]. A handler
99+
// that keeps a request stream open for the lifetime of a subscription therefore
100+
// blocks the loop, so spawn a goroutine inside the handler when requests must be
101+
// serviced concurrently.
102+
func (m *RequestMux) Run(ctx context.Context, sess *Session) error {
103+
for {
104+
req, err := sess.AcceptRequest(ctx)
105+
if err != nil {
106+
return err
107+
}
108+
m.dispatch(req)
109+
}
110+
}
111+
112+
// dispatch routes one accepted request to its registered handler, or to the
113+
// unknown path when none matches.
114+
func (m *RequestMux) dispatch(req *Request) {
115+
m.mu.RLock()
116+
h := m.handlers[req.First.Type()]
117+
f := m.onUnknown
118+
m.mu.RUnlock()
119+
if h != nil {
120+
h(req)
121+
return
122+
}
123+
if f != nil {
124+
f(req)
125+
return
126+
}
127+
_ = req.RejectError(moqt.RequestNotSupported, "moqt/session: no handler for request type")
128+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package session_test
2+
3+
import (
4+
"slices"
5+
"testing"
6+
"time"
7+
8+
"github.com/floatdrop/moq-go/pkg/moqt"
9+
"github.com/floatdrop/moq-go/pkg/moqt/message"
10+
"github.com/floatdrop/moq-go/pkg/moqt/session"
11+
"github.com/floatdrop/moq-go/pkg/moqt/wire"
12+
)
13+
14+
// TestRequestMuxRoutesByType checks that RequestMux dispatches requests by their
15+
// first message's Type, routes an unregistered type to OnUnknown, and honours a
16+
// handler registered after Run has already started.
17+
func TestRequestMuxRoutesByType(t *testing.T) {
18+
t.Parallel()
19+
client, server := openPair(t)
20+
21+
events := make(chan message.Type, 8)
22+
firstSeen := make(chan struct{}, 1)
23+
record := func(r *session.Request) {
24+
events <- r.First.Type()
25+
_ = r.Stream.Close()
26+
}
27+
28+
mux := session.NewRequestMux()
29+
mux.Handle(message.TypeSubscribe, func(r *session.Request) {
30+
record(r)
31+
firstSeen <- struct{}{}
32+
})
33+
mux.Handle(message.TypePublishNamespace, record)
34+
mux.OnUnknown(record)
35+
36+
go func() { _ = mux.Run(t.Context(), server) }()
37+
38+
// Client sends even Request IDs (§10.1), strictly increasing.
39+
40+
// 1. SUBSCRIBE on a registered type.
41+
openRequest(t, client, &message.Subscribe{
42+
RequestID: 0,
43+
Namespace: wire.Namespace("ns"),
44+
Name: []byte("a"),
45+
Parameters: message.Parameters{message.LargestObjectFilter()},
46+
})
47+
<-firstSeen // ensure Run is live before late registration
48+
49+
// 2. Register TRACK_STATUS *after* Run started, then send one.
50+
mux.Handle(message.TypeTrackStatus, record)
51+
openRequest(t, client, &message.TrackStatus{
52+
RequestID: 2,
53+
Namespace: wire.Namespace("ns"),
54+
Name: []byte("a"),
55+
})
56+
57+
// 3. PUBLISH_NAMESPACE on a registered type.
58+
openRequest(t, client, &message.PublishNamespace{
59+
RequestID: 4,
60+
Namespace: wire.Namespace("ns"),
61+
})
62+
63+
// 4. SUBSCRIBE_NAMESPACE — unregistered → OnUnknown.
64+
openRequest(t, client, &message.SubscribeNamespace{
65+
RequestID: 6,
66+
TrackNamespacePrefix: wire.Namespace("ns"),
67+
})
68+
69+
got := collectEvents(t, events, 4)
70+
for _, want := range []message.Type{
71+
message.TypeSubscribe,
72+
message.TypeTrackStatus,
73+
message.TypePublishNamespace,
74+
message.TypeSubscribeNamespace,
75+
} {
76+
if !slices.Contains(got, want) {
77+
t.Errorf("missing dispatched type %s; got %v", want, got)
78+
}
79+
}
80+
}
81+
82+
// TestRequestMuxHandleType checks that HandleType derives the message.Type key
83+
// from T and hands the handler the already-asserted typed message.
84+
func TestRequestMuxHandleType(t *testing.T) {
85+
t.Parallel()
86+
client, server := openPair(t)
87+
88+
got := make(chan string, 1)
89+
mux := session.NewRequestMux()
90+
session.HandleType(mux, func(_ *session.Request, msg *message.Subscribe) {
91+
got <- string(msg.Name) // typed access without a manual assertion
92+
})
93+
go func() { _ = mux.Run(t.Context(), server) }()
94+
95+
openRequest(t, client, &message.Subscribe{
96+
RequestID: 0,
97+
Namespace: wire.Namespace("ns"),
98+
Name: []byte("typed-track"),
99+
Parameters: message.Parameters{message.LargestObjectFilter()},
100+
})
101+
102+
select {
103+
case name := <-got:
104+
if name != "typed-track" {
105+
t.Errorf("handler saw Name = %q, want %q", name, "typed-track")
106+
}
107+
case <-time.After(2 * time.Second):
108+
t.Fatal("typed handler was not invoked")
109+
}
110+
}
111+
112+
// TestRequestMuxDefaultRejectsUnhandled verifies that with no OnUnknown set, an
113+
// unhandled request type is rejected with REQUEST_ERROR NOT_SUPPORTED and its
114+
// stream FIN'd, so the requester learns the server cannot serve it.
115+
func TestRequestMuxDefaultRejectsUnhandled(t *testing.T) {
116+
t.Parallel()
117+
client, server := openPair(t)
118+
119+
mux := session.NewRequestMux() // no handlers, no OnUnknown
120+
go func() { _ = mux.Run(t.Context(), server) }()
121+
122+
stream, err := client.OpenRequest(&message.TrackStatus{
123+
RequestID: 0,
124+
Namespace: wire.Namespace("ns"),
125+
Name: []byte("x"),
126+
})
127+
if err != nil {
128+
t.Fatalf("OpenRequest: %v", err)
129+
}
130+
131+
resp, err := message.Parse(stream)
132+
if err != nil {
133+
t.Fatalf("read response: %v", err)
134+
}
135+
rerr, ok := resp.(*message.RequestError)
136+
if !ok {
137+
t.Fatalf("got %s, want REQUEST_ERROR", resp.Type())
138+
}
139+
if rerr.ErrorCode != moqt.RequestNotSupported {
140+
t.Errorf("ErrorCode = %#x, want NOT_SUPPORTED (%#x)", rerr.ErrorCode, moqt.RequestNotSupported)
141+
}
142+
}
143+
144+
// openRequest opens a bidi request stream carrying first as its initial message
145+
// and leaves it open (the mux handler or session cleanup tears it down).
146+
func openRequest(t *testing.T, s *session.Session, first message.Message) {
147+
t.Helper()
148+
if _, err := s.OpenRequest(first); err != nil {
149+
t.Fatalf("OpenRequest(%s): %v", first.Type(), err)
150+
}
151+
}

0 commit comments

Comments
 (0)