Skip to content

Commit 19eaf1c

Browse files
author
Mark Pollack
committed
Fix flaky WebSocket tests: use emitNext with busyLooping for concurrent emission safety
Replace tryEmitNext with emitNext/busyLooping in all transport sendMessage methods to handle FAIL_NON_SERIALIZED race conditions on Reactor Sinks.many().unicast() sinks.
1 parent ba2a191 commit 19eaf1c

File tree

4 files changed

+13
-24
lines changed

4 files changed

+13
-24
lines changed

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/transport/StdioAcpAgentTransport.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package com.agentclientprotocol.sdk.agent.transport;
66

7+
import java.time.Duration;
78
import java.io.BufferedReader;
89
import java.io.IOException;
910
import java.io.InputStream;
@@ -277,12 +278,9 @@ else if (isClosing.get()) {
277278
@Override
278279
public Mono<Void> sendMessage(JSONRPCMessage message) {
279280
return Mono.zip(inboundReady.asMono(), outboundReady.asMono()).then(Mono.defer(() -> {
280-
if (outboundSink.tryEmitNext(message).isSuccess()) {
281-
return Mono.empty();
282-
}
283-
else {
284-
return Mono.error(new RuntimeException("Failed to enqueue message"));
285-
}
281+
outboundSink.emitNext(message,
282+
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
283+
return Mono.empty();
286284
}));
287285
}
288286

acp-core/src/main/java/com/agentclientprotocol/sdk/client/transport/StdioAcpClientTransport.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,9 @@ private void handleIncomingErrors() {
260260

261261
@Override
262262
public Mono<Void> sendMessage(JSONRPCMessage message) {
263-
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
264-
return Mono.empty();
265-
}
266-
else {
267-
return Mono.error(new RuntimeException("Failed to enqueue message"));
268-
}
263+
this.outboundSink.emitNext(message,
264+
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
265+
return Mono.empty();
269266
}
270267

271268
/**

acp-core/src/main/java/com/agentclientprotocol/sdk/client/transport/WebSocketAcpClientTransport.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,9 @@ private void startOutboundProcessing() {
199199
@Override
200200
public Mono<Void> sendMessage(JSONRPCMessage message) {
201201
return connectionReady.asMono().then(Mono.defer(() -> {
202-
if (outboundSink.tryEmitNext(message).isSuccess()) {
203-
return Mono.empty();
204-
}
205-
else {
206-
return Mono.error(new RuntimeException("Failed to enqueue message"));
207-
}
202+
outboundSink.emitNext(message,
203+
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
204+
return Mono.empty();
208205
}));
209206
}
210207

acp-websocket-jetty/src/main/java/com/agentclientprotocol/sdk/agent/transport/WebSocketAcpAgentTransport.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,12 +224,9 @@ private void startOutboundProcessing() {
224224
@Override
225225
public Mono<Void> sendMessage(JSONRPCMessage message) {
226226
return connectionReady.asMono().then(Mono.defer(() -> {
227-
if (outboundSink.tryEmitNext(message).isSuccess()) {
228-
return Mono.empty();
229-
}
230-
else {
231-
return Mono.error(new RuntimeException("Failed to enqueue message"));
232-
}
227+
outboundSink.emitNext(message,
228+
Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
229+
return Mono.empty();
233230
}));
234231
}
235232

0 commit comments

Comments
 (0)