Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 65 additions & 20 deletions jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func writeJSONRPCEvent(event string, params any, session *Session) {
}
}

// onRPCMessage parses a single RPC message off the per-session pump and
// dispatches it. Runs synchronously on the pump goroutine; handlers
// flagged Synchronous keep running there (so dequeue order is preserved
// for ordering-sensitive RPCs like pauseVideo / resumeVideo), everything
// else is dispatched in a fresh goroutine so a slow handler can't
// head-of-line block the queue.
func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
var request JSONRPCRequest
err := json.Unmarshal(message.Data, &request)
Expand All @@ -120,14 +126,6 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
return
}

scopedLogger := jsonRpcLogger.With().
Str("method", request.Method).
Interface("params", request.Params).
Interface("id", request.ID).Logger()

scopedLogger.Trace().Msg("Received RPC request")
t := time.Now()

handler, ok := rpcHandlers[request.Method]
if !ok {
errorResponse := JSONRPCResponse{
Expand All @@ -142,7 +140,26 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
return
}

result, err := callRPCHandler(scopedLogger, handler, request.Params)
if handler.Synchronous {
invokeRPCHandler(request, handler, session)
} else {
go invokeRPCHandler(request, handler, session)
}
}

// invokeRPCHandler runs a single RPC handler and writes its response back
// to the session. Called either inline on the pump (Synchronous handlers)
// or from a per-message goroutine (the default).
func invokeRPCHandler(request JSONRPCRequest, handler RPCHandler, session *Session) {
scopedLogger := jsonRpcLogger.With().
Str("method", request.Method).
Interface("params", request.Params).
Interface("id", request.ID).Logger()

scopedLogger.Trace().Msg("Received RPC request")
t := time.Now()

result, err := callRPCHandler(scopedLogger, handler, session, request.Params)
if err != nil {
scopedLogger.Error().Err(err).Msg("Error calling RPC handler")
errorResponse := JSONRPCResponse{
Expand Down Expand Up @@ -524,10 +541,25 @@ type RPCHandler struct {
Func any
Params []string
OptionalParams []string

// TakesSession: the handler's first parameter is *Session and the
// dispatcher injects the receiving session into it. Used by
// session-bound RPCs (e.g. pauseVideo / resumeVideo) that must act
// on the session whose data channel delivered the message, not on
// the global currentSession.
TakesSession bool

// Synchronous: the handler runs inline on the per-session rpcQueue
// pump goroutine rather than in a fresh per-message goroutine. Use
// for ordering-sensitive handlers — pause/resume toggle a shared
// global refcount and the Go scheduler can otherwise reorder a
// tight pause→resume pair. Slow or blocking handlers MUST stay
// async (the default).
Synchronous bool
}

// call the handler but recover from a panic to ensure our RPC thread doesn't collapse on malformed calls
func callRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string]any) (result any, err error) {
func callRPCHandler(logger zerolog.Logger, handler RPCHandler, session *Session, params map[string]any) (result any, err error) {
// Use defer to recover from a panic
defer func() {
if r := recover(); r != nil {
Expand All @@ -541,11 +573,11 @@ func callRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string
}()

// Call the handler
result, err = riskyCallRPCHandler(logger, handler, params)
result, err = riskyCallRPCHandler(logger, handler, session, params)
return result, err // do not combine these two lines into one, as it breaks the above defer function's setting of err
}

func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[string]any) (any, error) {
func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, session *Session, params map[string]any) (any, error) {
handlerValue := reflect.ValueOf(handler.Func)
handlerType := handlerValue.Type()

Expand All @@ -556,8 +588,12 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
numParams := handlerType.NumIn()
allParamNames := append(handler.Params, handler.OptionalParams...) //nolint:gocritic

if len(allParamNames) != numParams {
err := fmt.Errorf("mismatch between handler parameters (%d) and defined parameter names (%d)", numParams, len(allParamNames))
expectedSlots := len(allParamNames)
if handler.TakesSession {
expectedSlots++
}
if expectedSlots != numParams {
err := fmt.Errorf("mismatch between handler parameters (%d) and defined parameter names (%d)", numParams, expectedSlots)
logger.Error().Strs("paramNames", allParamNames).Err(err).Msg("Cannot call RPC handler")
return nil, err
}
Expand All @@ -568,14 +604,21 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
}

args := make([]reflect.Value, numParams)
// Reflective params start after the injected *Session slot, if any.
argOffset := 0
if handler.TakesSession {
args[0] = reflect.ValueOf(session)
argOffset = 1
}

for i := range numParams {
paramType := handlerType.In(i)
for i := range len(allParamNames) {
paramType := handlerType.In(i + argOffset)
paramName := allParamNames[i]
paramValue, ok := params[paramName]
argIdx := i + argOffset
if !ok {
if optionalSet[paramName] {
args[i] = reflect.Zero(paramType)
args[argIdx] = reflect.Zero(paramType)
continue
}
err := fmt.Errorf("missing parameter: %s", paramName)
Expand Down Expand Up @@ -609,7 +652,7 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
newSlice.Index(j).Set(elemValue.Convert(paramType.Elem()))
}
}
args[i] = newSlice
args[argIdx] = newSlice
} else if paramType.Kind() == reflect.Struct && convertedValue.Kind() == reflect.Map {
jsonData, err := json.Marshal(convertedValue.Interface())
if err != nil {
Expand All @@ -620,12 +663,12 @@ func riskyCallRPCHandler(logger zerolog.Logger, handler RPCHandler, params map[s
if err := json.Unmarshal(jsonData, newStruct); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON into struct: %v for parameter %s", err, paramName)
}
args[i] = reflect.ValueOf(newStruct).Elem()
args[argIdx] = reflect.ValueOf(newStruct).Elem()
} else {
return nil, fmt.Errorf("invalid parameter type for: %s, type: %s", paramName, paramType.Kind())
}
} else {
args[i] = convertedValue.Convert(paramType)
args[argIdx] = convertedValue.Convert(paramType)
}
}

Expand Down Expand Up @@ -1347,6 +1390,8 @@ var rpcHandlers = map[string]RPCHandler{
"wheelReport": {Func: rpcWheelReport, Params: []string{"wheelY", "wheelX"}},
"wakeHost": {Func: rpcWakeHost},
"getVideoState": {Func: rpcGetVideoState},
"pauseVideo": {Func: rpcPauseVideo, TakesSession: true, Synchronous: true},
"resumeVideo": {Func: rpcResumeVideo, TakesSession: true, Synchronous: true},
"getUSBState": {Func: rpcGetUSBState},
"unmountImage": {Func: rpcUnmountImage},
"rpcMountBuiltInImage": {Func: rpcMountBuiltInImage, Params: []string{"filename"}},
Expand Down
24 changes: 24 additions & 0 deletions ui/src/components/WebRTCVideo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,30 @@ export default function WebRTCVideo({
[keyDownHandler, keyUpHandler, resetKeyboardState],
);

// Pause/resume the server-side video feed when the tab is hidden so we
// don't burn WAN bandwidth decoding-then-discarding frames the user
// can't see. The encoder is restarted on resume so the first frame is
// an IDR and decode is artifact-free.
useEffect(
function pauseVideoOnTabHidden() {
const sync = () => {
sendRpc(document.hidden ? "pauseVideo" : "resumeVideo", {});
};

// Sync once on mount in case the tab is already hidden when we
// (re)connect, then track every visibility change.
sync();

const abortController = new AbortController();
document.addEventListener("visibilitychange", sync, {
signal: abortController.signal,
});

return () => abortController.abort();
},
[sendRpc],
);

// Setup Video Event Listeners
useEffect(
function setupVideoEventListeners() {
Expand Down
68 changes: 66 additions & 2 deletions video.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,56 @@ var (
lastVideoState native.VideoState
videoSleepModeCtx context.Context
videoSleepModeCancel context.CancelFunc

videoConsumersMu sync.Mutex
videoConsumers = map[string]struct{}{}
)

// acquireVideoStream registers a named consumer of the capture pipeline.
// The first acquirer starts the native video stream and pauses the HDMI
// sleep ticker; subsequent acquirers from different consumers are recorded
// without touching the underlying stream. Idempotent per consumer key —
// re-acquiring an already-held key is a no-op.
func acquireVideoStream(consumer string) {
videoConsumersMu.Lock()
defer videoConsumersMu.Unlock()

if _, exists := videoConsumers[consumer]; exists {
return
}
videoConsumers[consumer] = struct{}{}
if len(videoConsumers) == 1 {
_ = nativeInstance.VideoStart()
stopVideoSleepModeTicker()
}
}

// releaseVideoStream unregisters a consumer. When the last consumer is
// released, the native video stream is stopped and the HDMI sleep ticker
// is restarted. Idempotent — releasing an unknown key is a no-op.
func releaseVideoStream(consumer string) {
videoConsumersMu.Lock()
defer videoConsumersMu.Unlock()

if _, exists := videoConsumers[consumer]; !exists {
return
}
delete(videoConsumers, consumer)
if len(videoConsumers) == 0 {
_ = nativeInstance.VideoStop()
startVideoSleepModeTicker()
}
}

// videoStreamHasConsumers reports whether any consumer currently holds the
// capture pipeline open. The HDMI sleep ticker uses this to decide whether
// it is safe to put the capture chip to sleep.
func videoStreamHasConsumers() bool {
videoConsumersMu.Lock()
defer videoConsumersMu.Unlock()
return len(videoConsumers) > 0
}

const (
defaultVideoSleepModeDuration = 1 * time.Minute
)
Expand Down Expand Up @@ -109,6 +157,22 @@ func updateHostDisplayAdvertisement(reason string, force bool) error {
return setHostDisplayAdvertisedLocked(shouldAdvertiseHostDisplayLocked(), reason, force)
}

// rpcPauseVideo releases this session's slot in the video stream
// refcount. Registered with TakesSession + Synchronous so it acts on
// the receiving session and can't be reordered relative to a
// resumeVideo from the same source.
func rpcPauseVideo(s *Session) error {
releaseVideoStream(s.videoConsumerKey())
return nil
}

// rpcResumeVideo re-acquires this session's slot. See rpcPauseVideo for
// the dispatcher flags this relies on.
func rpcResumeVideo(s *Session) error {
acquireVideoStream(s.videoConsumerKey())
return nil
}

type rpcVideoSleepModeResponse struct {
Supported bool `json:"supported"`
Enabled bool `json:"enabled"`
Expand Down Expand Up @@ -184,8 +248,8 @@ func doVideoSleepModeTicker(ctx context.Context, duration time.Duration) {
for {
select {
case <-timer.C:
if getActiveSessions() > 0 {
nativeLogger.Warn().Msg("not going to enter HDMI sleep mode because there are active sessions")
if videoStreamHasConsumers() {
nativeLogger.Warn().Msg("not going to enter HDMI sleep mode because the capture pipeline has consumers")
continue
}

Expand Down
Loading
Loading