Skip to content

[kernel-1116] browser events: add external events#227

Open
archandatta wants to merge 69 commits intomainfrom
archand/kernel-1116/external-events
Open

[kernel-1116] browser events: add external events#227
archandatta wants to merge 69 commits intomainfrom
archand/kernel-1116/external-events

Conversation

@archandatta
Copy link
Copy Markdown
Contributor

@archandatta archandatta commented Apr 24, 2026

As per plan -> #217


Note

Medium Risk
Adds new public API endpoints for event injection and long-lived SSE streaming, and changes capture-session sequencing to be process-monotonic; incorrect validation or stream handling could impact event delivery and client reconnection behavior.

Overview
Adds an external events API: POST /events/publish to inject validated caller-supplied events into the active capture session and GET /events/stream to stream session envelopes over SSE with Last-Event-ID resume, keepalives, and explicit events_dropped / session_ended semantics.

Refactors the events pipeline so sequence numbers and the ring buffer live in a new process-lifetime EventStream, making seqs process-monotonic across sessions; CaptureSession now filters/categories on publish, tracks sessionStartSeq, exposes Active(), and emits a synthetic session_ended on stop. File-based JSONL logging (fileWriter + related tests/close calls) is removed.

Regenerates OpenAPI (oapi.go + openapi.yaml) to include new schemas/enums for events and updates SSE responses to set Cache-Control: no-cache and X-Accel-Buffering: no; adjusts tests and Makefile generation patch expectations accordingly.

Reviewed by Cursor Bugbot for commit 678df70. Bugbot is set up for automated code reviews on this repo. Configure here.

This binary is tracked on main and was incidentally deleted earlier on
this branch. Restoring it keeps the 13.4MB binary out of this PR's diff.
Removing the tracked binary from main should be done in a separate PR.
Comment thread server/cmd/api/api/events.go
Comment thread server/lib/events/capturesession.go
Comment thread server/cmd/api/api/events.go
Comment thread server/lib/events/capturesession.go Outdated
@archandatta archandatta force-pushed the archand/kernel-1116/external-events branch from 7139fb3 to 0a89c58 Compare April 24, 2026 15:42
Comment thread server/cmd/api/api/events.go
@archandatta archandatta requested review from Sayan- and rgarcia April 24, 2026 16:23
Copy link
Copy Markdown
Contributor

@Sayan- Sayan- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matches plan well and deviations from it are reasonable!

Comment thread server/lib/events/capturesession.go Outdated
return s.captureSessionID != ""
}

// Stop ends the current session. It publishes a synthetic session_ended
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should seq be monotonic across start/stop cycles? If so, I think Start shouldn't reset it here. That would make seq represent the process-wide event stream position rather than the position within a single capture session.

Copy link
Copy Markdown
Contributor Author

@archandatta archandatta May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, here are things to consider with how this works currently

  • every envelope already carries capture_session_id, so i think (session_id, seq) would be sufficient
  • SSE clients don't cross session boundaries, session_ended terminates the stream, so not sure if monotonic seq applies fully

the fix would also require a non-trivial ring buffer refactor to avoid dropped records

