Skip to content

Commit 43ddda6

Browse files
authored
[kernel-1116] browser events: add external events (#227)
As per plan -> #217 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Introduces new public event ingestion/streaming endpoints and refactors the core event/capture-session pipeline (seq semantics and envelope shape), which could impact existing consumers and event durability/ordering behavior. > > **Overview** > Adds **external event support** via new API endpoints: `POST /events/publish` to inject validated events into the server’s event bus and `GET /events/stream` to consume envelopes over SSE with `Last-Event-ID` resume and keepalives. > > Refactors the event pipeline by splitting responsibilities into a process-wide `events.EventStream` (ring buffer + process-monotonic sequence) and a new `capturesession.CaptureSession` (session lifecycle + category filtering + stamping `capture_session_id` into `event.source.metadata`), removing the old `events.CaptureSession` and file-writing path. The OpenAPI-generated types/handlers are updated accordingly (new schemas, renamed capture category enums, SSE responses now set `Cache-Control: no-cache` and `X-Accel-Buffering: no`), along with build tooling tweaks to patch the additional SSE method. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit bb38112. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 3b4ec09 commit 43ddda6

19 files changed

Lines changed: 1701 additions & 760 deletions

server/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ oapi-generate:
1919
openapi-down-convert --input openapi.yaml --output openapi-3.0.yaml
2020
go tool oapi-codegen -config ./oapi-codegen.yaml ./openapi-3.0.yaml
2121
@echo "Fixing oapi-codegen issue https://github.com/oapi-codegen/oapi-codegen/issues/1764..."
22-
go run ./scripts/oapi/patch_sse_methods -file ./lib/oapi/oapi.go -expected-replacements 3
22+
go run ./scripts/oapi/patch_sse_methods -file ./lib/oapi/oapi.go -expected-replacements 4
2323
go fmt ./lib/oapi/oapi.go
2424
go mod tidy
2525

server/cmd/api/api/api.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/kernel/kernel-images/server/lib/capturesession"
1314
"github.com/kernel/kernel-images/server/lib/cdpmonitor"
1415
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
1516
"github.com/kernel/kernel-images/server/lib/events"
@@ -81,7 +82,8 @@ type ApiService struct {
8182
xvfbResizeMu sync.Mutex
8283

8384
// CDP event pipeline and cdpMonitor.
84-
captureSession *events.CaptureSession
85+
eventStream *events.EventStream
86+
captureSession *capturesession.CaptureSession
8587
cdpMonitor cdpMonitorController
8688
monitorMu sync.Mutex
8789
lifecycleCtx context.Context
@@ -96,7 +98,8 @@ func New(
9698
upstreamMgr *devtoolsproxy.UpstreamManager,
9799
stz scaletozero.Controller,
98100
nekoAuthClient *nekoclient.AuthClient,
99-
captureSession *events.CaptureSession,
101+
captureSession *capturesession.CaptureSession,
102+
eventStream *events.EventStream,
100103
displayNum int,
101104
) (*ApiService, error) {
102105
switch {
@@ -110,6 +113,8 @@ func New(
110113
return nil, fmt.Errorf("nekoAuthClient cannot be nil")
111114
case captureSession == nil:
112115
return nil, fmt.Errorf("captureSession cannot be nil")
116+
case eventStream == nil:
117+
return nil, fmt.Errorf("eventStream cannot be nil")
113118
}
114119

115120
mon := cdpmonitor.New(upstreamMgr, captureSession.Publish, displayNum, slog.Default())
@@ -125,6 +130,7 @@ func New(
125130
stz: stz,
126131
nekoAuthClient: nekoAuthClient,
127132
policy: &policy.Policy{},
133+
eventStream: eventStream,
128134
captureSession: captureSession,
129135
cdpMonitor: mon,
130136
lifecycleCtx: ctx,
@@ -352,7 +358,6 @@ func (s *ApiService) Shutdown(ctx context.Context) error {
352358
s.lifecycleCancel()
353359
s.cdpMonitor.Stop()
354360
s.captureSession.Stop()
355-
_ = s.captureSession.Close()
356361
s.monitorMu.Unlock()
357362
return s.recordManager.StopAll(ctx)
358363
}

server/cmd/api/api/api_test.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"log/slog"
1313

14+
"github.com/kernel/kernel-images/server/lib/capturesession"
1415
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
1516
"github.com/kernel/kernel-images/server/lib/events"
1617
"github.com/kernel/kernel-images/server/lib/nekoclient"
@@ -26,7 +27,7 @@ func TestApiService_StartRecording(t *testing.T) {
2627

2728
t.Run("success", func(t *testing.T) {
2829
mgr := recorder.NewFFmpegManager()
29-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
30+
svc, err := newSvc(t, mgr)
3031
require.NoError(t, err)
3132

3233
resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{})
@@ -40,7 +41,7 @@ func TestApiService_StartRecording(t *testing.T) {
4041

4142
t.Run("already recording", func(t *testing.T) {
4243
mgr := recorder.NewFFmpegManager()
43-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
44+
svc, err := newSvc(t, mgr)
4445
require.NoError(t, err)
4546

4647
// First start should succeed
@@ -55,7 +56,7 @@ func TestApiService_StartRecording(t *testing.T) {
5556

5657
t.Run("custom ids don't collide", func(t *testing.T) {
5758
mgr := recorder.NewFFmpegManager()
58-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
59+
svc, err := newSvc(t, mgr)
5960
require.NoError(t, err)
6061

6162
for i := 0; i < 5; i++ {
@@ -88,7 +89,7 @@ func TestApiService_StopRecording(t *testing.T) {
8889

8990
t.Run("no active recording", func(t *testing.T) {
9091
mgr := recorder.NewFFmpegManager()
91-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
92+
svc, err := newSvc(t, mgr)
9293
require.NoError(t, err)
9394

9495
resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
@@ -101,7 +102,7 @@ func TestApiService_StopRecording(t *testing.T) {
101102
rec := &mockRecorder{id: "default", isRecordingFlag: true}
102103
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
103104

104-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
105+
svc, err := newSvc(t, mgr)
105106
require.NoError(t, err)
106107
resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{})
107108
require.NoError(t, err)
@@ -116,7 +117,7 @@ func TestApiService_StopRecording(t *testing.T) {
116117

117118
force := true
118119
req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}}
119-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
120+
svc, err := newSvc(t, mgr)
120121
require.NoError(t, err)
121122
resp, err := svc.StopRecording(ctx, req)
122123
require.NoError(t, err)
@@ -130,7 +131,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
130131

131132
t.Run("not found", func(t *testing.T) {
132133
mgr := recorder.NewFFmpegManager()
133-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
134+
svc, err := newSvc(t, mgr)
134135
require.NoError(t, err)
135136
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
136137
require.NoError(t, err)
@@ -150,7 +151,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
150151
rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)}
151152
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
152153

153-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
154+
svc, err := newSvc(t, mgr)
154155
require.NoError(t, err)
155156
// will return a 202 when the recording is too small
156157
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
@@ -180,7 +181,7 @@ func TestApiService_DownloadRecording(t *testing.T) {
180181
rec := &mockRecorder{id: "default", recordingData: data}
181182
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
182183

183-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
184+
svc, err := newSvc(t, mgr)
184185
require.NoError(t, err)
185186
resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{})
186187
require.NoError(t, err)
@@ -200,7 +201,7 @@ func TestApiService_Shutdown(t *testing.T) {
200201
rec := &mockRecorder{id: "default", isRecordingFlag: true}
201202
require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder")
202203

203-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
204+
svc, err := newSvc(t, mgr)
204205
require.NoError(t, err)
205206

206207
require.NoError(t, svc.Shutdown(ctx))
@@ -304,23 +305,26 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
304305
return client
305306
}
306307

307-
func newCaptureSession(t *testing.T) *events.CaptureSession {
308+
func newCaptureSession(t *testing.T) (*capturesession.CaptureSession, *events.EventStream) {
308309
t.Helper()
309-
cs, err := events.NewCaptureSession(events.CaptureSessionConfig{
310-
LogDir: t.TempDir(),
311-
RingCapacity: 64,
312-
})
310+
es, err := events.NewEventStream(events.EventStreamConfig{RingCapacity: 64})
313311
if err != nil {
314312
t.Fatal(err)
315313
}
316-
t.Cleanup(func() { cs.Close() })
317-
return cs
314+
return capturesession.NewCaptureSession(es), es
315+
}
316+
317+
// newSvc constructs an ApiService with a fresh capture session and event stream.
318+
func newSvc(t *testing.T, mgr recorder.RecordManager) (*ApiService, error) {
319+
t.Helper()
320+
cs, es := newCaptureSession(t)
321+
return New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), cs, es, 0)
318322
}
319323

320324
func TestApiService_PatchChromiumFlags(t *testing.T) {
321325
ctx := context.Background()
322326
mgr := recorder.NewFFmpegManager()
323-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
327+
svc, err := newSvc(t, mgr)
324328
require.NoError(t, err)
325329

326330
// Test with valid flags

server/cmd/api/api/capture_session.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/nrednav/cuid2"
99
oapi "github.com/kernel/kernel-images/server/lib/oapi"
1010

11+
"github.com/kernel/kernel-images/server/lib/capturesession"
1112
"github.com/kernel/kernel-images/server/lib/events"
1213
"github.com/kernel/kernel-images/server/lib/logger"
1314
)
@@ -119,26 +120,26 @@ func (s *ApiService) buildSessionResponse() oapi.CaptureSession {
119120
}
120121

121122
// captureConfigFrom converts the optional StartCaptureSessionRequest body
122-
// into an events.CaptureConfig.
123-
func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (events.CaptureConfig, error) {
123+
// into a capturesession.CaptureConfig.
124+
func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (capturesession.CaptureConfig, error) {
124125
if body == nil {
125-
return events.CaptureConfig{}, nil
126+
return capturesession.CaptureConfig{}, nil
126127
}
127128
return captureConfigFromOAPI(body.Config)
128129
}
129130

130-
// captureConfigFromOAPI converts an oapi.CaptureConfig to events.CaptureConfig.
131-
func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (events.CaptureConfig, error) {
131+
// captureConfigFromOAPI converts an oapi.CaptureConfig to capturesession.CaptureConfig.
132+
func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (capturesession.CaptureConfig, error) {
132133
if cfg == nil || cfg.Categories == nil {
133-
return events.CaptureConfig{}, nil
134+
return capturesession.CaptureConfig{}, nil
134135
}
135-
out := events.CaptureConfig{
136+
out := capturesession.CaptureConfig{
136137
Categories: make([]events.EventCategory, 0, len(*cfg.Categories)),
137138
}
138139
for _, c := range *cfg.Categories {
139140
cat := events.EventCategory(c)
140141
if !events.ValidCategory(cat) {
141-
return events.CaptureConfig{}, fmt.Errorf("unknown category: %q", c)
142+
return capturesession.CaptureConfig{}, fmt.Errorf("unknown category: %q", c)
142143
}
143144
out.Categories = append(out.Categories, cat)
144145
}

server/cmd/api/api/capture_session_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestCaptureConfigFrom(t *testing.T) {
1919
})
2020

2121
t.Run("valid categories", func(t *testing.T) {
22-
cats := []oapi.CaptureConfigCategories{oapi.Console, oapi.Network}
22+
cats := []oapi.CaptureConfigCategories{oapi.CaptureConfigCategoriesConsole, oapi.CaptureConfigCategoriesNetwork}
2323
body := &oapi.StartCaptureSessionRequest{
2424
Config: &oapi.CaptureConfig{Categories: &cats},
2525
}
@@ -64,7 +64,7 @@ func TestStartCaptureSession(t *testing.T) {
6464

6565
t.Run("success with config", func(t *testing.T) {
6666
svc := newTestService(t, newMockRecordManager())
67-
cats := []oapi.CaptureConfigCategories{oapi.Console}
67+
cats := []oapi.CaptureConfigCategories{oapi.CaptureConfigCategoriesConsole}
6868
resp, err := svc.StartCaptureSession(ctx, oapi.StartCaptureSessionRequestObject{
6969
Body: &oapi.StartCaptureSessionRequest{
7070
Config: &oapi.CaptureConfig{Categories: &cats},
@@ -141,7 +141,7 @@ func TestUpdateCaptureSession(t *testing.T) {
141141
_, err := svc.StartCaptureSession(ctx, oapi.StartCaptureSessionRequestObject{})
142142
require.NoError(t, err)
143143

144-
cats := []oapi.CaptureConfigCategories{oapi.Console}
144+
cats := []oapi.CaptureConfigCategories{oapi.CaptureConfigCategoriesConsole}
145145
resp, err := svc.UpdateCaptureSession(ctx, oapi.UpdateCaptureSessionRequestObject{
146146
Body: &oapi.UpdateCaptureSessionRequest{
147147
Config: &oapi.CaptureConfig{Categories: &cats},
@@ -152,7 +152,7 @@ func TestUpdateCaptureSession(t *testing.T) {
152152
require.True(t, ok)
153153
require.NotNil(t, r200.Config.Categories)
154154
assert.Len(t, *r200.Config.Categories, 1)
155-
assert.Equal(t, oapi.Console, (*r200.Config.Categories)[0])
155+
assert.Equal(t, oapi.CaptureConfigCategoriesConsole, (*r200.Config.Categories)[0])
156156
})
157157

158158
t.Run("empty body is no-op", func(t *testing.T) {
@@ -246,7 +246,8 @@ func (m *mockRecordManager) StopAll(_ context.Context) error
246246
// newTestService builds an ApiService with minimal dependencies for capture session tests.
247247
func newTestService(t *testing.T, mgr recorder.RecordManager) *ApiService {
248248
t.Helper()
249-
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
249+
cs, es := newCaptureSession(t)
250+
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), cs, es, 0)
250251
require.NoError(t, err)
251252
svc.cdpMonitor = &stubCdpMonitor{}
252253
return svc

server/cmd/api/api/display_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ func testFFmpegFactory(t *testing.T, tempDir string) recorder.FFmpegRecorderFact
3434

3535
func newTestServiceWithFactory(t *testing.T, mgr recorder.RecordManager, factory recorder.FFmpegRecorderFactory) *ApiService {
3636
t.Helper()
37-
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
37+
cs, es := newCaptureSession(t)
38+
svc, err := New(mgr, factory, newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), cs, es, 0)
3839
require.NoError(t, err)
3940
return svc
4041
}

0 commit comments

Comments
 (0)