Skip to content

Commit 562527f

Browse files
authored
Merge pull request #38 from github/edburns/dd-2758695-virtual-threads
Add **Shared `ScheduledExecutorService`** for timeouts
2 parents c67dbb3 + 4c6c2ec commit 562527f

File tree

5 files changed

+307
-15
lines changed

5 files changed

+307
-15
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@
8686
<version>5.14.1</version>
8787
<scope>test</scope>
8888
</dependency>
89+
<dependency>
90+
<groupId>org.mockito</groupId>
91+
<artifactId>mockito-core</artifactId>
92+
<version>5.17.0</version>
93+
<scope>test</scope>
94+
</dependency>
8995
</dependencies>
9096

9197
<build>

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import java.util.Set;
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
16-
import java.util.concurrent.Executors;
16+
import java.util.concurrent.RejectedExecutionException;
17+
import java.util.concurrent.ScheduledExecutorService;
18+
import java.util.concurrent.ScheduledFuture;
19+
import java.util.concurrent.ScheduledThreadPoolExecutor;
1720
import java.util.concurrent.TimeUnit;
1821
import java.util.concurrent.TimeoutException;
1922
import java.util.concurrent.atomic.AtomicReference;
@@ -121,6 +124,7 @@ public final class CopilotSession implements AutoCloseable {
121124
private volatile EventErrorHandler eventErrorHandler;
122125
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
123126
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
127+
private final ScheduledExecutorService timeoutScheduler;
124128

125129
/** Tracks whether this session instance has been terminated via close(). */
126130
private volatile boolean isTerminated = false;
@@ -157,6 +161,13 @@ public final class CopilotSession implements AutoCloseable {
157161
this.sessionId = sessionId;
158162
this.rpc = rpc;
159163
this.workspacePath = workspacePath;
164+
var executor = new ScheduledThreadPoolExecutor(1, r -> {
165+
var t = new Thread(r, "sendAndWait-timeout");
166+
t.setDaemon(true);
167+
return t;
168+
});
169+
executor.setRemoveOnCancelPolicy(true);
170+
this.timeoutScheduler = executor;
160171
}
161172

162173
/**
@@ -407,29 +418,41 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
407418
return null;
408419
});
409420

410-
// Set up timeout with daemon thread so it doesn't prevent JVM exit
411-
var scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
412-
var t = new Thread(r, "sendAndWait-timeout");
413-
t.setDaemon(true);
414-
return t;
415-
});
416-
scheduler.schedule(() -> {
417-
if (!future.isDone()) {
418-
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
419-
}
420-
scheduler.shutdown();
421-
}, timeoutMs, TimeUnit.MILLISECONDS);
422-
423421
var result = new CompletableFuture<AssistantMessageEvent>();
424422

423+
// Schedule timeout on the shared session-level scheduler.
424+
// Per Javadoc, timeoutMs <= 0 means "no timeout".
425+
ScheduledFuture<?> timeoutTask = null;
426+
if (timeoutMs > 0) {
427+
try {
428+
timeoutTask = timeoutScheduler.schedule(() -> {
429+
if (!future.isDone()) {
430+
future.completeExceptionally(
431+
new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
432+
}
433+
}, timeoutMs, TimeUnit.MILLISECONDS);
434+
} catch (RejectedExecutionException e) {
435+
try {
436+
subscription.close();
437+
} catch (IOException closeEx) {
438+
e.addSuppressed(closeEx);
439+
}
440+
result.completeExceptionally(e);
441+
return result;
442+
}
443+
}
444+
425445
// When inner future completes, run cleanup and propagate to result
446+
final ScheduledFuture<?> taskToCancel = timeoutTask;
426447
future.whenComplete((r, ex) -> {
427448
try {
428449
subscription.close();
429450
} catch (IOException e) {
430451
LOG.log(Level.SEVERE, "Error closing subscription", e);
431452
}
432-
scheduler.shutdown();
453+
if (taskToCancel != null) {
454+
taskToCancel.cancel(false);
455+
}
433456
if (!result.isDone()) {
434457
if (ex != null) {
435458
result.completeExceptionally(ex);
@@ -1303,6 +1326,8 @@ public void close() {
13031326
isTerminated = true;
13041327
}
13051328

1329+
timeoutScheduler.shutdownNow();
1330+
13061331
try {
13071332
rpc.invoke("session.destroy", Map.of("sessionId", sessionId), Void.class).get(5, TimeUnit.SECONDS);
13081333
} catch (Exception e) {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.ArgumentMatchers.*;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.RejectedExecutionException;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.TimeUnit;
16+
17+
import org.junit.jupiter.api.Test;
18+
19+
import com.github.copilot.sdk.json.MessageOptions;
20+
21+
/**
22+
* Regression coverage for the race between {@code sendAndWait()} and
23+
* {@code close()}.
24+
* <p>
25+
* If {@code close()} shuts down the timeout scheduler after
26+
* {@code ensureNotTerminated()} passes but before
27+
* {@code timeoutScheduler.schedule()} executes, the schedule call throws
28+
* {@link RejectedExecutionException}. This test asserts that
29+
* {@code sendAndWait()} handles this race by returning a future that completes
30+
* exceptionally (rather than propagating the exception to the caller or leaving
31+
* the returned future incomplete).
32+
*/
33+
public class SchedulerShutdownRaceTest {
34+
35+
@SuppressWarnings("unchecked")
36+
@Test
37+
void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception {
38+
// Build a session via reflection (package-private constructor)
39+
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
40+
ctor.setAccessible(true);
41+
42+
// Mock JsonRpcClient so send() returns a pending future instead of NPE
43+
var mockRpc = mock(JsonRpcClient.class);
44+
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());
45+
46+
var session = ctor.newInstance("race-test", mockRpc, null);
47+
48+
// Shut down the scheduler without setting isTerminated,
49+
// simulating the race window between ensureNotTerminated() and schedule()
50+
var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler");
51+
schedulerField.setAccessible(true);
52+
var scheduler = (ScheduledExecutorService) schedulerField.get(session);
53+
scheduler.shutdownNow();
54+
55+
// sendAndWait must return a failed future rather than throwing directly.
56+
CompletableFuture<?> result = session.sendAndWait(new MessageOptions().setPrompt("test"), 5000);
57+
58+
assertNotNull(result, "sendAndWait should return a future, not throw");
59+
var ex = assertThrows(ExecutionException.class, () -> result.get(1, TimeUnit.SECONDS));
60+
assertInstanceOf(RejectedExecutionException.class, ex.getCause());
61+
}
62+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.assertFalse;
8+
import static org.junit.jupiter.api.Assertions.assertTrue;
9+
10+
import java.io.ByteArrayOutputStream;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.net.Socket;
14+
import java.util.concurrent.CompletableFuture;
15+
16+
import org.junit.jupiter.api.Test;
17+
18+
import com.github.copilot.sdk.events.AssistantMessageEvent;
19+
import com.github.copilot.sdk.json.MessageOptions;
20+
21+
/**
22+
* Regression tests for timeout edge cases in
23+
* {@link CopilotSession#sendAndWait}.
24+
* <p>
25+
* These tests assert two behavioral contracts of the shared
26+
* {@code ScheduledExecutorService} approach:
27+
* <ol>
28+
* <li>A pending timeout must NOT fire after {@code close()} and must NOT
29+
* complete the returned future with a {@code TimeoutException}.</li>
30+
* <li>Multiple {@code sendAndWait} calls must reuse a single shared scheduler
31+
* thread rather than spawning a new OS thread per call.</li>
32+
* </ol>
33+
*/
34+
public class TimeoutEdgeCaseTest {
35+
36+
/**
37+
* Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that
38+
* never complete. The reader thread blocks forever on the input stream, and
39+
* writes go to a no-op output stream.
40+
*/
41+
private JsonRpcClient createHangingRpcClient() throws Exception {
42+
InputStream blockingInput = new InputStream() {
43+
@Override
44+
public int read() throws IOException {
45+
try {
46+
Thread.sleep(Long.MAX_VALUE);
47+
} catch (InterruptedException e) {
48+
Thread.currentThread().interrupt();
49+
return -1;
50+
}
51+
return -1;
52+
}
53+
};
54+
ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream();
55+
56+
var ctor = JsonRpcClient.class.getDeclaredConstructor(InputStream.class, java.io.OutputStream.class,
57+
Socket.class, Process.class);
58+
ctor.setAccessible(true);
59+
return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null);
60+
}
61+
62+
/**
63+
* After {@code close()}, the future returned by {@code sendAndWait} must NOT be
64+
* completed by a stale timeout.
65+
* <p>
66+
* Contract: {@code close()} shuts down the timeout scheduler before the
67+
* blocking {@code session.destroy} RPC call, so any pending timeout task is
68+
* cancelled and the future remains incomplete (not exceptionally completed with
69+
* {@code TimeoutException}).
70+
*/
71+
@Test
72+
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
73+
JsonRpcClient rpc = createHangingRpcClient();
74+
try {
75+
try (CopilotSession session = new CopilotSession("test-timeout-id", rpc)) {
76+
77+
CompletableFuture<AssistantMessageEvent> result = session
78+
.sendAndWait(new MessageOptions().setPrompt("hello"), 2000);
79+
80+
assertFalse(result.isDone(), "Future should be pending before timeout fires");
81+
82+
// close() blocks up to 5s on session.destroy RPC. The 2s timeout
83+
// fires during that window with the current per-call scheduler.
84+
session.close();
85+
86+
assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. "
87+
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
88+
}
89+
} finally {
90+
rpc.close();
91+
}
92+
}
93+
94+
/**
95+
* A shared scheduler must reuse a single thread across multiple
96+
* {@code sendAndWait} calls, rather than spawning a new OS thread per call.
97+
* <p>
98+
* Contract: after two consecutive {@code sendAndWait} calls the number of live
99+
* {@code sendAndWait-timeout} threads must not increase after the second call.
100+
*/
101+
@Test
102+
void testSendAndWaitReusesTimeoutThread() throws Exception {
103+
JsonRpcClient rpc = createHangingRpcClient();
104+
try {
105+
try (CopilotSession session = new CopilotSession("test-thread-count-id", rpc)) {
106+
107+
long baselineCount = countTimeoutThreads();
108+
109+
CompletableFuture<AssistantMessageEvent> result1 = session
110+
.sendAndWait(new MessageOptions().setPrompt("hello1"), 30000);
111+
112+
Thread.sleep(100);
113+
long afterFirst = countTimeoutThreads();
114+
assertTrue(afterFirst >= baselineCount + 1,
115+
"Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: "
116+
+ baselineCount + ", after: " + afterFirst);
117+
118+
CompletableFuture<AssistantMessageEvent> result2 = session
119+
.sendAndWait(new MessageOptions().setPrompt("hello2"), 30000);
120+
121+
Thread.sleep(100);
122+
long afterSecond = countTimeoutThreads();
123+
assertTrue(afterSecond == afterFirst,
124+
"Shared scheduler should reuse the same thread — no new threads after second call. "
125+
+ "After first: " + afterFirst + ", after second: " + afterSecond);
126+
127+
result1.cancel(true);
128+
result2.cancel(true);
129+
}
130+
} finally {
131+
rpc.close();
132+
}
133+
}
134+
135+
/**
136+
* Counts the number of live threads whose name contains "sendAndWait-timeout".
137+
*/
138+
private long countTimeoutThreads() {
139+
return Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("sendAndWait-timeout"))
140+
.filter(Thread::isAlive).count();
141+
}
142+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.ArgumentMatchers.*;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
13+
import org.junit.jupiter.api.Test;
14+
15+
import com.github.copilot.sdk.events.AssistantMessageEvent;
16+
import com.github.copilot.sdk.json.MessageOptions;
17+
18+
/**
19+
* Verifies the documented contract that {@code timeoutMs <= 0} means "no
20+
* timeout" in {@link CopilotSession#sendAndWait(MessageOptions, long)}.
21+
*/
22+
public class ZeroTimeoutContractTest {
23+
24+
@SuppressWarnings("unchecked")
25+
@Test
26+
void sendAndWaitWithZeroTimeoutShouldNotTimeOut() throws Exception {
27+
// Build a session via reflection (package-private constructor)
28+
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
29+
ctor.setAccessible(true);
30+
31+
var mockRpc = mock(JsonRpcClient.class);
32+
when(mockRpc.invoke(any(), any(), any())).thenAnswer(invocation -> {
33+
Object method = invocation.getArgument(0);
34+
if ("session.destroy".equals(method)) {
35+
// Make session.close() non-blocking by completing destroy immediately
36+
return CompletableFuture.completedFuture(null);
37+
}
38+
// For other calls (e.g., message send), return an incomplete future so the
39+
// sendAndWait result does not complete due to a mock response.
40+
return new CompletableFuture<>();
41+
});
42+
43+
try (var session = ctor.newInstance("zero-timeout-test", mockRpc, null)) {
44+
45+
// Per the Javadoc: timeoutMs of 0 means "no timeout".
46+
// The future should NOT complete with TimeoutException.
47+
CompletableFuture<AssistantMessageEvent> result = session
48+
.sendAndWait(new MessageOptions().setPrompt("test"), 0);
49+
50+
// Give the scheduler a chance to fire if it was (incorrectly) scheduled
51+
Thread.sleep(200);
52+
53+
// The future should still be pending — not timed out
54+
assertFalse(result.isDone(), "Future should not be done; timeoutMs=0 means no timeout per Javadoc");
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)