Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9357d07
chore: rename event type constants to drop Type prefix
archandatta May 6, 2026
50602bb
feat: add S2 durable event storage library
archandatta May 6, 2026
31b2135
feat: wire S2 storage writer into api service
archandatta May 6, 2026
afbfa78
review: rename storage fields and clean up writer tests
archandatta May 6, 2026
eae5f2a
chore: update headless configs
archandatta May 6, 2026
d852919
fix: bound ack goroutine lifetimes and close the wg.Add/Wait race in …
archandatta May 6, 2026
17379dd
fix: drain SessionEnded to storage before evicting s2 producer
archandatta May 6, 2026
0e54ef4
fix: run shutdown phases in parallel and bound storageWriter.Close wi…
archandatta May 6, 2026
a9495e0
fix: suppress spurious storage_error on shutdown and surface ring ove…
archandatta May 6, 2026
83fa35e
fix: remove streams:list probe and drop ctx from NewS2Storage; degrad…
archandatta May 6, 2026
c5443b5
fix: make s2 Append synchronous — wait for ack before returning
archandatta May 6, 2026
dce67aa
review: add missing test coverage for overflow, slow-append shutdown,…
archandatta May 6, 2026
944e0fa
review: use EventsDropped for ring overflow events and add batcher tu…
archandatta May 6, 2026
df53c74
review: document nil storageWriter contract and session cleanup ordering
archandatta May 6, 2026
6998e1a
review: remove unnecessary sessionID local var in StopCaptureSession
archandatta May 6, 2026
207b49a
fix: block EventsStorageError in PublishEvent reserved type check
archandatta May 6, 2026
4519dd4
fix: remove overflow PublishUnfiltered to eliminate cpu-spinning feed…
archandatta May 6, 2026
ad61486
fix: remove unused storageWriter field and parameter from ApiService
archandatta May 6, 2026
bf059ca
fix: stop capture session before cancelling writer so SessionEnded re…
archandatta May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions images/chromium-headful/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi

# WebRTC port mapping
if [[ "${ENABLE_WEBRTC:-}" == "true" ]]; then
echo "Running container with WebRTC"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" exec /usr/local/bin/kernel-images-api'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there also be an s2 stream config?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream names are derived dynamically from the CaptureSessionID at runtime, so one S2 stream is created per capture session within the configured basin

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will make it hard for, e.g. the API to know what stream to read when it wants to stream telemetry for a browser. It also is at odds with "one global event stream" that the cdp stuff publishes into. I would make this a constant that is injected on startup

autostart=false
autorestart=true
startsecs=2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[program:kernel-images-api]
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" exec /usr/local/bin/kernel-images-api'
command=/bin/bash -lc 'mkdir -p "${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" && PORT="${KERNEL_IMAGES_API_PORT:-10001}" FRAME_RATE="${KERNEL_IMAGES_API_FRAME_RATE:-10}" DISPLAY_NUM="${KERNEL_IMAGES_API_DISPLAY_NUM:-${DISPLAY_NUM:-1}}" MAX_SIZE_MB="${KERNEL_IMAGES_API_MAX_SIZE_MB:-500}" OUTPUT_DIR="${KERNEL_IMAGES_API_OUTPUT_DIR:-/recordings}" LOG_CDP_MESSAGES="${LOG_CDP_MESSAGES:-false}" S2_BASIN="${S2_BASIN:-}" S2_ACCESS_TOKEN="${S2_ACCESS_TOKEN:-}" exec /usr/local/bin/kernel-images-api'
autostart=false
autorestart=true
startsecs=2
Expand Down
8 changes: 8 additions & 0 deletions images/chromium-headless/run-docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ if [[ -n "${PLAYWRIGHT_ENGINE:-}" ]]; then
RUN_ARGS+=( -e PLAYWRIGHT_ENGINE="$PLAYWRIGHT_ENGINE" )
fi

# S2 durable event storage
if [[ -n "${S2_BASIN:-}" ]]; then
RUN_ARGS+=( -e S2_BASIN="$S2_BASIN" )
fi
if [[ -n "${S2_ACCESS_TOKEN:-}" ]]; then
RUN_ARGS+=( -e S2_ACCESS_TOKEN="$S2_ACCESS_TOKEN" )
fi

