Skip to content

Commit 61ab04c

Browse files
committed
Optimize telemetry event publishing
1 parent c03e3d4 commit 61ab04c

8 files changed

Lines changed: 244 additions & 41 deletions

File tree

server/lib/events/event.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ import (
1212
// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB).
1313
const maxS2RecordBytes = 1_000_000
1414

15+
// Below this estimated size, normal JSON overhead cannot push a record near the
16+
// S2 limit, so Publish can avoid a full JSON marshal on the hot path.
17+
const truncateMarshalThreshold = maxS2RecordBytes - 64*1024
18+
1519
const (
1620
Console = oapi.TelemetryEventCategory("console")
1721
Network = oapi.TelemetryEventCategory("network")
@@ -97,8 +101,11 @@ type Envelope struct {
97101

98102
// truncateIfNeeded marshals env and returns the (possibly truncated) envelope.
99103
// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge
100-
// source.metadata), it is returned as-is, callers must handle nil data.
104+
// source.metadata), it is returned as-is with data set to null.
101105
func truncateIfNeeded(env Envelope) (Envelope, []byte) {
106+
if estimatedEnvelopeBytes(env) < truncateMarshalThreshold {
107+
return env, nil
108+
}
102109
data, err := json.Marshal(env)
103110
if err != nil {
104111
return env, nil
@@ -113,7 +120,20 @@ func truncateIfNeeded(env Envelope) (Envelope, []byte) {
113120
return env, nil
114121
}
115122
if len(data) > maxS2RecordBytes {
116-
slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data))
123+
slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "size", len(data))
117124
}
118125
return env, data
119126
}
127+
128+
func estimatedEnvelopeBytes(env Envelope) int {
129+
n := len(env.Event.Data) + len(env.Event.Type) + len(env.Event.Category) + len(env.Event.Source.Kind)
130+
if env.Event.Source.Event != nil {
131+
n += len(*env.Event.Source.Event)
132+
}
133+
if env.Event.Source.Metadata != nil {
134+
for k, v := range *env.Event.Source.Metadata {
135+
n += len(k) + len(v)
136+
}
137+
}
138+
return n
139+
}

server/lib/events/events_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package events
33
import (
44
"context"
55
"encoding/json"
6+
"strings"
67
"sync"
78
"testing"
89
"time"
@@ -118,6 +119,25 @@ func TestEventOmitEmpty(t *testing.T) {
118119
assert.NotContains(t, s, `"event"`)
119120
}
120121

122+
func TestTruncateIfNeededChecksLargeMetadata(t *testing.T) {
123+
metadata := map[string]string{"large": strings.Repeat("x", maxS2RecordBytes)}
124+
env := Envelope{
125+
Event: Event{
126+
Type: "console.log",
127+
Category: Console,
128+
Source: oapi.BrowserEventSource{Kind: oapi.Cdp, Metadata: &metadata},
129+
Data: json.RawMessage(`{"message":"hello"}`),
130+
},
131+
}
132+
133+
truncated, data := truncateIfNeeded(env)
134+
135+
require.NotNil(t, data)
136+
assert.True(t, truncated.Event.Truncated)
137+
assert.Equal(t, json.RawMessage("null"), truncated.Event.Data)
138+
assert.Greater(t, len(data), maxS2RecordBytes)
139+
}
140+
121141
func mkEnv(seq uint64, ev Event) Envelope {
122142
return Envelope{Seq: seq, Event: ev}
123143
}
@@ -133,6 +153,24 @@ func newTestRingBuffer(t *testing.T, capacity int) *ringBuffer {
133153
return rb
134154
}
135155

156+
func TestEventStreamPublishAssignsSeq(t *testing.T) {
157+
es, err := NewEventStream(EventStreamConfig{RingCapacity: 10})
158+
require.NoError(t, err)
159+
reader := es.NewReader(0)
160+
161+
first := es.Publish(Envelope{Event: cdpEvent("console.log", Console)})
162+
second := es.Publish(Envelope{Event: cdpEvent("network.request", Network)})
163+
164+
assert.Equal(t, uint64(1), first.Seq)
165+
assert.Equal(t, uint64(2), second.Seq)
166+
assert.Equal(t, uint64(2), es.Seq())
167+
168+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
169+
defer cancel()
170+
assert.Equal(t, uint64(1), readEnvelope(t, reader, ctx).Seq)
171+
assert.Equal(t, uint64(2), readEnvelope(t, reader, ctx).Seq)
172+
}
173+
136174
// TestRingBuffer: publish 3 envelopes; reader reads all 3 in order
137175
func TestRingBuffer(t *testing.T) {
138176
rb := newTestRingBuffer(t, 10)

server/lib/events/eventstream.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,11 @@ package events
22

33
import (
44
"fmt"
5-
"sync"
65
)
76

8-
// EventStream is the process-lifetime event bus. It owns the ring buffer and
9-
// sequence counter, which outlive individual capture sessions.
7+
// EventStream is the process-lifetime event bus. Its ring buffer and sequence
8+
// counter outlive individual capture sessions.
109
type EventStream struct {
11-
mu sync.Mutex
12-
seq uint64
1310
ring *ringBuffer
1411
}
1512

@@ -29,14 +26,8 @@ func NewEventStream(cfg EventStreamConfig) (*EventStream, error) {
2926
// Publish assigns a monotonically increasing seq to env, truncates oversized
3027
// payloads, and pushes it to the ring buffer.
3128
func (es *EventStream) Publish(env Envelope) Envelope {
32-
es.mu.Lock()
33-
es.seq++
34-
env.Seq = es.seq
35-
es.mu.Unlock()
36-
3729
env, _ = truncateIfNeeded(env)
38-
es.ring.publish(env)
39-
return env
30+
return es.ring.publishNext(env)
4031
}
4132

4233
// NewReader returns a Reader positioned after afterSeq. Pass 0 to start from
@@ -47,7 +38,5 @@ func (es *EventStream) NewReader(afterSeq uint64) *Reader {
4738

4839
// Seq returns the sequence number of the last published event.
4940
func (es *EventStream) Seq() uint64 {
50-
es.mu.Lock()
51-
defer es.mu.Unlock()
52-
return es.seq
41+
return es.ring.seq()
5342
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package events
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
oapi "github.com/kernel/kernel-images/server/lib/oapi"
8+
)
9+
10+
func benchmarkEvent() Event {
11+
return Event{
12+
Ts: 123456789,
13+
Type: "console.log",
14+
Category: Console,
15+
Source: oapi.BrowserEventSource{Kind: oapi.Cdp},
16+
Data: json.RawMessage(`{"message":"hello","level":"log"}`),
17+
}
18+
}
19+
20+
func BenchmarkEventStreamPublish(b *testing.B) {
21+
es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024})
22+
if err != nil {
23+
b.Fatal(err)
24+
}
25+
ev := benchmarkEvent()
26+
b.ReportAllocs()
27+
b.ResetTimer()
28+
for i := 0; i < b.N; i++ {
29+
es.Publish(Envelope{Event: ev})
30+
}
31+
}
32+
33+
func BenchmarkEventStreamPublishRead(b *testing.B) {
34+
es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024})
35+
if err != nil {
36+
b.Fatal(err)
37+
}
38+
reader := es.NewReader(0)
39+
ev := benchmarkEvent()
40+
b.ReportAllocs()
41+
b.ResetTimer()
42+
for i := 0; i < b.N; i++ {
43+
es.Publish(Envelope{Event: ev})
44+
res, ok := reader.TryRead()
45+
if !ok || res.Envelope == nil {
46+
b.Fatal("expected envelope")
47+
}
48+
}
49+
}

server/lib/events/ringbuffer.go

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ type ringBuffer struct {
1313
buf []Envelope
1414
cap uint64
1515
latestSeq uint64 // highest envelope.Seq published
16-
readerWake chan struct{} // closed-and-replaced on each Publish to wake blocked readers
16+
readerWake chan struct{} // closed-and-replaced when blocked readers need a wakeup
17+
waiters int
1718
}
1819

1920
func newRingBuffer(capacity int) (*ringBuffer, error) {
@@ -36,21 +37,55 @@ func (rb *ringBuffer) reset() {
3637
rb.buf[i] = Envelope{}
3738
}
3839
rb.latestSeq = 0
39-
old := rb.readerWake
40-
rb.readerWake = make(chan struct{})
40+
var old chan struct{}
41+
if rb.waiters > 0 {
42+
old = rb.readerWake
43+
rb.readerWake = make(chan struct{})
44+
}
4145
rb.mu.Unlock()
42-
close(old)
46+
if old != nil {
47+
close(old)
48+
}
4349
}
4450

45-
// publish adds an envelope to the ring, evicting the oldest on overflow.
46-
func (rb *ringBuffer) publish(env Envelope) {
47-
rb.mu.Lock()
51+
func (rb *ringBuffer) publishLocked(env Envelope) chan struct{} {
4852
rb.buf[env.Seq%rb.cap] = env
4953
rb.latestSeq = env.Seq
54+
if rb.waiters == 0 {
55+
return nil
56+
}
5057
old := rb.readerWake
5158
rb.readerWake = make(chan struct{})
59+
return old
60+
}
61+
62+
func (rb *ringBuffer) closeWake(old chan struct{}) {
63+
if old != nil {
64+
close(old)
65+
}
66+
}
67+
68+
// publish adds an envelope to the ring, evicting the oldest on overflow.
69+
func (rb *ringBuffer) publish(env Envelope) {
70+
rb.mu.Lock()
71+
old := rb.publishLocked(env)
72+
rb.mu.Unlock()
73+
rb.closeWake(old)
74+
}
75+
76+
func (rb *ringBuffer) publishNext(env Envelope) Envelope {
77+
rb.mu.Lock()
78+
env.Seq = rb.latestSeq + 1
79+
old := rb.publishLocked(env)
5280
rb.mu.Unlock()
53-
close(old)
81+
rb.closeWake(old)
82+
return env
83+
}
84+
85+
func (rb *ringBuffer) seq() uint64 {
86+
rb.mu.RLock()
87+
defer rb.mu.RUnlock()
88+
return rb.latestSeq
5489
}
5590

5691
func (rb *ringBuffer) oldestSeq() uint64 {
@@ -108,7 +143,7 @@ func (r *Reader) TryRead() (ReadResult, bool) {
108143
// Read blocks until the next envelope is available or ctx is cancelled.
109144
func (r *Reader) Read(ctx context.Context) (ReadResult, error) {
110145
for {
111-
r.rb.mu.RLock()
146+
r.rb.mu.Lock()
112147
wake := r.rb.readerWake
113148
latest := r.rb.latestSeq
114149
oldest := r.rb.oldestSeq()
@@ -117,35 +152,49 @@ func (r *Reader) Read(ctx context.Context) (ReadResult, error) {
117152
// Buffer is empty (or was just reset). Reset reader position
118153
// so it starts from the beginning when new data arrives.
119154
r.nextSeq = 1
120-
r.rb.mu.RUnlock()
121-
select {
122-
case <-ctx.Done():
155+
r.rb.waiters++
156+
r.rb.mu.Unlock()
157+
err := waitForWake(ctx, wake)
158+
r.rb.mu.Lock()
159+
r.rb.waiters--
160+
r.rb.mu.Unlock()
161+
if err != nil {
123162
return ReadResult{}, ctx.Err()
124-
case <-wake:
125-
continue
126163
}
164+
continue
127165
}
128166

129167
if r.nextSeq < oldest {
130168
dropped := oldest - r.nextSeq
131169
r.nextSeq = oldest
132-
r.rb.mu.RUnlock()
170+
r.rb.mu.Unlock()
133171
return ReadResult{Dropped: dropped}, nil
134172
}
135173

136174
if r.nextSeq <= latest {
137175
env := r.rb.buf[r.nextSeq%r.rb.cap]
138176
r.nextSeq++
139-
r.rb.mu.RUnlock()
177+
r.rb.mu.Unlock()
140178
return ReadResult{Envelope: &env}, nil
141179
}
142180

143-
r.rb.mu.RUnlock()
144-
145-
select {
146-
case <-ctx.Done():
181+
r.rb.waiters++
182+
r.rb.mu.Unlock()
183+
err := waitForWake(ctx, wake)
184+
r.rb.mu.Lock()
185+
r.rb.waiters--
186+
r.rb.mu.Unlock()
187+
if err != nil {
147188
return ReadResult{}, ctx.Err()
148-
case <-wake:
149189
}
150190
}
151191
}
192+
193+
func waitForWake(ctx context.Context, wake <-chan struct{}) error {
194+
select {
195+
case <-ctx.Done():
196+
return ctx.Err()
197+
case <-wake:
198+
return nil
199+
}
200+
}

server/lib/telemetry/telemetry.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type TelemetrySession struct {
3131
id string
3232
sessionStartSeq uint64
3333
categories map[oapi.TelemetryEventCategory]struct{}
34+
sessionMetadata *map[string]string
3435
appliedAt time.Time
3536
}
3637

@@ -68,6 +69,8 @@ func (s *TelemetrySession) Start(telemetrySessionID string, cfg TelemetryConfig)
6869
s.sessionStartSeq = s.es.Seq()
6970
s.appliedAt = time.Now()
7071
s.categories = categorySet(cfg.Categories)
72+
metadata := map[string]string{"telemetry_session_id": telemetrySessionID}
73+
s.sessionMetadata = &metadata
7174
}
7275

7376
// publishLocked stamps telemetry_session_id into ev.Source.Metadata and forwards to the bus.
@@ -77,10 +80,10 @@ func (s *TelemetrySession) publishLocked(ev events.Event) events.Envelope {
7780
ev.Ts = time.Now().UnixMicro()
7881
}
7982
if ev.Source.Metadata == nil {
80-
m := make(map[string]string)
81-
ev.Source.Metadata = &m
83+
ev.Source.Metadata = s.sessionMetadata
84+
} else {
85+
(*ev.Source.Metadata)["telemetry_session_id"] = s.id
8286
}
83-
(*ev.Source.Metadata)["telemetry_session_id"] = s.id
8487
return s.es.Publish(events.Envelope{Event: ev})
8588
}
8689

@@ -176,5 +179,6 @@ func (s *TelemetrySession) Stop() {
176179
s.mu.Lock()
177180
defer s.mu.Unlock()
178181
s.id = ""
182+
s.sessionMetadata = nil
179183
s.appliedAt = time.Time{}
180184
}

0 commit comments

Comments
 (0)