Skip to content

Commit 9ece667

Browse files
author
Mark Pollack
committed
Add WebSocket edge case tests, increase max message size to 4MB
6 new integration tests covering WebSocket transport edge cases: - Concurrent prompt rejection (single-turn enforcement) - Cancel mid-prompt (notification delivery during processing) - Permission request over WebSocket (agent-to-client round-trip) - Write file request over WebSocket (agent-to-client with capabilities) - Large messages (500KB prompt, verifies delivery intact) - Multiple sequential prompts (5 prompts over same connection) Increases Jetty WebSocket maxTextMessageSize from 64KB default to 4MB. Agent messages frequently contain file contents, code blocks, and tool outputs that exceed the default.
1 parent 9a1ac23 commit 9ece667

2 files changed

Lines changed: 380 additions & 0 deletions

File tree

acp-websocket-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/WebSocketAcpAgentTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public Mono<Void> start(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> han
174174
// Set up WebSocket handler
175175
WebSocketUpgradeHandler wsHandler = WebSocketUpgradeHandler.from(server, container -> {
176176
container.setIdleTimeout(idleTimeout);
177+
container.setMaxTextMessageSize(4 * 1024 * 1024); // 4MB — agent messages can include file contents
177178
container.addMapping(path, (request, response, callback) -> new AcpWebSocketEndpoint());
178179
});
179180
server.setHandler(wsHandler);
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
/*
2+
* Copyright 2025-2026 the original author or authors.
3+
*/
4+
5+
package com.agentclientprotocol.sdk.integration;
6+
7+
import java.io.IOException;
8+
import java.net.ServerSocket;
9+
import java.net.URI;
10+
import java.time.Duration;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import com.agentclientprotocol.sdk.agent.AcpAgent;
19+
import com.agentclientprotocol.sdk.agent.AcpAsyncAgent;
20+
import com.agentclientprotocol.sdk.agent.transport.WebSocketAcpAgentTransport;
21+
import com.agentclientprotocol.sdk.client.AcpAsyncClient;
22+
import com.agentclientprotocol.sdk.client.AcpClient;
23+
import com.agentclientprotocol.sdk.client.transport.WebSocketAcpClientTransport;
24+
import com.agentclientprotocol.sdk.json.AcpJsonMapper;
25+
import com.agentclientprotocol.sdk.spec.AcpSchema;
26+
import org.junit.jupiter.api.Test;
27+
import reactor.core.publisher.Mono;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
31+
32+
/**
33+
* WebSocket transport edge case tests.
34+
*
35+
* Covers: concurrent prompts, cancel mid-prompt, permission requests,
36+
* write file requests, large messages, and multiple sequential prompts.
37+
*/
38+
class WebSocketEdgeCaseTest {
39+
40+
private static final Duration TIMEOUT = Duration.ofSeconds(10);
41+
42+
@Test
43+
void concurrentPromptsRejected() throws Exception {
44+
int port = findFreePort();
45+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
46+
CountDownLatch promptStarted = new CountDownLatch(1);
47+
CountDownLatch promptRelease = new CountDownLatch(1);
48+
49+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
50+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
51+
.requestTimeout(TIMEOUT)
52+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
53+
1, new AcpSchema.AgentCapabilities(), List.of())))
54+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
55+
.promptHandler((req, ctx) -> {
56+
promptStarted.countDown();
57+
// Block until released
58+
try { promptRelease.await(10, TimeUnit.SECONDS); } catch (InterruptedException ignored) {}
59+
return Mono.just(AcpSchema.PromptResponse.endTurn());
60+
})
61+
.build();
62+
63+
agent.start().block(TIMEOUT);
64+
Thread.sleep(300);
65+
66+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
67+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
68+
AcpAsyncClient client = AcpClient.async(clientTransport).requestTimeout(TIMEOUT).build();
69+
70+
try {
71+
client.initialize(new AcpSchema.InitializeRequest(1, new AcpSchema.ClientCapabilities()))
72+
.block(TIMEOUT);
73+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
74+
75+
// Fire first prompt (will block in handler)
76+
var firstPrompt = client.prompt(new AcpSchema.PromptRequest("s1",
77+
List.of(new AcpSchema.TextContent("first")))).toFuture();
78+
79+
// Wait for it to start processing
80+
assertThat(promptStarted.await(5, TimeUnit.SECONDS)).isTrue();
81+
82+
// Second prompt should fail (either rejected with "active prompt" or timeout)
83+
assertThatThrownBy(() ->
84+
client.prompt(new AcpSchema.PromptRequest("s1",
85+
List.of(new AcpSchema.TextContent("second")))).block(Duration.ofSeconds(3))
86+
).isNotNull();
87+
88+
// Release the first prompt
89+
promptRelease.countDown();
90+
var result = firstPrompt.get(5, TimeUnit.SECONDS);
91+
assertThat(result.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
92+
}
93+
finally {
94+
client.closeGracefully().block(TIMEOUT);
95+
agent.closeGracefully().block(TIMEOUT);
96+
}
97+
}
98+
99+
@Test
100+
void cancelDuringPromptOverWebSocket() throws Exception {
101+
int port = findFreePort();
102+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
103+
CountDownLatch cancelReceived = new CountDownLatch(1);
104+
CountDownLatch promptStarted = new CountDownLatch(1);
105+
106+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
107+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
108+
.requestTimeout(TIMEOUT)
109+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
110+
1, new AcpSchema.AgentCapabilities(), List.of())))
111+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
112+
.promptHandler((req, ctx) -> {
113+
promptStarted.countDown();
114+
// Simulate slow work
115+
return Mono.delay(Duration.ofSeconds(5))
116+
.then(Mono.just(new AcpSchema.PromptResponse(AcpSchema.StopReason.CANCELLED)));
117+
})
118+
.cancelHandler(notification -> {
119+
assertThat(notification.sessionId()).isEqualTo("s1");
120+
cancelReceived.countDown();
121+
return Mono.empty();
122+
})
123+
.build();
124+
125+
agent.start().block(TIMEOUT);
126+
Thread.sleep(300);
127+
128+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
129+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
130+
AcpAsyncClient client = AcpClient.async(clientTransport).requestTimeout(TIMEOUT).build();
131+
132+
try {
133+
client.initialize(new AcpSchema.InitializeRequest(1, new AcpSchema.ClientCapabilities()))
134+
.block(TIMEOUT);
135+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
136+
137+
// Start prompt in background
138+
client.prompt(new AcpSchema.PromptRequest("s1",
139+
List.of(new AcpSchema.TextContent("slow work")))).subscribe();
140+
141+
// Wait for prompt to start, then cancel
142+
assertThat(promptStarted.await(5, TimeUnit.SECONDS)).isTrue();
143+
client.cancel(new AcpSchema.CancelNotification("s1")).block(TIMEOUT);
144+
145+
// Verify cancel was received by the agent
146+
assertThat(cancelReceived.await(5, TimeUnit.SECONDS)).isTrue();
147+
}
148+
finally {
149+
client.closeGracefully().block(TIMEOUT);
150+
agent.closeGracefully().block(TIMEOUT);
151+
}
152+
}
153+
154+
@Test
155+
void permissionRequestOverWebSocket() throws Exception {
156+
int port = findFreePort();
157+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
158+
AtomicReference<AcpAsyncAgent> agentRef = new AtomicReference<>();
159+
AtomicReference<AcpSchema.RequestPermissionResponse> permResponse = new AtomicReference<>();
160+
CountDownLatch latch = new CountDownLatch(1);
161+
162+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
163+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
164+
.requestTimeout(TIMEOUT)
165+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
166+
1, new AcpSchema.AgentCapabilities(), List.of())))
167+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
168+
.promptHandler((req, ctx) -> {
169+
var toolCall = new AcpSchema.ToolCallUpdate("tool-1", "Delete File",
170+
AcpSchema.ToolKind.DELETE, AcpSchema.ToolCallStatus.PENDING, null, null, null, null);
171+
var options = List.of(
172+
new AcpSchema.PermissionOption("allow", "Allow", AcpSchema.PermissionOptionKind.ALLOW_ONCE),
173+
new AcpSchema.PermissionOption("deny", "Deny", AcpSchema.PermissionOptionKind.REJECT_ONCE));
174+
return agentRef.get()
175+
.requestPermission(new AcpSchema.RequestPermissionRequest("s1", toolCall, options))
176+
.doOnNext(resp -> { permResponse.set(resp); latch.countDown(); })
177+
.then(Mono.just(AcpSchema.PromptResponse.endTurn()));
178+
})
179+
.build();
180+
agentRef.set(agent);
181+
182+
agent.start().block(TIMEOUT);
183+
Thread.sleep(300);
184+
185+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
186+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
187+
AcpAsyncClient client = AcpClient.async(clientTransport)
188+
.requestTimeout(TIMEOUT)
189+
.requestPermissionHandler(req -> {
190+
assertThat(req.toolCall().title()).isEqualTo("Delete File");
191+
return Mono.just(new AcpSchema.RequestPermissionResponse(
192+
new AcpSchema.PermissionSelected("allow")));
193+
})
194+
.build();
195+
196+
try {
197+
client.initialize(new AcpSchema.InitializeRequest(1, new AcpSchema.ClientCapabilities()))
198+
.block(TIMEOUT);
199+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
200+
client.prompt(new AcpSchema.PromptRequest("s1",
201+
List.of(new AcpSchema.TextContent("delete something")))).block(TIMEOUT);
202+
203+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
204+
assertThat(permResponse.get().outcome()).isInstanceOf(AcpSchema.PermissionSelected.class);
205+
assertThat(((AcpSchema.PermissionSelected) permResponse.get().outcome()).optionId())
206+
.isEqualTo("allow");
207+
}
208+
finally {
209+
client.closeGracefully().block(TIMEOUT);
210+
agent.closeGracefully().block(TIMEOUT);
211+
}
212+
}
213+
214+
@Test
215+
void writeFileRequestOverWebSocket() throws Exception {
216+
int port = findFreePort();
217+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
218+
AtomicReference<AcpAsyncAgent> agentRef = new AtomicReference<>();
219+
AtomicReference<String> writtenPath = new AtomicReference<>();
220+
AtomicReference<String> writtenContent = new AtomicReference<>();
221+
CountDownLatch latch = new CountDownLatch(1);
222+
223+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
224+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
225+
.requestTimeout(TIMEOUT)
226+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
227+
1, new AcpSchema.AgentCapabilities(), List.of())))
228+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
229+
.promptHandler((req, ctx) ->
230+
agentRef.get()
231+
.writeTextFile(new AcpSchema.WriteTextFileRequest("s1", "/out.txt", "hello world"))
232+
.doOnSuccess(v -> latch.countDown())
233+
.then(Mono.just(AcpSchema.PromptResponse.endTurn())))
234+
.build();
235+
agentRef.set(agent);
236+
237+
agent.start().block(TIMEOUT);
238+
Thread.sleep(300);
239+
240+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
241+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
242+
AcpAsyncClient client = AcpClient.async(clientTransport)
243+
.requestTimeout(TIMEOUT)
244+
.writeTextFileHandler(req -> {
245+
writtenPath.set(req.path());
246+
writtenContent.set(req.content());
247+
return Mono.just(new AcpSchema.WriteTextFileResponse());
248+
})
249+
.build();
250+
251+
try {
252+
var caps = new AcpSchema.ClientCapabilities(
253+
new AcpSchema.FileSystemCapability(false, true), false);
254+
client.initialize(new AcpSchema.InitializeRequest(1, caps)).block(TIMEOUT);
255+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
256+
client.prompt(new AcpSchema.PromptRequest("s1",
257+
List.of(new AcpSchema.TextContent("write a file")))).block(TIMEOUT);
258+
259+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
260+
assertThat(writtenPath.get()).isEqualTo("/out.txt");
261+
assertThat(writtenContent.get()).isEqualTo("hello world");
262+
}
263+
finally {
264+
client.closeGracefully().block(TIMEOUT);
265+
agent.closeGracefully().block(TIMEOUT);
266+
}
267+
}
268+
269+
@Test
270+
void largeMessageOverWebSocket() throws Exception {
271+
int port = findFreePort();
272+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
273+
274+
// Build a large prompt (~500KB) to test beyond the old 64KB default
275+
String largeText = "x".repeat(500_000);
276+
277+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
278+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
279+
.requestTimeout(TIMEOUT)
280+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
281+
1, new AcpSchema.AgentCapabilities(), List.of())))
282+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
283+
.promptHandler((req, ctx) -> {
284+
// Verify the large text arrived intact
285+
String text = req.text();
286+
assertThat(text).hasSize(500_000);
287+
assertThat(text).startsWith("xxx");
288+
return ctx.sendMessage("received " + text.length())
289+
.then(Mono.just(AcpSchema.PromptResponse.endTurn()));
290+
})
291+
.build();
292+
293+
agent.start().block(TIMEOUT);
294+
Thread.sleep(300);
295+
296+
AtomicReference<String> agentResponse = new AtomicReference<>();
297+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
298+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
299+
AcpAsyncClient client = AcpClient.async(clientTransport)
300+
.requestTimeout(TIMEOUT)
301+
.sessionUpdateConsumer(notification -> {
302+
if (notification.update() instanceof AcpSchema.AgentMessageChunk msg) {
303+
agentResponse.set(((AcpSchema.TextContent) msg.content()).text());
304+
}
305+
return Mono.empty();
306+
})
307+
.build();
308+
309+
try {
310+
client.initialize(new AcpSchema.InitializeRequest(1, new AcpSchema.ClientCapabilities()))
311+
.block(TIMEOUT);
312+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
313+
314+
AcpSchema.PromptResponse response = client.prompt(new AcpSchema.PromptRequest("s1",
315+
List.of(new AcpSchema.TextContent(largeText)))).block(TIMEOUT);
316+
317+
assertThat(response.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
318+
assertThat(agentResponse.get()).isEqualTo("received 500000");
319+
}
320+
finally {
321+
client.closeGracefully().block(TIMEOUT);
322+
agent.closeGracefully().block(TIMEOUT);
323+
}
324+
}
325+
326+
@Test
327+
void multipleSequentialPromptsOverWebSocket() throws Exception {
328+
int port = findFreePort();
329+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
330+
CopyOnWriteArrayList<String> receivedPrompts = new CopyOnWriteArrayList<>();
331+
332+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
333+
AcpAsyncAgent agent = AcpAgent.async(agentTransport)
334+
.requestTimeout(TIMEOUT)
335+
.initializeHandler(req -> Mono.just(new AcpSchema.InitializeResponse(
336+
1, new AcpSchema.AgentCapabilities(), List.of())))
337+
.newSessionHandler(req -> Mono.just(new AcpSchema.NewSessionResponse("s1", null, null)))
338+
.promptHandler((req, ctx) -> {
339+
receivedPrompts.add(req.text());
340+
return ctx.sendMessage("echo: " + req.text())
341+
.then(Mono.just(AcpSchema.PromptResponse.endTurn()));
342+
})
343+
.build();
344+
345+
agent.start().block(TIMEOUT);
346+
Thread.sleep(300);
347+
348+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
349+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
350+
AcpAsyncClient client = AcpClient.async(clientTransport).requestTimeout(TIMEOUT).build();
351+
352+
try {
353+
client.initialize(new AcpSchema.InitializeRequest(1, new AcpSchema.ClientCapabilities()))
354+
.block(TIMEOUT);
355+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
356+
357+
// Send 5 sequential prompts over the same WebSocket connection
358+
for (int i = 1; i <= 5; i++) {
359+
AcpSchema.PromptResponse resp = client.prompt(new AcpSchema.PromptRequest("s1",
360+
List.of(new AcpSchema.TextContent("prompt-" + i)))).block(TIMEOUT);
361+
assertThat(resp.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
362+
}
363+
364+
assertThat(receivedPrompts).containsExactly(
365+
"prompt-1", "prompt-2", "prompt-3", "prompt-4", "prompt-5");
366+
}
367+
finally {
368+
client.closeGracefully().block(TIMEOUT);
369+
agent.closeGracefully().block(TIMEOUT);
370+
}
371+
}
372+
373+
private static int findFreePort() throws IOException {
374+
try (ServerSocket socket = new ServerSocket(0)) {
375+
return socket.getLocalPort();
376+
}
377+
}
378+
379+
}

0 commit comments

Comments
 (0)