Skip to content

Commit a1668c7

Browse files
edburnsCopilot
andcommitted
Fix scheduler memory leak and close() race condition in CopilotSession
pom.xml Add mockito-core 5.17.0 as a test dependency. src/main/java/com/github/copilot/sdk/CopilotSession.java Replace Executors.newSingleThreadScheduledExecutor with explicit ScheduledThreadPoolExecutor and enable removeOnCancelPolicy(true) so cancelled timeout tasks are purged from the work queue immediately. Wrap timeoutScheduler.schedule() in a try-catch for RejectedExecutionException. On rejection (close() race), the event subscription is cleaned up and the returned future completes exceptionally instead of throwing uncaught. src/test/java/com/github/copilot/sdk/SchedulerShutdownRaceTest.java (new) TDD test that reproduces the scheduler shutdown race. Uses Mockito to stub JsonRpcClient.invoke(), then shuts down the scheduler without setting isTerminated, and asserts sendAndWait() returns a failed future rather than throwing RejectedExecutionException. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Ed Burns <edburns@microsoft.com>
1 parent 9e102bc commit a1668c7

File tree

3 files changed

+91
-6
lines changed

3 files changed

+91
-6
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@
8686
<version>5.14.1</version>
8787
<scope>test</scope>
8888
</dependency>
89+
<dependency>
90+
<groupId>org.mockito</groupId>
91+
<artifactId>mockito-core</artifactId>
92+
<version>5.17.0</version>
93+
<scope>test</scope>
94+
</dependency>
8995
</dependencies>
9096

9197
<build>

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
1616
import java.util.concurrent.Executors;
17+
import java.util.concurrent.RejectedExecutionException;
1718
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.concurrent.ScheduledFuture;
1820
import java.util.concurrent.ScheduledThreadPoolExecutor;
1921
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.TimeoutException;
@@ -417,14 +419,26 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
417419
return null;
418420
});
419421

422+
var result = new CompletableFuture<AssistantMessageEvent>();
423+
420424
// Schedule timeout on the shared session-level scheduler
421-
var timeoutTask = timeoutScheduler.schedule(() -> {
422-
if (!future.isDone()) {
423-
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
425+
ScheduledFuture<?> timeoutTask;
426+
try {
427+
timeoutTask = timeoutScheduler.schedule(() -> {
428+
if (!future.isDone()) {
429+
future.completeExceptionally(
430+
new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
431+
}
432+
}, timeoutMs, TimeUnit.MILLISECONDS);
433+
} catch (RejectedExecutionException e) {
434+
try {
435+
subscription.close();
436+
} catch (IOException closeEx) {
437+
e.addSuppressed(closeEx);
424438
}
425-
}, timeoutMs, TimeUnit.MILLISECONDS);
426-
427-
var result = new CompletableFuture<AssistantMessageEvent>();
439+
result.completeExceptionally(e);
440+
return result;
441+
}
428442

429443
// When inner future completes, run cleanup and propagate to result
430444
future.whenComplete((r, ex) -> {
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
*--------------------------------------------------------------------------------------------*/
4+
5+
package com.github.copilot.sdk;
6+
7+
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.mockito.ArgumentMatchers.*;
9+
import static org.mockito.Mockito.*;
10+
11+
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.RejectedExecutionException;
14+
import java.util.concurrent.ScheduledExecutorService;
15+
import java.util.concurrent.TimeUnit;
16+
17+
import org.junit.jupiter.api.Test;
18+
19+
import com.github.copilot.sdk.json.MessageOptions;
20+
21+
/**
22+
* Reproduces the race between {@code sendAndWait()} and {@code close()}.
23+
* <p>
24+
* If {@code close()} shuts down the timeout scheduler after
25+
* {@code ensureNotTerminated()} passes but before
26+
* {@code timeoutScheduler.schedule()} executes, the schedule call throws
27+
* {@link RejectedExecutionException}. Without a fix the exception propagates
28+
* uncaught, leaking the event subscription and leaving the returned future
29+
* incomplete.
30+
*/
31+
public class SchedulerShutdownRaceTest {
32+
33+
@SuppressWarnings("unchecked")
34+
@Test
35+
void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception {
36+
// Build a session via reflection (package-private constructor)
37+
var ctor = CopilotSession.class.getDeclaredConstructor(
38+
String.class, JsonRpcClient.class, String.class);
39+
ctor.setAccessible(true);
40+
41+
// Mock JsonRpcClient so send() returns a pending future instead of NPE
42+
var mockRpc = mock(JsonRpcClient.class);
43+
when(mockRpc.invoke(any(), any(), any()))
44+
.thenReturn(new CompletableFuture<>());
45+
46+
var session = ctor.newInstance("race-test", mockRpc, null);
47+
48+
// Shut down the scheduler without setting isTerminated,
49+
// simulating the race window between ensureNotTerminated() and schedule()
50+
var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler");
51+
schedulerField.setAccessible(true);
52+
var scheduler = (ScheduledExecutorService) schedulerField.get(session);
53+
scheduler.shutdownNow();
54+
55+
// With the fix: sendAndWait returns a future that completes exceptionally.
56+
// Without the fix: sendAndWait throws RejectedExecutionException directly.
57+
CompletableFuture<?> result = session.sendAndWait(
58+
new MessageOptions().setPrompt("test"), 5000);
59+
60+
assertNotNull(result, "sendAndWait should return a future, not throw");
61+
var ex = assertThrows(ExecutionException.class,
62+
() -> result.get(1, TimeUnit.SECONDS));
63+
assertInstanceOf(RejectedExecutionException.class, ex.getCause());
64+
}
65+
}

0 commit comments

Comments
 (0)