Skip to content

Commit 5362a27

Browse files
committed
review: extract EventStream from CaptureSession for process-lifetime ownership
1 parent 56782c0 commit 5362a27

5 files changed

Lines changed: 119 additions & 93 deletions

File tree

server/cmd/api/api/api_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,13 +306,14 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
306306

307307
func newCaptureSession(t *testing.T) *events.CaptureSession {
308308
t.Helper()
309-
cs, err := events.NewCaptureSession(events.CaptureSessionConfig{
309+
es, err := events.NewEventStream(events.EventStreamConfig{
310310
LogDir: t.TempDir(),
311311
RingCapacity: 64,
312312
})
313313
if err != nil {
314314
t.Fatal(err)
315315
}
316+
cs := events.NewCaptureSession(es)
316317
t.Cleanup(func() { cs.Close() })
317318
return cs
318319
}

server/cmd/api/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,15 @@ func main() {
9292
}
9393

9494
// Construct events pipeline
95-
captureSession, err := events.NewCaptureSession(events.CaptureSessionConfig{
95+
eventStream, err := events.NewEventStream(events.EventStreamConfig{
9696
LogDir: "/var/log/kernel",
9797
RingCapacity: 1024,
9898
})
9999
if err != nil {
100-
slogger.Error("failed to create capture session", "err", err)
100+
slogger.Error("failed to create event stream", "err", err)
101101
os.Exit(1)
102102
}
103+
captureSession := events.NewCaptureSession(eventStream)
103104

104105
apiService, err := api.New(
105106
recorder.NewFFmpegManager(),
Lines changed: 33 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,48 @@
11
package events
22

33
import (
4-
"fmt"
5-
"log/slog"
64
"sync"
75
"time"
86
)
97

108
// CaptureConfig holds caller-supplied capture preferences. All fields are
119
// optional; zero values mean "use server defaults" (all categories).
1210
type CaptureConfig struct {
13-
// Categories limits which event categories are captured
14-
// nil represents all categories.
11+
// Categories limits which event categories are captured.
12+
// nil or empty includes all categories.
1513
Categories []EventCategory
1614
}
1715

18-
// CaptureSession wraps events in envelopes and fans them out to a fileWriter
19-
// Reusable: call Start with a new ID to begin a new session; call Stop to end
20-
// the current session without closing the underlying writers. Close tears down
21-
// file descriptors and should only be called during server shutdown.
16+
// CaptureSession manages a capture session against a shared EventStream.
17+
// It is responsible for (a) category-filtering Publish calls and (b) tracking
18+
// session-scoped metadata (ID, config, timestamps).
2219
type CaptureSession struct {
20+
es *EventStream
2321
mu sync.Mutex
24-
ring *ringBuffer
25-
files *fileWriter
26-
seq uint64
27-
sessionStartSeq uint64 // seq at the time the current session started
2822
captureSessionID string
23+
sessionStartSeq uint64
2924
categories map[EventCategory]struct{}
3025
createdAt time.Time
3126
}
3227

33-
// CaptureSessionConfig holds the parameters for creating a CaptureSession.
34-
type CaptureSessionConfig struct {
35-
LogDir string
36-
// RingCapacity is the number of envelopes the in-memory ring buffer holds.
37-
RingCapacity int
38-
}
39-
40-
func NewCaptureSession(cfg CaptureSessionConfig) (*CaptureSession, error) {
41-
rb, err := newRingBuffer(cfg.RingCapacity)
42-
if err != nil {
43-
return nil, fmt.Errorf("capture session: %w", err)
44-
}
45-
fw, err := newFileWriter(cfg.LogDir)
46-
if err != nil {
47-
return nil, fmt.Errorf("capture session: %w", err)
48-
}
28+
func NewCaptureSession(es *EventStream) *CaptureSession {
4929
cats := make(map[EventCategory]struct{}, len(allCategories))
5030
for _, c := range allCategories {
5131
cats[c] = struct{}{}
5232
}
53-
return &CaptureSession{
54-
ring: rb,
55-
files: fw,
56-
categories: cats,
57-
}, nil
33+
return &CaptureSession{es: es, categories: cats}
5834
}
5935

60-
// Start sets the capture session ID and applies the given config. Sequence
36+
// Start begins a new capture session with the given ID and config. Sequence
6137
// numbers are process-monotonic and do not reset between sessions; a
62-
// Last-Event-ID from any previous session is valid for resuming the stream.
63-
// The fileWriter is intentionally not rotated: events from different sessions
64-
// are interleaved in the same per-category JSONL files and distinguished by
65-
// their envelope's capture_session_id.
38+
// Last-Event-ID from any previous session is valid for resuming the SSE stream.
39+
// Events from different sessions are interleaved in the same per-category JSONL
40+
// files and distinguished by their envelope's captureSessionID.
6641
func (s *CaptureSession) Start(captureSessionID string, cfg CaptureConfig) {
6742
s.mu.Lock()
6843
defer s.mu.Unlock()
6944
s.captureSessionID = captureSessionID
70-
s.sessionStartSeq = s.seq
45+
s.sessionStartSeq = s.es.Seq()
7146
s.createdAt = time.Now()
7247
cats := cfg.Categories
7348
if len(cats) == 0 {
@@ -79,54 +54,33 @@ func (s *CaptureSession) Start(captureSessionID string, cfg CaptureConfig) {
7954
}
8055
}
8156

82-
// publishLocked is the core publish path. Requires s.mu held and a captureSessionID.
57+
// publishLocked builds an envelope and forwards it to the EventStream.
58+
// Requires s.mu to be held.
8359
func (s *CaptureSession) publishLocked(ev Event) Envelope {
8460
if ev.Ts == 0 {
8561
ev.Ts = time.Now().UnixMicro()
8662
}
87-
s.seq++
88-
env := Envelope{
63+
return s.es.publish(Envelope{
8964
CaptureSessionID: s.captureSessionID,
90-
Seq: s.seq,
9165
Event: ev,
92-
}
93-
env, data := truncateIfNeeded(env)
94-
if data == nil {
95-
slog.Error("capture_session: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category)
96-
} else {
97-
filename := string(env.Event.Category) + ".log"
98-
if err := s.files.Write(filename, data); err != nil {
99-
slog.Error("capture_session: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err)
100-
}
101-
}
102-
s.ring.publish(env)
103-
return env
66+
})
10467
}
10568

106-
// Publish wraps ev in an Envelope, truncates if needed, then writes to
107-
// fileWriter (durable) before RingBuffer (in-memory fan-out).
69+
// Publish applies the category filter then forwards ev to the EventStream.
10870
func (s *CaptureSession) Publish(ev Event) {
10971
s.mu.Lock()
11072
defer s.mu.Unlock()
111-
112-
// No active session, drop silently. This can happen when events
113-
// arrive between Stop() and producers noticing, or before Start().
11473
if s.captureSessionID == "" {
11574
return
11675
}
117-
118-
// Drop events whose category is outside the configured set.
11976
if _, ok := s.categories[ev.Category]; !ok {
12077
return
12178
}
122-
12379
s.publishLocked(ev)
12480
}
12581

126-
// PublishUnfiltered publishes ev without applying the category filter. Use for
127-
// externally-initiated events (e.g. API callers) that must not be silently
128-
// dropped by capture preferences set by the session owner.
129-
// Returns the assigned Envelope, or a zero Envelope if no session is active.
82+
// PublishUnfiltered forwards ev to the EventStream without applying the category
83+
// filter. Returns the assigned Envelope, or a zero Envelope if no session is active.
13084
func (s *CaptureSession) PublishUnfiltered(ev Event) Envelope {
13185
s.mu.Lock()
13286
defer s.mu.Unlock()
@@ -136,9 +90,9 @@ func (s *CaptureSession) PublishUnfiltered(ev Event) Envelope {
13690
return s.publishLocked(ev)
13791
}
13892

139-
// NewReader returns a Reader positioned at the start of the ring buffer.
93+
// NewReader returns a Reader from the EventStream positioned after afterSeq.
14094
func (s *CaptureSession) NewReader(afterSeq uint64) *Reader {
141-
return s.ring.newReader(afterSeq)
95+
return s.es.NewReader(afterSeq)
14296
}
14397

14498
// ID returns the current capture session ID, or "" if no session is active.
@@ -148,16 +102,13 @@ func (s *CaptureSession) ID() string {
148102
return s.captureSessionID
149103
}
150104

151-
// Seq returns the current sequence number (last published).
105+
// Seq returns the sequence number of the last published event.
152106
func (s *CaptureSession) Seq() uint64 {
153-
s.mu.Lock()
154-
defer s.mu.Unlock()
155-
return s.seq
107+
return s.es.Seq()
156108
}
157109

158110
// SessionStartSeq returns the sequence number at which the current session
159-
// started. Fresh SSE connections with no Last-Event-ID should begin here so
160-
// they see only the current session's events.
111+
// started. Fresh SSE connections with no Last-Event-ID should begin here.
161112
func (s *CaptureSession) SessionStartSeq() uint64 {
162113
s.mu.Lock()
163114
defer s.mu.Unlock()
@@ -172,9 +123,7 @@ func (s *CaptureSession) Config() CaptureConfig {
172123
for c := range s.categories {
173124
cats = append(cats, c)
174125
}
175-
return CaptureConfig{
176-
Categories: cats,
177-
}
126+
return CaptureConfig{Categories: cats}
178127
}
179128

180129
// CreatedAt returns when the current session was started.
@@ -184,8 +133,7 @@ func (s *CaptureSession) CreatedAt() time.Time {
184133
return s.createdAt
185134
}
186135

187-
// UpdateConfig applies a new CaptureConfig to the running session without
188-
// resetting the sequence counter or ring buffer.
136+
// UpdateConfig applies a new CaptureConfig to the running session.
189137
func (s *CaptureSession) UpdateConfig(cfg CaptureConfig) {
190138
s.mu.Lock()
191139
defer s.mu.Unlock()
@@ -206,11 +154,9 @@ func (s *CaptureSession) Active() bool {
206154
return s.captureSessionID != ""
207155
}
208156

209-
// Stop ends the current session. It publishes a synthetic session_ended
210-
// envelope so open SSE stream connections receive a terminal frame and can
211-
// close cleanly, then clears the session ID. The ring buffer is intentionally
212-
// left intact so existing readers can finish draining. A new session can be
213-
// started by calling Start again.
157+
// Stop ends the current session by publishing a synthetic session_ended event,
158+
// then clears the session ID. The ring buffer is left intact so existing readers
159+
// can finish draining.
214160
func (s *CaptureSession) Stop() {
215161
s.mu.Lock()
216162
defer s.mu.Unlock()
@@ -225,7 +171,7 @@ func (s *CaptureSession) Stop() {
225171
s.captureSessionID = ""
226172
}
227173

228-
// Close flushes and releases all open file descriptors.
174+
// Close releases resources held by the EventStream.
229175
func (s *CaptureSession) Close() error {
230-
return s.files.Close()
176+
return s.es.Close()
231177
}

server/lib/events/events_test.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,9 @@ func TestCaptureSession(t *testing.T) {
450450
newCaptureSession := func(t *testing.T) (*CaptureSession, string) {
451451
t.Helper()
452452
dir := t.TempDir()
453-
p, err := NewCaptureSession(CaptureSessionConfig{LogDir: dir, RingCapacity: 100})
453+
es, err := NewEventStream(EventStreamConfig{LogDir: dir, RingCapacity: 100})
454454
require.NoError(t, err)
455+
p := NewCaptureSession(es)
455456
p.Start("test-session", CaptureConfig{})
456457
t.Cleanup(func() { p.Close() })
457458
return p, dir
@@ -462,8 +463,9 @@ func TestCaptureSession(t *testing.T) {
462463
const eventsEach = 50
463464
const total = goroutines * eventsEach
464465

465-
p, err := NewCaptureSession(CaptureSessionConfig{LogDir: t.TempDir(), RingCapacity: total})
466+
es, err := NewEventStream(EventStreamConfig{LogDir: t.TempDir(), RingCapacity: total})
466467
require.NoError(t, err)
468+
p := NewCaptureSession(es)
467469
p.Start("test-concurrent", CaptureConfig{})
468470
t.Cleanup(func() { p.Close() })
469471
reader := p.NewReader(0)
@@ -490,8 +492,9 @@ func TestCaptureSession(t *testing.T) {
490492
})
491493

492494
t.Run("seq_continues_across_sessions", func(t *testing.T) {
493-
p, err := NewCaptureSession(CaptureSessionConfig{LogDir: t.TempDir(), RingCapacity: 100})
495+
es, err := NewEventStream(EventStreamConfig{LogDir: t.TempDir(), RingCapacity: 100})
494496
require.NoError(t, err)
497+
p := NewCaptureSession(es)
495498
t.Cleanup(func() { p.Close() })
496499

497500
p.Start("session-1", CaptureConfig{})

server/lib/events/eventstream.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package events
2+
3+
import (
4+
"fmt"
5+
"log/slog"
6+
"sync"
7+
)
8+
9+
// EventStream is the process-lifetime event bus. It owns the ring buffer, file
10+
// writer, and sequence counter.
11+
type EventStream struct {
12+
mu sync.Mutex
13+
seq uint64
14+
ring *ringBuffer
15+
files *fileWriter
16+
}
17+
18+
// EventStreamConfig holds the parameters for creating an EventStream.
19+
type EventStreamConfig struct {
20+
LogDir string
21+
// RingCapacity is the number of envelopes the ring buffer holds.
22+
RingCapacity int
23+
}
24+
25+
func NewEventStream(cfg EventStreamConfig) (*EventStream, error) {
26+
rb, err := newRingBuffer(cfg.RingCapacity)
27+
if err != nil {
28+
return nil, fmt.Errorf("event stream: %w", err)
29+
}
30+
fw, err := newFileWriter(cfg.LogDir)
31+
if err != nil {
32+
return nil, fmt.Errorf("event stream: %w", err)
33+
}
34+
return &EventStream{ring: rb, files: fw}, nil
35+
}
36+
37+
// publish assigns a monotonically increasing seq to env, writes it to the
38+
// per-category JSONL file, and pushes it to the ring buffer. Called by
39+
// CaptureSession under its own lock; env must already have CaptureSessionID set.
40+
func (es *EventStream) publish(env Envelope) Envelope {
41+
es.mu.Lock()
42+
es.seq++
43+
env.Seq = es.seq
44+
es.mu.Unlock()
45+
46+
env, data := truncateIfNeeded(env)
47+
if data == nil {
48+
slog.Error("event_stream: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category)
49+
} else {
50+
filename := string(env.Event.Category) + ".log"
51+
if err := es.files.Write(filename, data); err != nil {
52+
slog.Error("event_stream: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err)
53+
}
54+
}
55+
es.ring.publish(env)
56+
return env
57+
}
58+
59+
// NewReader returns a Reader positioned after afterSeq. Pass 0 to start from
60+
// the oldest buffered event.
61+
func (es *EventStream) NewReader(afterSeq uint64) *Reader {
62+
return es.ring.newReader(afterSeq)
63+
}
64+
65+
// Seq returns the sequence number of the last published event.
66+
func (es *EventStream) Seq() uint64 {
67+
es.mu.Lock()
68+
defer es.mu.Unlock()
69+
return es.seq
70+
}
71+
72+
// Close flushes and releases all open file descriptors.
73+
func (es *EventStream) Close() error {
74+
return es.files.Close()
75+
}

0 commit comments

Comments
 (0)