Skip to content

Commit f0b2e16

Browse files
authored
ts: Use Stream abstraction instead of raw byte streams (#93)
* ts: Use Stream abstraction instead of raw byte streams * Fix format * Fix typos * Remove unused imports
1 parent 4ac908a commit f0b2e16

7 files changed

Lines changed: 235 additions & 162 deletions

File tree

typescript/acp.test.ts

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
CancelNotification,
2424
SessionNotification,
2525
PROTOCOL_VERSION,
26+
ndJsonStream,
2627
} from "./acp.js";
2728

2829
describe("Connection", () => {
@@ -82,14 +83,12 @@ describe("Connection", () => {
8283
// Set up connections
8384
const agentConnection = new ClientSideConnection(
8485
() => new TestClient(),
85-
clientToAgent.writable,
86-
agentToClient.readable,
86+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
8787
);
8888

8989
const clientConnection = new AgentSideConnection(
9090
() => new TestAgent(),
91-
agentToClient.writable,
92-
clientToAgent.readable,
91+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
9392
);
9493

9594
// Test error handling in client->agent direction
@@ -173,16 +172,15 @@ describe("Connection", () => {
173172
}
174173
}
175174

176-
new ClientSideConnection(
175+
// Set up connections
176+
const agentConnection = new ClientSideConnection(
177177
() => new TestClient(),
178-
clientToAgent.writable,
179-
agentToClient.readable,
178+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
180179
);
181180

182181
const clientConnection = new AgentSideConnection(
183182
() => new TestAgent(),
184-
agentToClient.writable,
185-
clientToAgent.readable,
183+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
186184
);
187185

188186
// Send multiple concurrent requests
@@ -285,14 +283,12 @@ describe("Connection", () => {
285283
// Set up connections
286284
const agentConnection = new ClientSideConnection(
287285
() => new TestClient(),
288-
clientToAgent.writable,
289-
agentToClient.readable,
286+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
290287
);
291288

292289
const clientConnection = new AgentSideConnection(
293290
() => new TestAgent(),
294-
agentToClient.writable,
295-
clientToAgent.readable,
291+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
296292
);
297293

298294
// Send requests in specific order
@@ -422,14 +418,12 @@ describe("Connection", () => {
422418
// Set up connections
423419
const agentConnection = new ClientSideConnection(
424420
testClient,
425-
clientToAgent.writable,
426-
agentToClient.readable,
421+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
427422
);
428423

429424
const clientConnection = new AgentSideConnection(
430425
testAgent,
431-
agentToClient.writable,
432-
clientToAgent.readable,
426+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
433427
);
434428

435429
// Send notifications
@@ -516,16 +510,15 @@ describe("Connection", () => {
516510
}
517511
}
518512

513+
// Set up connections
519514
const agentConnection = new ClientSideConnection(
520515
() => new TestClient(),
521-
clientToAgent.writable,
522-
agentToClient.readable,
516+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
523517
);
524518

525-
new AgentSideConnection(
519+
const clientConnection = new AgentSideConnection(
526520
() => new TestAgent(),
527-
agentToClient.writable,
528-
clientToAgent.readable,
521+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
529522
);
530523

531524
// Test initialize request
@@ -627,16 +620,15 @@ describe("Connection", () => {
627620
}
628621
}
629622

623+
// Set up connections
630624
const agentConnection = new ClientSideConnection(
631625
() => new TestClient(),
632-
clientToAgent.writable,
633-
agentToClient.readable,
626+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
634627
);
635628

636629
const clientConnection = new AgentSideConnection(
637630
() => new TestAgent(),
638-
agentToClient.writable,
639-
clientToAgent.readable,
631+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
640632
);
641633

642634
// Test agent calling client extension method
@@ -729,16 +721,15 @@ describe("Connection", () => {
729721
// Note: No extMethod or extNotification implemented
730722
}
731723

724+
// Set up connections
732725
const agentConnection = new ClientSideConnection(
733726
() => new TestClientWithoutExtensions(),
734-
clientToAgent.writable,
735-
agentToClient.readable,
727+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
736728
);
737729

738730
const clientConnection = new AgentSideConnection(
739731
() => new TestAgentWithoutExtensions(),
740-
agentToClient.writable,
741-
clientToAgent.readable,
732+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
742733
);
743734

744735
// Test that calling extension methods on connections without them throws method not found
@@ -857,16 +848,15 @@ describe("Connection", () => {
857848
}
858849
}
859850

851+
// Set up connections
860852
const agentConnection = new ClientSideConnection(
861853
() => new TestClient(),
862-
clientToAgent.writable,
863-
agentToClient.readable,
854+
ndJsonStream(clientToAgent.writable, agentToClient.readable),
864855
);
865856

866857
const clientConnection = new AgentSideConnection(
867858
() => new TestAgent(),
868-
agentToClient.writable,
869-
clientToAgent.readable,
859+
ndJsonStream(agentToClient.writable, clientToAgent.readable),
870860
);
871861

872862
// Test writeTextFile returns response with _meta

typescript/acp.ts

Lines changed: 52 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
import { z } from "zod";
22
import * as schema from "./schema.js";
33
export * from "./schema.js";
4+
export * from "./stream.js";
5+
6+
import type { Stream } from "./stream.js";
7+
import type {
8+
AnyMessage,
9+
AnyResponse,
10+
Result,
11+
ErrorResponse,
12+
PendingResponse,
13+
RequestHandler,
14+
NotificationHandler,
15+
} from "./jsonrpc.js";
416

517
/**
618
* An agent-side connection to a client.
@@ -22,16 +34,12 @@ export class AgentSideConnection {
2234
* following the ACP specification.
2335
*
2436
* @param toAgent - A function that creates an Agent handler to process incoming client requests
25-
* @param input - The stream for sending data to the client (typically stdout)
26-
* @param output - The stream for receiving data from the client (typically stdin)
37+
* @param stream - The bidirectional message stream for communication. Typically created using
38+
* {@link ndJsonStream} for stdio-based connections.
2739
*
2840
* See protocol docs: [Communication Model](https://agentclientprotocol.com/protocol/overview#communication-model)
2941
*/
30-
constructor(
31-
toAgent: (conn: AgentSideConnection) => Agent,
32-
input: WritableStream<Uint8Array>,
33-
output: ReadableStream<Uint8Array>,
34-
) {
42+
constructor(toAgent: (conn: AgentSideConnection) => Agent, stream: Stream) {
3543
const agent = toAgent(this);
3644

3745
const requestHandler = async (
@@ -119,8 +127,7 @@ export class AgentSideConnection {
119127
this.#connection = new Connection(
120128
requestHandler,
121129
notificationHandler,
122-
input,
123-
output,
130+
stream,
124131
);
125132
}
126133

@@ -376,16 +383,12 @@ export class ClientSideConnection implements Agent {
376383
* following the ACP specification.
377384
*
378385
* @param toClient - A function that creates a Client handler to process incoming agent requests
379-
* @param input - The stream for sending data to the agent (typically stdout)
380-
* @param output - The stream for receiving data from the agent (typically stdin)
386+
* @param stream - The bidirectional message stream for communication. Typically created using
387+
* {@link ndJsonStream} for stdio-based connections.
381388
*
382389
* See protocol docs: [Communication Model](https://agentclientprotocol.com/protocol/overview#communication-model)
383390
*/
384-
constructor(
385-
toClient: (agent: Agent) => Client,
386-
input: WritableStream<Uint8Array>,
387-
output: ReadableStream<Uint8Array>,
388-
) {
391+
constructor(toClient: (agent: Agent) => Client, stream: Stream) {
389392
const client = toClient(this);
390393

391394
const requestHandler = async (
@@ -498,8 +501,7 @@ export class ClientSideConnection implements Agent {
498501
this.#connection = new Connection(
499502
requestHandler,
500503
notificationHandler,
501-
input,
502-
output,
504+
stream,
503505
);
504506
}
505507

@@ -683,113 +685,58 @@ export class ClientSideConnection implements Agent {
683685
}
684686
}
685687

686-
type AnyMessage = AnyRequest | AnyResponse | AnyNotification;
687-
688-
type AnyRequest = {
689-
jsonrpc: "2.0";
690-
id: string | number;
691-
method: string;
692-
params?: unknown;
693-
};
694-
695-
type AnyResponse = {
696-
jsonrpc: "2.0";
697-
id: string | number;
698-
} & Result<unknown>;
699-
700-
type AnyNotification = {
701-
jsonrpc: "2.0";
702-
method: string;
703-
params?: unknown;
704-
};
705-
706-
type Result<T> =
707-
| {
708-
result: T;
709-
}
710-
| {
711-
error: ErrorResponse;
712-
};
713-
714-
type ErrorResponse = {
715-
code: number;
716-
message: string;
717-
data?: unknown;
718-
};
719-
720-
type PendingResponse = {
721-
resolve: (response: unknown) => void;
722-
reject: (error: ErrorResponse) => void;
723-
};
724-
725-
type RequestHandler = (method: string, params: unknown) => Promise<unknown>;
726-
type NotificationHandler = (method: string, params: unknown) => Promise<void>;
688+
// Re-export AnyMessage for backwards compatibility
689+
export type { AnyMessage } from "./jsonrpc.js";
727690

728691
class Connection {
729692
#pendingResponses: Map<string | number, PendingResponse> = new Map();
730693
#nextRequestId: number = 0;
731694
#requestHandler: RequestHandler;
732695
#notificationHandler: NotificationHandler;
733-
#peerInput: WritableStream<Uint8Array>;
696+
#stream: Stream;
734697
#writeQueue: Promise<void> = Promise.resolve();
735-
#textEncoder: TextEncoder;
736698

737699
constructor(
738700
requestHandler: RequestHandler,
739701
notificationHandler: NotificationHandler,
740-
peerInput: WritableStream<Uint8Array>,
741-
peerOutput: ReadableStream<Uint8Array>,
702+
stream: Stream,
742703
) {
743704
this.#requestHandler = requestHandler;
744705
this.#notificationHandler = notificationHandler;
745-
this.#peerInput = peerInput;
746-
this.#textEncoder = new TextEncoder();
747-
this.#receive(peerOutput);
706+
this.#stream = stream;
707+
this.#receive();
748708
}
749709

750-
async #receive(output: ReadableStream<Uint8Array>) {
751-
let content = "";
752-
const decoder = new TextDecoder();
753-
const reader = output.getReader();
710+
async #receive() {
711+
const reader = this.#stream.readable.getReader();
754712
try {
755713
while (true) {
756-
const { value, done } = await reader.read();
714+
const { value: message, done } = await reader.read();
757715
if (done) {
758716
break;
759717
}
760-
if (!value) {
718+
if (!message) {
761719
continue;
762720
}
763-
content += decoder.decode(value, { stream: true });
764-
const lines = content.split("\n");
765-
content = lines.pop() || "";
766-
767-
for (const line of lines) {
768-
const trimmedLine = line.trim();
769-
770-
if (trimmedLine) {
771-
let id;
772-
try {
773-
const message = JSON.parse(trimmedLine);
774-
id = message.id;
775-
this.#processMessage(message);
776-
} catch (err) {
777-
console.error(
778-
"Unexpected error during message processing:",
779-
trimmedLine,
780-
err,
781-
);
782-
if (id) {
783-
this.#sendMessage({
784-
jsonrpc: "2.0",
785-
id,
786-
error: {
787-
code: -32700,
788-
message: "Parse error",
789-
},
790-
});
791-
}
792-
}
721+
722+
try {
723+
this.#processMessage(message);
724+
} catch (err) {
725+
console.error(
726+
"Unexpected error during message processing:",
727+
message,
728+
err,
729+
);
730+
// Only send error response if the message had an id (was a request)
731+
if ("id" in message && message.id !== undefined) {
732+
this.#sendMessage({
733+
jsonrpc: "2.0",
734+
id: message.id,
735+
error: {
736+
code: -32700,
737+
message: "Parse error",
738+
},
739+
});
793740
}
794741
}
795742
}
@@ -936,13 +883,12 @@ class Connection {
936883
await this.#sendMessage({ jsonrpc: "2.0", method, params });
937884
}
938885

939-
async #sendMessage(json: AnyMessage) {
940-
const content = JSON.stringify(json) + "\n";
886+
async #sendMessage(message: AnyMessage) {
941887
this.#writeQueue = this.#writeQueue
942888
.then(async () => {
943-
const writer = this.#peerInput.getWriter();
889+
const writer = this.#stream.writable.getWriter();
944890
try {
945-
await writer.write(this.#textEncoder.encode(content));
891+
await writer.write(message);
946892
} finally {
947893
writer.releaseLock();
948894
}

0 commit comments

Comments
 (0)