Skip to content

Commit 6e26b8e

Browse files
author
Mark Pollack
committed
Add agent await/awaitTermination for daemon thread early-exit fix (Step 3.7.2)
- Add awaitTermination() to AcpAgentTransport interface - Implement termination signaling in StdioAcpAgentTransport and WebSocketAcpAgentTransport - Add awaitTermination() to AcpAsyncAgent interface and DefaultAcpAsyncAgent - Add await() and run() methods to AcpSyncAgent - Rename CleanShutdownTest to CleanShutdownIT and add agent await tests - Fix StdioAcpAgentTransportTest echo handler bug (use non-echoing handler) - Add awaitTermination() to InMemoryTransportPair for test support
1 parent 0091115 commit 6e26b8e

File tree

10 files changed

+320
-109
lines changed

10 files changed

+320
-109
lines changed

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/AcpAsyncAgent.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,21 @@ public interface AcpAsyncAgent {
3434
*/
3535
Mono<Void> start();
3636

37+
/**
38+
* Returns a Mono that completes when the agent terminates.
39+
* This is useful for blocking until the transport closes, particularly
40+
* when using daemon threads.
41+
*
42+
* <p>Example usage:
43+
* <pre>{@code
44+
* agent.start().block();
45+
* agent.awaitTermination().block(); // Block until transport closes
46+
* }</pre>
47+
*
48+
* @return A Mono that completes when the agent terminates
49+
*/
50+
Mono<Void> awaitTermination();
51+
3752
/**
3853
* Returns the capabilities negotiated with the client during initialization.
3954
*

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/AcpSyncAgent.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,46 @@ public AcpSyncAgent(AcpAsyncAgent asyncAgent, Duration blockTimeout) {
4949

5050
/**
5151
* Starts the agent, beginning to accept client connections.
52+
* This method returns immediately after setup is complete.
53+
* Use {@link #await()} or {@link #run()} to block until the transport closes.
5254
*/
5355
public void start() {
5456
asyncAgent.start().block(blockTimeout);
5557
}
5658

59+
/**
60+
* Blocks until the agent terminates (transport closes).
61+
* This is useful for keeping the main thread alive when using daemon threads.
62+
*
63+
* <p>Example usage:
64+
* <pre>{@code
65+
* agent.start();
66+
* agent.await(); // Blocks until stdin closes
67+
* }</pre>
68+
*/
69+
public void await() {
70+
asyncAgent.awaitTermination().block();
71+
}
72+
73+
/**
74+
* Starts the agent and blocks until it terminates.
75+
* This is a convenience method combining {@link #start()} and {@link #await()}.
76+
*
77+
* <p>Example usage for a standalone agent:
78+
* <pre>{@code
79+
* public static void main(String[] args) {
80+
* AcpSyncAgent agent = AcpAgent.sync(transport)
81+
* .promptHandler(...)
82+
* .build();
83+
* agent.run(); // Start and block until done
84+
* }
85+
* }</pre>
86+
*/
87+
public void run() {
88+
start();
89+
await();
90+
}
91+
5792
/**
5893
* Sends a session update notification to the client.
5994
* @param sessionId The session ID

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/DefaultAcpAsyncAgent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ public Mono<Void> start() {
181181
});
182182
}
183183

184+
@Override
185+
public Mono<Void> awaitTermination() {
186+
return transport.awaitTermination();
187+
}
188+
184189
@Override
185190
public NegotiatedCapabilities getClientCapabilities() {
186191
return clientCapabilities.get();

acp-core/src/main/java/com/agentclientprotocol/sdk/agent/transport/StdioAcpAgentTransport.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class StdioAcpAgentTransport implements AcpAgentTransport {
6969

7070
private final Sinks.One<Void> outboundReady = Sinks.one();
7171

72+
private final Sinks.One<Void> terminationSink = Sinks.one();
73+
7274
private Scheduler inboundScheduler;
7375

7476
private Scheduler outboundScheduler;
@@ -153,6 +155,11 @@ public Mono<Void> start(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> han
153155
private void handleIncomingMessages(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
154156
this.inboundSink.asFlux()
155157
.flatMap(message -> Mono.just(message).transform(handler))
158+
.doOnNext(response -> {
159+
if (response != null) {
160+
this.outboundSink.tryEmitNext(response);
161+
}
162+
})
156163
.doOnTerminate(() -> {
157164
this.outboundSink.tryEmitComplete();
158165
this.inboundScheduler.dispose();
@@ -206,6 +213,8 @@ private void startInboundProcessing() {
206213
finally {
207214
isClosing.set(true);
208215
inboundSink.tryEmitComplete();
216+
terminationSink.tryEmitValue(null); // Signal termination for awaitTermination()
217+
logger.debug("Agent transport terminated");
209218
}
210219
});
211220
}
@@ -301,6 +310,11 @@ public void setExceptionHandler(Consumer<Throwable> handler) {
301310
this.exceptionHandler = handler;
302311
}
303312

313+
@Override
314+
public Mono<Void> awaitTermination() {
315+
return terminationSink.asMono();
316+
}
317+
304318
@Override
305319
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
306320
return jsonMapper.convertValue(data, typeRef);

acp-core/src/main/java/com/agentclientprotocol/sdk/spec/AcpAgentTransport.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,19 @@ public interface AcpAgentTransport extends AcpTransport {
4141
default void setExceptionHandler(Consumer<Throwable> handler) {
4242
}
4343

44+
/**
45+
* Returns a Mono that completes when the transport terminates.
46+
* This is useful for agents that need to block until the transport is done,
47+
* particularly when using daemon threads.
48+
*
49+
* <p>Example usage:
50+
* <pre>{@code
51+
* transport.start(handler).block();
52+
* transport.awaitTermination().block(); // Block until stdin closes
53+
* }</pre>
54+
*
55+
* @return a {@link Mono} that completes when the transport terminates
56+
*/
57+
Mono<Void> awaitTermination();
58+
4459
}

