Skip to content

Commit f57a8d1

Browse files
michelThomistral-vibebenbrandt
authored
fix: propagate input stream errors through ndJsonStream (#111)
* fix: propagate input stream errors through ndJsonStream The readable stream returned by ndJsonStream used try/finally, which swallowed errors from the underlying input stream. When the input errored (e.g. a child process crash), the readable stream would close cleanly instead of erroring, causing the Connection to treat it as a normal shutdown rather than propagating the error to pending requests. Changed to try/catch/finally so that input stream errors are forwarded via controller.error(), allowing Connection.#receive() to capture the error and reject all pending requests with the original error message. Added two tests: - Verifies ndJsonStream propagates immediate input stream errors - Verifies pending requests are rejected when input stream errors Generated by Mistral Vibe. Co-Authored-By: Mistral Vibe <vibe@mistral.ai> * Fix lockfile --------- Co-authored-by: Mistral Vibe <vibe@mistral.ai> Co-authored-by: Ben Brandt <benjamin.j.brandt@gmail.com>
1 parent e620a8d commit f57a8d1

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

src/acp.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,49 @@ describe("Connection", () => {
10131013
}
10141014
}
10151015

1016+
it("propagates input stream errors through ndJsonStream", async () => {
1017+
const inputStream = new ReadableStream<Uint8Array>({
1018+
start(controller) {
1019+
// Simulate a process crash after partial data
1020+
controller.error(new Error("process exited with code 1"));
1021+
},
1022+
});
1023+
const outputStream = new WritableStream<Uint8Array>();
1024+
1025+
const connection = new ClientSideConnection(
1026+
() => new MinimalTestClient(),
1027+
ndJsonStream(outputStream, inputStream),
1028+
);
1029+
1030+
await expect(connection.closed).resolves.toBeUndefined();
1031+
expect(connection.signal.aborted).toBe(true);
1032+
});
1033+
1034+
it("rejects pending requests when input stream errors via ndJsonStream", async () => {
1035+
let errorController!: ReadableStreamDefaultController<Uint8Array>;
1036+
1037+
const inputStream = new ReadableStream<Uint8Array>({
1038+
start(controller) {
1039+
errorController = controller;
1040+
},
1041+
});
1042+
const outputStream = new WritableStream<Uint8Array>();
1043+
1044+
const connection = new ClientSideConnection(
1045+
() => new MinimalTestClient(),
1046+
ndJsonStream(outputStream, inputStream),
1047+
);
1048+
1049+
const requestPromise = connection.newSession({
1050+
cwd: "/test",
1051+
mcpServers: [],
1052+
});
1053+
1054+
errorController.error(new Error("process exited with code 1"));
1055+
1056+
await expect(requestPromise).rejects.toThrow("process exited with code 1");
1057+
});
1058+
10161059
it("rejects pending requests when the stream errors", async () => {
10171060
let readableController!: ReadableStreamDefaultController<AnyMessage>;
10181061

src/stream.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,13 @@ export function ndJsonStream(
6363
}
6464
}
6565
}
66+
} catch (err) {
67+
controller.error(err);
68+
return;
6669
} finally {
6770
reader.releaseLock();
68-
controller.close();
6971
}
72+
controller.close();
7073
},
7174
});
7275

0 commit comments

Comments
 (0)