Skip to content

Commit ef9bb27

Browse files
committed
Add remote WebSocket agent transport
1 parent f706d17 commit ef9bb27

5 files changed

Lines changed: 993 additions & 112 deletions

File tree

Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Copyright 2025-2026 the original author or authors.
3+
*/
4+
5+
package com.agentclientprotocol.sdk.agent.transport;
6+
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.function.Consumer;
9+
import java.util.function.Function;
10+
11+
import com.agentclientprotocol.sdk.agent.AcpAgentFactory;
12+
import com.agentclientprotocol.sdk.agent.AcpAsyncAgent;
13+
import com.agentclientprotocol.sdk.error.AcpConnectionException;
14+
import com.agentclientprotocol.sdk.json.AcpJsonMapper;
15+
import com.agentclientprotocol.sdk.json.TypeRef;
16+
import com.agentclientprotocol.sdk.spec.AcpAgentTransport;
17+
import com.agentclientprotocol.sdk.spec.AcpSchema;
18+
import com.agentclientprotocol.sdk.spec.AcpSchema.JSONRPCMessage;
19+
import com.agentclientprotocol.sdk.util.Assert;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import reactor.core.publisher.Mono;
23+
import reactor.core.publisher.Sinks;
24+
25+
/**
26+
* Shared per-connection core for listener-backed remote ACP agent transports.
27+
*
28+
* <p>
29+
* Remote transports such as Streamable HTTP and WebSocket have different wire-level
30+
* framing, but they both need the same agent-side shape once a remote ACP connection
31+
* exists: one connection-bound {@link AcpAgentTransport}, one fresh agent runtime from
32+
* {@link AcpAgentFactory}, inbound JSON-RPC delivery to the agent, and outbound JSON-RPC
33+
* delivery back to the wire adapter.
34+
* </p>
35+
*
36+
* <p>
37+
* This class intentionally does not know about HTTP headers, SSE streams, WebSocket
38+
* sessions, or route maps. Those remain transport-adapter concerns.
39+
* </p>
40+
*
41+
* @author Kaiser Dandangi
42+
*/
43+
public final class RemoteAcpConnection {
44+
45+
private static final Logger logger = LoggerFactory.getLogger(RemoteAcpConnection.class);
46+
47+
private final String id;
48+
49+
private final AcpJsonMapper jsonMapper;
50+
51+
private final ConnectionTransport transport;
52+
53+
private final AtomicBoolean started = new AtomicBoolean(false);
54+
55+
private final AtomicBoolean closing = new AtomicBoolean(false);
56+
57+
private volatile AcpAsyncAgent agent;
58+
59+
/**
60+
* Creates a new remote ACP connection core.
61+
* @param id stable transport connection id
62+
* @param jsonMapper JSON mapper used by the connection transport
63+
* @param outboundConsumer callback that receives agent-originated outbound messages
64+
*/
65+
public RemoteAcpConnection(String id, AcpJsonMapper jsonMapper, Consumer<JSONRPCMessage> outboundConsumer) {
66+
Assert.hasText(id, "The id can not be empty");
67+
Assert.notNull(jsonMapper, "The jsonMapper can not be null");
68+
Assert.notNull(outboundConsumer, "The outboundConsumer can not be null");
69+
this.id = id;
70+
this.jsonMapper = jsonMapper;
71+
this.transport = new ConnectionTransport(outboundConsumer);
72+
}
73+
74+
/**
75+
* Returns the transport-level connection id.
76+
* @return connection id
77+
*/
78+
public String id() {
79+
return id;
80+
}
81+
82+
/**
83+
* Starts a fresh agent runtime for this connection.
84+
* @param agentFactory factory used to create the connection-bound agent runtime
85+
* @return mono that completes when the agent runtime is started
86+
*/
87+
public Mono<Void> start(AcpAgentFactory agentFactory) {
88+
Assert.notNull(agentFactory, "The agentFactory can not be null");
89+
if (!started.compareAndSet(false, true)) {
90+
return Mono.error(new IllegalStateException("Already started"));
91+
}
92+
return Mono.defer(() -> {
93+
this.agent = agentFactory.create(transport);
94+
return this.agent.start();
95+
}).doOnError(this::signalException);
96+
}
97+
98+
/**
99+
* Accepts one client-originated JSON-RPC message for delivery to the connection's
100+
* agent runtime.
101+
* @param message inbound message
102+
*/
103+
public void acceptInbound(JSONRPCMessage message) {
104+
transport.acceptInbound(message);
105+
}
106+
107+
/**
108+
* Reports a transport adapter exception to the agent transport exception handler.
109+
* @param error exception to report
110+
*/
111+
public void signalException(Throwable error) {
112+
transport.signalException(error);
113+
}
114+
115+
/**
116+
* Closes the connection and its agent runtime gracefully.
117+
* @return mono that completes when close work has been requested
118+
*/
119+
public Mono<Void> closeGracefully() {
120+
return Mono.defer(() -> {
121+
if (!closing.compareAndSet(false, true)) {
122+
return Mono.empty();
123+
}
124+
AcpAsyncAgent currentAgent = this.agent;
125+
if (currentAgent != null) {
126+
return currentAgent.closeGracefully()
127+
.onErrorResume(error -> {
128+
signalException(error);
129+
return Mono.empty();
130+
})
131+
.then(transport.closeGracefully());
132+
}
133+
return transport.closeGracefully();
134+
});
135+
}
136+
137+
/**
138+
* Closes the connection and its agent runtime immediately.
139+
*/
140+
public void close() {
141+
if (!closing.compareAndSet(false, true)) {
142+
return;
143+
}
144+
AcpAsyncAgent currentAgent = this.agent;
145+
if (currentAgent != null) {
146+
currentAgent.close();
147+
}
148+
transport.close();
149+
}
150+
151+
private final class ConnectionTransport implements AcpAgentTransport {
152+
153+
private final Consumer<JSONRPCMessage> outboundConsumer;
154+
155+
private final Sinks.Many<JSONRPCMessage> inboundSink = Sinks.many().unicast().onBackpressureBuffer();
156+
157+
private final Sinks.One<Void> terminationSink = Sinks.one();
158+
159+
private final AtomicBoolean transportStarted = new AtomicBoolean(false);
160+
161+
private final AtomicBoolean transportClosing = new AtomicBoolean(false);
162+
163+
private volatile Consumer<Throwable> exceptionHandler = t -> logger.error("Remote ACP transport error", t);
164+
165+
ConnectionTransport(Consumer<JSONRPCMessage> outboundConsumer) {
166+
this.outboundConsumer = outboundConsumer;
167+
}
168+
169+
@Override
170+
public Mono<Void> start(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
171+
Assert.notNull(handler, "The handler can not be null");
172+
if (!transportStarted.compareAndSet(false, true)) {
173+
return Mono.error(new IllegalStateException("Already started"));
174+
}
175+
inboundSink.asFlux()
176+
.flatMap(message -> Mono.just(message).transform(handler))
177+
.doOnNext(response -> {
178+
if (response != null) {
179+
outboundConsumer.accept(response);
180+
}
181+
})
182+
.doOnError(this::signalException)
183+
.doFinally(signal -> terminationSink.tryEmitValue(null))
184+
.subscribe();
185+
return Mono.empty();
186+
}
187+
188+
void acceptInbound(JSONRPCMessage message) {
189+
Assert.notNull(message, "The message can not be null");
190+
if (transportClosing.get()) {
191+
throw new AcpConnectionException("Remote ACP connection is closing");
192+
}
193+
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
194+
if (result.isFailure()) {
195+
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
196+
}
197+
}
198+
199+
void signalException(Throwable error) {
200+
exceptionHandler.accept(error);
201+
}
202+
203+
@Override
204+
public Mono<Void> sendMessage(JSONRPCMessage message) {
205+
return Mono.fromRunnable(() -> {
206+
if (transportClosing.get()) {
207+
throw new AcpConnectionException("Remote ACP connection is closing");
208+
}
209+
outboundConsumer.accept(message);
210+
});
211+
}
212+
213+
@Override
214+
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
215+
return jsonMapper.convertValue(data, typeRef);
216+
}
217+
218+
@Override
219+
public Mono<Void> closeGracefully() {
220+
return Mono.fromRunnable(this::close);
221+
}
222+
223+
@Override
224+
public void close() {
225+
if (transportClosing.compareAndSet(false, true)) {
226+
inboundSink.tryEmitComplete();
227+
terminationSink.tryEmitValue(null);
228+
}
229+
}
230+
231+
@Override
232+
public void setExceptionHandler(Consumer<Throwable> handler) {
233+
Assert.notNull(handler, "The handler can not be null");
234+
this.exceptionHandler = handler;
235+
}
236+
237+
@Override
238+
public Mono<Void> awaitTermination() {
239+
return terminationSink.asMono();
240+
}
241+
242+
}
243+
244+
}