acp-core/src/test/java/com/agentclientprotocol/sdk/agent/transport/StdioAcpAgentTransportTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ void sendsMessageToOutputStream() throws Exception {
105105

106106
StdioAcpAgentTransport transport = new StdioAcpAgentTransport(jsonMapper, agentIn, agentOut);
107107

108-
transport.start(msg -> msg).subscribe();
108+
// Use a non-echoing handler - the default echo handler (msg -> msg) would
109+
// echo the warmup message back to output, causing the test to read the
110+
// wrong message. We only want to test explicit sendMessage() output.
111+
transport.start(msg -> Mono.empty()).subscribe();
109112

110113
// Wait for transport to be ready by sending a dummy message from "client" first
111114
AcpSchema.JSONRPCRequest dummyRequest = AcpTestFixtures.createJsonRpcRequest(AcpSchema.METHOD_INITIALIZE, "0",
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*/
4+
5+
package com.agentclientprotocol.sdk.integration;
6+
7+
import java.time.Duration;
8+
import java.util.List;
9+
import java.util.Set;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
import java.util.stream.Collectors;
14+
15+
import com.agentclientprotocol.sdk.agent.AcpAgent;
16+
import com.agentclientprotocol.sdk.agent.AcpSyncAgent;
17+
import com.agentclientprotocol.sdk.client.AcpClient;
18+
import com.agentclientprotocol.sdk.client.AcpSyncClient;
19+
import com.agentclientprotocol.sdk.spec.AcpSchema;
20+
import com.agentclientprotocol.sdk.test.InMemoryTransportPair;
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/**
26+
* Integration tests to verify that ACP transports clean up properly on shutdown
27+
* and don't leave lingering threads that prevent JVM exit.
28+
*
29+
* <p>
30+
* These tests address GitHub issue: Transport threads prevent JVM exit when
31+
* closeGracefully() isn't called properly. The fix uses daemon threads for
32+
* all transport schedulers.
33+
* </p>
34+
*
35+
* @author Mark Pollack
36+
*/
37+
class CleanShutdownIT {
38+
39+
/**
40+
* Verifies that no ACP-prefixed threads remain after client close.
41+
*
42+
* <p>
43+
* This test ensures that the daemon thread fix is working correctly.
44+
* Before the fix, non-daemon threads would prevent the JVM from exiting.
45+
* </p>
46+
*/
47+
@Test
48+
void noLingeringAcpThreadsAfterClientClose() throws InterruptedException {
49+
// Record threads before test
50+
Set<String> acpThreadsBefore = getAcpThreadNames();
51+
52+
// Create and use client with in-memory transport
53+
InMemoryTransportPair transportPair = InMemoryTransportPair.create();
54+
55+
try (AcpSyncClient client = AcpClient.sync(transportPair.clientTransport()).build()) {
56+
// The transport threads are created when build() connects
57+
// Give threads a moment to start
58+
Thread.sleep(100);
59+
60+
// Verify threads were created (sanity check)
61+
Set<String> threadsWhileRunning = getAcpThreadNames();
62+
// Note: InMemory transport may not create threads in the same way as stdio,
63+
// but we should still verify no leak occurs
64+
}
65+
66+
// Allow time for thread cleanup
67+
Thread.sleep(500);
68+
69+
// Verify no ACP threads remain
70+
Set<String> acpThreadsAfter = getAcpThreadNames();
71+
assertThat(acpThreadsAfter)
72+
.describedAs("ACP threads should be cleaned up after client close")
73+
.isEqualTo(acpThreadsBefore);
74+
}
75+
76+
/**
77+
* Verifies that daemon threads are used for transport schedulers.
78+
*
79+
* <p>
80+
* This test inspects active threads to ensure any ACP-prefixed threads
81+
* are daemon threads, which allows the JVM to exit gracefully.
82+
* </p>
83+
*/
84+
@Test
85+
void acpThreadsAreDaemonThreads() throws InterruptedException {
86+
InMemoryTransportPair transportPair = InMemoryTransportPair.create();
87+
88+
try (AcpSyncClient client = AcpClient.sync(transportPair.clientTransport()).build()) {
89+
// Give threads a moment to start
90+
Thread.sleep(100);
91+
92+
// Check that any ACP threads are daemon threads
93+
Set<Thread> acpThreads = Thread.getAllStackTraces().keySet().stream()
94+
.filter(t -> t.getName().startsWith("acp-"))
95+
.collect(Collectors.toSet());
96+
97+
for (Thread thread : acpThreads) {
98+
assertThat(thread.isDaemon())
99+
.describedAs("Thread '%s' should be a daemon thread", thread.getName())
100+
.isTrue();
101+
}
102+
}
103+
}
104+
105+
/**
106+
* Verifies that agent awaitTermination() blocks until transport closes.
107+
*
108+
* <p>
109+
* This test ensures the await() fix for daemon thread early exit is working.
110+
* Before the fix, agents using daemon threads would exit immediately after start().
111+
* With await(), agents properly block until the transport terminates.
112+
* </p>
113+
*/
114+
@Test
115+
void agentAwaitBlocksUntilTransportCloses() throws InterruptedException {
116+
InMemoryTransportPair transportPair = InMemoryTransportPair.create();
117+
118+
// Build agent - SyncAgentBuilder expects SyncInitializeHandler (returns response directly)
119+
AcpSyncAgent agent = AcpAgent.sync(transportPair.agentTransport())
120+
.requestTimeout(Duration.ofSeconds(5))
121+
.initializeHandler(request ->
122+
new AcpSchema.InitializeResponse(1, new AcpSchema.AgentCapabilities(), List.of()))
123+
.build();
124+
125+
// Track whether await() returned
126+
AtomicBoolean awaitReturned = new AtomicBoolean(false);
127+
CountDownLatch agentStarted = new CountDownLatch(1);
128+
129+
// Start agent in background thread that calls await()
130+
Thread agentThread = new Thread(() -> {
131+
agent.start();
132+
agentStarted.countDown();
133+
agent.await(); // Should block until transport closes
134+
awaitReturned.set(true);
135+
});
136+
agentThread.setDaemon(true);
137+
agentThread.start();
138+
139+
// Wait for agent to start
140+
assertThat(agentStarted.await(5, TimeUnit.SECONDS)).isTrue();
141+
142+
// Give a moment and verify await() has NOT returned yet
143+
Thread.sleep(200);
144+
assertThat(awaitReturned.get())
145+
.describedAs("await() should NOT return while transport is still open")
146+
.isFalse();
147+
148+
// Close transport - this should unblock await()
149+
transportPair.closeGracefully().block(Duration.ofSeconds(5));
150+
151+
// Wait for agent thread to complete
152+
agentThread.join(5000);
153+
assertThat(awaitReturned.get())
154+
.describedAs("await() should return after transport closes")
155+
.isTrue();
156+
}
157+
158+
/**
159+
* Verifies that agent run() combines start() and await().
160+
*
161+
* <p>
162+
* The run() method is a convenience for standalone agents that want to
163+
* start and block in one call, similar to HTTP server patterns.
164+
* </p>
165+
*/
166+
@Test
167+
void agentRunBlocksUntilTransportCloses() throws InterruptedException {
168+
InMemoryTransportPair transportPair = InMemoryTransportPair.create();
169+
170+
// Build agent - SyncAgentBuilder expects SyncInitializeHandler (returns response directly)
171+
AcpSyncAgent agent = AcpAgent.sync(transportPair.agentTransport())
172+
.requestTimeout(Duration.ofSeconds(5))
173+
.initializeHandler(request ->
174+
new AcpSchema.InitializeResponse(1, new AcpSchema.AgentCapabilities(), List.of()))
175+
.build();
176+
177+
// Track whether run() returned
178+
AtomicBoolean runReturned = new AtomicBoolean(false);
179+
180+
// Start agent using run() in background thread
181+
Thread agentThread = new Thread(() -> {
182+
agent.run(); // Should block until transport closes
183+
runReturned.set(true);
184+
});
185+
agentThread.setDaemon(true);
186+
agentThread.start();
187+
188+
// Give a moment for agent to start
189+
Thread.sleep(200);
190+
assertThat(runReturned.get())
191+
.describedAs("run() should NOT return while transport is still open")
192+
.isFalse();
193+
194+
// Close transport - this should unblock run()
195+
transportPair.closeGracefully().block(Duration.ofSeconds(5));
196+
197+
// Wait for agent thread to complete
198+
agentThread.join(5000);
199+
assertThat(runReturned.get())
200+
.describedAs("run() should return after transport closes")
201+
.isTrue();
202+
}
203+
204+
/**
205+
* Gets the names of all threads that start with "acp-".
206+
* @return set of ACP thread names
207+
*/
208+
private Set<String> getAcpThreadNames() {
209+
return Thread.getAllStackTraces().keySet().stream()
210+
.map(Thread::getName)
211+
.filter(name -> name.startsWith("acp-"))
212+
.collect(Collectors.toSet());
213+
}
214+
215+
}

0 commit comments

Comments
 (0)