Skip to content

Commit 4be4ea3

Browse files
committed
review: move CaptureSession to standalone capture package
1 parent 678df70 commit 4be4ea3

9 files changed

Lines changed: 233 additions & 209 deletions

File tree

server/cmd/api/api/api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"sync"
1111
"time"
1212

13+
"github.com/kernel/kernel-images/server/lib/capture"
1314
"github.com/kernel/kernel-images/server/lib/cdpmonitor"
1415
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
15-
"github.com/kernel/kernel-images/server/lib/events"
1616
"github.com/kernel/kernel-images/server/lib/logger"
1717
"github.com/kernel/kernel-images/server/lib/nekoclient"
1818
oapi "github.com/kernel/kernel-images/server/lib/oapi"
@@ -81,7 +81,7 @@ type ApiService struct {
8181
xvfbResizeMu sync.Mutex
8282

8383
// CDP event pipeline and cdpMonitor.
84-
captureSession *events.CaptureSession
84+
captureSession *capture.CaptureSession
8585
cdpMonitor cdpMonitorController
8686
monitorMu sync.Mutex
8787
lifecycleCtx context.Context
@@ -96,7 +96,7 @@ func New(
9696
upstreamMgr *devtoolsproxy.UpstreamManager,
9797
stz scaletozero.Controller,
9898
nekoAuthClient *nekoclient.AuthClient,
99-
captureSession *events.CaptureSession,
99+
captureSession *capture.CaptureSession,
100100
displayNum int,
101101
) (*ApiService, error) {
102102
switch {

server/cmd/api/api/api_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"log/slog"
1313

14+
"github.com/kernel/kernel-images/server/lib/capture"
1415
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
1516
"github.com/kernel/kernel-images/server/lib/events"
1617
"github.com/kernel/kernel-images/server/lib/nekoclient"
@@ -304,13 +305,13 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
304305
return client
305306
}
306307

307-
func newCaptureSession(t *testing.T) *events.CaptureSession {
308+
func newCaptureSession(t *testing.T) *capture.CaptureSession {
308309
t.Helper()
309310
es, err := events.NewEventStream(events.EventStreamConfig{RingCapacity: 64})
310311
if err != nil {
311312
t.Fatal(err)
312313
}
313-
return events.NewCaptureSession(es)
314+
return capture.NewCaptureSession(es)
314315
}
315316

316317
func TestApiService_PatchChromiumFlags(t *testing.T) {

server/cmd/api/api/capture_session.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sort"
77

88
"github.com/nrednav/cuid2"
9+
"github.com/kernel/kernel-images/server/lib/capture"
910
oapi "github.com/kernel/kernel-images/server/lib/oapi"
1011

1112
"github.com/kernel/kernel-images/server/lib/events"
@@ -119,26 +120,26 @@ func (s *ApiService) buildSessionResponse() oapi.CaptureSession {
119120
}
120121

121122
// captureConfigFrom converts the optional StartCaptureSessionRequest body
122-
// into an events.CaptureConfig.
123-
func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (events.CaptureConfig, error) {
123+
// into a capture.CaptureConfig.
124+
func captureConfigFrom(body *oapi.StartCaptureSessionRequest) (capture.CaptureConfig, error) {
124125
if body == nil {
125-
return events.CaptureConfig{}, nil
126+
return capture.CaptureConfig{}, nil
126127
}
127128
return captureConfigFromOAPI(body.Config)
128129
}
129130

130-
// captureConfigFromOAPI converts an oapi.CaptureConfig to events.CaptureConfig.
131-
func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (events.CaptureConfig, error) {
131+
// captureConfigFromOAPI converts an oapi.CaptureConfig to capture.CaptureConfig.
132+
func captureConfigFromOAPI(cfg *oapi.CaptureConfig) (capture.CaptureConfig, error) {
132133
if cfg == nil || cfg.Categories == nil {
133-
return events.CaptureConfig{}, nil
134+
return capture.CaptureConfig{}, nil
134135
}
135-
out := events.CaptureConfig{
136+
out := capture.CaptureConfig{
136137
Categories: make([]events.EventCategory, 0, len(*cfg.Categories)),
137138
}
138139
for _, c := range *cfg.Categories {
139140
cat := events.EventCategory(c)
140141
if !events.ValidCategory(cat) {
141-
return events.CaptureConfig{}, fmt.Errorf("unknown category: %q", c)
142+
return capture.CaptureConfig{}, fmt.Errorf("unknown category: %q", c)
142143
}
143144
out.Categories = append(out.Categories, cat)
144145
}

server/cmd/api/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/kernel/kernel-images/server/cmd/api/api"
2424
"github.com/kernel/kernel-images/server/cmd/config"
2525
"github.com/kernel/kernel-images/server/lib/chromedriverproxy"
26+
"github.com/kernel/kernel-images/server/lib/capture"
2627
"github.com/kernel/kernel-images/server/lib/devtoolsproxy"
2728
"github.com/kernel/kernel-images/server/lib/events"
2829
"github.com/kernel/kernel-images/server/lib/logger"
@@ -99,7 +100,7 @@ func main() {
99100
slogger.Error("failed to create event stream", "err", err)
100101
os.Exit(1)
101102
}
102-
captureSession := events.NewCaptureSession(eventStream)
103+
captureSession := capture.NewCaptureSession(eventStream)
103104

104105
apiService, err := api.New(
105106
recorder.NewFFmpegManager(),
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
1-
package events
1+
package capture
22

33
import (
44
"sync"
55
"time"
6+
7+
"github.com/kernel/kernel-images/server/lib/events"
68
)
79

810
// CaptureConfig holds caller-supplied capture preferences. All fields are
911
// optional; zero values mean "use server defaults" (all categories).
1012
type CaptureConfig struct {
1113
// Categories limits which event categories are captured.
1214
// nil or empty includes all categories.
13-
Categories []EventCategory
15+
Categories []events.EventCategory
1416
}
1517

1618
// CaptureSession manages a capture session against a shared EventStream.
1719
// It is responsible for (a) category-filtering Publish calls and (b) tracking
1820
// session-scoped metadata (ID, config, timestamps).
1921
type CaptureSession struct {
20-
es *EventStream
22+
es *events.EventStream
2123
mu sync.Mutex
2224
captureSessionID string
2325
sessionStartSeq uint64
24-
categories map[EventCategory]struct{}
26+
categories map[events.EventCategory]struct{}
2527
createdAt time.Time
2628
}
2729

28-
func NewCaptureSession(es *EventStream) *CaptureSession {
29-
cats := make(map[EventCategory]struct{}, len(allCategories))
30-
for _, c := range allCategories {
30+
func NewCaptureSession(es *events.EventStream) *CaptureSession {
31+
cats := make(map[events.EventCategory]struct{}, len(events.AllCategories()))
32+
for _, c := range events.AllCategories() {
3133
cats[c] = struct{}{}
3234
}
3335
return &CaptureSession{es: es, categories: cats}
@@ -36,8 +38,8 @@ func NewCaptureSession(es *EventStream) *CaptureSession {
3638
// Start begins a new capture session with the given ID and config. Sequence
3739
// numbers are process-monotonic and do not reset between sessions; a
3840
// Last-Event-ID from any previous session is valid for resuming the SSE stream.
39-
// Events from different sessions are interleaved in the same per-category JSONL
40-
// files and distinguished by their envelope's captureSessionID.
41+
// Events from different sessions are distinguished by their envelope's
42+
// CaptureSessionID.
4143
func (s *CaptureSession) Start(captureSessionID string, cfg CaptureConfig) {
4244
s.mu.Lock()
4345
defer s.mu.Unlock()
@@ -46,28 +48,28 @@ func (s *CaptureSession) Start(captureSessionID string, cfg CaptureConfig) {
4648
s.createdAt = time.Now()
4749
cats := cfg.Categories
4850
if len(cats) == 0 {
49-
cats = allCategories
51+
cats = events.AllCategories()
5052
}
51-
s.categories = make(map[EventCategory]struct{}, len(cats))
53+
s.categories = make(map[events.EventCategory]struct{}, len(cats))
5254
for _, c := range cats {
5355
s.categories[c] = struct{}{}
5456
}
5557
}
5658

5759
// publishLocked builds an envelope and forwards it to the EventStream.
5860
// Requires s.mu to be held.
59-
func (s *CaptureSession) publishLocked(ev Event) Envelope {
61+
func (s *CaptureSession) publishLocked(ev events.Event) events.Envelope {
6062
if ev.Ts == 0 {
6163
ev.Ts = time.Now().UnixMicro()
6264
}
63-
return s.es.publish(Envelope{
65+
return s.es.Publish(events.Envelope{
6466
CaptureSessionID: s.captureSessionID,
6567
Event: ev,
6668
})
6769
}
6870

6971
// Publish applies the category filter then forwards ev to the EventStream.
70-
func (s *CaptureSession) Publish(ev Event) {
72+
func (s *CaptureSession) Publish(ev events.Event) {
7173
s.mu.Lock()
7274
defer s.mu.Unlock()
7375
if s.captureSessionID == "" {
@@ -80,18 +82,18 @@ func (s *CaptureSession) Publish(ev Event) {
8082
}
8183

8284
// PublishUnfiltered forwards ev to the EventStream without applying the category
83-
// filter. Returns the assigned Envelope, or a zero Envelope if no session is active.
84-
func (s *CaptureSession) PublishUnfiltered(ev Event) Envelope {
85+
// filter. Returns the assigned Envelope.
86+
func (s *CaptureSession) PublishUnfiltered(ev events.Event) events.Envelope {
8587
s.mu.Lock()
8688
defer s.mu.Unlock()
8789
if s.captureSessionID == "" {
88-
return Envelope{}
90+
return events.Envelope{}
8991
}
9092
return s.publishLocked(ev)
9193
}
9294

9395
// NewReader returns a Reader from the EventStream positioned after afterSeq.
94-
func (s *CaptureSession) NewReader(afterSeq uint64) *Reader {
96+
func (s *CaptureSession) NewReader(afterSeq uint64) *events.Reader {
9597
return s.es.NewReader(afterSeq)
9698
}
9799

@@ -102,7 +104,6 @@ func (s *CaptureSession) ID() string {
102104
return s.captureSessionID
103105
}
104106

105-
// Seq returns the sequence number of the last published event.
106107
func (s *CaptureSession) Seq() uint64 {
107108
return s.es.Seq()
108109
}
@@ -115,39 +116,35 @@ func (s *CaptureSession) SessionStartSeq() uint64 {
115116
return s.sessionStartSeq
116117
}
117118

118-
// Config returns the current capture configuration.
119119
func (s *CaptureSession) Config() CaptureConfig {
120120
s.mu.Lock()
121121
defer s.mu.Unlock()
122-
cats := make([]EventCategory, 0, len(s.categories))
122+
cats := make([]events.EventCategory, 0, len(s.categories))
123123
for c := range s.categories {
124124
cats = append(cats, c)
125125
}
126126
return CaptureConfig{Categories: cats}
127127
}
128128

129-
// CreatedAt returns when the current session was started.
130129
func (s *CaptureSession) CreatedAt() time.Time {
131130
s.mu.Lock()
132131
defer s.mu.Unlock()
133132
return s.createdAt
134133
}
135134

136-
// UpdateConfig applies a new CaptureConfig to the running session.
137135
func (s *CaptureSession) UpdateConfig(cfg CaptureConfig) {
138136
s.mu.Lock()
139137
defer s.mu.Unlock()
140138
cats := cfg.Categories
141139
if len(cats) == 0 {
142-
cats = allCategories
140+
cats = events.AllCategories()
143141
}
144-
s.categories = make(map[EventCategory]struct{}, len(cats))
142+
s.categories = make(map[events.EventCategory]struct{}, len(cats))
145143
for _, c := range cats {
146144
s.categories[c] = struct{}{}
147145
}
148146
}
149147

150-
// Active reports whether a capture session is currently running.
151148
func (s *CaptureSession) Active() bool {
152149
s.mu.Lock()
153150
defer s.mu.Unlock()
@@ -163,11 +160,10 @@ func (s *CaptureSession) Stop() {
163160
if s.captureSessionID == "" {
164161
return
165162
}
166-
s.publishLocked(Event{
167-
Type: TypeSessionEnded,
168-
Category: CategorySystem,
169-
Source: Source{Kind: KindKernelAPI},
163+
s.publishLocked(events.Event{
164+
Type: events.TypeSessionEnded,
165+
Category: events.CategorySystem,
166+
Source: events.Source{Kind: events.KindKernelAPI},
170167
})
171168
s.captureSessionID = ""
172169
}
173-

0 commit comments

Comments
 (0)