Skip to content

Commit e6b528f

Browse files
committed
fix: serialize streamable websocket writes
1 parent da691f3 commit e6b528f

2 files changed

Lines changed: 126 additions & 12 deletions

File tree

acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -935,6 +935,12 @@ private final class WebSocketConnectionState {
935935

936936
private final AtomicBoolean closed = new AtomicBoolean(false);
937937

938+
private final Object outboundLock = new Object();
939+
940+
private final ArrayDeque<String> outboundQueue = new ArrayDeque<>();
941+
942+
private boolean outboundSendInProgress = false;
943+
938944
private volatile Session session;
939945

940946
WebSocketConnectionState(String id) {
@@ -970,27 +976,76 @@ void acceptFromClient(JSONRPCMessage message) {
970976

971977
void sendToClient(JSONRPCMessage message) {
972978
try {
973-
Session currentSession = this.session;
974-
if (closed.get() || currentSession == null || !currentSession.isOpen()) {
975-
throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
976-
}
977979
String payload = jsonMapper.writeValueAsString(message);
978980
logger.debug("Sending streamable ACP WebSocket message: {}", payload);
979-
currentSession.sendText(payload, Callback.from(() -> {
980-
// Jetty requires an explicit success callback; there is no
981-
// follow-up work after the frame has been accepted for writing.
982-
}, error -> {
983-
if (!closed.get()) {
984-
remoteConnection.signalException(error);
985-
}
986-
}));
981+
enqueueOutbound(payload);
987982
}
988983
catch (Exception e) {
989984
remoteConnection.signalException(e);
990985
close(StatusCode.SERVER_ERROR, "failed to send ACP message");
991986
}
992987
}
993988

989+
private void enqueueOutbound(String payload) {
990+
boolean shouldDrain;
991+
synchronized (outboundLock) {
992+
if (closed.get()) {
993+
throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
994+
}
995+
outboundQueue.addLast(payload);
996+
shouldDrain = !outboundSendInProgress;
997+
if (shouldDrain) {
998+
outboundSendInProgress = true;
999+
}
1000+
}
1001+
if (shouldDrain) {
1002+
drainOutbound();
1003+
}
1004+
}
1005+
1006+
private void drainOutbound() {
1007+
String payload;
1008+
Session currentSession;
1009+
synchronized (outboundLock) {
1010+
if (closed.get()) {
1011+
outboundQueue.clear();
1012+
outboundSendInProgress = false;
1013+
return;
1014+
}
1015+
payload = outboundQueue.pollFirst();
1016+
if (payload == null) {
1017+
outboundSendInProgress = false;
1018+
return;
1019+
}
1020+
currentSession = this.session;
1021+
}
1022+
1023+
if (currentSession == null || !currentSession.isOpen()) {
1024+
failOutbound(new AcpConnectionException("Streamable ACP WebSocket connection is closed"));
1025+
return;
1026+
}
1027+
1028+
try {
1029+
/*
1030+
* Jetty WebSocket sessions do not allow overlapping writes. Agent messages can
1031+
* be produced by concurrent prompt handlers, so this per-connection queue sends
1032+
* exactly one frame at a time and advances only after Jetty completes the
1033+
* callback for the previous frame.
1034+
*/
1035+
currentSession.sendText(payload, Callback.from(this::drainOutbound, this::failOutbound));
1036+
}
1037+
catch (Exception e) {
1038+
failOutbound(e);
1039+
}
1040+
}
1041+
1042+
private void failOutbound(Throwable error) {
1043+
if (!closed.get()) {
1044+
remoteConnection.signalException(error);
1045+
close(StatusCode.SERVER_ERROR, "failed to send ACP message");
1046+
}
1047+
}
1048+
9941049
void close() {
9951050
close(StatusCode.NORMAL, "server closing");
9961051
}
@@ -999,6 +1054,10 @@ void close(int statusCode, String reason) {
9991054
if (!closed.compareAndSet(false, true)) {
10001055
return;
10011056
}
1057+
synchronized (outboundLock) {
1058+
outboundQueue.clear();
1059+
outboundSendInProgress = false;
1060+
}
10021061
webSocketConnections.remove(id, this);
10031062
Session currentSession = this.session;
10041063
if (currentSession != null && currentSession.isOpen()) {

acp-streamable-http-jetty/src/test/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransportWebSocketIntegrationTest.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import com.agentclientprotocol.sdk.spec.AcpSchema;
3838
import org.eclipse.jetty.websocket.api.StatusCode;
3939
import org.junit.jupiter.api.Test;
40+
import reactor.core.publisher.Flux;
4041
import reactor.core.publisher.Mono;
42+
import reactor.core.scheduler.Schedulers;
4143

4244
import static org.assertj.core.api.Assertions.assertThat;
4345
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -209,6 +211,59 @@ void supportsMultipleConcurrentWebSocketClients() throws Exception {
209211
}
210212
}
211213

214+
@Test
215+
void serializesConcurrentAgentMessagesOnOneWebSocketConnection() throws Exception {
216+
AtomicInteger sessionCounter = new AtomicInteger();
217+
AtomicInteger receivedUpdates = new AtomicInteger();
218+
AcpAgentFactory agentFactory = AcpAgentFactory.async(transport -> AcpAgent.async(transport)
219+
.initializeHandler(request -> Mono.just(new AcpSchema.InitializeResponse(
220+
AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.AgentCapabilities(true, null, null), List.of())))
221+
.newSessionHandler(request -> Mono.just(new AcpSchema.NewSessionResponse(
222+
"sess-" + sessionCounter.incrementAndGet(), null, null)))
223+
.promptHandler((request, context) -> Mono.delay(Duration.ofMillis(25))
224+
.thenMany(Flux.range(0, 20)
225+
.flatMap(i -> context.sendMessage(request.sessionId() + ": update-" + i)
226+
.subscribeOn(Schedulers.parallel()), 8))
227+
.then(Mono.just(AcpSchema.PromptResponse.endTurn())))
228+
.build());
229+
230+
try (FixtureServer server = FixtureServer.start(agentFactory)) {
231+
AcpAsyncClient client = AcpClient
232+
.async(new WebSocketAcpClientTransport(server.endpoint(), AcpJsonMapper.createDefault()))
233+
.sessionUpdateConsumer(update -> {
234+
receivedUpdates.incrementAndGet();
235+
return Mono.empty();
236+
})
237+
.requestTimeout(TIMEOUT)
238+
.build();
239+
try {
240+
client.initialize(new AcpSchema.InitializeRequest(
241+
AcpSchema.LATEST_PROTOCOL_VERSION, new AcpSchema.ClientCapabilities()))
242+
.block(TIMEOUT);
243+
AcpSchema.NewSessionResponse firstSession = client
244+
.newSession(new AcpSchema.NewSessionRequest("/workspace/one", List.of()))
245+
.block(TIMEOUT);
246+
AcpSchema.NewSessionResponse secondSession = client
247+
.newSession(new AcpSchema.NewSessionRequest("/workspace/two", List.of()))
248+
.block(TIMEOUT);
249+
250+
var prompts = Mono.zip(
251+
client.prompt(new AcpSchema.PromptRequest(firstSession.sessionId(),
252+
List.of(new AcpSchema.TextContent("first")))),
253+
client.prompt(new AcpSchema.PromptRequest(secondSession.sessionId(),
254+
List.of(new AcpSchema.TextContent("second")))))
255+
.block(TIMEOUT);
256+
257+
assertThat(prompts.getT1().stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
258+
assertThat(prompts.getT2().stopReason()).isEqualTo(AcpSchema.StopReason.END_TURN);
259+
assertThat(receivedUpdates).hasValue(40);
260+
}
261+
finally {
262+
client.closeGracefully().block(TIMEOUT);
263+
}
264+
}
265+
}
266+
212267
@Test
213268
void rejectsNonInitializeFirstMessage() throws Exception {
214269
try (FixtureServer server = FixtureServer.start(simpleAgentFactory())) {

0 commit comments

Comments
 (0)