acp-core/src/test/java/com/agentclientprotocol/sdk/spec/AcpAgentSessionTest.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.agentclientprotocol.sdk.test.InMemoryTransportPair;
1717
import org.junit.jupiter.api.Test;
1818
import reactor.core.publisher.Mono;
19+
import reactor.core.publisher.Sinks;
1920

2021
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -228,13 +229,17 @@ void singleTurnEnforcementAllowsConcurrentPromptsForDifferentSessions() throws E
228229
try {
229230
CountDownLatch handlersStarted = new CountDownLatch(2);
230231
AtomicInteger handlerInvocations = new AtomicInteger();
232+
Sinks.One<Void> session1Release = Sinks.one();
233+
Sinks.One<Void> session2Release = Sinks.one();
231234

232235
Map<String, AcpAgentSession.RequestHandler<?>> requestHandlers = Map.of(AcpSchema.METHOD_SESSION_PROMPT,
233236
params -> Mono.defer(() -> {
234237
handlerInvocations.incrementAndGet();
235238
handlersStarted.countDown();
236-
return Mono.delay(PROMPT_RESPONSE_DELAY)
237-
.map(ignored -> new AcpSchema.PromptResponse(AcpSchema.StopReason.END_TURN));
239+
String sessionId = sessionId(params);
240+
Sinks.One<Void> release = SESSION_1.equals(sessionId) ? session1Release : session2Release;
241+
return release.asMono()
242+
.thenReturn(new AcpSchema.PromptResponse(AcpSchema.StopReason.END_TURN));
238243
}));
239244

240245
AcpAgentSession session = new AcpAgentSession(TIMEOUT, transportPair.agentTransport(), requestHandlers,
@@ -262,6 +267,13 @@ void singleTurnEnforcementAllowsConcurrentPromptsForDifferentSessions() throws E
262267
assertThat(session.hasActivePrompt(SESSION_2)).isTrue();
263268
assertThat(session.getActivePromptSessionIds()).containsExactlyInAnyOrder(SESSION_1, SESSION_2);
264269

270+
// Release the responses one at a time. The in-memory test transport uses a
271+
// unicast sink, so simultaneous emissions from concurrent prompt handlers can
272+
// fail with FAIL_NON_SERIALIZED and obscure the behavior under test.
273+
session1Release.tryEmitValue(null);
274+
awaitResponse(responses, "1");
275+
session2Release.tryEmitValue(null);
276+
265277
assertThat(responseLatch.await(5, TimeUnit.SECONDS)).isTrue();
266278

267279
assertThat(responseById(responses, "1").error()).isNull();
@@ -383,6 +395,23 @@ private static AcpSchema.JSONRPCResponse responseById(List<AcpSchema.JSONRPCResp
383395
return responses.stream().filter(response -> id.equals(response.id())).findFirst().orElseThrow();
384396
}
385397

398+
private static void awaitResponse(List<AcpSchema.JSONRPCResponse> responses, Object id) throws InterruptedException {
399+
long deadline = System.nanoTime() + TIMEOUT.toNanos();
400+
while (System.nanoTime() < deadline) {
401+
if (responses.stream().anyMatch(response -> id.equals(response.id()))) {
402+
return;
403+
}
404+
Thread.sleep(10);
405+
}
406+
}
407+
408+
private static String sessionId(Object params) {
409+
if (params instanceof AcpSchema.PromptRequest promptRequest) {
410+
return promptRequest.sessionId();
411+
}
412+
throw new IllegalArgumentException("Expected PromptRequest params but received " + params);
413+
}
414+
386415
private static void allowAgentTransportSubscription() throws InterruptedException {
387416
// AcpAgentSession subscribes to the in-memory transport in its constructor.
388417
// subscribe() is asynchronous, so give the unicast sink subscriber a short

0 commit comments

Comments
 (0)