Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 24 additions & 34 deletions typescript/acp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
CancelNotification,
SessionNotification,
PROTOCOL_VERSION,
ndJsonStream,
} from "./acp.js";

describe("Connection", () => {
Expand Down Expand Up @@ -82,14 +83,12 @@ describe("Connection", () => {
// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Test error handling in client->agent direction
Expand Down Expand Up @@ -173,16 +172,15 @@ describe("Connection", () => {
}
}

new ClientSideConnection(
// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Send multiple concurrent requests
Expand Down Expand Up @@ -285,14 +283,12 @@ describe("Connection", () => {
// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Send requests in specific order
Expand Down Expand Up @@ -422,14 +418,12 @@ describe("Connection", () => {
// Set up connections
const agentConnection = new ClientSideConnection(
testClient,
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
testAgent,
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Send notifications
Expand Down Expand Up @@ -516,16 +510,15 @@ describe("Connection", () => {
}
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

new AgentSideConnection(
const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Test initialize request
Expand Down Expand Up @@ -627,16 +620,15 @@ describe("Connection", () => {
}
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Test agent calling client extension method
Expand Down Expand Up @@ -729,16 +721,15 @@ describe("Connection", () => {
// Note: No extMethod or extNotification implemented
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClientWithoutExtensions(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgentWithoutExtensions(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Test that calling extension methods on connections without them throws method not found
Expand Down Expand Up @@ -857,16 +848,15 @@ describe("Connection", () => {
}
}

// Set up connections
const agentConnection = new ClientSideConnection(
() => new TestClient(),
clientToAgent.writable,
agentToClient.readable,
ndJsonStream(clientToAgent.writable, agentToClient.readable),
);

const clientConnection = new AgentSideConnection(
() => new TestAgent(),
agentToClient.writable,
clientToAgent.readable,
ndJsonStream(agentToClient.writable, clientToAgent.readable),
);

// Test writeTextFile returns response with _meta
Expand Down
158 changes: 52 additions & 106 deletions typescript/acp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
import { z } from "zod";
import * as schema from "./schema.js";
export * from "./schema.js";
export * from "./stream.js";

import type { Stream } from "./stream.js";
import type {
AnyMessage,
AnyResponse,
Result,
ErrorResponse,
PendingResponse,
RequestHandler,
NotificationHandler,
} from "./jsonrpc.js";

/**
* An agent-side connection to a client.
Expand All @@ -22,16 +34,12 @@ export class AgentSideConnection {
* following the ACP specification.
*
* @param toAgent - A function that creates an Agent handler to process incoming client requests
* @param input - The stream for sending data to the client (typically stdout)
* @param output - The stream for receiving data from the client (typically stdin)
* @param stream - The bidirectional message stream for communication. Typically created using
* {@link ndJsonStream} for stdio-based connections.
*
* See protocol docs: [Communication Model](https://agentclientprotocol.com/protocol/overview#communication-model)
*/
constructor(
toAgent: (conn: AgentSideConnection) => Agent,
input: WritableStream<Uint8Array>,
output: ReadableStream<Uint8Array>,
) {
constructor(toAgent: (conn: AgentSideConnection) => Agent, stream: Stream) {
const agent = toAgent(this);

const requestHandler = async (
Expand Down Expand Up @@ -119,8 +127,7 @@ export class AgentSideConnection {
this.#connection = new Connection(
requestHandler,
notificationHandler,
input,
output,
stream,
);
}

Expand Down Expand Up @@ -376,16 +383,12 @@ export class ClientSideConnection implements Agent {
* following the ACP specification.
*
* @param toClient - A function that creates a Client handler to process incoming agent requests
* @param input - The stream for sending data to the agent (typically stdout)
* @param output - The stream for receiving data from the agent (typically stdin)
* @param stream - The bidirectional message stream for communication. Typically created using
* {@link ndJsonStream} for stdio-based connections.
*
* See protocol docs: [Communication Model](https://agentclientprotocol.com/protocol/overview#communication-model)
*/
constructor(
toClient: (agent: Agent) => Client,
input: WritableStream<Uint8Array>,
output: ReadableStream<Uint8Array>,
) {
constructor(toClient: (agent: Agent) => Client, stream: Stream) {
const client = toClient(this);

const requestHandler = async (
Expand Down Expand Up @@ -498,8 +501,7 @@ export class ClientSideConnection implements Agent {
this.#connection = new Connection(
requestHandler,
notificationHandler,
input,
output,
stream,
);
}

Expand Down Expand Up @@ -683,113 +685,58 @@ export class ClientSideConnection implements Agent {
}
}

type AnyMessage = AnyRequest | AnyResponse | AnyNotification;

type AnyRequest = {
jsonrpc: "2.0";
id: string | number;
method: string;
params?: unknown;
};

type AnyResponse = {
jsonrpc: "2.0";
id: string | number;
} & Result<unknown>;

type AnyNotification = {
jsonrpc: "2.0";
method: string;
params?: unknown;
};

type Result<T> =
| {
result: T;
}
| {
error: ErrorResponse;
};

type ErrorResponse = {
code: number;
message: string;
data?: unknown;
};

type PendingResponse = {
resolve: (response: unknown) => void;
reject: (error: ErrorResponse) => void;
};

type RequestHandler = (method: string, params: unknown) => Promise<unknown>;
type NotificationHandler = (method: string, params: unknown) => Promise<void>;
// Re-export AnyMessage for backwards compatibility
export type { AnyMessage } from "./jsonrpc.js";

class Connection {
#pendingResponses: Map<string | number, PendingResponse> = new Map();
#nextRequestId: number = 0;
#requestHandler: RequestHandler;
#notificationHandler: NotificationHandler;
#peerInput: WritableStream<Uint8Array>;
#stream: Stream;
#writeQueue: Promise<void> = Promise.resolve();
#textEncoder: TextEncoder;

constructor(
requestHandler: RequestHandler,
notificationHandler: NotificationHandler,
peerInput: WritableStream<Uint8Array>,
peerOutput: ReadableStream<Uint8Array>,
stream: Stream,
) {
this.#requestHandler = requestHandler;
this.#notificationHandler = notificationHandler;
this.#peerInput = peerInput;
this.#textEncoder = new TextEncoder();
this.#receive(peerOutput);
this.#stream = stream;
this.#receive();
}

async #receive(output: ReadableStream<Uint8Array>) {
let content = "";
const decoder = new TextDecoder();
const reader = output.getReader();
async #receive() {
const reader = this.#stream.readable.getReader();
try {
while (true) {
const { value, done } = await reader.read();
const { value: message, done } = await reader.read();
if (done) {
break;
}
if (!value) {
if (!message) {
continue;
}
content += decoder.decode(value, { stream: true });
const lines = content.split("\n");
content = lines.pop() || "";

for (const line of lines) {
const trimmedLine = line.trim();

if (trimmedLine) {
let id;
try {
const message = JSON.parse(trimmedLine);
id = message.id;
this.#processMessage(message);
} catch (err) {
console.error(
"Unexpected error during message processing:",
trimmedLine,
err,
);
if (id) {
this.#sendMessage({
jsonrpc: "2.0",
id,
error: {
code: -32700,
message: "Parse error",
},
});
}
}

try {
this.#processMessage(message);
} catch (err) {
console.error(
"Unexpected error during message processing:",
message,
err,
);
// Only send error response if the message had an id (was a request)
if ("id" in message && message.id !== undefined) {
this.#sendMessage({
jsonrpc: "2.0",
id: message.id,
error: {
code: -32700,
message: "Parse error",
},
});
}
}
}
Expand Down Expand Up @@ -936,13 +883,12 @@ class Connection {
await this.#sendMessage({ jsonrpc: "2.0", method, params });
}

async #sendMessage(json: AnyMessage) {
const content = JSON.stringify(json) + "\n";
async #sendMessage(message: AnyMessage) {
this.#writeQueue = this.#writeQueue
.then(async () => {
const writer = this.#peerInput.getWriter();
const writer = this.#stream.writable.getWriter();
try {
await writer.write(this.#textEncoder.encode(content));
await writer.write(message);
} finally {
writer.releaseLock();
}
Expand Down
Loading