Skip to content

Commit 38d5e16

Browse files
test: add SSE writer, WS handler message conversion, and route edge-case tests
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3304558 commit 38d5e16

3 files changed

Lines changed: 521 additions & 0 deletions

File tree

packages/remote-control-server/src/__tests__/routes.test.ts

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,26 @@ describe("V1 Session Routes", () => {
178178
const body = await sessRes.json();
179179
expect(body.environment_id).toBe(environment_id);
180180
});
181+
182+
test("POST /v1/sessions with invalid environment_id — session created, work item fails silently", async () => {
183+
const sessRes = await app.request("/v1/sessions", {
184+
method: "POST",
185+
headers: { ...AUTH_HEADERS, "Content-Type": "application/json" },
186+
body: JSON.stringify({ environment_id: "env_nonexistent" }),
187+
});
188+
expect(sessRes.status).toBe(200);
189+
const body = await sessRes.json();
190+
expect(body.id).toMatch(/^session_/);
191+
});
192+
193+
test("POST /v1/sessions with events — publishes initial events", async () => {
194+
const sessRes = await app.request("/v1/sessions", {
195+
method: "POST",
196+
headers: { ...AUTH_HEADERS, "Content-Type": "application/json" },
197+
body: JSON.stringify({ events: [{ type: "init", data: "starting" }] }),
198+
});
199+
expect(sessRes.status).toBe(200);
200+
});
181201
});
182202

183203
describe("V1 Environment Routes", () => {
@@ -542,6 +562,102 @@ describe("Web Session Routes", () => {
542562
const body = await histRes.json();
543563
expect(body.events).toEqual([]);
544564
});
565+
566+
test("GET /web/sessions/:id/history — 403 for non-owner", async () => {
567+
const createRes = await app.request("/web/sessions?uuid=user-1", {
568+
method: "POST",
569+
headers: { "Content-Type": "application/json" },
570+
body: JSON.stringify({}),
571+
});
572+
const { id } = await createRes.json();
573+
574+
const histRes = await app.request(`/web/sessions/${id}/history?uuid=user-2`);
575+
expect(histRes.status).toBe(403);
576+
});
577+
578+
test("GET /web/sessions/:id — 404 after session deleted", async () => {
579+
const createRes = await app.request("/web/sessions?uuid=user-1", {
580+
method: "POST",
581+
headers: { "Content-Type": "application/json" },
582+
body: JSON.stringify({}),
583+
});
584+
const { id } = await createRes.json();
585+
586+
// Archive/delete the session via v1
587+
await app.request(`/v1/sessions/${id}/archive`, {
588+
method: "POST",
589+
headers: AUTH_HEADERS,
590+
});
591+
592+
// Session still exists (archived), so we can still get it
593+
const getRes = await app.request(`/web/sessions/${id}?uuid=user-1`);
594+
// After archive, session status is "archived" but still exists
595+
expect(getRes.status).toBe(200);
596+
});
597+
598+
test("GET /web/sessions/:id/history — 404 for non-existent session", async () => {
599+
// Bind to a non-existent session won't work, but if ownership was set
600+
// and session deleted, we need to test the 404 path
601+
const createRes = await app.request("/web/sessions?uuid=user-1", {
602+
method: "POST",
603+
headers: { "Content-Type": "application/json" },
604+
body: JSON.stringify({}),
605+
});
606+
const { id } = await createRes.json();
607+
608+
// Delete the session from store directly
609+
const { storeDeleteSession } = await import("../store");
610+
storeDeleteSession(id);
611+
612+
const histRes = await app.request(`/web/sessions/${id}/history?uuid=user-1`);
613+
expect(histRes.status).toBe(404);
614+
});
615+
616+
test("POST /web/sessions with invalid environment_id — handles work item error", async () => {
617+
const res = await app.request("/web/sessions?uuid=user-1", {
618+
method: "POST",
619+
headers: { "Content-Type": "application/json" },
620+
body: JSON.stringify({ environment_id: "env_nonexistent" }),
621+
});
622+
// Session is still created even if work item fails
623+
expect(res.status).toBe(200);
624+
const body = await res.json();
625+
expect(body.id).toMatch(/^session_/);
626+
});
627+
628+
test("GET /web/sessions/:id/events — returns SSE stream", async () => {
629+
const createRes = await app.request("/web/sessions?uuid=user-1", {
630+
method: "POST",
631+
headers: { "Content-Type": "application/json" },
632+
body: JSON.stringify({}),
633+
});
634+
const { id } = await createRes.json();
635+
636+
const eventsRes = await app.request(`/web/sessions/${id}/events?uuid=user-1`);
637+
expect(eventsRes.status).toBe(200);
638+
expect(eventsRes.headers.get("Content-Type")).toBe("text/event-stream");
639+
640+
// Read initial keepalive and cancel
641+
const reader = eventsRes.body?.getReader();
642+
if (reader) {
643+
const { value } = await reader.read();
644+
const text = new TextDecoder().decode(value!);
645+
expect(text).toContain(": keepalive");
646+
reader.cancel();
647+
}
648+
});
649+
650+
test("GET /web/sessions/:id/events — 403 for non-owner", async () => {
651+
const createRes = await app.request("/web/sessions?uuid=user-1", {
652+
method: "POST",
653+
headers: { "Content-Type": "application/json" },
654+
body: JSON.stringify({}),
655+
});
656+
const { id } = await createRes.json();
657+
658+
const eventsRes = await app.request(`/web/sessions/${id}/events?uuid=user-2`);
659+
expect(eventsRes.status).toBe(403);
660+
});
545661
});
546662