// PublishEvent handles POST /events/publish.
// Injects a caller-supplied event into the active capture session. Returns 400
// if no session is active or the event fails validation.
func (s *ApiService) PublishEvent(_ context.Context, req oapi.PublishEventRequestObject) (oapi.PublishEventResponseObject, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add one lifecycle test for the new event flow? Something like start capture session, publish an event through PublishEvent, read it from StreamEvents, then stop and verify the stream receives session_ended.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@archandatta archandatta requested a review from Sayan- May 1, 2026 13:02
Comment thread server/lib/events/capturesession.go
Base automatically changed from archand/kernel-1116/cdp-foundation to main May 5, 2026 17:13
Copy link
Copy Markdown
Contributor

@Sayan- Sayan- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementation is internally leaning toward “seq is per capture session” but the docs don’t clearly say that. would recommend:

  • make seq process-wide by not resetting it, or
  • keep the current behavior and document that seq / Last-Event-ID are scoped to the active capture session

Comment thread server/cmd/api/api/events.go Outdated
Comment thread server/openapi.yaml Outdated
Comment thread server/openapi.yaml Outdated
content:
application/json:
schema:
$ref: "#/components/schemas/Event"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't $ref: Event for the request body. The server forces source.kind = kernel_api and silently defaults to system category. I think this source.kind should be reserved for when we produce events on API requests to the image server and that we should let event publishers define whatever source.kind/category they want. I would define PublishEventRequest with only the fields the server actually honors so SDK consumers don't see fields that get overwritten.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed -> 093d8cc

Comment thread server/openapi.yaml
$ref: "#/components/schemas/Event"
responses:
"200":
description: Event accepted and published
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty 200 means callers can't correlate their POST to the stream. Return the assigned envelope

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed -> 059f929

Comment thread server/openapi.yaml Outdated
session. Omit or send 0 to start from the oldest buffered event.
Sequence numbers reset to 0 when a new capture session is started;
a value from a previous session is not meaningful here and may
cause the stream to miss events silently.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the global-stream model, seqs are process-monotonic — they don't reset on session start. This warning ("a value from a previous session is not meaningful") becomes obsolete and the silent-miss footgun goes away.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed -> 7bc3050

Comment thread server/openapi.yaml Outdated
content:
text/event-stream:
schema:
type: string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no way to type this? what do we do in other sse streams here and in the API?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch i missed updating this after creating the event structs 56782c0

Comment thread server/lib/events/capturesession.go Outdated
// the sequence counter so each session starts at seq 1.
// the sequence counter so each session starts at seq 1. Sequence numbers are
// scoped to the active session; Last-Event-ID values from a previous session
// are not valid for reconnecting to a new one.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would hoist ring, seq, and the system-event publish responsibility out of CaptureSession into a process-lifetime EventStream (or EventBus). CaptureSession then holds the stream as a dependency and is just (a) category filter for file writes + (b) capture-preferences metadata. This is the structural change that makes the global-stream model real.

we may also want to just rm the file writing since in a global event stream model it is less clear what events go to these files

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adressed -> 5362a27

if body.Ts != nil {
ev.Ts = *body.Ts
}
ev.Ts = time.Now().UnixMicro()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raf any thoughts for this? I'm going back and forth if we should allow for the timestamp to be sent or not because then the request could completely miss align with the event

@archandatta archandatta requested a review from rgarcia May 8, 2026 17:15
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 678df70. Configure here.

env, _ = truncateIfNeeded(env)
es.ring.publish(env)
return env
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventStream.publish releases lock before ring.publish risking out-of-order events

Medium Severity

EventStream.publish releases es.mu after assigning the seq but before calling truncateIfNeeded and ring.publish. If two goroutines call publish concurrently (possible since EventStream is a shared process-lifetime bus accepting any *EventStream pointer), events can arrive at the ring buffer out of order. The ring buffer unconditionally sets latestSeq = env.Seq in its own publish, so a late-arriving lower seq would regress latestSeq, causing readers to block on already-buffered events or miss them entirely. The lock scope needs to cover through ring.publish to maintain the seq-order invariant the readers depend on.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 678df70. Configure here.

Copy link
Copy Markdown
Contributor

@rgarcia rgarcia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some things still outstanding in terms of decoupling the publish/subscribe API from the capture session:

  • PublishEvent 400s when no capture session active (events.go:21-23)
  • StreamEvents 400s when no capture session active (events.go:82-84)
  • SSE stream terminates on session_ended (events.go:145-147)

Also nit but we should rename the capture session events to capture_session otherwise we risk confusion with the browser session itself beginning/ending

External publish should be treated like a peer producer to the cdp producer, so I think instead of Publish and PublishUnfiltered living on CaptureSession it should be something like

// EventStream gets a public Publish (today's es.publish is private)
func (es *EventStream) Publish(env Envelope) Envelope

// CaptureSession.Publish stays, applies its own config-driven filter,
// internally calls es.Publish
func (s *CaptureSession) Publish(ev Event)

// PublishUnfiltered goes away

Also it looks like capture session id is still a top level field on the event envelope.

// today
type Envelope struct {
    CaptureSessionID string  // producer detail leaking up
    Seq              uint64  // actual pipeline metadata
    Event            Event
}

In a global-event stream model I think it should be

type Envelope struct {
    Seq   uint64
    Event Event
}

type Source struct {
    Kind             SourceKind
    Event            string
    Metadata         map[string]string // capture session id can go here as a producer detail
}

also the SSE stream data and response to POST /events should be identical to the envelope i.e. {seq: ..., event: ...}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants