Skip to content

Commit 0326d1e

Browse files
mcuelenaereclaude
andcommitted
refactor(jsonrpc): support session injection and synchronous handlers in dispatcher
Extends RPCHandler with two new orthogonal flags: - TakesSession: dispatcher injects the receiving *Session as the handler's first reflected argument. Lets session-bound RPCs act on the session that delivered the message rather than the global currentSession. - Synchronous: handler runs inline on the per-session rpcQueue pump goroutine instead of in a fresh per-message goroutine. Preserves dequeue order for ordering-sensitive RPCs. Plumbs *Session through callRPCHandler / riskyCallRPCHandler and shifts the reflected param index by one when TakesSession is set. Splits the existing onRPCMessage into: - onRPCMessage: parse + lookup + sync/async decision; runs on the pump. - invokeRPCHandler: handler invocation + response write; runs either inline (Synchronous) or in a goroutine (default). The goroutine spawn previously living in the WebRTC pump (webrtc.go:381) moves into onRPCMessage, where the JSON-RPC layer can decide per handler. Existing handlers all keep zero-value defaults (TakesSession: false, Synchronous: false), so their dispatch behaviour is identical: today's "go onRPCMessage(...)" is now "go invokeRPCHandler(...)" — one extra stack frame, otherwise the same. No handler uses the new flags in this commit; that follows. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent e4cb21e commit 0326d1e

2 files changed

Lines changed: 68 additions & 22 deletions

File tree

jsonrpc.go

Lines changed: 63 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ func writeJSONRPCEvent(event string, params any, session *Session) {
9999
}
100100
}
101101

102+
// onRPCMessage parses a single RPC message off the per-session pump and
103+
// dispatches it. Runs synchronously on the pump goroutine; handlers
104+
// flagged Synchronous keep running there (so dequeue order is preserved
105+
// for ordering-sensitive RPCs like pauseVideo / resumeVideo), everything
106+
// else is dispatched in a fresh goroutine so a slow handler can't
107+
// head-of-line block the queue.
102108
func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
103109
var request JSONRPCRequest
104110
err := json.Unmarshal(message.Data, &request)
@@ -120,14 +126,6 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
120126
return
121127
}
122128

123-
scopedLogger := jsonRpcLogger.With().
124-
Str("method", request.Method).
125-
Interface("params", request.Params).
126-
Interface("id", request.ID).Logger()
127-
128-
scopedLogger.Trace().Msg("Received RPC request")
129-
t := time.Now()
130-
131129
// pauseVideo / resumeVideo are session-bound notifications: they
132130
// toggle this session's slot in the video stream refcount (see
133131
// video.go). Handled inline because the generic dispatcher doesn't
@@ -158,7 +156,26 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
158156
return
159157
}
160158