547663
describe("Web Control Routes", () => {
@@ -601,6 +717,32 @@ describe("Web Control Routes", () => {
601717
});
602718
expect(res.status).toBe(200);
603719
});
720+
721+
test("POST /web/sessions/:id/interrupt — 403 for non-owner", async () => {
722+
const res = await app.request(`/web/sessions/${sessionId}/interrupt?uuid=user-2`, {
723+
method: "POST",
724+
headers: { "Content-Type": "application/json" },
725+
});
726+
expect(res.status).toBe(403);
727+
});
728+
729+
test("POST /web/sessions/:id/control — 403 for non-owner", async () => {
730+
const res = await app.request(`/web/sessions/${sessionId}/control?uuid=user-2`, {
731+
method: "POST",
732+
headers: { "Content-Type": "application/json" },
733+
body: JSON.stringify({ type: "permission_response", approved: true }),
734+
});
735+
expect(res.status).toBe(403);
736+
});
737+
738+
test("POST /web/sessions/:id/events — 403 for non-existent session with no ownership", async () => {
739+
const res = await app.request("/web/sessions/nonexistent/events?uuid=user-1", {
740+
method: "POST",
741+
headers: { "Content-Type": "application/json" },
742+
body: JSON.stringify({ type: "user", content: "hello" }),
743+
});
744+
expect(res.status).toBe(403);
745+
});
604746
});
605747

