Skip to content

Commit 82b1c0e

Browse files
committed
fix: route session/load resume streams correctly
1 parent af3f7cc commit 82b1c0e

3 files changed

Lines changed: 118 additions & 10 deletions

File tree

src/server-session-sse.test.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@ import {
55
HEADER_SESSION_ID,
66
JSON_MIME_TYPE,
77
} from "./protocol.js";
8+
import { PROTOCOL_VERSION } from "./schema/index.js";
89
import { parseSseStream } from "./sse.js";
910
import { TestAgent } from "./test-support/test-agent.js";
1011
import { startTestServer } from "./test-support/test-http-server.js";
1112

12-
import type { AgentSideConnection } from "./acp.js";
13+
import type {
14+
AgentSideConnection,
15+
InitializeRequest,
16+
InitializeResponse,
17+
LoadSessionRequest,
18+
LoadSessionResponse,
19+
} from "./acp.js";
1320
import type { AnyMessage } from "./jsonrpc.js";
1421

1522
const initializeRequest = {
@@ -57,6 +64,49 @@ function createForkRequest(id: number, sessionId: string) {
5764
};
5865
}
5966

67+
function createLoadSessionRequest(id: number, sessionId: string) {
68+
return {
69+
jsonrpc: "2.0",
70+
id,
71+
method: "session/load",
72+
params: {
73+
cwd: "/tmp",
74+
mcpServers: [],
75+
sessionId,
76+
},
77+
};
78+
}
79+
80+
class LoadSessionAgent extends TestAgent {
81+
constructor(private readonly agentConnection: AgentSideConnection) {
82+
super(agentConnection);
83+
}
84+
85+
initialize(_params: InitializeRequest): Promise<InitializeResponse> {
86+
return Promise.resolve({
87+
protocolVersion: PROTOCOL_VERSION,
88+
agentCapabilities: {
89+
loadSession: true,
90+
},
91+
});
92+
}
93+
94+
async loadSession(params: LoadSessionRequest): Promise<LoadSessionResponse> {
95+
await this.agentConnection.sessionUpdate({
96+
sessionId: params.sessionId,
97+
update: {
98+
sessionUpdate: "agent_message_chunk",
99+
content: {
100+
type: "text",
101+
text: "replayed-session-history",
102+
},
103+
},
104+
});
105+
106+
return {};
107+
}
108+
}
109+
60110
describe("AcpServer session SSE", () => {
61111
it("streams prompt updates and responses on the session SSE stream", async () => {
62112
const server = await startTestServer(
@@ -203,6 +253,58 @@ describe("AcpServer session SSE", () => {
203253
}
204254
});
205255

256+
it("routes session/load replay updates to session SSE and final response to connection SSE", async () => {
257+
const server = await startTestServer(
258+
(conn: AgentSideConnection) => new LoadSessionAgent(conn),
259+
);
260+
261+
try {
262+
const connectionId = await initialize(server.url);
263+
const sessionId = "existing-session";
264+
const connectionSse = await openConnectionSse(server.url, connectionId);
265+
const sessionSse = await openSessionSse(
266+
server.url,
267+
connectionId,
268+
sessionId,
269+
);
270+
const accepted = await postJson(
271+
server.url,
272+
createLoadSessionRequest(3, sessionId),
273+
{
274+
[HEADER_CONNECTION_ID]: connectionId,
275+
[HEADER_SESSION_ID]: sessionId,
276+
},
277+
);
278+
279+
expect(sessionSse.status).toBe(200);
280+
expect(accepted.status).toBe(202);
281+
expect(await readSseMessages(sessionSse, 1)).toMatchObject([
282+
{
283+
jsonrpc: "2.0",
284+
method: "session/update",
285+
params: {
286+
sessionId,
287+
update: {
288+
sessionUpdate: "agent_message_chunk",
289+
content: {
290+
text: "replayed-session-history",
291+
},
292+
},
293+
},
294+
},
295+
]);
296+
expect(await readSseMessages(connectionSse, 1)).toMatchObject([
297+
{
298+
jsonrpc: "2.0",
299+
id: 3,
300+
result: {},
301+
},
302+
]);
303+
} finally {
304+
await server.close();
305+
}
306+
});
307+
206308
it("replays buffered session messages when session SSE attaches after prompt", async () => {
207309
const server = await startTestServer();
208310

src/server.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ describe("AcpServer", () => {
176176
}
177177
});
178178

179-
it("rejects session-scoped GETs for unknown sessions", async () => {
179+
it("opens session-scoped GETs for sessions without local streams", async () => {
180180
const server = await startTestServer();
181181

182182
try {
@@ -187,7 +187,7 @@ describe("AcpServer", () => {
187187
globalThis.crypto.randomUUID(),
188188
);
189189

190-
expect(response.status).toBe(404);
190+
expect(response.status).toBe(200);
191191
} finally {
192192
await server.close();
193193
}

src/server.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
isRequestMessage,
1515
isResponseMessage,
1616
} from "./jsonrpc.js";
17+
import { AGENT_METHODS } from "./schema/index.js";
1718
import { serializeSseEvent, serializeSseKeepAlive } from "./sse.js";
1819
import { handleWebSocketConnection } from "./ws-server.js";
1920
import type { WebSocketServerSocket } from "./ws-server.js";
@@ -179,12 +180,7 @@ export class AcpServer {
179180

180181
const sessionId = req.headers.get(HEADER_SESSION_ID);
181182
if (sessionId) {
182-
const sessionStream = connection.sessionStreams.get(sessionId);
183-
if (!sessionStream) {
184-
return textResponse("Unknown Acp-Session-Id", 404);
185-
}
186-
187-
return sseResponse(sessionStream.subscribe());
183+
return sseResponse(connection.ensureSession(sessionId).subscribe());
188184
}
189185

190186
return sseResponse(connection.connectionStream.subscribe());
@@ -336,7 +332,10 @@ async function forwardClientRequest(
336332
const key = messageIdKey(message.id);
337333

338334
if (key) {
339-
connection.pendingRoutes.set(key, route.value);
335+
connection.pendingRoutes.set(
336+
key,
337+
pendingResponseRoute(message, route.value),
338+
);
340339
}
341340

342341
await writeInbound(connection, message);
@@ -359,6 +358,13 @@ async function forwardClientNotification(
359358
return { ok: true };
360359
}
361360

361+
function pendingResponseRoute(
362+
message: ClientRequestMessage,
363+
route: ResponseRoute,
364+
): ResponseRoute {
365+
return message.method === AGENT_METHODS.session_load ? "connection" : route;
366+
}
367+
362368
function determineRoute(
363369
message: ClientRequestMessage,
364370
headers: Headers,

0 commit comments

Comments
 (0)