161-
result, err := callRPCHandler(scopedLogger, handler, request.Params)
159+
if handler.Synchronous {
160+
invokeRPCHandler(request, handler, session)
161+
} else {
162+
go invokeRPCHandler(request, handler, session)
163+
}
164+
}
165+
166+
// invokeRPCHandler runs a single RPC handler and writes its response back
167+
// to the session. Called either inline on the pump (Synchronous handlers)
168+
// or from a per-message goroutine (the default).
169+
func invokeRPCHandler(request JSONRPCRequest, handler RPCHandler, session *Session) {
170+
scopedLogger := jsonRpcLogger.With().
171+
Str("method", request.Method).
172+
Interface("params", request.Params).
173+
Interface("id", request.ID).Logger()
174+
175+
scopedLogger.Trace().Msg("Received RPC request")
176+
t := time.Now()
177+
178+
result, err := callRPCHandler(scopedLogger, handler, session, request.Params)
162179
if err != nil {
163180
scopedLogger.Error().Err(err).Msg("Error calling RPC handler")
164181
errorResponse := JSONRPCResponse{
@@ -529,10 +546,25 @@ type RPCHandler struct {
529546
Func any
530547
Params []string
531548
OptionalParams []string
549+
550+
// TakesSession: the handler's first parameter is *Session and the
551+
// dispatcher injects the receiving session into it. Used by
552+
// session-bound RPCs (e.g. pauseVideo / resumeVideo) that must act
553+
// on the session whose data channel delivered the message, not on
554+
// the global currentSession.
555+
TakesSession bool
556+
557+
// Synchronous: the handler runs inline on the per-session rpcQueue
558+
// pump goroutine rather than in a fresh per-message goroutine. Use
559+
// for ordering-sensitive handlers — pause/resume toggle a shared
560+
// global refcount and the Go scheduler can otherwise reorder a
561+
// tight pause→resume pair. Slow or blocking handlers MUST stay
562+
// async (the default).
563+
Synchronous bool
532564
}
533565

534566
// call the handler but recover from a panic to ensure our RPC thread doesn't collapse on malformed calls
535-
func callRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string]any) (result any, err error) {
567+
func callRPCHandler(logger zerolog.Logger, handler RPCHandler, session *Session, params map[string]any) (result any, err error) {
536568
// Use defer to recover from a panic
537569
defer func() {
538570
if r := recover(); r != nil {
@@ -546,11 +578,11 @@ func callRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string
546578
}()
547579

548580
// Call the handler
549-
result, err = riskyCallRPCHandler(logger, handler, params)
581+
result, err = riskyCallRPCHandler(logger, handler, session, params)
550582
return result, err // do not combine these two lines into one, as it breaks the above defer function's setting of err
551583
}
552584

553-
func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string]any) (any, error) {
585+
func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, session *Session, params map[string]any) (any, error) {
554586
handlerValue := reflect.ValueOf(handler.Func)
555587
handlerType := handlerValue.Type()
556588

@@ -561,8 +593,12 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
561593
numParams := handlerType.NumIn()
562594
allParamNames := append(handler.Params, handler.OptionalParams...) //nolint:gocritic
563595

564-
if len(allParamNames) != numParams {
565-
err := fmt.Errorf("mismatch between handler parameters (%d) and defined parameter names (%d)", numParams, len(allParamNames))
596+
expectedSlots := len(allParamNames)
597+
if handler.TakesSession {
598+
expectedSlots++
599+
}
600+
if expectedSlots != numParams {
601+
err := fmt.Errorf("mismatch between handler parameters (%d) and defined parameter names (%d)", numParams, expectedSlots)
566602
logger.Error().Strs("paramNames", allParamNames).Err(err).Msg("Cannot call RPC handler")
567603
return nil, err
568604
}
@@ -573,14 +609,21 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
573609
}
574610

575611
args := make([]reflect.Value, numParams)
612+
// Reflective params start after the injected *Session slot, if any.
613+
argOffset := 0
614+
if handler.TakesSession {
615+
args[0] = reflect.ValueOf(session)
616+
argOffset = 1
617+
}
576618

577-
for i := range numParams {
578-
paramType := handlerType.In(i)
619+
for i := range len(allParamNames) {
620+
paramType := handlerType.In(i + argOffset)
579621
paramName := allParamNames[i]
580622
paramValue, ok := params[paramName]
623+
argIdx := i + argOffset
581624
if !ok {
582625
if optionalSet[paramName] {
583-
args[i] = reflect.Zero(paramType)
626+
args[argIdx] = reflect.Zero(paramType)
584627
continue
585628
}
586629
err := fmt.Errorf("missing parameter: %s", paramName)
@@ -614,7 +657,7 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
614657
newSlice.Index(j).Set(elemValue.Convert(paramType.Elem()))
615658
}
616659
}
617-
args[i] = newSlice
660+
args[argIdx] = newSlice
618661
} else if paramType.Kind() == reflect.Struct && convertedValue.Kind() == reflect.Map {
619662
jsonData, err := json.Marshal(convertedValue.Interface())
620663
if err != nil {
@@ -625,12 +668,12 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
625668
if err := json.Unmarshal(jsonData, newStruct); err != nil {
626669
return nil, fmt.Errorf("failed to unmarshal JSON into struct: %v for parameter %s", err, paramName)
627670
}
628-
args[i] = reflect.ValueOf(newStruct).Elem()
671+
args[argIdx] = reflect.ValueOf(newStruct).Elem()
629672
} else {
630673
return nil, fmt.Errorf("invalid parameter type for: %s, type: %s", paramName, paramType.Kind())
631674
}
632675
} else {
633-
args[i] = convertedValue.Convert(paramType)
676+
args[argIdx] = convertedValue.Convert(paramType)
634677
}
635678
}
636679

webrtc.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,10 @@ func newSession(config SessionConfig) (*Session, error) {
532532

533533
rpcQueue := session.rpcQueue
534534
go func() {
535+
// onRPCMessage runs synchronously on this pump goroutine so
536+
// handlers flagged Synchronous (pause/resume) keep their
537+
// dequeue order. The dispatcher spawns its own goroutine for
538+
// every async handler internally.
535539
for {
536540
select {
537541
case <-session.done:
@@ -543,8 +547,7 @@ func newSession(config SessionConfig) (*Session, error) {
543547
case <-session.done:
544548
return
545549
case msg := <-rpcQueue:
546-
// TODO: only use goroutine if the task is asynchronous
547-
go onRPCMessage(msg, session)
550+
onRPCMessage(msg, session)
548551
}
549552
}
550553
}()

0 commit comments

Comments
 (0)