Skip to content

Commit 9a1ac23

Browse files
author
Mark Pollack
committed
Fix WebSocket client transport echoing agent requests back to agent
AcpClientSession wired the transport with doOnNext(this::handle), which performs the side effect but passes the original message through downstream. The WebSocket transport then sent it back over the wire, causing the agent to receive its own requests as inbound client messages. Fix: consume the message after handling with .then(Mono.empty()) so nothing passes through to the transport's outbound path. Test reproduction by Kaiser Dandangi (@kamikaz1k) via PR #6.
1 parent f9ea30f commit 9a1ac23

2 files changed

Lines changed: 154 additions & 1 deletion

File tree

acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpClientSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public AcpClientSession(Duration requestTimeout, AcpClientTransport transport,
148148
return t;
149149
}), "acp-timeout-" + sessionPrefix);
150150

151-
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
151+
this.transport.connect(mono -> mono.doOnNext(this::handle).then(Mono.empty())).transform(connectHook).subscribe();
152152
}
153153

154154
private void dismissPendingResponses() {
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2025-2026 the original author or authors.
3+
*/
4+
5+
package com.agentclientprotocol.sdk.integration;
6+
7+
import java.io.IOException;
8+
import java.net.ServerSocket;
9+
import java.net.URI;
10+
import java.time.Duration;
11+
import java.util.HashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.concurrent.CountDownLatch;
15+
import java.util.concurrent.TimeUnit;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import com.agentclientprotocol.sdk.agent.transport.WebSocketAcpAgentTransport;
19+
import com.agentclientprotocol.sdk.client.AcpAsyncClient;
20+
import com.agentclientprotocol.sdk.client.AcpClient;
21+
import com.agentclientprotocol.sdk.client.transport.WebSocketAcpClientTransport;
22+
import com.agentclientprotocol.sdk.json.AcpJsonMapper;
23+
import com.agentclientprotocol.sdk.json.TypeRef;
24+
import com.agentclientprotocol.sdk.spec.AcpAgentSession;
25+
import com.agentclientprotocol.sdk.spec.AcpSchema;
26+
import org.junit.jupiter.api.Test;
27+
import reactor.core.publisher.Mono;
28+
29+
import static org.assertj.core.api.Assertions.assertThat;
30+
31+
/**
32+
* Reproduces the WebSocket client transport echo bug on main.
33+
*
34+
* <p>
35+
* This test isolates the agent-request path:
36+
* </p>
37+
* <ol>
38+
* <li>The agent receives {@code session/prompt} from the client.</li>
39+
* <li>While handling that prompt, the agent sends {@code fs/read_text_file} to the
40+
* client.</li>
41+
* <li>The client has registered {@code readTextFileHandler}, so it should handle the
42+
* request locally and send only a JSON-RPC response with the same id.</li>
43+
* <li>The original {@code fs/read_text_file} request must not be sent back to the
44+
* agent.</li>
45+
* </ol>
46+
*
47+
* <p>
48+
* On main, {@code AcpClientSession} wires the transport handler with
49+
* {@code mono -> mono.doOnNext(this::handle)}. Reactor {@code doOnNext} preserves the
50+
* original message downstream, and the WebSocket transport forwards handler-emitted
51+
* messages back onto the socket. The result is that an inbound agent request can be
52+
* echoed back to the agent after being handled by the client.
53+
* </p>
54+
*/
55+
class WebSocketClientInboundEchoReproTest {
56+
57+
private static final Duration TIMEOUT = Duration.ofSeconds(5);
58+
59+
@Test
60+
void clientSessionShouldNotEchoAgentRequestsBackToAgent() throws Exception {
61+
AcpJsonMapper jsonMapper = AcpJsonMapper.createDefault();
62+
int port = findFreePort();
63+
64+
WebSocketAcpAgentTransport agentTransport = new WebSocketAcpAgentTransport(port, jsonMapper);
65+
AtomicReference<AcpAgentSession> agentSessionRef = new AtomicReference<>();
66+
CountDownLatch echoedRequestReceived = new CountDownLatch(1);
67+
68+
AcpAgentSession agentSession = null;
69+
AcpAsyncClient client = null;
70+
71+
try {
72+
Map<String, AcpAgentSession.RequestHandler<?>> requestHandlers = new HashMap<>();
73+
requestHandlers.put(AcpSchema.METHOD_INITIALIZE,
74+
params -> Mono.just(new AcpSchema.InitializeResponse(1, new AcpSchema.AgentCapabilities(), List.of())));
75+
requestHandlers.put(AcpSchema.METHOD_SESSION_NEW,
76+
params -> Mono.just(new AcpSchema.NewSessionResponse("echo-session", null, null)));
77+
78+
// The prompt handler deliberately sends an agent->client request. The expected
79+
// protocol flow is:
80+
//
81+
// agent -> client: request id=N, method=fs/read_text_file
82+
// client -> agent: response id=N, result={ content: "client content" }
83+
//
84+
// The original request is not a client->agent message and should never be
85+
// observed by the agent's inbound request router.
86+
requestHandlers.put(AcpSchema.METHOD_SESSION_PROMPT, params -> agentSessionRef.get()
87+
.sendRequest(AcpSchema.METHOD_FS_READ_TEXT_FILE,
88+
new AcpSchema.ReadTextFileRequest("echo-session", "/tmp/input.txt", null, null),
89+
new TypeRef<AcpSchema.ReadTextFileResponse>() {
90+
})
91+
.thenReturn(AcpSchema.PromptResponse.endTurn()));
92+
93+
// Trap the agent->client method on the agent side. This handler should never run:
94+
// fs/read_text_file is a client method, so if the agent receives it here, the
95+
// client has echoed the inbound agent request back over the WebSocket transport.
96+
// Returning "unexpected echo" makes the trap harmless to the rest of the prompt
97+
// flow while the latch records that the invalid path happened.
98+
requestHandlers.put(AcpSchema.METHOD_FS_READ_TEXT_FILE, params -> {
99+
echoedRequestReceived.countDown();
100+
return Mono.just(new AcpSchema.ReadTextFileResponse("unexpected echo"));
101+
});
102+
103+
agentSession = new AcpAgentSession(TIMEOUT, agentTransport, requestHandlers, Map.of());
104+
agentSessionRef.set(agentSession);
105+
Thread.sleep(300);
106+
107+
WebSocketAcpClientTransport clientTransport = new WebSocketAcpClientTransport(
108+
URI.create("ws://localhost:" + port + "/acp"), jsonMapper);
109+
client = AcpClient.async(clientTransport)
110+
.requestTimeout(TIMEOUT)
111+
// Registering this handler means the client can satisfy fs/read_text_file
112+
// locally. It has no reason to route the request back to the agent.
113+
.readTextFileHandler(params -> Mono.just(new AcpSchema.ReadTextFileResponse("client content")))
114+
.build();
115+
116+
// Advertise the matching client capability so the agent is allowed to make the
117+
// fs/read_text_file request during prompt handling.
118+
client.initialize(new AcpSchema.InitializeRequest(1,
119+
new AcpSchema.ClientCapabilities(new AcpSchema.FileSystemCapability(true, false), false)))
120+
.block(TIMEOUT);
121+
client.newSession(new AcpSchema.NewSessionRequest("/workspace", List.of())).block(TIMEOUT);
122+
123+
AcpSchema.PromptResponse response = client
124+
.prompt(new AcpSchema.PromptRequest("echo-session", List.of(new AcpSchema.TextContent("read file"))))
125+
.block(TIMEOUT);
126+
127+
assertThat(response).isNotNull();
128+
assertThat(response.stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
129+
130+
// This should remain false. On main it becomes true, proving that the
131+
// WebSocket client transport echoed the inbound fs/read_text_file request back
132+
// to the agent.
133+
assertThat(echoedRequestReceived.await(1, TimeUnit.SECONDS))
134+
.as("WebSocket client session must not send inbound agent requests back to the agent")
135+
.isFalse();
136+
}
137+
finally {
138+
if (client != null) {
139+
client.closeGracefully().block(TIMEOUT);
140+
}
141+
if (agentSession != null) {
142+
agentSession.closeGracefully().block(TIMEOUT);
143+
}
144+
}
145+
}
146+
147+
private static int findFreePort() throws IOException {
148+
try (ServerSocket socket = new ServerSocket(0)) {
149+
return socket.getLocalPort();
150+
}
151+
}
152+
153+
}

0 commit comments

Comments
 (0)