Skip to content

Commit ad15407

Browse files
committed
fix: serialize streamable HTTP sink emissions
1 parent f0d8578 commit ad15407

2 files changed

Lines changed: 25 additions & 6 deletions

File tree

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/transport/RemoteAcpConnection.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ private final class ConnectionTransport implements AcpAgentTransport {
154154

155155
private final Sinks.Many<JSONRPCMessage> inboundSink = Sinks.many().unicast().onBackpressureBuffer();
156156

157+
/*
158+
* Streamable HTTP can deliver multiple POST requests for one ACP connection on
159+
* different server threads. Reactor unicast sinks require serialized producers,
160+
* so all transport-adapter ingress is funneled through this monitor before
161+
* emission.
162+
*/
163+
private final Object inboundEmitMonitor = new Object();
164+
157165
private final Sinks.One<Void> terminationSink = Sinks.one();
158166

159167
private final AtomicBoolean transportStarted = new AtomicBoolean(false);
@@ -190,9 +198,11 @@ void acceptInbound(JSONRPCMessage message) {
190198
if (transportClosing.get()) {
191199
throw new AcpConnectionException("Remote ACP connection is closing");
192200
}
193-
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
194-
if (result.isFailure()) {
195-
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
201+
synchronized (inboundEmitMonitor) {
202+
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
203+
if (result.isFailure()) {
204+
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
205+
}
196206
}
197207
}
198208

acp-core/src/main/java/com/agentclientprotocol/sdk/client/transport/StreamableHttpAcpClientTransport.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,13 @@ private record HttpClientBundle(HttpClient httpClient, ExecutorService ownedExec
146146

147147
private final Sinks.Many<JSONRPCMessage> inboundSink;
148148

149+
/*
150+
* A streamable HTTP client may have one connection SSE reader and multiple session
151+
* SSE readers active at the same time. Reactor unicast sinks require serialized
152+
* producers, so every SSE reader emits through this monitor.
153+
*/
154+
private final Object inboundEmitMonitor = new Object();
155+
149156
private final AtomicBoolean connected = new AtomicBoolean(false);
150157

151158
private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -581,9 +588,11 @@ private Mono<Void> processInbound(RouteScope actualScope, JSONRPCMessage message
581588

582589
private Mono<Void> emitInbound(JSONRPCMessage message) {
583590
return Mono.fromRunnable(() -> {
584-
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
585-
if (result.isFailure()) {
586-
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
591+
synchronized (inboundEmitMonitor) {
592+
Sinks.EmitResult result = inboundSink.tryEmitNext(message);
593+
if (result.isFailure()) {
594+
throw new AcpConnectionException("Failed to enqueue inbound message: " + result);
595+
}
587596
}
588597
});
589598
}

0 commit comments

Comments
 (0)