Skip to content

Commit 9c89c30

Browse files
committed
refactor: encapsulate websocket send queue
1 parent e6b528f commit 9c89c30

1 file changed

Lines changed: 81 additions & 68 deletions

File tree

acp-streamable-http-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/StreamableHttpAcpAgentTransport.java

Lines changed: 81 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -935,11 +935,7 @@ private final class WebSocketConnectionState {
935935

936936
private final AtomicBoolean closed = new AtomicBoolean(false);
937937

938-
private final Object outboundLock = new Object();
939-
940-
private final ArrayDeque<String> outboundQueue = new ArrayDeque<>();
941-
942-
private boolean outboundSendInProgress = false;
938+
private final SerializedWebSocketSender outboundSender = new SerializedWebSocketSender();
943939

944940
private volatile Session session;
945941

@@ -978,92 +974,109 @@ void sendToClient(JSONRPCMessage message) {
978974
try {
979975
String payload = jsonMapper.writeValueAsString(message);
980976
logger.debug("Sending streamable ACP WebSocket message: {}", payload);
981-
enqueueOutbound(payload);
977+
outboundSender.send(payload);
982978
}
983979
catch (Exception e) {
984980
remoteConnection.signalException(e);
985981
close(StatusCode.SERVER_ERROR, "failed to send ACP message");
986982
}
987983
}
988984

989-
private void enqueueOutbound(String payload) {
990-
boolean shouldDrain;
991-
synchronized (outboundLock) {
992-
if (closed.get()) {
993-
throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
985+
void close() {
986+
close(StatusCode.NORMAL, "server closing");
987+
}
988+
989+
void close(int statusCode, String reason) {
990+
if (!closed.compareAndSet(false, true)) {
991+
return;
992+
}
993+
outboundSender.close();
994+
webSocketConnections.remove(id, this);
995+
Session currentSession = this.session;
996+
if (currentSession != null && currentSession.isOpen()) {
997+
currentSession.close(statusCode, reason, Callback.NOOP);
998+
}
999+
remoteConnection.closeGracefully().subscribe();
1000+
}
1001+
1002+
private final class SerializedWebSocketSender {
1003+
1004+
private final Object lock = new Object();
1005+
1006+
private final ArrayDeque<String> queue = new ArrayDeque<>();
1007+
1008+
private boolean sendInProgress = false;
1009+
1010+
void send(String payload) {
1011+
boolean shouldDrain;
1012+
synchronized (lock) {
1013+
if (closed.get()) {
1014+
throw new AcpConnectionException("Streamable ACP WebSocket connection is closed");
1015+
}
1016+
queue.addLast(payload);
1017+
shouldDrain = !sendInProgress;
1018+
if (shouldDrain) {
1019+
sendInProgress = true;
1020+
}
9941021
}
995-
outboundQueue.addLast(payload);
996-
shouldDrain = !outboundSendInProgress;
9971022
if (shouldDrain) {
998-
outboundSendInProgress = true;
1023+
drain();
9991024
}
10001025
}
1001-
if (shouldDrain) {
1002-
drainOutbound();
1003-
}
1004-
}
10051026

1006-
private void drainOutbound() {
1007-
String payload;
1008-
Session currentSession;
1009-
synchronized (outboundLock) {
1010-
if (closed.get()) {
1011-
outboundQueue.clear();
1012-
outboundSendInProgress = false;
1013-
return;
1027+
/*
1028+
* Jetty WebSocket sessions do not allow overlapping writes. Agent messages can
1029+
* be produced by concurrent prompt handlers, so this per-connection queue sends
1030+
* exactly one frame at a time and advances only after Jetty completes the
1031+
* callback for the previous frame.
1032+
*/
1033+
private void drain() {
1034+
String payload;
1035+
Session currentSession;
1036+
synchronized (lock) {
1037+
if (closed.get()) {
1038+
clear();
1039+
return;
1040+
}
1041+
payload = queue.pollFirst();
1042+
if (payload == null) {
1043+
sendInProgress = false;
1044+
return;
1045+
}
1046+
currentSession = session;
10141047
}
1015-
payload = outboundQueue.pollFirst();
1016-
if (payload == null) {
1017-
outboundSendInProgress = false;
1048+
1049+
if (currentSession == null || !currentSession.isOpen()) {
1050+
fail(new AcpConnectionException("Streamable ACP WebSocket connection is closed"));
10181051
return;
10191052
}
1020-
currentSession = this.session;
1021-
}
10221053

1023-
if (currentSession == null || !currentSession.isOpen()) {
1024-
failOutbound(new AcpConnectionException("Streamable ACP WebSocket connection is closed"));
1025-
return;
1054+
try {
1055+
currentSession.sendText(payload, Callback.from(this::drain, this::fail));
1056+
}
1057+
catch (Exception e) {
1058+
fail(e);
1059+
}
10261060
}
10271061

1028-
try {
1029-
/*
1030-
* Jetty WebSocket sessions do not allow overlapping writes. Agent messages can
1031-
* be produced by concurrent prompt handlers, so this per-connection queue sends
1032-
* exactly one frame at a time and advances only after Jetty completes the
1033-
* callback for the previous frame.
1034-
*/
1035-
currentSession.sendText(payload, Callback.from(this::drainOutbound, this::failOutbound));
1036-
}
1037-
catch (Exception e) {
1038-
failOutbound(e);
1062+
private void fail(Throwable error) {
1063+
if (!closed.get()) {
1064+
remoteConnection.signalException(error);
1065+
WebSocketConnectionState.this.close(StatusCode.SERVER_ERROR, "failed to send ACP message");
1066+
}
10391067
}
1040-
}
10411068

1042-
private void failOutbound(Throwable error) {
1043-
if (!closed.get()) {
1044-
remoteConnection.signalException(error);
1045-
close(StatusCode.SERVER_ERROR, "failed to send ACP message");
1069+
void close() {
1070+
clear();
10461071
}
1047-
}
1048-
1049-
void close() {
1050-
close(StatusCode.NORMAL, "server closing");
1051-
}
10521072

1053-
void close(int statusCode, String reason) {
1054-
if (!closed.compareAndSet(false, true)) {
1055-
return;
1056-
}
1057-
synchronized (outboundLock) {
1058-
outboundQueue.clear();
1059-
outboundSendInProgress = false;
1060-
}
1061-
webSocketConnections.remove(id, this);
1062-
Session currentSession = this.session;
1063-
if (currentSession != null && currentSession.isOpen()) {
1064-
currentSession.close(statusCode, reason, Callback.NOOP);
1073+
private void clear() {
1074+
synchronized (lock) {
1075+
queue.clear();
1076+
sendInProgress = false;
1077+
}
10651078
}
1066-
remoteConnection.closeGracefully().subscribe();
1079+
10671080
}
10681081

10691082
}

0 commit comments

Comments
 (0)