Skip to content

Commit 1e79c0c

Browse files
floatdropclaude
andcommitted
refactor(relay): route requests through RequestMux (#32)
Replace the bespoke AcceptRequest loop + type switch in the session handler with the session.RequestMux it now has available. Per-type handlers register via the generic HandleType (typed message handed in, no manual assertion); the two cross-cutting policies become small shared helpers — verifyRequest (§10.2.2 token verification + dispatch log) and namespaceRequest (§13.7.1 cap), the latter collapsing the three repeated namespace-state cases. OnUnknown keeps the §3.3.2 reset-and-isolate behavior; the token-cache session-fatal close moves just after Run returns. Dogfoods RequestMux on the relay's policy-heavy dispatch path. No behavior change; all relay tests pass unchanged under -race. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent f701aef commit 1e79c0c

1 file changed

Lines changed: 84 additions & 56 deletions

File tree

pkg/relay/session_handler.go

Lines changed: 84 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -224,23 +224,18 @@ func (h *sessionHandler) run(ctx context.Context) error {
224224
// request and continues serving the session. This matches §9.5's rule that
225225
// a single bad request must not break unrelated subscriptions.
226226
func (h *sessionHandler) runRequestLoop(ctx context.Context) error {
227-
for {
228-
req, err := h.sess.AcceptRequest(ctx)
229-
if err != nil {
230-
// A malformed / duplicate / overflowing / unknown
231-
// AUTHORIZATION_TOKEN alias is a session-level fault per
232-
// §10.2.2: close the session with the mapped SESSION_ERROR
233-
// code rather than just tearing down the request loop.
234-
if tce, ok := errors.AsType[*session.TokenCacheError](err); ok {
235-
h.log.LogAttrs(ctx, slog.LevelDebug, "relay closing session on token cache error",
236-
slog.String("err", err.Error()),
237-
slog.Uint64("code", uint64(tce.Code)))
238-
_ = h.sess.Close(tce.Code, tce.Error())
239-
}
240-
return err
241-
}
242-
h.dispatch(ctx, req)
227+
err := h.requestMux(ctx).Run(ctx, h.sess)
228+
// A malformed / duplicate / overflowing / unknown AUTHORIZATION_TOKEN alias
229+
// surfaces from AcceptRequest as a session-level fault per §10.2.2: close the
230+
// session with the mapped SESSION_ERROR code rather than just tearing down
231+
// the request loop.
232+
if tce, ok := errors.AsType[*session.TokenCacheError](err); ok {
233+
h.log.LogAttrs(ctx, slog.LevelDebug, "relay closing session on token cache error",
234+
slog.String("err", err.Error()),
235+
slog.Uint64("code", uint64(tce.Code)))
236+
_ = h.sess.Close(tce.Code, tce.Error())
243237
}
238+
return err
244239
}
245240

246241
// runDataLoop accepts inbound data streams and routes each by type: subgroup
@@ -281,71 +276,104 @@ func (h *sessionHandler) runDataLoop(ctx context.Context) error {
281276
}
282277
}
283278

284-
// dispatch routes one inbound [*session.Request] to the handler responsible
285-
// for its First-message type. Each handler is expected to:
279+
// requestMux builds the per-session [session.RequestMux] that routes each inbound
280+
// request to the handler responsible for its First-message type. Each handler is
281+
// expected to:
286282
//
287283
// 1. Authorize the request.
288284
// 2. Reply with either *_OK or REQUEST_ERROR.
289285
// 3. Keep the bidi stream open for as long as the subscription's lifetime
290286
// warrants (or close it cleanly on rejection).
291287
// 4. Update [registry.TrackRegistry] / [registry.NamespaceRegistry] as appropriate.
292288
//
293-
// Unknown message types are treated as protocol violations per §3.3.2: we
294-
// reset the bidi stream and log. We do NOT close the session — §9.5 ("If a
295-
// Session is closed due to an unknown or invalid control message or Object,
296-
// the Relay MUST NOT propagate that message or Object to another Session")
297-
// implies the relay must isolate the failure to the one request.
298-
func (h *sessionHandler) dispatch(ctx context.Context, req *session.Request) {
299-
h.log.LogAttrs(ctx, slog.LevelDebug, "relay dispatching request",
300-
slog.String("type", fmt.Sprintf("%T", req.First)))
301-
302-
// §10.2.2: apply the application's TokenVerifier to the request's resolved
303-
// AUTHORIZATION_TOKEN(s) before any handler runs. A denial is per-request:
304-
// reply REQUEST_ERROR with the mapped code and keep the session running.
305-
if err := h.sess.VerifyRequestTokens(ctx, req); err != nil {
306-
h.rejectTokenDenied(ctx, req, err)
307-
return
308-
}
289+
// Two cross-cutting policies are shared across the per-type handlers:
290+
// verifyRequest applies the §10.2.2 token-verification pre-step, and
291+
// namespaceRequest folds in the §13.7.1 per-session cap for the three
292+
// namespace-state requests (the §13.1 subscription cap is inline on SUBSCRIBE).
293+
//
294+
// An unknown / unexpected first-message type is a protocol violation per §3.3.2:
295+
// OnUnknown resets the bidi stream and logs. The session is NOT closed — §9.5
296+
// ("if a Session is closed due to an unknown or invalid control message [...] the
297+
// Relay MUST NOT propagate that message [...] to another Session") means the
298+
// relay isolates the failure to the one request.
299+
func (h *sessionHandler) requestMux(ctx context.Context) *session.RequestMux {
300+
mux := session.NewRequestMux()
309301

310-
switch msg := req.First.(type) {
311-
case *message.Subscribe:
302+
session.HandleType(mux, func(req *session.Request, msg *message.Subscribe) {
303+
if !h.verifyRequest(ctx, req) {
304+
return
305+
}
312306
// §13.1: bound concurrent subscriptions per session.
313307
if !h.limiter.acquireSub() {
314308
h.rejectExcessiveLoad(ctx, req, "subscription")
315309
return
316310
}
317311
h.spawn(func() { defer h.limiter.releaseSub(); h.handleSubscribe(ctx, req, msg) })
318-
case *message.Publish:
319-
h.spawn(func() { h.handlePublish(ctx, req, msg) })
320-
case *message.Fetch:
321-
h.spawn(func() { h.handleFetch(ctx, req, msg) })
322-
case *message.TrackStatus:
323-
h.spawn(func() { h.handleTrackStatus(ctx, req, msg) })
324-
case *message.PublishNamespace:
325-
// §13.7.1: bound concurrent namespace-state requests per session.
326-
if !h.limiter.acquireNamespace() {
327-
h.rejectExcessiveLoad(ctx, req, "namespace request")
312+
})
313+
session.HandleType(mux, func(req *session.Request, msg *message.Publish) {
314+
if !h.verifyRequest(ctx, req) {
328315
return
329316
}
330-
h.spawn(func() { defer h.limiter.releaseNamespace(); h.handlePublishNamespace(ctx, req, msg) })
331-
case *message.SubscribeNamespace:
332-
if !h.limiter.acquireNamespace() {
333-
h.rejectExcessiveLoad(ctx, req, "namespace request")
317+
h.spawn(func() { h.handlePublish(ctx, req, msg) })
318+
})
319+
session.HandleType(mux, func(req *session.Request, msg *message.Fetch) {
320+
if !h.verifyRequest(ctx, req) {
334321
return
335322
}
336-
h.spawn(func() { defer h.limiter.releaseNamespace(); h.handleSubscribeNamespace(ctx, req, msg) })
337-
case *message.SubscribeTracks:
338-
if !h.limiter.acquireNamespace() {
339-
h.rejectExcessiveLoad(ctx, req, "namespace request")
323+
h.spawn(func() { h.handleFetch(ctx, req, msg) })
324+
})
325+
session.HandleType(mux, func(req *session.Request, msg *message.TrackStatus) {
326+
if !h.verifyRequest(ctx, req) {
340327
return
341328
}
342-
h.spawn(func() { defer h.limiter.releaseNamespace(); h.handleSubscribeTracks(ctx, req, msg) })
343-
default:
329+
h.spawn(func() { h.handleTrackStatus(ctx, req, msg) })
330+
})
331+
session.HandleType(mux, func(req *session.Request, msg *message.PublishNamespace) {
332+
h.namespaceRequest(ctx, req, func() { h.handlePublishNamespace(ctx, req, msg) })
333+
})
334+
session.HandleType(mux, func(req *session.Request, msg *message.SubscribeNamespace) {
335+
h.namespaceRequest(ctx, req, func() { h.handleSubscribeNamespace(ctx, req, msg) })
336+
})
337+
session.HandleType(mux, func(req *session.Request, msg *message.SubscribeTracks) {
338+
h.namespaceRequest(ctx, req, func() { h.handleSubscribeTracks(ctx, req, msg) })
339+
})
340+
341+
mux.OnUnknown(func(req *session.Request) {
344342
h.log.LogAttrs(ctx, slog.LevelWarn, "relay rejected unknown request type",
345343
slog.String("type", fmt.Sprintf("%T", req.First)))
346344
req.Stream.CancelRead(uint64(moqt.StreamResetInternalError))
347345
req.Stream.CancelWrite(uint64(moqt.StreamResetInternalError))
346+
})
347+
348+
return mux
349+
}
350+
351+
// verifyRequest runs the per-request dispatch log and the §10.2.2 token
352+
// verification shared by every known request type. It returns false — after
353+
// replying REQUEST_ERROR with the mapped code — when the request's resolved
354+
// AUTHORIZATION_TOKEN is denied; the session stays up (a denial is per-request).
355+
func (h *sessionHandler) verifyRequest(ctx context.Context, req *session.Request) bool {
356+
h.log.LogAttrs(ctx, slog.LevelDebug, "relay dispatching request",
357+
slog.String("type", fmt.Sprintf("%T", req.First)))
358+
if err := h.sess.VerifyRequestTokens(ctx, req); err != nil {
359+
h.rejectTokenDenied(ctx, req, err)
360+
return false
361+
}
362+
return true
363+
}
364+
365+
// namespaceRequest wraps a namespace-state handler (PUBLISH_NAMESPACE,
366+
// SUBSCRIBE_NAMESPACE, SUBSCRIBE_TRACKS) with the shared token verification and
367+
// the §13.7.1 per-session cap, spawning fn under the limiter when admitted.
368+
func (h *sessionHandler) namespaceRequest(ctx context.Context, req *session.Request, fn func()) {
369+
if !h.verifyRequest(ctx, req) {
370+
return
371+
}
372+
if !h.limiter.acquireNamespace() {
373+
h.rejectExcessiveLoad(ctx, req, "namespace request")
374+
return
348375
}
376+
h.spawn(func() { defer h.limiter.releaseNamespace(); fn() })
349377
}
350378

351379
// spawn registers a goroutine with the handler's wg so run() can join it

0 commit comments

Comments
 (0)