[kernel-1116] browser events: add external events#227
[kernel-1116] browser events: add external events#227archandatta wants to merge 69 commits intomainfrom
Conversation
…ies and fix delimiter in CategoryFor
… and category validation
…alloc in filewriter, fix session doc
This binary is tracked on main and was incidentally deleted earlier on this branch. Restoring it keeps the 13.4MB binary out of this PR's diff. Removing the tracked binary from main should be done in a separate PR.
7139fb3 to
0a89c58
Compare
Sayan-
left a comment
There was a problem hiding this comment.
matches plan well and deviations from it are reasonable!
| return s.captureSessionID != "" | ||
| } | ||
|
|
||
| // Stop ends the current session. It publishes a synthetic session_ended |
There was a problem hiding this comment.
should seq be monotonic across start/stop cycles? If so, I think Start shouldn't reset it here. That would make seq represent the process-wide event stream position rather than the position within a single capture session.
There was a problem hiding this comment.
I see your point, here are things to consider with how this works currently
- every envelope already carries capture_session_id, so i think (session_id, seq) would be sufficient
- SSE clients don't cross session boundaries, session_ended terminates the stream, so not sure if monotonic seq applies fully
the fix would also require a non-trivial ring buffer refactor to avoid dropped records
| // PublishEvent handles POST /events/publish. | ||
| // Injects a caller-supplied event into the active capture session. Returns 400 | ||
| // if no session is active or the event fails validation. | ||
| func (s *ApiService) PublishEvent(_ context.Context, req oapi.PublishEventRequestObject) (oapi.PublishEventResponseObject, error) { |
There was a problem hiding this comment.
could we add one lifecycle test for the new event flow? Something like start capture session, publish an event through PublishEvent, read it from StreamEvents, then stop and verify the stream receives session_ended.
Sayan-
left a comment
There was a problem hiding this comment.
the implementation is internally leaning toward “seq is per capture session” but the docs don’t clearly say that. would recommend:
- make seq process-wide by not resetting it, or
- keep the current behavior and document that seq / Last-Event-ID are scoped to the active capture session
| content: | ||
| application/json: | ||
| schema: | ||
| $ref: "#/components/schemas/Event" |
There was a problem hiding this comment.
I wouldn't $ref: Event for the request body. The server forces source.kind = kernel_api and silently defaults to system category. I think this source.kind should be reserved for when we produce events on API requests to the image server and that we should let event publishers define whatever source.kind/category they want. I would define PublishEventRequest with only the fields the server actually honors so SDK consumers don't see fields that get overwritten.
| $ref: "#/components/schemas/Event" | ||
| responses: | ||
| "200": | ||
| description: Event accepted and published |
There was a problem hiding this comment.
Empty 200 means callers can't correlate their POST to the stream. Return the assigned envelope
| session. Omit or send 0 to start from the oldest buffered event. | ||
| Sequence numbers reset to 0 when a new capture session is started; | ||
| a value from a previous session is not meaningful here and may | ||
| cause the stream to miss events silently. |
There was a problem hiding this comment.
Under the global-stream model, seqs are process-monotonic — they don't reset on session start. This warning ("a value from a previous session is not meaningful") becomes obsolete and the silent-miss footgun goes away.
| content: | ||
| text/event-stream: | ||
| schema: | ||
| type: string |
There was a problem hiding this comment.
is there no way to type this? what do we do in other sse streams here and in the API?
There was a problem hiding this comment.
good catch i missed updating this after creating the event structs 56782c0
| // the sequence counter so each session starts at seq 1. | ||
| // the sequence counter so each session starts at seq 1. Sequence numbers are | ||
| // scoped to the active session; Last-Event-ID values from a previous session | ||
| // are not valid for reconnecting to a new one. |
There was a problem hiding this comment.
i would hoist ring, seq, and the system-event publish responsibility out of CaptureSession into a process-lifetime EventStream (or EventBus). CaptureSession then holds the stream as a dependency and is just (a) category filter for file writes + (b) capture-preferences metadata. This is the structural change that makes the global-stream model real.
we may also want to just rm the file writing since in a global event stream model it is less clear what events go to these files
| if body.Ts != nil { | ||
| ev.Ts = *body.Ts | ||
| } | ||
| ev.Ts = time.Now().UnixMicro() |
There was a problem hiding this comment.
@raf any thoughts for this? I'm going back and forth if we should allow for the timestamp to be sent or not because then the request could completely miss align with the event
…p category description
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 678df70. Configure here.
| env, _ = truncateIfNeeded(env) | ||
| es.ring.publish(env) | ||
| return env | ||
| } |
There was a problem hiding this comment.
EventStream.publish releases lock before ring.publish risking out-of-order events
Medium Severity
EventStream.publish releases es.mu after assigning the seq but before calling truncateIfNeeded and ring.publish. If two goroutines call publish concurrently (possible since EventStream is a shared process-lifetime bus accepting any *EventStream pointer), events can arrive at the ring buffer out of order. The ring buffer unconditionally sets latestSeq = env.Seq in its own publish, so a late-arriving lower seq would regress latestSeq, causing readers to block on already-buffered events or miss them entirely. The lock scope needs to cover through ring.publish to maintain the seq-order invariant the readers depend on.
Reviewed by Cursor Bugbot for commit 678df70. Configure here.
rgarcia
left a comment
There was a problem hiding this comment.
some things still outstanding in terms of decoupling the publish/subscribe API from the capture session:
- PublishEvent 400s when no capture session active (events.go:21-23)
- StreamEvents 400s when no capture session active (events.go:82-84)
- SSE stream terminates on
session_ended(events.go:145-147)
Also nit but we should rename the capture session events to capture_session otherwise we risk confusion with the browser session itself beginning/ending
External publish should be treated like a peer producer to the cdp producer, so I think instead of Publish and PublishUnfiltered living on CaptureSession it should be something like
// EventStream gets a public Publish (today's es.publish is private)
func (es *EventStream) Publish(env Envelope) Envelope
// CaptureSession.Publish stays, applies its own config-driven filter,
// internally calls es.Publish
func (s *CaptureSession) Publish(ev Event)
// PublishUnfiltered goes away
Also it looks like capture session id is still a top level field on the event envelope.
// today
type Envelope struct {
CaptureSessionID string // producer detail leaking up
Seq uint64 // actual pipeline metadata
Event Event
}
In a global-event stream model I think it should be
type Envelope struct {
Seq uint64
Event Event
}
type Source struct {
Kind SourceKind
Event string
Metadata map[string]string // capture session id can go here as a producer detail
}
also the SSE stream data and response to POST /events should be identical to the envelope i.e. {seq: ..., event: ...}


As per plan -> #217
Note
Medium Risk
Adds new public API endpoints for event injection and long-lived SSE streaming, and changes capture-session sequencing to be process-monotonic; incorrect validation or stream handling could impact event delivery and client reconnection behavior.
Overview
Adds an external events API:
POST /events/publishto inject validated caller-supplied events into the active capture session andGET /events/streamto stream session envelopes over SSE withLast-Event-IDresume, keepalives, and explicitevents_dropped/session_endedsemantics.Refactors the events pipeline so sequence numbers and the ring buffer live in a new process-lifetime
EventStream, making seqs process-monotonic across sessions;CaptureSessionnow filters/categories on publish, trackssessionStartSeq, exposesActive(), and emits a syntheticsession_endedon stop. File-based JSONL logging (fileWriter+ related tests/close calls) is removed.Regenerates OpenAPI (
oapi.go+openapi.yaml) to include new schemas/enums for events and updates SSE responses to setCache-Control: no-cacheandX-Accel-Buffering: no; adjusts tests and Makefile generation patch expectations accordingly.Reviewed by Cursor Bugbot for commit 678df70. Bugbot is set up for automated code reviews on this repo. Configure here.