# If a positional argument is given, use it as the entrypoint
ENTRYPOINT_ARG=()
if [[ $# -ge 1 && -n "$1" ]]; then
Expand Down
5 changes: 3 additions & 2 deletions server/cmd/api/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type ApiService struct {

var _ oapi.StrictServerInterface = (*ApiService)(nil)

// New constructs an ApiService.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

func New(
recordManager recorder.RecordManager,
factory recorder.FFmpegRecorderFactory,
Expand All @@ -116,8 +117,8 @@ func New(
ctx, cancel := context.WithCancel(context.Background())

return &ApiService{
recordManager: recordManager,
factory: factory,
recordManager: recordManager,
factory: factory,
defaultRecorderID: "default",
watches: make(map[string]*fsWatch),
procs: make(map[string]*processHandle),
Expand Down
3 changes: 3 additions & 0 deletions server/cmd/api/api/capture_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (s *ApiService) StopCaptureSession(_ context.Context, _ oapi.StopCaptureSes
// tear down asynchronously, leaving IsRunning briefly true.
resp := s.buildSessionResponse()
resp.Status = oapi.CaptureSessionStatusStopped
// Session cleanup (Remove on the S2 producer) happens automatically in
// EventsStorageWriter.Run when it processes the SessionEnded event, ensuring
// all pending writes are flushed before the producer is torn down.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment feels out of place. remove or move closer to the things it's talking about?

s.captureSession.Stop()

return oapi.StopCaptureSession200JSONResponse(resp), nil
Expand Down
2 changes: 2 additions & 0 deletions server/cmd/api/api/capture_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ func (m *mockRecordManager) ListActiveRecorders(_ context.Context) []recorder.Re
func (m *mockRecordManager) StopAll(_ context.Context) error { return nil }

// newTestService builds an ApiService with minimal dependencies for capture session tests.
// The RemoveSession path (triggered by SessionEnded events via the writer's Run loop) is
// not exercised here — it lives in eventsstorage_writer_test.go.
func newTestService(t *testing.T, mgr recorder.RecordManager) *ApiService {
t.Helper()
svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), newCaptureSession(t), 0)
Expand Down
6 changes: 3 additions & 3 deletions server/cmd/api/api/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s *ApiService) PublishEvent(_ context.Context, req oapi.PublishEventReques
if body == nil || body.Type == "" {
return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "type is required"}}, nil
}
if body.Type == events.TypeSessionEnded || body.Type == events.TypeEventsDropped {
if body.Type == events.SessionEnded || body.Type == events.EventsDropped || body.Type == events.EventsStorageError {
return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "type is reserved"}}, nil
}

Expand Down Expand Up @@ -120,7 +120,7 @@ func (s *ApiService) StreamEvents(ctx context.Context, req oapi.StreamEventsRequ
Seq: 0,
Event: events.Event{
Ts: time.Now().UnixMicro(),
Type: events.TypeEventsDropped,
Type: events.EventsDropped,
Category: events.CategorySystem,
Source: events.Source{Kind: events.KindKernelAPI},
Data: json.RawMessage(fmt.Sprintf(`{"dropped":%d}`, result.Dropped)),
Expand All @@ -137,7 +137,7 @@ func (s *ApiService) StreamEvents(ctx context.Context, req oapi.StreamEventsRequ
if err := writeEnvelopeFrame(pw, &env.Seq, *env); err != nil {
return
}
if env.Event.Type == events.TypeSessionEnded {
if env.Event.Type == events.SessionEnded {
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/cmd/api/api/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestEventLifecycle(t *testing.T) {
// Verify session_ended arrives on the stream.
select {
case env := <-received:
assert.Equal(t, events.TypeSessionEnded, env.Event.Type)
assert.Equal(t, events.SessionEnded, env.Event.Type)
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for session_ended")
}
Expand Down
70 changes: 66 additions & 4 deletions server/cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ func main() {
os.Exit(1)
}

// Optionally connect S2 durable storage sink (requires S2_BASIN + S2_TOKEN).
var storageWriter *events.EventsStorageWriter
if config.S2Basin != "" && config.S2Token != "" {
s2backend, err := events.NewS2Storage(config.S2Basin, config.S2Token,
config.S2BatcherLingerMs, config.S2BatcherMaxRecs)
if err != nil {
slogger.Warn("s2 storage unavailable, running without durable event storage", "err", err)
} else {
storageWriter = events.NewEventsStorageWriter(captureSession, s2backend)
slogger.Info("s2 durable event storage enabled", "basin", config.S2Basin)
}
}

apiService, err := api.New(
recorder.NewFFmpegManager(),
recorder.NewFFmpegRecorderFactory(config.PathToFFmpeg, defaultParams, stz),
Expand Down Expand Up @@ -242,31 +255,80 @@ func main() {
}
}()

// Give the storage writer its own cancellable context so we can stop it
// independently of the signal context. This lets us call captureSession.Stop()
// (which publishes SessionEnded to the ring) before cancelling the writer,
// ensuring the writer can process and flush SessionEnded to S2.
writerCtx, writerCancel := context.WithCancel(context.Background())
defer writerCancel()

// Start the S2 storage writer goroutine (no-op if S2 not configured).
storageDone := make(chan struct{})
if storageWriter != nil {
go func() {
defer close(storageDone)
storageWriter.Run(writerCtx)
}()
} else {
close(storageDone)
}

// graceful shutdown
<-ctx.Done()
slogger.Info("shutdown signal received")

shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()

// Stop the active capture session now, while the storage writer is still
// alive (writerCtx not yet cancelled), so the writer can process the
// SessionEnded event and flush it to S2 before tearing down.
captureSession.Stop()

// Cancel the writer so it exits after draining remaining events.
writerCancel()

// Drain storage writer, close it (bounded by shutdownCtx), and shut down
// all HTTP servers in parallel so the full 10s budget is available to each.
g, _ := errgroup.WithContext(shutdownCtx)

if storageWriter != nil {
g.Go(func() error {
<-storageDone
closeDone := make(chan error, 1)
go func() { closeDone <- storageWriter.Close() }()
select {
case err := <-closeDone:
if err != nil {
slogger.Error("storage writer close failed", "err", err)
}
case <-shutdownCtx.Done():
slogger.Error("storage writer close timed out, forcing shutdown")
}
return nil
})
}

g.Go(func() error {
return srv.Shutdown(shutdownCtx)
})
g.Go(func() error {
return apiService.Shutdown(shutdownCtx)
})
g.Go(func() error {
upstreamMgr.Stop()
return srvDevtools.Shutdown(shutdownCtx)
})
g.Go(func() error {
return srvChromeDriver.Shutdown(shutdownCtx)
})

if err := g.Wait(); err != nil {
slogger.Error("server failed to shutdown", "err", err)
}

// Close CaptureSession last, after storage is fully drained.
// apiService.Shutdown calls captureSession.Close, which must come after
// the storage writer has finished consuming events.
if err := apiService.Shutdown(shutdownCtx); err != nil {
slogger.Error("api service failed to shutdown", "err", err)
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

func mustFFmpeg() {
Expand Down
9 changes: 9 additions & 0 deletions server/cmd/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ type Config struct {
// DevTools proxy address passed to ChromeDriver as goog:chromeOptions.debuggerAddress.
// If empty, it is derived from DevToolsProxyPort as 127.0.0.1:<port>.
DevToolsProxyAddr string `envconfig:"DEVTOOLS_PROXY_ADDR" default:""`

// S2 durable event storage. Both fields must be non-empty to enable the sink.
S2Basin string `envconfig:"S2_BASIN" default:""`
S2Token string `envconfig:"S2_ACCESS_TOKEN" default:""`
// S2BatcherLingerMs and S2BatcherMaxRecs control the batcher's flush triggers.
// 100ms linger keeps event latency low for near-real-time replay; 50 records
// per batch keeps individual S2 append payloads small.
S2BatcherLingerMs int `envconfig:"S2_BATCHER_LINGER_MS" default:"100"`
S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"`
}

// Load loads configuration from environment variables
Expand Down
6 changes: 6 additions & 0 deletions server/cmd/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 9224,
ChromeDriverUpstreamAddr: "127.0.0.1:9225",
DevToolsProxyAddr: "127.0.0.1:9222",
S2BatcherLingerMs: 100,
S2BatcherMaxRecs: 50,
},
},
{
Expand Down Expand Up @@ -57,6 +59,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 5432,
ChromeDriverUpstreamAddr: "127.0.0.1:9999",
DevToolsProxyAddr: "127.0.0.1:9876",
S2BatcherLingerMs: 100,
S2BatcherMaxRecs: 50,
},
},
{
Expand All @@ -77,6 +81,8 @@ func TestLoad(t *testing.T) {
ChromeDriverProxyPort: 9224,
ChromeDriverUpstreamAddr: "127.0.0.1:9225",
DevToolsProxyAddr: "10.0.0.1:1234",
S2BatcherLingerMs: 100,
S2BatcherMaxRecs: 50,
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866
github.com/nrednav/cuid2 v1.1.0
github.com/oapi-codegen/runtime v1.2.0
github.com/s2-streamstore/s2-sdk-go v0.14.0
github.com/samber/lo v1.52.0
github.com/stretchr/testify v1.11.1
github.com/testcontainers/testcontainers-go v0.40.0
Expand Down Expand Up @@ -99,6 +100,7 @@ require (
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/mod v0.28.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/tools v0.37.0 // indirect
google.golang.org/protobuf v1.36.10 // indirect
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/s2-streamstore/s2-sdk-go v0.14.0 h1:YqAqUpyeaf6XBA2gPjIyHhVMOnClhFAd8etckUbmSE4=
github.com/s2-streamstore/s2-sdk-go v0.14.0/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo=
github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw=
github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
Expand Down
2 changes: 1 addition & 1 deletion server/lib/events/capturesession.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *CaptureSession) Stop() {
return
}
s.publishLocked(Event{
Type: TypeSessionEnded,
Type: SessionEnded,
Category: CategorySystem,
Source: Source{Kind: KindKernelAPI},
})
Expand Down
5 changes: 3 additions & 2 deletions server/lib/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func ValidCategory(c EventCategory) bool {

// System event types emitted by the pipeline itself.
const (
TypeSessionEnded = "session_ended"
TypeEventsDropped = "events_dropped"
SessionEnded = "session_ended"
EventsDropped = "events_dropped"
EventsStorageError = "storage_error"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we should publish this as an event--create's a feedback loop and exposes details that make more sense as logs

)

type SourceKind string
Expand Down
Loading