-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathapi.go
More file actions
363 lines (312 loc) · 13.6 KB
/
api.go
File metadata and controls
363 lines (312 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
package api
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"sync"
"time"
"github.com/kernel/kernel-images/server/lib/capturesession"
"github.com/kernel/kernel-images/server/lib/cdpmonitor"
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
"github.com/kernel/kernel-images/server/lib/events"
"github.com/kernel/kernel-images/server/lib/logger"
"github.com/kernel/kernel-images/server/lib/nekoclient"
oapi "github.com/kernel/kernel-images/server/lib/oapi"
"github.com/kernel/kernel-images/server/lib/policy"
"github.com/kernel/kernel-images/server/lib/recorder"
"github.com/kernel/kernel-images/server/lib/scaletozero"
)
type cdpMonitorController interface {
Start(ctx context.Context) error
Stop()
IsRunning() bool
}
var _ cdpMonitorController = (*cdpmonitor.Monitor)(nil)
type ApiService struct {
// defaultRecorderID is used whenever the caller doesn't specify an explicit ID.
defaultRecorderID string
recordManager recorder.RecordManager
factory recorder.FFmpegRecorderFactory
// Filesystem watch management
watchMu sync.RWMutex
watches map[string]*fsWatch
// Process management
procMu sync.RWMutex
procs map[string]*processHandle
// Neko authenticated client
nekoAuthClient *nekoclient.AuthClient
// DevTools upstream manager (Chromium supervisord log tailer)
upstreamMgr *devtoolsproxy.UpstreamManager
stz scaletozero.PinnedController
// inputMu serializes input-related operations (mouse, keyboard, screenshot)
inputMu sync.Mutex
// playwrightMu serializes Playwright code execution (only one execution at a time)
playwrightMu sync.Mutex
// playwrightDaemonStarting is an atomic flag to prevent concurrent daemon starts
playwrightDaemonStarting int32
// playwrightDaemonCmd holds the daemon process for cleanup
playwrightDaemonCmd *exec.Cmd
// policy management
policy *policy.Policy
// viewportOverride stores the last viewport dimensions set via CDP so
// that getCurrentResolution can return consistent values even while
// Xvfb is restarting in the background.
viewportMu sync.RWMutex
viewportOverride *[3]int // [width, height, refreshRate] or nil
// cachedDisplayMode caches the result of detectDisplayMode since the
// display server type (xorg vs xvfb) does not change at runtime.
displayModeOnce sync.Once
displayModeVal string
// xvfbResizeMu serializes background Xvfb restarts to prevent races
// when multiple CDP fast-path resizes fire in quick succession.
xvfbResizeMu sync.Mutex
// CDP event pipeline and cdpMonitor.
eventStream *events.EventStream
captureSession *capturesession.CaptureSession
cdpMonitor cdpMonitorController
monitorMu sync.Mutex
lifecycleCtx context.Context
lifecycleCancel context.CancelFunc
}
var _ oapi.StrictServerInterface = (*ApiService)(nil)
func New(
recordManager recorder.RecordManager,
factory recorder.FFmpegRecorderFactory,
upstreamMgr *devtoolsproxy.UpstreamManager,
stz scaletozero.PinnedController,
nekoAuthClient *nekoclient.AuthClient,
captureSession *capturesession.CaptureSession,
eventStream *events.EventStream,
displayNum int,
) (*ApiService, error) {
switch {
case recordManager == nil:
return nil, fmt.Errorf("recordManager cannot be nil")
case factory == nil:
return nil, fmt.Errorf("factory cannot be nil")
case upstreamMgr == nil:
return nil, fmt.Errorf("upstreamMgr cannot be nil")
case nekoAuthClient == nil:
return nil, fmt.Errorf("nekoAuthClient cannot be nil")
case captureSession == nil:
return nil, fmt.Errorf("captureSession cannot be nil")
case eventStream == nil:
return nil, fmt.Errorf("eventStream cannot be nil")
}
mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default())
ctx, cancel := context.WithCancel(context.Background())
return &ApiService{
recordManager: recordManager,
factory: factory,
defaultRecorderID: "default",
watches: make(map[string]*fsWatch),
procs: make(map[string]*processHandle),
upstreamMgr: upstreamMgr,
stz: stz,
nekoAuthClient: nekoAuthClient,
policy: &policy.Policy{},
eventStream: eventStream,
captureSession: captureSession,
cdpMonitor: mon,
lifecycleCtx: ctx,
lifecycleCancel: cancel,
}, nil
}
func (s *ApiService) StartRecording(ctx context.Context, req oapi.StartRecordingRequestObject) (oapi.StartRecordingResponseObject, error) {
log := logger.FromContext(ctx)
var params recorder.FFmpegRecordingParams
if req.Body != nil {
params.FrameRate = req.Body.Framerate
params.MaxSizeInMB = req.Body.MaxFileSizeInMB
params.MaxDurationInSeconds = req.Body.MaxDurationInSeconds
}
// Determine recorder ID (use default if none provided)
recorderID := s.defaultRecorderID
if req.Body != nil && req.Body.Id != nil && *req.Body.Id != "" {
recorderID = *req.Body.Id
}
// Create, register, and start a new recorder
rec, err := s.factory(recorderID, params)
if err != nil {
log.Error("failed to create recorder", "err", err, "recorder_id", recorderID)
return oapi.StartRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to create recording"}}, nil
}
if err := s.recordManager.RegisterRecorder(ctx, rec); err != nil {
if rec, exists := s.recordManager.GetRecorder(recorderID); exists {
if rec.IsRecording(ctx) {
log.Error("attempted to start recording while one is already active", "recorder_id", recorderID)
return oapi.StartRecording409JSONResponse{ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{Message: "recording already in progress"}}, nil
} else {
log.Error("attempted to restart recording", "recorder_id", recorderID)
return oapi.StartRecording400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "recording already completed"}}, nil
}
}
log.Error("failed to register recorder", "err", err, "recorder_id", recorderID)
return oapi.StartRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to register recording"}}, nil
}
if err := rec.Start(ctx); err != nil {
log.Error("failed to start recording", "err", err, "recorder_id", recorderID)
// ensure the recorder is deregistered
defer s.recordManager.DeregisterRecorder(ctx, rec)
return oapi.StartRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start recording"}}, nil
}
return oapi.StartRecording201Response{}, nil
}
func (s *ApiService) StopRecording(ctx context.Context, req oapi.StopRecordingRequestObject) (oapi.StopRecordingResponseObject, error) {
log := logger.FromContext(ctx)
// Determine recorder ID
recorderID := s.defaultRecorderID
if req.Body != nil && req.Body.Id != nil && *req.Body.Id != "" {
recorderID = *req.Body.Id
}
rec, exists := s.recordManager.GetRecorder(recorderID)
if !exists {
log.Error("attempted to stop recording when none is active", "recorder_id", recorderID)
return oapi.StopRecording400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "no active recording to stop"}}, nil
}
// Always call Stop() even if IsRecording() is false. Recordings that exit naturally
// (max duration, max file size, etc.) finalize automatically, but Stop() is still
// needed to update scale-to-zero state and ensure clean shutdown.
// Check if force stop is requested
forceStop := false
if req.Body != nil && req.Body.ForceStop != nil {
forceStop = *req.Body.ForceStop
}
var err error
if forceStop {
log.Info("force stopping recording", "recorder_id", recorderID)
err = rec.ForceStop(ctx)
} else {
log.Info("gracefully stopping recording", "recorder_id", recorderID)
err = rec.Stop(ctx)
}
if err != nil {
log.Error("error occurred while stopping recording", "err", err, "force", forceStop, "recorder_id", recorderID)
}
return oapi.StopRecording200Response{}, nil
}
const (
minRecordingSizeInBytes = 100
)
func (s *ApiService) DownloadRecording(ctx context.Context, req oapi.DownloadRecordingRequestObject) (oapi.DownloadRecordingResponseObject, error) {
log := logger.FromContext(ctx)
// Determine recorder ID
recorderID := s.defaultRecorderID
if req.Params.Id != nil && *req.Params.Id != "" {
recorderID = *req.Params.Id
}
// Get the recorder to access its output path
rec, exists := s.recordManager.GetRecorder(recorderID)
if !exists {
log.Error("attempted to download non-existent recording", "recorder_id", recorderID)
return oapi.DownloadRecording404JSONResponse{NotFoundErrorJSONResponse: oapi.NotFoundErrorJSONResponse{Message: "no recording found"}}, nil
}
if rec.IsDeleted(ctx) {
log.Error("attempted to download deleted recording", "recorder_id", recorderID)
return oapi.DownloadRecording400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "requested recording has been deleted"}}, nil
}
out, meta, err := rec.Recording(ctx)
if err != nil {
if errors.Is(err, recorder.ErrRecordingFinalizing) {
// Wait for finalization to complete instead of asking client to retry
log.Info("waiting for recording finalization", "recorder_id", recorderID)
ffmpegRec, ok := rec.(*recorder.FFmpegRecorder)
if !ok {
log.Error("failed to cast recorder to FFmpegRecorder", "recorder_id", recorderID)
return oapi.DownloadRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "internal error"}}, nil
}
// WaitForFinalization blocks until finalization completes and returns the result
if finalizeErr := ffmpegRec.WaitForFinalization(ctx); finalizeErr != nil {
log.Error("finalization failed", "err", finalizeErr, "recorder_id", recorderID)
return oapi.DownloadRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to finalize recording"}}, nil
}
// Finalization complete, retry getting the recording
out, meta, err = rec.Recording(ctx)
if err != nil {
log.Error("failed to get recording after finalization", "err", err, "recorder_id", recorderID)
return oapi.DownloadRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to get recording"}}, nil
}
} else {
log.Error("failed to get recording", "err", err, "recorder_id", recorderID)
return oapi.DownloadRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to get recording"}}, nil
}
}
// short-circuit if the recording is still in progress and the file is arbitrary small
if rec.IsRecording(ctx) && meta.Size <= minRecordingSizeInBytes {
out.Close() // Close the file handle to prevent descriptor leak
return oapi.DownloadRecording202Response{
Headers: oapi.DownloadRecording202ResponseHeaders{
RetryAfter: 300,
},
}, nil
}
log.Info("serving recording file for download", "size", meta.Size, "recorder_id", recorderID)
return oapi.DownloadRecording200Videomp4Response{
Body: out,
Headers: oapi.DownloadRecording200ResponseHeaders{
XRecordingStartedAt: meta.StartTime.Format(time.RFC3339),
XRecordingFinishedAt: meta.EndTime.Format(time.RFC3339),
},
ContentLength: meta.Size,
}, nil
}
func (s *ApiService) DeleteRecording(ctx context.Context, req oapi.DeleteRecordingRequestObject) (oapi.DeleteRecordingResponseObject, error) {
log := logger.FromContext(ctx)
recorderID := s.defaultRecorderID
if req.Body != nil && req.Body.Id != nil && *req.Body.Id != "" {
recorderID = *req.Body.Id
}
rec, exists := s.recordManager.GetRecorder(recorderID)
if !exists {
log.Error("attempted to delete non-existent recording", "recorder_id", recorderID)
return oapi.DeleteRecording404JSONResponse{NotFoundErrorJSONResponse: oapi.NotFoundErrorJSONResponse{Message: "no recording found"}}, nil
}
if rec.IsRecording(ctx) {
log.Error("attempted to delete recording while still in progress", "recorder_id", recorderID)
return oapi.DeleteRecording400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "recording must be stopped first"}}, nil
}
if err := rec.Delete(ctx); err != nil {
if errors.Is(err, recorder.ErrRecordingFinalizing) {
log.Info("recording is being finalized, client should retry", "recorder_id", recorderID)
return oapi.DeleteRecording409JSONResponse{ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{Message: "recording is being finalized, please retry in a few seconds"}}, nil
}
if !errors.Is(err, os.ErrNotExist) {
log.Error("failed to delete recording", "err", err, "recorder_id", recorderID)
return oapi.DeleteRecording500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to delete recording"}}, nil
}
}
log.Info("recording deleted", "recorder_id", recorderID)
return oapi.DeleteRecording200Response{}, nil
}
// ListRecorders returns a list of all registered recorders and whether each one is currently recording.
func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequestObject) (oapi.ListRecordersResponseObject, error) {
infos := []oapi.RecorderInfo{}
timeOrNil := func(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}
recs := s.recordManager.ListActiveRecorders(ctx)
for _, r := range recs {
m := r.Metadata()
infos = append(infos, oapi.RecorderInfo{
Id: r.ID(),
IsRecording: r.IsRecording(ctx),
StartedAt: timeOrNil(m.StartTime),
FinishedAt: timeOrNil(m.EndTime),
})
}
return oapi.ListRecorders200JSONResponse(infos), nil
}
func (s *ApiService) Shutdown(ctx context.Context) error {
s.monitorMu.Lock()
s.lifecycleCancel()
s.cdpMonitor.Stop()
s.captureSession.Stop()
s.monitorMu.Unlock()
return s.recordManager.StopAll(ctx)
}