Skip to content

Commit 2a4219a

Browse files
committed
review: remove file writer from EventStream
1 parent 5362a27 commit 2a4219a

7 files changed

Lines changed: 23 additions & 267 deletions

File tree

server/cmd/api/api/api.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,6 @@ func (s *ApiService) Shutdown(ctx context.Context) error {
352352
s.lifecycleCancel()
353353
s.cdpMonitor.Stop()
354354
s.captureSession.Stop()
355-
_ = s.captureSession.Close()
356355
s.monitorMu.Unlock()
357356
return s.recordManager.StopAll(ctx)
358357
}

server/cmd/api/api/api_test.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,16 +306,11 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient {
306306

307307
func newCaptureSession(t *testing.T) *events.CaptureSession {
308308
t.Helper()
309-
es, err := events.NewEventStream(events.EventStreamConfig{
310-
LogDir: t.TempDir(),
311-
RingCapacity: 64,
312-
})
309+
es, err := events.NewEventStream(events.EventStreamConfig{RingCapacity: 64})
313310
if err != nil {
314311
t.Fatal(err)
315312
}
316-
cs := events.NewCaptureSession(es)
317-
t.Cleanup(func() { cs.Close() })
318-
return cs
313+
return events.NewCaptureSession(es)
319314
}
320315

321316
func TestApiService_PatchChromiumFlags(t *testing.T) {

server/cmd/api/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ func main() {
9393

9494
// Construct events pipeline
9595
eventStream, err := events.NewEventStream(events.EventStreamConfig{
96-
LogDir: "/var/log/kernel",
9796
RingCapacity: 1024,
9897
})
9998
if err != nil {

server/lib/events/capturesession.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,3 @@ func (s *CaptureSession) Stop() {
171171
s.captureSessionID = ""
172172
}
173173

174-
// Close releases resources held by the EventStream.
175-
func (s *CaptureSession) Close() error {
176-
return s.es.Close()
177-
}

server/lib/events/events_test.go

Lines changed: 10 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package events
22

33
import (
4-
"bytes"
54
"context"
65
"encoding/json"
7-
"os"
8-
"path/filepath"
96
"strings"
107
"sync"
118
"testing"
@@ -330,144 +327,25 @@ func TestConcurrentReaders(t *testing.T) {
330327
}
331328
}
332329

333-
// TestFileWriter: per-category JSONL appender tests.
334-
func TestFileWriter(t *testing.T) {
335-
t.Run("category_routing", func(t *testing.T) {
336-
dir := t.TempDir()
337-
fw, err := newFileWriter(dir)
338-
require.NoError(t, err)
339-
defer fw.Close()
340-
341-
envsToFile := []struct {
342-
env Envelope
343-
file string
344-
category string
345-
}{
346-
{Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}}, "console.log", "console"},
347-
{Envelope{Seq: 2, Event: Event{Type: "network.request", Category: CategoryNetwork, Source: Source{Kind: KindCDP}, Ts: 1}}, "network.log", "network"},
348-
{Envelope{Seq: 3, Event: Event{Type: "liveview.click", Category: CategoryLiveview, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "liveview.log", "liveview"},
349-
{Envelope{Seq: 4, Event: Event{Type: "captcha.solve", Category: CategoryCaptcha, Source: Source{Kind: KindExtension}, Ts: 1}}, "captcha.log", "captcha"},
350-
{Envelope{Seq: 5, Event: Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1}}, "page.log", "page"},
351-
{Envelope{Seq: 6, Event: Event{Type: "input.click", Category: CategoryInteraction, Source: Source{Kind: KindCDP}, Ts: 1}}, "interaction.log", "interaction"},
352-
{Envelope{Seq: 7, Event: Event{Type: "monitor.connected", Category: CategorySystem, Source: Source{Kind: KindKernelAPI}, Ts: 1}}, "system.log", "system"},
353-
}
354-
355-
for _, e := range envsToFile {
356-
data, err := json.Marshal(e.env)
357-
require.NoError(t, err)
358-
require.NoError(t, fw.Write(e.file, data))
359-
}
360-
361-
for _, e := range envsToFile {
362-
data, err := os.ReadFile(filepath.Join(dir, e.file))
363-
require.NoError(t, err, "missing file %s for type %s", e.file, e.env.Event.Type)
364-
365-
line := bytes.TrimRight(data, "\n")
366-
require.True(t, json.Valid(line), "invalid JSON in %s", e.file)
367-
368-
var decoded map[string]any
369-
require.NoError(t, json.Unmarshal(line, &decoded))
370-
inner, ok := decoded["event"].(map[string]any)
371-
require.True(t, ok)
372-
assert.Equal(t, e.category, inner["category"], "wrong category in %s", e.file)
373-
srcMap, ok := inner["source"].(map[string]any)
374-
require.True(t, ok, "source should be an object in %s", e.file)
375-
assert.Equal(t, string(e.env.Event.Source.Kind), srcMap["kind"], "wrong source kind in %s", e.file)
376-
}
377-
})
378-
379-
t.Run("empty_filename_rejected", func(t *testing.T) {
380-
dir := t.TempDir()
381-
fw, err := newFileWriter(dir)
382-
require.NoError(t, err)
383-
defer fw.Close()
384-
385-
err = fw.Write("", []byte(`{"seq":1}`))
386-
require.Error(t, err)
387-
assert.Contains(t, err.Error(), "empty filename")
388-
})
389-
390-
t.Run("concurrent_writes", func(t *testing.T) {
391-
dir := t.TempDir()
392-
fw, err := newFileWriter(dir)
393-
require.NoError(t, err)
394-
defer fw.Close()
395-
396-
const goroutines = 10
397-
const eventsPerGoroutine = 100
398-
399-
var wg sync.WaitGroup
400-
for i := 0; i < goroutines; i++ {
401-
wg.Add(1)
402-
go func(i int) {
403-
defer wg.Done()
404-
for j := 0; j < eventsPerGoroutine; j++ {
405-
env := Envelope{
406-
Seq: uint64(i*eventsPerGoroutine + j),
407-
Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1},
408-
}
409-
envData, err := json.Marshal(env)
410-
require.NoError(t, err)
411-
require.NoError(t, fw.Write("console.log", envData))
412-
}
413-
}(i)
414-
}
415-
wg.Wait()
416-
417-
data, err := os.ReadFile(filepath.Join(dir, "console.log"))
418-
require.NoError(t, err)
419-
420-
lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n")
421-
assert.Len(t, lines, goroutines*eventsPerGoroutine)
422-
for _, line := range lines {
423-
assert.True(t, json.Valid([]byte(line)), "invalid JSON line: %s", line)
424-
}
425-
})
426-
427-
t.Run("lazy_open", func(t *testing.T) {
428-
dir := t.TempDir()
429-
fw, err := newFileWriter(dir)
430-
require.NoError(t, err)
431-
defer fw.Close()
432-
433-
entries, err := os.ReadDir(dir)
434-
require.NoError(t, err)
435-
assert.Empty(t, entries, "files opened before first Write")
436-
437-
env := Envelope{Seq: 1, Event: Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1}}
438-
envData, err := json.Marshal(env)
439-
require.NoError(t, err)
440-
require.NoError(t, fw.Write("console.log", envData))
441-
442-
entries, err = os.ReadDir(dir)
443-
require.NoError(t, err)
444-
assert.Len(t, entries, 1, "expected exactly one file after first Write")
445-
assert.Equal(t, "console.log", entries[0].Name())
446-
})
447-
}
448-
449330
func TestCaptureSession(t *testing.T) {
450-
newCaptureSession := func(t *testing.T) (*CaptureSession, string) {
331+
newCaptureSession := func(t *testing.T) *CaptureSession {
451332
t.Helper()
452-
dir := t.TempDir()
453-
es, err := NewEventStream(EventStreamConfig{LogDir: dir, RingCapacity: 100})
333+
es, err := NewEventStream(EventStreamConfig{RingCapacity: 100})
454334
require.NoError(t, err)
455335
p := NewCaptureSession(es)
456336
p.Start("test-session", CaptureConfig{})
457-
t.Cleanup(func() { p.Close() })
458-
return p, dir
337+
return p
459338
}
460339

461340
t.Run("concurrent_publish_seq_order", func(t *testing.T) {
462341
const goroutines = 8
463342
const eventsEach = 50
464343
const total = goroutines * eventsEach
465344

466-
es, err := NewEventStream(EventStreamConfig{LogDir: t.TempDir(), RingCapacity: total})
345+
es, err := NewEventStream(EventStreamConfig{RingCapacity: total})
467346
require.NoError(t, err)
468347
p := NewCaptureSession(es)
469348
p.Start("test-concurrent", CaptureConfig{})
470-
t.Cleanup(func() { p.Close() })
471349
reader := p.NewReader(0)
472350

473351
var wg sync.WaitGroup
@@ -492,11 +370,9 @@ func TestCaptureSession(t *testing.T) {
492370
})
493371

494372
t.Run("seq_continues_across_sessions", func(t *testing.T) {
495-
es, err := NewEventStream(EventStreamConfig{LogDir: t.TempDir(), RingCapacity: 100})
373+
es, err := NewEventStream(EventStreamConfig{RingCapacity: 100})
496374
require.NoError(t, err)
497375
p := NewCaptureSession(es)
498-
t.Cleanup(func() { p.Close() })
499-
500376
p.Start("session-1", CaptureConfig{})
501377
p.Publish(cdpEvent("ev.one", CategorySystem))
502378
p.Publish(cdpEvent("ev.two", CategorySystem))
@@ -520,7 +396,7 @@ func TestCaptureSession(t *testing.T) {
520396
})
521397

522398
t.Run("publish_increments_seq", func(t *testing.T) {
523-
p, _ := newCaptureSession(t)
399+
p := newCaptureSession(t)
524400
reader := p.NewReader(0)
525401

526402
for i := 0; i < 3; i++ {
@@ -537,7 +413,7 @@ func TestCaptureSession(t *testing.T) {
537413
})
538414

539415
t.Run("publish_sets_ts", func(t *testing.T) {
540-
p, _ := newCaptureSession(t)
416+
p := newCaptureSession(t)
541417
reader := p.NewReader(0)
542418

543419
before := time.Now().UnixMicro()
@@ -552,22 +428,8 @@ func TestCaptureSession(t *testing.T) {
552428
assert.LessOrEqual(t, env.Event.Ts, after)
553429
})
554430

555-
t.Run("publish_writes_file", func(t *testing.T) {
556-
p, dir := newCaptureSession(t)
557-
558-
p.Publish(Event{Type: "console.log", Category: CategoryConsole, Source: Source{Kind: KindCDP}, Ts: 1})
559-
560-
data, err := os.ReadFile(filepath.Join(dir, "console.log"))
561-
require.NoError(t, err)
562-
563-
lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n")
564-
require.Len(t, lines, 1)
565-
assert.True(t, json.Valid([]byte(lines[0])))
566-
assert.Contains(t, lines[0], `"console.log"`)
567-
})
568-
569431
t.Run("publish_writes_ring", func(t *testing.T) {
570-
p, _ := newCaptureSession(t)
432+
p := newCaptureSession(t)
571433

572434
reader := p.NewReader(0)
573435
p.Publish(Event{Type: "page.navigation", Category: CategoryPage, Source: Source{Kind: KindCDP}, Ts: 1})
@@ -581,7 +443,7 @@ func TestCaptureSession(t *testing.T) {
581443
})
582444

583445
t.Run("start_sets_capture_session_id", func(t *testing.T) {
584-
p, _ := newCaptureSession(t)
446+
p := newCaptureSession(t)
585447
p.Start("test-uuid", CaptureConfig{})
586448

587449
reader := p.NewReader(0)
@@ -595,7 +457,7 @@ func TestCaptureSession(t *testing.T) {
595457
})
596458

597459
t.Run("truncation_applied", func(t *testing.T) {
598-
p, dir := newCaptureSession(t)
460+
p := newCaptureSession(t)
599461
reader := p.NewReader(0)
600462

601463
largeData := strings.Repeat("x", 1_100_000)
@@ -620,12 +482,6 @@ func TestCaptureSession(t *testing.T) {
620482
marshaled, err := json.Marshal(env)
621483
require.NoError(t, err)
622484
assert.LessOrEqual(t, len(marshaled), maxS2RecordBytes)
623-
624-
data, err := os.ReadFile(filepath.Join(dir, "page.log"))
625-
require.NoError(t, err)
626-
lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n")
627-
require.Len(t, lines, 1)
628-
assert.Contains(t, lines[0], `"truncated":true`)
629485
})
630486

631487
}

server/lib/events/eventstream.go

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,42 @@ package events
22

33
import (
44
"fmt"
5-
"log/slog"
65
"sync"
76
)
87

9-
// EventStream is the process-lifetime event bus. It owns the ring buffer, file
10-
// writer, and sequence counter.
8+
// EventStream is the process-lifetime event bus. It owns the ring buffer and
9+
// sequence counter, which outlive individual capture sessions.
1110
type EventStream struct {
12-
mu sync.Mutex
13-
seq uint64
14-
ring *ringBuffer
15-
files *fileWriter
11+
mu sync.Mutex
12+
seq uint64
13+
ring *ringBuffer
1614
}
1715

1816
// EventStreamConfig holds the parameters for creating an EventStream.
1917
type EventStreamConfig struct {
20-
LogDir string
2118
// RingCapacity is the number of envelopes the ring buffer holds.
2219
RingCapacity int
2320
}
2421

22+
// NewEventStream creates an EventStream.
2523
func NewEventStream(cfg EventStreamConfig) (*EventStream, error) {
2624
rb, err := newRingBuffer(cfg.RingCapacity)
2725
if err != nil {
2826
return nil, fmt.Errorf("event stream: %w", err)
2927
}
30-
fw, err := newFileWriter(cfg.LogDir)
31-
if err != nil {
32-
return nil, fmt.Errorf("event stream: %w", err)
33-
}
34-
return &EventStream{ring: rb, files: fw}, nil
28+
return &EventStream{ring: rb}, nil
3529
}
3630

37-
// publish assigns a monotonically increasing seq to env, writes it to the
38-
// per-category JSONL file, and pushes it to the ring buffer. Called by
39-
// CaptureSession under its own lock; env must already have CaptureSessionID set.
31+
// publish assigns a monotonically increasing seq to env, truncates oversized
32+
// payloads, and pushes it to the ring buffer. Called by CaptureSession under
33+
// its own lock; env must already have CaptureSessionID set.
4034
func (es *EventStream) publish(env Envelope) Envelope {
4135
es.mu.Lock()
4236
es.seq++
4337
env.Seq = es.seq
4438
es.mu.Unlock()
4539

46-
env, data := truncateIfNeeded(env)
47-
if data == nil {
48-
slog.Error("event_stream: marshal failed, skipping file write", "seq", env.Seq, "category", env.Event.Category)
49-
} else {
50-
filename := string(env.Event.Category) + ".log"
51-
if err := es.files.Write(filename, data); err != nil {
52-
slog.Error("event_stream: file write failed", "seq", env.Seq, "category", env.Event.Category, "err", err)
53-
}
54-
}
40+
env, _ = truncateIfNeeded(env)
5541
es.ring.publish(env)
5642
return env
5743
}
@@ -68,8 +54,3 @@ func (es *EventStream) Seq() uint64 {
6854
defer es.mu.Unlock()
6955
return es.seq
7056
}
71-
72-
// Close flushes and releases all open file descriptors.
73-
func (es *EventStream) Close() error {
74-
return es.files.Close()
75-
}

0 commit comments

Comments
 (0)