Skip to content

Commit a31c82a

Browse files
mcuelenaereclaude
andcommitted
feat(video): refcount the capture pipeline; expose pauseVideo / resumeVideo
Introduces acquireVideoStream / releaseVideoStream / videoStreamHasConsumers helpers in video.go that gate the native encoder's VideoStart / VideoStop and the HDMI sleep ticker on a set of named consumers. The encoder runs iff at least one consumer holds it; the 0→1 transition runs VideoStart so the first delivered frame is an IDR. Each WebRTC Session gets a stable id (uuid in newSession) and its own consumer key "webrtc:<id>". The slot is acquired when ICE reaches Connected and released when it reaches Closed. pauseVideo / resumeVideo JSON-RPC notifications toggle the calling session's own slot, handled inline in onRPCMessage so the dispatcher's session reference is in scope (the generic reflection-based dispatch doesn't pass *Session). This data model fixes by construction the two race classes Cursor Bugbot flagged on earlier drafts of this PR: - Handover-while-paused: the new session's own acquire is what starts the encoder; no special-case "restart if the outgoing session was paused" code at the handover site. - Stale-session pause: a pauseVideo from the soon-to-close session releases only its own slot, not the new session's; no need for a session != currentSession gate. The 1s handover overlap can briefly deliver frames to a paused session's track if the other session is keeping the encoder alive; bounded by the existing peer-connection close delay and acceptable for a feature aimed at sustained idle bandwidth saving. The refcount helpers are byte-identical to those in PR #1447 (commit dec25e1), so that PR will merge cleanly on top. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 79c23ec commit a31c82a

3 files changed

Lines changed: 100 additions & 8 deletions

File tree

jsonrpc.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,22 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
124124
scopedLogger.Trace().Msg("Received RPC request")
125125
t := time.Now()
126126

127+
// pauseVideo / resumeVideo are session-bound notifications: they
128+
// toggle this session's slot in the video stream refcount (see
129+
// video.go). Handled inline because the generic dispatcher doesn't
130+
// pass *Session, and acting on currentSession instead of the
131+
// receiving session would let a stale data channel mis-target the
132+
// active one during the 1s handover overlap. Both helpers are
133+
// idempotent so rapid pause/resume bursts are safe.
134+
switch request.Method {
135+
case "pauseVideo":
136+
releaseVideoStream(session.videoConsumerKey())
137+
return
138+
case "resumeVideo":
139+
acquireVideoStream(session.videoConsumerKey())
140+
return
141+
}
142+
127143
handler, ok := rpcHandlers[request.Method]
128144
if !ok {
129145
errorResponse := JSONRPCResponse{

video.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,63 @@ import (
66
"time"
77

88
"github.com/jetkvm/kvm/internal/native"
9+
"github.com/jetkvm/kvm/internal/sync"
910
)
1011

1112
var (
1213
lastVideoState native.VideoState
1314
videoSleepModeCtx context.Context
1415
videoSleepModeCancel context.CancelFunc
16+
17+
videoConsumersMu sync.Mutex
18+
videoConsumers = map[string]struct{}{}
1519
)
1620

21+
// acquireVideoStream registers a named consumer of the capture pipeline.
22+
// The first acquirer starts the native video stream and pauses the HDMI
23+
// sleep ticker; subsequent acquirers from different consumers are recorded
24+
// without touching the underlying stream. Idempotent per consumer key —
25+
// re-acquiring an already-held key is a no-op.
26+
func acquireVideoStream(consumer string) {
27+
videoConsumersMu.Lock()
28+
defer videoConsumersMu.Unlock()
29+
30+
if _, exists := videoConsumers[consumer]; exists {
31+
return
32+
}
33+
videoConsumers[consumer] = struct{}{}
34+
if len(videoConsumers) == 1 {
35+
_ = nativeInstance.VideoStart()
36+
stopVideoSleepModeTicker()
37+
}
38+
}
39+
40+
// releaseVideoStream unregisters a consumer. When the last consumer is
41+
// released, the native video stream is stopped and the HDMI sleep ticker
42+
// is restarted. Idempotent — releasing an unknown key is a no-op.
43+
func releaseVideoStream(consumer string) {
44+
videoConsumersMu.Lock()
45+
defer videoConsumersMu.Unlock()
46+
47+
if _, exists := videoConsumers[consumer]; !exists {
48+
return
49+
}
50+
delete(videoConsumers, consumer)
51+
if len(videoConsumers) == 0 {
52+
_ = nativeInstance.VideoStop()
53+
startVideoSleepModeTicker()
54+
}
55+
}
56+
57+
// videoStreamHasConsumers reports whether any consumer currently holds the
58+
// capture pipeline open. The HDMI sleep ticker uses this to decide whether
59+
// it is safe to put the capture chip to sleep.
60+
func videoStreamHasConsumers() bool {
61+
videoConsumersMu.Lock()
62+
defer videoConsumersMu.Unlock()
63+
return len(videoConsumers) > 0
64+
}
65+
1766
const (
1867
defaultVideoSleepModeDuration = 1 * time.Minute
1968
)
@@ -111,8 +160,8 @@ func doVideoSleepModeTicker(ctx context.Context, duration time.Duration) {
111160
for {
112161
select {
113162
case <-timer.C:
114-
if getActiveSessions() > 0 {
115-
nativeLogger.Warn().Msg("not going to enter HDMI sleep mode because there are active sessions")
163+
if videoStreamHasConsumers() {
164+
nativeLogger.Warn().Msg("not going to enter HDMI sleep mode because the capture pipeline has consumers")
116165
continue
117166
}
118167

webrtc.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,18 @@ import (
1717
"github.com/coder/websocket"
1818
"github.com/coder/websocket/wsjson"
1919
"github.com/gin-gonic/gin"
20+
"github.com/google/uuid"
2021
"github.com/pion/ice/v4"
2122
"github.com/pion/webrtc/v4"
2223
"github.com/rs/zerolog"
2324
)
2425

2526
type Session struct {
27+
// id is a stable per-session identifier used as the suffix of the
28+
// "webrtc:<id>" consumer key for the video stream refcount in video.go.
29+
// Generated in newSession; never reused.
30+
id string
31+
2632
peerConnection *webrtc.PeerConnection
2733
VideoTrack *webrtc.TrackLocalStaticSample
2834
ControlChannel *webrtc.DataChannel
@@ -44,6 +50,13 @@ type Session struct {
4450
codecMimeType string
4551
}
4652

53+
// videoConsumerKey is the key this session holds in the video stream
54+
// refcount (see video.go). Acquired when ICE reaches Connected, released
55+
// on Closed; pauseVideo / resumeVideo toggle it for this session only.
56+
func (s *Session) videoConsumerKey() string {
57+
return "webrtc:" + s.id
58+
}
59+
4760
var (
4861
actionSessions int = 0
4962
activeSessionsMutex = &sync.Mutex{}
@@ -354,7 +367,10 @@ func newSession(config SessionConfig) (*Session, error) {
354367
return nil, err
355368
}
356369

357-
session := &Session{peerConnection: peerConnection}
370+
session := &Session{
371+
id: uuid.New().String(),
372+
peerConnection: peerConnection,
373+
}
358374
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
359375
session.initQueues()
360376
session.initKeysDownStateQueue()
@@ -435,6 +451,11 @@ func newSession(config SessionConfig) (*Session, error) {
435451
if incrActiveSessions() == 1 {
436452
onFirstSessionConnected()
437453
}
454+
// Per-session slot in the video stream refcount; the
455+
// 0→1 transition (first consumer overall) starts the
456+
// encoder, so the first frame after a fresh acquire
457+
// is an IDR.
458+
acquireVideoStream(session.videoConsumerKey())
438459
if mqttManager != nil {
439460
mqttManager.publishSessionsState()
440461
}
@@ -481,6 +502,10 @@ func newSession(config SessionConfig) (*Session, error) {
481502
}
482503
if isConnected {
483504
isConnected = false
505+
// Drop our slot in the video stream refcount. Idempotent —
506+
// if pauseVideo already released it the call is a no-op.
507+
// On the N→0 transition the encoder is stopped.
508+
releaseVideoStream(session.videoConsumerKey())
484509
onActiveSessionsChanged()
485510
if decrActiveSessions() == 0 {
486511
scopedLogger.Info().Msg("last session disconnected, stopping video stream")
@@ -502,18 +527,20 @@ func onActiveSessionsChanged() {
502527

503528
func onFirstSessionConnected() {
504529
notifyFailsafeMode(currentSession)
530+
// The video stream's start/stop lifecycle is owned by the refcount in
531+
// video.go (acquireVideoStream / releaseVideoStream). What stays here
532+
// is the global codec selection: it has to happen before the encoder
533+
// is started by the first acquire that follows this function.
505534
if currentSession != nil && currentSession.codecMimeType == webrtc.MimeTypeH265 {
506535
_ = nativeInstance.VideoSetCodecType(1)
507536
} else {
508537
_ = nativeInstance.VideoSetCodecType(0)
509538
}
510-
_ = nativeInstance.VideoStart()
511-
stopVideoSleepModeTicker()
512539
}
513540

514541
func onLastSessionDisconnected() {
515-
// Safety net: ensure all keys are released when the last session disconnects
542+
// Safety net: ensure all keys are released when the last session disconnects.
543+
// VideoStop / sleep ticker are owned by releaseVideoStream (called when each
544+
// session's ICE state reaches Closed, just above this).
516545
_ = rpcKeyboardReport(0, keyboardClearStateKeys)
517-
_ = nativeInstance.VideoStop()
518-
startVideoSleepModeTicker()
519546
}

0 commit comments

Comments
 (0)