|
| 1 | +# External Event Ingestion + SSE Streaming — Plan |
| 2 | + |
| 3 | +**Scope:** Two new HTTP endpoints layered on top of the merged CDP base: `POST /events/capture_session/publish` (external event ingestion) and `GET /events/capture_session/stream` (SSE live stream), both wired into the existing resource-style `CaptureSession`. |
| 4 | + |
| 5 | +--- |
| 6 | + |
| 7 | +## 1. Motivation |
| 8 | + |
| 9 | +The CDP monitor produces a unified `events.Event` stream into a `CaptureSession` (ring buffer + seq assignment + optional file write). The base branch exposes the session as a resource: |
| 10 | + |
| 11 | +``` |
| 12 | +GET /events/capture_session — read current session |
| 13 | +POST /events/capture_session — start a session |
| 14 | +PATCH /events/capture_session — update config |
| 15 | +DELETE /events/capture_session — stop the session |
| 16 | +``` |
| 17 | + |
| 18 | +Gaps this plan closes: |
| 19 | + |
| 20 | +1. External producers (kernel API callers, browser extensions, local processes) have no way to inject events into the same merged stream. |
| 21 | +2. There is no live pull interface — consumers can read the session state but cannot subscribe to events in real time with reconnection support. |
| 22 | + |
| 23 | +Both new endpoints share the existing `CaptureSession` — no new storage, no new transport, no new schema. |
| 24 | + |
| 25 | +--- |
| 26 | + |
| 27 | +## 2. What's Changing |
| 28 | + |
| 29 | +### 2.1 New endpoints |
| 30 | + |
| 31 | +| Method | Path | Handler | operationId | |
| 32 | +| --- | --- | --- | --- | |
| 33 | +| POST | `/events/capture_session/publish` | `ApiService.PublishEvent` | `publishEvent` | |
| 34 | +| GET | `/events/capture_session/stream` | `ApiService.StreamCaptureSession` (SSE) | `streamCaptureSession` | |
| 35 | + |
| 36 | +The stream endpoint follows the same singleton pattern as the other `/events/capture_session` routes. Handlers reference `s.captureSession` directly; the endpoint returns 404 when no session is active. |
| 37 | + |
| 38 | +Both registered in `server/cmd/api/main.go` alongside the `/events/capture_session` routes from the base branch. |
| 39 | + |
| 40 | +### 2.2 `POST /events/capture_session/publish` |
| 41 | + |
| 42 | +Accepts a JSON `events.Event` body and publishes it into the currently active `CaptureSession`. |
| 43 | + |
| 44 | +**Defaults applied server-side when caller omits them:** |
| 45 | + |
| 46 | +- `category` — derived from `type` via `cdpmonitor.CategoryFor()`. Splits `type` on the first `_` and maps the prefix to a category. |
| 47 | +- `source.kind` — stamped to `kernel_api` when absent, so downstream consumers can distinguish external traffic from CDP traffic. |
| 48 | + |
| 49 | +**Validation and status codes:** |
| 50 | + |
| 51 | +- `400` on invalid JSON body. |
| 52 | +- `400` when `type` is empty. |
| 53 | +- `400` when `category` is present but not in the known set (`events.ValidCategory`). |
| 54 | +- `404` when no capture session is active (consistent with the resource model — publish has no implicit session). |
| 55 | +- `200` on successful publish. |
| 56 | + |
| 57 | +The handler does not take `monitorMu` — `CaptureSession.Publish` is serialised internally and guarantees monotonic seq delivery. |
| 58 | + |
| 59 | +**Request** — only `type` is required: |
| 60 | + |
| 61 | +```json |
| 62 | +POST /events/publish |
| 63 | +Content-Type: application/json |
| 64 | + |
| 65 | +{ |
| 66 | + "type": "network_request", |
| 67 | + "ts": 1713100000000000, |
| 68 | + "category": "network", |
| 69 | + "source": { |
| 70 | + "kind": "kernel_api", |
| 71 | + "event": "fetch", |
| 72 | + "metadata": { "request_id": "abc123" } |
| 73 | + }, |
| 74 | + "detail_level": "standard", |
| 75 | + "url": "https://example.com/api/data", |
| 76 | + "data": { "method": "GET", "status": 200 } |
| 77 | +} |
| 78 | +``` |
| 79 | + |
| 80 | +Minimal valid request: |
| 81 | + |
| 82 | +```json |
| 83 | +{ "type": "network_request" } |
| 84 | +``` |
| 85 | + |
| 86 | +**Response** `200` — pipeline stamps `seq` and `capture_session_id`, returns the full `Envelope`: |
| 87 | + |
| 88 | +```json |
| 89 | +{ |
| 90 | + "capture_session_id": "sess_01j...", |
| 91 | + "seq": 42, |
| 92 | + "event": { |
| 93 | + "ts": 1713100000000000, |
| 94 | + "type": "network_request", |
| 95 | + "category": "network", |
| 96 | + "source": { "kind": "kernel_api" }, |
| 97 | + "detail_level": "standard", |
| 98 | + "url": "https://example.com/api/data", |
| 99 | + "data": { "method": "GET", "status": 200 } |
| 100 | + } |
| 101 | +} |
| 102 | +``` |
| 103 | + |
| 104 | +**Capture-session category filter interaction.** The base branch introduced `CaptureConfig.Categories` to let callers filter what the CDP monitor records. External publishes are **not** filtered by this config — an explicit publish is treated as a caller intent and always reaches the pipeline. (Open question §6.5.) |
| 105 | + |
| 106 | +### 2.3 `GET /events/capture_session/stream` |
| 107 | + |
| 108 | +Server-Sent Events endpoint backed by the singleton `CaptureSession`. Follows the same pattern as `GET /events/capture_session` — no path parameter, operates on the currently active session, returns 404 when none is active. |
| 109 | + |
| 110 | +**Frame format:** |
| 111 | + |
| 112 | +``` |
| 113 | +id: {seq} |
| 114 | +data: {envelope-json} |
| 115 | +``` |
| 116 | + |
| 117 | +**Headers set:** |
| 118 | + |
| 119 | +- `Content-Type: text/event-stream` |
| 120 | +- `Cache-Control: no-cache` |
| 121 | +- `X-Accel-Buffering: no` (disables nginx / reverse-proxy buffering) |
| 122 | + |
| 123 | +**Reconnection.** Honours the `Last-Event-ID` request header. On reconnect the client passes the last `seq` it saw; the handler constructs `captureSession.NewReader(lastSeq)` which resumes at the first envelope with `Seq > lastSeq` — gap-tolerant, does not require `lastSeq+1`. |
| 124 | + |
| 125 | +**No-session behaviour.** Returns `404` when no capture session is active, matching the resource-style semantics of the base branch. |
| 126 | + |
| 127 | +**Flusher guard.** If the `ResponseWriter` does not implement `http.Flusher`, the handler returns `500` **before writing any headers** to avoid partial responses. |
| 128 | + |
| 129 | +**Lifecycle.** The handler loops on `reader.Read(ctx)` bound to the request context; when the client disconnects, the read returns and the goroutine exits cleanly. |
| 130 | + |
| 131 | +--- |
| 132 | + |
| 133 | +## 3. Key Design Decisions |
| 134 | + |
| 135 | +1. **Shared pipeline, not a new queue.** External events flow through the same `CaptureSession` as CDP events, so `seq` is globally monotonic across all sources. Consumers never have to merge streams. |
| 136 | +2. `**source.kind` is the fan-out key.\*\* `kernel_api` for publish, `cdp` for the monitor, `extension` / `local_process` reserved for future producers. Category is a coarse bucket derived from `type`; source is the precise provenance. |
| 137 | +3. **Publish does not honour** `CaptureConfig.Categories`**.** The config is a filter on what the CDP monitor records — an explicit publish is a deliberate caller action and bypasses it. |
| 138 | +4. **SSE over WebSocket.** SSE is one-way, proxy-friendly, and has built-in reconnection semantics (`Last-Event-ID`) that map cleanly to our `seq` cursor. No extra framing library. |
| 139 | +5. **Direct writes, no goroutine.** `StreamEvents` writes straight to the `ResponseWriter` from the request goroutine. No `io.Pipe`, no background worker — correct for HTTP/1.1 SSE and simpler to reason about on disconnect. |
| 140 | +6. **Seq==0 skip on fast-forward.** Synthetic `events_dropped` envelopes carry `Seq==0`; the `Last-Event-ID` seek skips them so they never advance the cursor past a real event. |
| 141 | +7. **Envelope size cap enforced in the pipeline, not the handler.** `truncateIfNeeded` (1 MB limit) lives on the publish path; `PublishEvent` does not re-check size. |
| 142 | +8. **404 when no session.** Both endpoints 404 if no capture session is active, consistent with the resource model. Publishes are not buffered and streams do not wait. |
| 143 | + |
| 144 | +--- |
| 145 | + |
| 146 | +## 4. Files Changed |
| 147 | + |
| 148 | +### Added |
| 149 | + |
| 150 | +- `server/cmd/api/api/events.go` — `PublishEvent`, `StreamEvents`, `writeSSEEnvelope` helper. (Re-introduced; the base branch folded the prior `events.go` handlers into `capture_session.go`.) |
| 151 | +- `server/cmd/api/api/events_publish_test.go` — subtests: happy path + ring read, invalid JSON, missing type, unknown category, no-session 404, liveview routing, captcha routing, category derivation from type, default `source.kind` = `kernel_api`. |
| 152 | +- `server/cmd/api/api/events_stream_test.go` — subtests: `no_flusher_returns_500`, no-session 404, live streaming, `Last-Event-ID` resume across gaps, context cancellation on client disconnect. |
| 153 | + |
| 154 | +--- |
| 155 | + |
| 156 | +## 5. Testing |
| 157 | + |
| 158 | +- Unit tests in `events_publish_test.go` and `events_stream_test.go` run against a real `ApiService` + `CaptureSession` (no mocks). |
| 159 | +- Race: `go test ./... -race` passes for the whole server module. |
| 160 | +- SSE tests use an `httptest.Server` + a small SSE client that parses `id:` / `data:` frames and asserts ordering and content. |
| 161 | +- `Last-Event-ID` reconnection is exercised by: publish N events → stream receives them → disconnect → publish M more → reconnect with last `seq` → assert stream resumes at seq N+1 (or the first surviving seq if the ring dropped events). |
| 162 | +- No-session 404 is covered for both endpoints. |
| 163 | + |
| 164 | +--- |
0 commit comments