Skip to content

Commit 151a0d1

Browse files
authored
[kernel-1116] browser telemetry add s2 storage (#239)
## Architecture ``` Producers Shared Transport Consumers ───────────────────────────────────────────────────────────────────── CaptureSession ──Publish()──▶ EventStream ◀── API readers (CDP events, (ring buffer, (HTTP SSE, system events) seq counter) WebSocket) │ │ NewReader(0) [NEW] ▼ StorageWriter.Run() (blocking loop) │ Append(ctx, env) ▼ S2Storage (JSON marshal → S2 batcher → S2 stream) Shutdown sequence (new): 1. HTTP servers stop → no new publishes 2. ctx cancelled → Run() returns 3. Drain() → flush ring tail 4. Close(ctx) → flush S2 batcher to network ``` <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Adds a new background persistence path that runs during capture and changes shutdown ordering; failures could impact telemetry durability or shutdown behavior. Token handling is introduced but is redacted in config logging. > > **Overview** > Adds an **optional S2-backed durable event sink** that forwards the server `EventStream` to an S2 stream when `S2_BASIN`, `S2_ACCESS_TOKEN`, and `S2_STREAM` are set. > > Implements a generic `StorageWriter` with non-blocking drain support (`Reader.TryRead`) and an S2 implementation (`S2StorageWriter`) that batches/submits JSON-encoded envelopes and flushes/acks on shutdown. > > Plumbs S2 env/config through docker run scripts and supervisord configs, updates server startup/shutdown to start the writer conditionally and stop it *after* HTTP services, and adds unit tests plus an e2e test that verifies events land in S2 (skipped when creds are unset). > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 0f67535. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 452efe9 commit 151a0d1

13 files changed

Lines changed: 734 additions & 2 deletions

File tree

images/chromium-headful/run-docker.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
7171
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
7272
fi
7373

74+
# S2 durable event storage (all three must be set to enable the sink)
75+
if [[ -n "${S2_BASIN:-}" ]]; then
76+
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
77+
fi
78+
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
79+
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
80+
fi
81+
if [[ -n "${S2_STREAM:-}" ]]; then
82+
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
83+
fi
84+
7485
# WebRTC port mapping
7586
if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then
7687
echo "Running container with WebRTC"

images/chromium-headful/supervisor/services/kernel-images-api.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[program:kernel-images-api]
2-
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
2+
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
33
autostart=false
44
autorestart=true
55
startsecs=0

images/chromium-headless/image/supervisor/services/kernel-images-api.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[program:kernel-images-api]
2-
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
2+
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" S2_STREAM="${S2_STREAM:-}" exec /usr/local/bin/kernel-images-api'
33
autostart=false
44
autorestart=true
55
startsecs=0

images/chromium-headless/run-docker.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,17 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
2424
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
2525
fi
2626

27+
# S2 durable event storage (all three must be set to enable the sink)
28+
if [[ -n "${S2_BASIN:-}" ]]; then
29+
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
30+
fi
31+
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
32+
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
33+
fi
34+
if [[ -n "${S2_STREAM:-}" ]]; then
35+
RUN_ARGS+=( -e S2_STREAM="$S2_STREAM" )
36+
fi
37+
2738
# If a positional argument is given, use it as the entrypoint
2839
ENTRYPOINT_ARG=()
2940
if [[ $# -ge 1 && -n "$1" ]]; then

server/cmd/api/main.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,17 @@ func main() {
102102
}
103103
captureSession := capturesession.NewCaptureSession(eventStream)
104104

105+
// Optional S2 storage sink.
106+
var s2Writer *events.S2StorageWriter
107+
if config.S2Basin != "" && config.S2AccessToken != "" && config.S2Stream != "" {
108+
slogger.Info("S2 storage enabled", "basin", config.S2Basin, "stream", config.S2Stream)
109+
s2Writer = events.NewS2StorageWriter(eventStream, config.S2Basin, config.S2AccessToken, config.S2Stream, events.S2Config{}, slogger)
110+
if err := s2Writer.Start(ctx); err != nil {
111+
slogger.Error("failed to start S2 storage writer", "err", err)
112+
os.Exit(1)
113+
}
114+
}
115+
105116
apiService, err := api.New(
106117
recorder.NewFFmpegManager(),
107118
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
@@ -269,6 +280,16 @@ func main() {
269280
if err := g.Wait(); err != nil {
270281
slogger.Error("server failed to shutdown", "err", err)
271282
}
283+
284+
// s2Writer shuts down after the servers above, since they might produce events we
285+
// want to capture into the stream; we must let them finish before closing the writer.
286+
if s2Writer != nil {
287+
stopCtx, stopCancel := context.WithTimeout(context.Background(), 10*time.Second)
288+
defer stopCancel()
289+
if err := s2Writer.Stop(stopCtx); err != nil {
290+
slogger.Error("s2 storage writer stop failed", "err", err)
291+
}
292+
}
272293
}
273294

274295
func mustFFmpeg() {

server/cmd/config/config.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"fmt"
5+
"log/slog"
56
"time"
67

78
"github.com/kelseyhightower/envconfig"
@@ -35,6 +36,36 @@ type Config struct {
3536
// DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress.
3637
// If empty, it is derived from DevToolsProxyPort as 127.0.0.1:<port>.
3738
DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""`
39+
40+
// S2 durable event storage. All three fields must be set to enable the S2 sink.
41+
S2Basin string `envconfig:"S2_BASIN" default:""`
42+
S2AccessToken string `envconfig:"S2_ACCESS_TOKEN" default:""`
43+
S2Stream string `envconfig:"S2_STREAM" default:""`
44+
}
45+
46+
// LogValue implements slog.LogValuer, redacting secret fields.
47+
func (c *Config) LogValue() slog.Value {
48+
s2AccessToken := ""
49+
if c.S2AccessToken != "" {
50+
s2AccessToken = "[redacted]"
51+
}
52+
return slog.GroupValue(
53+
slog.Int("port", c.Port),
54+
slog.Int("frame_rate", c.FrameRate),
55+
slog.Int("display_num", c.DisplayNum),
56+
slog.Int("max_size_mb", c.MaxSizeInMB),
57+
slog.String("output_dir", c.OutputDir),
58+
slog.String("ffmpeg_path", c.PathToFFmpeg),
59+
slog.Int("devtools_proxy_port", c.DevToolsProxyPort),
60+
slog.Bool("log_cdp_messages", c.LogCDPMessages),
61+
slog.Duration("scale_to_zero_cooldown", c.ScaleToZeroCooldown),
62+
slog.Int("chromedriver_proxy_port", c.ChromeDriverProxyPort),
63+
slog.String("chromedriver_upstream_addr", c.ChromeDriverUpstreamAddr),
64+
slog.String("devtools_proxy_addr", c.DevToolsProxyAddr),
65+
slog.String("s2_basin", c.S2Basin),
66+
slog.String("s2_access_token", s2AccessToken),
67+
slog.String("s2_stream", c.S2Stream),
68+
)
3869
}
3970

4071
// Load loads configuration from environment variables

server/e2e/e2e_s2_storage_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package e2e
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"os"
7+
"os/exec"
8+
"testing"
9+
"time"
10+
11+
"github.com/s2-streamstore/s2-sdk-go/s2"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
instanceoapi "github.com/kernel/kernel-images/server/lib/oapi"
16+
)
17+
18+
// TestS2StorageWriter starts a headless container with S2 credentials, runs a
19+
// capture session, and verifies that events land in the configured S2 stream.
20+
//
21+
// Skips automatically when S2_BASIN, S2_ACCESS_TOKEN, or S2_STREAM are unset.
22+
func TestS2StorageWriter(t *testing.T) {
23+
basin := os.Getenv("S2_BASIN")
24+
accessToken := os.Getenv("S2_ACCESS_TOKEN")
25+
stream := os.Getenv("S2_STREAM")
26+
if basin == "" || accessToken == "" || stream == "" {
27+
t.Skip("S2_BASIN, S2_ACCESS_TOKEN, and S2_STREAM must be set to run this test")
28+
}
29+
30+
if _, err := exec.LookPath("docker"); err != nil {
31+
t.Skipf("docker not available: %v", err)
32+
}
33+
34+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
35+
defer cancel()
36+
37+
c := NewTestContainer(t, headlessImage)
38+
require.NoError(t, c.Start(ctx, ContainerConfig{
39+
Env: map[string]string{
40+
"S2_BASIN": basin,
41+
"S2_ACCESS_TOKEN": accessToken,
42+
"S2_STREAM": stream,
43+
},
44+
}), "failed to start container")
45+
defer c.Stop(ctx)
46+
47+
require.NoError(t, c.WaitReady(ctx), "api not ready")
48+
49+
client, err := c.APIClient()
50+
require.NoError(t, err)
51+
52+
// Note the current S2 stream tail seq before we write anything so we only
53+
// read records produced by this test run.
54+
s2Client := s2.New(accessToken, nil)
55+
streamClient := s2Client.Basin(basin).Stream(s2.StreamName(stream))
56+
57+
checkResp, err := streamClient.CheckTail(ctx)
58+
require.NoError(t, err, "check tail before test")
59+
startSeq := checkResp.Tail.SeqNum
60+
61+
// Start a capture session.
62+
startResp, err := client.StartCaptureSessionWithResponse(ctx, instanceoapi.StartCaptureSessionJSONRequestBody{})
63+
require.NoError(t, err)
64+
require.Equal(t, http.StatusCreated, startResp.StatusCode(), "start capture session: %s", string(startResp.Body))
65+
require.NotNil(t, startResp.JSON201)
66+
sessionID := startResp.JSON201.Id
67+
t.Logf("capture session started: %s", sessionID)
68+
69+
// Let the session run briefly so at least one event is published (the
70+
// session_started system event is emitted on session start).
71+
time.Sleep(500 * time.Millisecond)
72+
73+
// Stop the capture session.
74+
stopResp, err := client.StopCaptureSessionWithResponse(ctx)
75+
require.NoError(t, err)
76+
require.Equal(t, http.StatusOK, stopResp.StatusCode(), "stop capture session: %s", string(stopResp.Body))
77+
t.Log("capture session stopped")
78+
79+
// Give the storage writer time to flush to S2 (batcher linger + network).
80+
time.Sleep(2 * time.Second)
81+
82+
// Read records written after the pre-test tail and verify at least one
83+
// envelope is present.
84+
readCtx, readCancel := context.WithTimeout(ctx, 10*time.Second)
85+
defer readCancel()
86+
87+
readSession, err := streamClient.ReadSession(readCtx, &s2.ReadOptions{
88+
SeqNum: s2.Uint64(startSeq),
89+
})
90+
require.NoError(t, err, "open S2 read session")
91+
defer readSession.Close()
92+
93+
var count int
94+
for readSession.Next() {
95+
count++
96+
}
97+
// EOF is expected once we reach the tail — not an error.
98+
if err := readSession.Err(); err != nil && readCtx.Err() == nil {
99+
t.Fatalf("S2 read session error: %v", err)
100+
}
101+
102+
assert.Greater(t, count, 0, "expected at least one event record in S2 stream %q", stream)
103+
t.Logf("found %d record(s) in S2 stream after seq %d", count, startSeq)
104+
}

server/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ require (
2121
github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866
2222
github.com/nrednav/cuid2 v1.1.0
2323
github.com/oapi-codegen/runtime v1.2.0
24+
github.com/s2-streamstore/s2-sdk-go v0.16.1
2425
github.com/samber/lo v1.52.0
2526
github.com/stretchr/testify v1.11.1
2627
github.com/testcontainers/testcontainers-go v0.40.0
@@ -99,6 +100,7 @@ require (
99100
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
100101
golang.org/x/crypto v0.43.0 // indirect
101102
golang.org/x/mod v0.28.0 // indirect
103+
golang.org/x/net v0.45.0 // indirect
102104
golang.org/x/text v0.30.0 // indirect
103105
golang.org/x/tools v0.37.0 // indirect
104106
google.golang.org/protobuf v1.36.10 // indirect

server/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
198198
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
199199
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
200200
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
201+
github.com/s2-streamstore/s2-sdk-go v0.16.1 h1:18Qht850wUhIb9JZkMwF5EJWfnmZnjdtW3z8xOuL7Ys=
202+
github.com/s2-streamstore/s2-sdk-go v0.16.1/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo=
201203
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
202204
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
203205
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=

server/lib/events/eventsstorage.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"sync"
8+
"sync/atomic"
9+
)
10+
11+
type Storage interface {
12+
Append(ctx context.Context, env Envelope) error
13+
Close(ctx context.Context) error
14+
}
15+
16+
// StorageWriter reads from the ring buffer and forwards each envelope to
17+
// Storage. Single-use and not thread-safe: call Run once, then after
18+
// it returns call Drain followed by Close. Reads start from the oldest
19+
// available event in the ring, not the current tail. Delivery is
20+
// at-least-once; consumers should dedupe by env.Seq.
21+
type StorageWriter struct {
22+
reader *Reader
23+
storage Storage
24+
log *slog.Logger
25+
once sync.Once
26+
appendErrors atomic.Uint64 // total append failures; best-effort, not retried
27+
}
28+
29+
// NewStorageWriter creates a writer that reads from es starting at seq 0.
30+
func NewStorageWriter(es *EventStream, storage Storage, log *slog.Logger) *StorageWriter {
31+
return &StorageWriter{
32+
reader: es.NewReader(0),
33+
storage: storage,
34+
log: log,
35+
}
36+
}
37+
38+
// Run reads from the ring buffer and appends each envelope to storage until
39+
// ctx is cancelled. Returns the context error on clean shutdown. Must be
40+
// called at most once; returns an error on a second call.
41+
func (w *StorageWriter) Run(ctx context.Context) error {
42+
firstCall := false
43+
w.once.Do(func() { firstCall = true })
44+
if !firstCall {
45+
return fmt.Errorf("events: StorageWriter.Run called more than once")
46+
}
47+
48+
for {
49+
res, err := w.reader.Read(ctx)
50+
if err != nil {
51+
return err
52+
}
53+
if err := w.processResult(ctx, res); err != nil {
54+
return err
55+
}
56+
}
57+
}
58+
59+
// Drain reads any events still in the ring non-blockingly until caught up or
60+
// ctx expires. Call after all publishers have stopped and Run has returned to
61+
// ensure no events are silently skipped on shutdown.
62+
func (w *StorageWriter) Drain(ctx context.Context) error {
63+
for {
64+
select {
65+
case <-ctx.Done():
66+
w.log.Warn("storage writer: drain deadline exceeded, ring may have unread events")
67+
return ctx.Err()
68+
default:
69+
}
70+
71+
res, ok := w.reader.TryRead()
72+
if !ok {
73+
return nil
74+
}
75+
if err := w.processResult(ctx, res); err != nil {
76+
return err
77+
}
78+
}
79+
}
80+
81+
func (w *StorageWriter) processResult(ctx context.Context, res ReadResult) error {
82+
if res.Dropped > 0 {
83+
w.log.Warn("storage writer: dropped events", "count", res.Dropped)
84+
return nil
85+
}
86+
if err := w.storage.Append(ctx, *res.Envelope); err != nil {
87+
total := w.appendErrors.Add(1)
88+
w.log.Error("storage writer: append failed", "seq", res.Envelope.Seq, "err", err, "total_append_errors", total)
89+
}
90+
return nil
91+
}
92+
93+
// Close drains in-flight writes and releases backend resources.
94+
func (w *StorageWriter) Close(ctx context.Context) error {
95+
return w.storage.Close(ctx)
96+
}

0 commit comments

Comments
 (0)