Skip to content

Commit f0d8578

Browse files
committed
feat: unify streamable agent transport
1 parent ef9bb27 commit f0d8578

46 files changed

Lines changed: 822 additions & 5250 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

acp-core/src/test/java/com/agentclientprotocol/sdk/client/transport/StreamableHttpAcpClientTransportIntegrationTest.java

Lines changed: 345 additions & 105 deletions
Large diffs are not rendered by default.

acp-streamable-http-jetty/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@
3434
<groupId>org.eclipse.jetty.http2</groupId>
3535
<artifactId>jetty-http2-server</artifactId>
3636
</dependency>
37+
<dependency>
38+
<groupId>org.eclipse.jetty.websocket</groupId>
39+
<artifactId>jetty-websocket-jetty-server</artifactId>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.eclipse.jetty.websocket</groupId>
43+
<artifactId>jetty-websocket-jetty-api</artifactId>
44+
</dependency>
3745

3846
<dependency>
3947
<groupId>org.junit.jupiter</groupId>

acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java

Lines changed: 191 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.io.IOException;
88
import java.io.PrintWriter;
9+
import java.nio.channels.ClosedChannelException;
910
import java.nio.charset.StandardCharsets;
1011
import java.time.Duration;
1112
import java.util.ArrayDeque;
@@ -39,6 +40,15 @@
3940
import org.eclipse.jetty.server.HttpConnectionFactory;
4041
import org.eclipse.jetty.server.Server;
4142
import org.eclipse.jetty.server.ServerConnector;
43+
import org.eclipse.jetty.websocket.api.Callback;
44+
import org.eclipse.jetty.websocket.api.Session;
45+
import org.eclipse.jetty.websocket.api.StatusCode;
46+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
47+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
48+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
49+
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
50+
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
51+
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
4252
import org.slf4j.Logger;
4353
import org.slf4j.LoggerFactory;
4454
import reactor.core.publisher.Mono;
@@ -48,16 +58,19 @@
4858
* Listener-backed ACP Streamable HTTP transport for agents.
4959
*
5060
* <p>
51-
* This transport hosts a Jetty HTTP endpoint and creates one fresh agent runtime per
52-
* remote ACP connection through {@link AcpAgentFactory}. The accepted connection then
53-
* owns its own per-connection {@link RemoteAcpConnection}, while the listener remains
54-
* responsible only for HTTP concerns such as headers, SSE streams, and request routing.
61+
* This transport hosts the ACP Streamable HTTP endpoint on Jetty, including POST/SSE
62+
* request handling and WebSocket upgrades on the same path. It creates one fresh agent
63+
* runtime per remote ACP connection through {@link AcpAgentFactory}. The accepted
64+
* connection then owns its own per-connection {@link RemoteAcpConnection}, while the
65+
* listener remains responsible only for wire-level concerns such as headers, SSE
66+
* streams, WebSocket frames, and request routing.
5567
* </p>
5668
*
5769
* <p>
58-
* Streamable HTTP and the RFD-compliant remote WebSocket listener share
59-
* {@link RemoteAcpConnection}; this class keeps HTTP-specific routing, headers, SSE
60-
* stream ownership, and replay behavior local to the HTTP adapter.
70+
* WebSocket support is intentionally hosted here instead of as a separate public
71+
* listener so one {@code /acp} endpoint can behave like the RFD and the Rust
72+
* {@code AcpHttpServer}: HTTP requests fall through to the servlet, while valid
73+
* WebSocket upgrade requests are accepted by Jetty's {@link WebSocketUpgradeHandler}.
6174
* </p>
6275
*
6376
* @author Kaiser Dandangi
@@ -160,6 +173,8 @@ private record ResolvedInboundRoute(JSONRPCMessage message, RouteScope requestSc
160173

161174
private final ConcurrentMap<String, ConnectionState> connections = new ConcurrentHashMap<>();
162175

176+
private final ConcurrentMap<String, WebSocketConnectionState> webSocketConnections = new ConcurrentHashMap<>();
177+
163178
private final AtomicBoolean started = new AtomicBoolean(false);
164179

165180
private final AtomicBoolean closing = new AtomicBoolean(false);
@@ -232,6 +247,25 @@ public Mono<Void> start() {
232247
ServletContextHandler context = new ServletContextHandler();
233248
context.setContextPath("/");
234249
context.addServlet(new ServletHolder(new AcpServlet()), path);
250+
251+
WebSocketUpgradeHandler webSocketHandler = WebSocketUpgradeHandler.from(jettyServer, context, container -> {
252+
container.setIdleTimeout(Duration.ofMinutes(30));
253+
container.addMapping(path, (request, response, callback) -> {
254+
WebSocketConnectionState connection = createWebSocketConnection();
255+
try {
256+
connection.start();
257+
webSocketConnections.put(connection.id(), connection);
258+
response.getHeaders().put(HEADER_CONNECTION_ID, connection.id());
259+
return new AcpWebSocketEndpoint(connection);
260+
}
261+
catch (Exception e) {
262+
connection.close();
263+
callback.failed(e);
264+
return null;
265+
}
266+
});
267+
});
268+
context.insertHandler(webSocketHandler);
235269
jettyServer.setHandler(context);
236270

237271
jettyServer.start();
@@ -262,6 +296,8 @@ public Mono<Void> closeGracefully() {
262296
}
263297
connections.values().forEach(ConnectionState::close);
264298
connections.clear();
299+
webSocketConnections.values().forEach(WebSocketConnectionState::close);
300+
webSocketConnections.clear();
265301
Server currentServer = this.server;
266302
if (currentServer != null) {
267303
try {
@@ -283,15 +319,25 @@ public Mono<Void> awaitTermination() {
283319
return terminationSink.asMono();
284320
}
285321

322+
int activeConnectionCount() {
323+
return connections.size() + webSocketConnections.size();
324+
}
325+
286326
private ConnectionState createConnection() {
287327
String connectionId = UUID.randomUUID().toString();
288328
ConnectionState connection = new ConnectionState(connectionId);
289329
connection.start();
290330
return connection;
291331
}
292332

293-
private Optional<ConnectionState> connection(String connectionId) {
294-
return Optional.ofNullable(connections.get(connectionId));
333+
private WebSocketConnectionState createWebSocketConnection() {
334+
String connectionId = UUID.randomUUID().toString();
335+
return new WebSocketConnectionState(connectionId);
336+
}
337+
338+
private boolean isInitializeRequest(JSONRPCMessage message) {
339+
return message instanceof AcpSchema.JSONRPCRequest request
340+
&& AcpSchema.METHOD_INITIALIZE.equals(request.method()) && request.id() != null;
295341
}
296342

297343
private final class AcpServlet extends HttpServlet {
@@ -879,6 +925,142 @@ void close() {
879925

880926
}
881927

928+
private final class WebSocketConnectionState {
929+
930+
private final String id;
931+
932+
private final RemoteAcpConnection remoteConnection;
933+
934+
private final AtomicBoolean initialized = new AtomicBoolean(false);
935+
936+
private final AtomicBoolean closed = new AtomicBoolean(false);
937+
938+
private volatile Session session;
939+
940+
WebSocketConnectionState(String id) {
941+
this.id = id;
942+
this.remoteConnection = new RemoteAcpConnection(id, jsonMapper, this::sendToClient);
943+
}
944+
945+
String id() {
946+
return id;
947+
}
948+
949+
void start() {
950+
this.remoteConnection.start(agentFactory).block(INITIALIZE_TIMEOUT);
951+
}
952+
953+
void open(Session session) {
954+
this.session = session;
955+
}
956+
957+
void acceptFromClient(JSONRPCMessage message) {
958+
if (!initialized.get()) {
959+
// The WebSocket branch of the streamable endpoint has no POST
960+
// initialize response that can create the connection first, so the first
961+
// client-originated JSON-RPC message on the socket must be initialize.
962+
if (!isInitializeRequest(message)) {
963+
close(StatusCode.PROTOCOL, "first ACP WebSocket message must be initialize");
964+
return;
965+
}
966+
initialized.set(true);
967+
}
968+
remoteConnection.acceptInbound(message);
969+
}
970+
971+
void sendToClient(JSONRPCMessage message) {
972+
try {
973+
Session currentSession = this.session;
974+
if (closed.get() || currentSession == null || !currentSession.isOpen()) {
975+
throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
976+
}
977+
String payload = jsonMapper.writeValueAsString(message);
978+
logger.debug("Sending streamable ACP WebSocket message: {}", payload);
979+
currentSession.sendText(payload, Callback.from(() -> {
980+
// Jetty requires an explicit success callback; there is no
981+
// follow-up work after the frame has been accepted for writing.
982+
}, error -> {
983+
if (!closed.get()) {
984+
remoteConnection.signalException(error);
985+
}
986+
}));
987+
}
988+
catch (Exception e) {
989+
remoteConnection.signalException(e);
990+
close(StatusCode.SERVER_ERROR, "failed to send ACP message");
991+
}
992+
}
993+
994+
void close() {
995+
close(StatusCode.NORMAL, "server closing");
996+
}
997+
998+
void close(int statusCode, String reason) {
999+
if (!closed.compareAndSet(false, true)) {
1000+
return;
1001+
}
1002+
webSocketConnections.remove(id, this);
1003+
Session currentSession = this.session;
1004+
if (currentSession != null && currentSession.isOpen()) {
1005+
currentSession.close(statusCode, reason, Callback.NOOP);
1006+
}
1007+
remoteConnection.closeGracefully().subscribe();
1008+
}
1009+
1010+
}
1011+
1012+
/**
1013+
* Jetty WebSocket endpoint for one WebSocket-upgraded ACP connection.
1014+
*/
1015+
@WebSocket
1016+
public class AcpWebSocketEndpoint {
1017+
1018+
private final WebSocketConnectionState connection;
1019+
1020+
AcpWebSocketEndpoint(WebSocketConnectionState connection) {
1021+
this.connection = connection;
1022+
}
1023+
1024+
@OnWebSocketOpen
1025+
public void onOpen(Session session) {
1026+
logger.info("Streamable ACP WebSocket client connected from {}", session.getRemoteSocketAddress());
1027+
connection.open(session);
1028+
}
1029+
1030+
@OnWebSocketMessage
1031+
public void onMessage(Session session, String message) {
1032+
logger.debug("Received streamable ACP WebSocket message: {}", message);
1033+
1034+
try {
1035+
JSONRPCMessage jsonRpcMessage = AcpSchema.deserializeJsonRpcMessage(jsonMapper, message);
1036+
connection.acceptFromClient(jsonRpcMessage);
1037+
}
1038+
catch (Exception e) {
1039+
logger.warn("Closing streamable ACP WebSocket connection after invalid JSON-RPC frame", e);
1040+
connection.close(StatusCode.PROTOCOL, "invalid JSON-RPC frame");
1041+
}
1042+
}
1043+
1044+
@OnWebSocketClose
1045+
public void onClose(Session session, int statusCode, String reason) {
1046+
logger.info("Streamable ACP WebSocket client disconnected: {} - {}", statusCode, reason);
1047+
connection.close(statusCode, reason);
1048+
}
1049+
1050+
@OnWebSocketError
1051+
public void onError(Session session, Throwable error) {
1052+
if (error instanceof ClosedChannelException) {
1053+
logger.debug("Streamable ACP WebSocket channel closed");
1054+
connection.close(StatusCode.NORMAL, "WebSocket channel closed");
1055+
return;
1056+
}
1057+
logger.error("Streamable ACP WebSocket error", error);
1058+
connection.remoteConnection.signalException(error);
1059+
connection.close(StatusCode.SERVER_ERROR, "WebSocket error");
1060+
}
1061+
1062+
}
1063+
8821064
private static final class UnknownSessionException extends RuntimeException {
8831065

8841066
UnknownSessionException(String message) {

0 commit comments

Comments
 (0)