606748
describe("Web Environment Routes", () => {
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import { describe, test, expect, beforeEach, mock } from "bun:test";
2+
3+
// Mock config
4+
const mockConfig = {
5+
port: 3000,
6+
host: "0.0.0.0",
7+
apiKeys: ["test-api-key"],
8+
baseUrl: "http://localhost:3000",
9+
pollTimeout: 8,
10+
heartbeatInterval: 20,
11+
jwtExpiresIn: 3600,
12+
disconnectTimeout: 300,
13+
};
14+
15+
mock.module("../config", () => ({
16+
config: mockConfig,
17+
getBaseUrl: () => "http://localhost:3000",
18+
}));
19+
20+
import { Hono } from "hono";
21+
import { storeReset } from "../store";
22+
import { removeEventBus, getAllEventBuses, getEventBus } from "../transport/event-bus";
23+
import { createSSEWriter, createSSEStream } from "../transport/sse-writer";
24+
25+
/** Read up to N bytes from a Response stream, then cancel */
26+
async function readPartialStream(res: Response, maxBytes = 4096): Promise<string> {
27+
const reader = res.body?.getReader();
28+
if (!reader) return "";
29+
const chunks: Uint8Array[] = [];
30+
let totalBytes = 0;
31+
try {
32+
while (totalBytes < maxBytes) {
33+
const { done, value } = await reader.read();
34+
if (done) break;
35+
chunks.push(value);
36+
totalBytes += value.length;
37+
// Cancel after we have some data (first keepalive + any initial events)
38+
if (totalBytes > 0) break;
39+
}
40+
} finally {
41+
reader.cancel();
42+
}
43+
const combined = new Uint8Array(totalBytes);
44+
let offset = 0;
45+
for (const chunk of chunks) {
46+
combined.set(chunk, offset);
47+
offset += chunk.length;
48+
}
49+
return new TextDecoder().decode(combined);
50+
}
51+
52+
describe("SSE Writer", () => {
53+
describe("createSSEWriter", () => {
54+
test("creates SSEWriter with send and close methods", () => {
55+
const app = new Hono();
56+
let capturedWriter: ReturnType<typeof createSSEWriter> | null = null;
57+
58+
app.get("/test", (c) => {
59+
capturedWriter = createSSEWriter(c);
60+
return c.text("ok");
61+
});
62+
63+
app.request("/test");
64+
expect(capturedWriter).not.toBeNull();
65+
expect(typeof capturedWriter!.send).toBe("function");
66+
expect(typeof capturedWriter!.close).toBe("function");
67+
});
68+
});
69+
70+
describe("createSSEStream", () => {
71+
beforeEach(() => {
72+
storeReset();
73+
for (const [key] of getAllEventBuses()) {
74+
removeEventBus(key);
75+
}
76+
});
77+
78+
test("returns Response with correct SSE headers", async () => {
79+
const app = new Hono();
80+
81+
app.get("/stream/:sessionId", (c) => {
82+
const sessionId = c.req.param("sessionId");
83+
return createSSEStream(c, sessionId, 0);
84+
});
85+
86+
const res = await app.request("/stream/s1");
87+
expect(res.status).toBe(200);
88+
expect(res.headers.get("Content-Type")).toBe("text/event-stream");
89+
expect(res.headers.get("Cache-Control")).toBe("no-cache");
90+
expect(res.headers.get("Connection")).toBe("keep-alive");
91+
expect(res.headers.get("X-Accel-Buffering")).toBe("no");
92+
93+
// Cancel the stream
94+
res.body?.cancel();
95+
});
96+
97+
test("sends initial keepalive", async () => {
98+
const app = new Hono();
99+
100+
app.get("/stream/:sessionId", (c) => {
101+
const sessionId = c.req.param("sessionId");
102+
return createSSEStream(c, sessionId, 0);
103+
});
104+
105+
const res = await app.request("/stream/s2");
106+
const text = await readPartialStream(res);
107+
expect(text).toContain(": keepalive");
108+
});
109+
110+
test("sends historical events when fromSeqNum > 0", async () => {
111+
// Pre-populate event bus with events
112+
const bus = getEventBus("s3");
113+
bus.publish({ id: "e1", sessionId: "s3", type: "user", payload: { content: "hello" }, direction: "outbound" });
114+
bus.publish({ id: "e2", sessionId: "s3", type: "assistant", payload: { content: "hi" }, direction: "inbound" });
115+
116+
const app = new Hono();
117+
118+
app.get("/stream/:sessionId", (c) => {
119+
const sessionId = c.req.param("sessionId");
120+
const fromSeq = parseInt(c.req.query("fromSeq") || "0");
121+
return createSSEStream(c, sessionId, fromSeq);
122+
});
123+
124+
const res = await app.request("/stream/s3?fromSeq=1");
125+
const text = await readPartialStream(res);
126+
// Should replay events since seq 1 (i.e., event 2)
127+
expect(text).toContain('"seqNum":2');
128+
expect(text).toContain("assistant");
129+
});
130+
131+
test("no historical events when fromSeqNum is 0", async () => {
132+
const bus = getEventBus("s5");
133+
bus.publish({ id: "e1", sessionId: "s5", type: "user", payload: {}, direction: "outbound" });
134+
135+
const app = new Hono();
136+
137+
app.get("/stream/:sessionId", (c) => {
138+
const sessionId = c.req.param("sessionId");
139+
return createSSEStream(c, sessionId, 0);
140+
});
141+
142+
const res = await app.request("/stream/s5");
143+
const text = await readPartialStream(res);
144+
// With fromSeqNum=0, no historical replay, just keepalive
145+
expect(text).toContain(": keepalive");
146+
// Should NOT contain event data (only keepalive)
147+
expect(text).not.toContain("event: message");
148+
});
149+
150+
test("subscribes to new events and delivers them", async () => {
151+
const app = new Hono();
152+
153+
app.get("/stream/:sessionId", (c) => {
154+
const sessionId = c.req.param("sessionId");
155+
return createSSEStream(c, sessionId, 0);
156+
});
157+
158+
const res = await app.request("/stream/s6");
159+
160+
// Read initial keepalive first
161+
const reader = res.body!.getReader();
162+
const { value: firstChunk } = await reader.read();
163+
const initialText = new TextDecoder().decode(firstChunk!);
164+
expect(initialText).toContain(": keepalive");
165+
166+
// Now publish an event
167+
const bus = getEventBus("s6");
168+
bus.publish({ id: "e1", sessionId: "s6", type: "user", payload: { content: "real-time" }, direction: "outbound" });
169+
170+
// Read the event
171+
const { value: secondChunk } = await reader.read();
172+
const eventText = new TextDecoder().decode(secondChunk!);
173+
expect(eventText).toContain("event: message");
174+
expect(eventText).toContain("real-time");
175+
176+
reader.cancel();
177+
});
178+
});
179+
});

0 commit comments

Comments
 (0)