Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<version>5.14.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.17.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
55 changes: 40 additions & 15 deletions src/main/java/com/github/copilot/sdk/CopilotSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -121,6 +124,7 @@ public final class CopilotSession implements AutoCloseable {
private volatile EventErrorHandler eventErrorHandler;
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
private final ScheduledExecutorService timeoutScheduler;

/** Tracks whether this session instance has been terminated via close(). */
private volatile boolean isTerminated = false;
Expand Down Expand Up @@ -157,6 +161,13 @@ public final class CopilotSession implements AutoCloseable {
this.sessionId = sessionId;
this.rpc = rpc;
this.workspacePath = workspacePath;
var executor = new ScheduledThreadPoolExecutor(1, r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
executor.setRemoveOnCancelPolicy(true);
this.timeoutScheduler = executor;
}

/**
Expand Down Expand Up @@ -407,29 +418,41 @@ public CompletableFuture<AssistantMessageEvent> sendAndWait(MessageOptions optio
return null;
});

// Set up timeout with daemon thread so it doesn't prevent JVM exit
var scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
var t = new Thread(r, "sendAndWait-timeout");
t.setDaemon(true);
return t;
});
scheduler.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
}
scheduler.shutdown();
}, timeoutMs, TimeUnit.MILLISECONDS);

var result = new CompletableFuture<AssistantMessageEvent>();

// Schedule timeout on the shared session-level scheduler.
// Per Javadoc, timeoutMs <= 0 means "no timeout".
ScheduledFuture<?> timeoutTask = null;
if (timeoutMs > 0) {
try {
timeoutTask = timeoutScheduler.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(
new TimeoutException("sendAndWait timed out after " + timeoutMs + "ms"));
}
}, timeoutMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
try {
subscription.close();
} catch (IOException closeEx) {
e.addSuppressed(closeEx);
}
result.completeExceptionally(e);
return result;
}
}

// When inner future completes, run cleanup and propagate to result
final ScheduledFuture<?> taskToCancel = timeoutTask;
future.whenComplete((r, ex) -> {
try {
subscription.close();
} catch (IOException e) {
LOG.log(Level.SEVERE, "Error closing subscription", e);
}
scheduler.shutdown();
if (taskToCancel != null) {
taskToCancel.cancel(false);
}
if (!result.isDone()) {
if (ex != null) {
result.completeExceptionally(ex);
Expand Down Expand Up @@ -1303,6 +1326,8 @@ public void close() {
isTerminated = true;
}

timeoutScheduler.shutdownNow();

try {
rpc.invoke("session.destroy", Map.of("sessionId", sessionId), Void.class).get(5, TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.json.MessageOptions;

/**
* Reproduces the race between {@code sendAndWait()} and {@code close()}.
* <p>
* If {@code close()} shuts down the timeout scheduler after
* {@code ensureNotTerminated()} passes but before
* {@code timeoutScheduler.schedule()} executes, the schedule call throws
* {@link RejectedExecutionException}. Without a fix the exception propagates
* uncaught, leaking the event subscription and leaving the returned future
* incomplete.
*/
public class SchedulerShutdownRaceTest {

@SuppressWarnings("unchecked")
@Test
void sendAndWaitShouldReturnFailedFutureWhenSchedulerIsShutDown() throws Exception {
// Build a session via reflection (package-private constructor)
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
ctor.setAccessible(true);

// Mock JsonRpcClient so send() returns a pending future instead of NPE
var mockRpc = mock(JsonRpcClient.class);
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());

var session = ctor.newInstance("race-test", mockRpc, null);

// Shut down the scheduler without setting isTerminated,
// simulating the race window between ensureNotTerminated() and schedule()
var schedulerField = CopilotSession.class.getDeclaredField("timeoutScheduler");
schedulerField.setAccessible(true);
var scheduler = (ScheduledExecutorService) schedulerField.get(session);
scheduler.shutdownNow();

// With the fix: sendAndWait returns a future that completes exceptionally.
// Without the fix: sendAndWait throws RejectedExecutionException directly.
CompletableFuture<?> result = session.sendAndWait(new MessageOptions().setPrompt("test"), 5000);

assertNotNull(result, "sendAndWait should return a future, not throw");
var ex = assertThrows(ExecutionException.class, () -> result.get(1, TimeUnit.SECONDS));
assertInstanceOf(RejectedExecutionException.class, ex.getCause());
}
}
146 changes: 146 additions & 0 deletions src/test/java/com/github/copilot/sdk/TimeoutEdgeCaseTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.json.MessageOptions;

/**
* Tests for timeout edge cases in {@link CopilotSession#sendAndWait}.
* <p>
* These tests prove two defects in the current per-call
* {@code ScheduledExecutorService} approach:
* <ol>
* <li>A timeout fires after {@code close()}, leaking a {@code TimeoutException}
* onto the returned future.</li>
* <li>Each {@code sendAndWait} call spawns a new OS thread (~1 MB stack),
* instead of reusing a shared scheduler thread.</li>
* </ol>
*/
public class TimeoutEdgeCaseTest {

/**
* Creates a {@link JsonRpcClient} whose {@code invoke()} returns futures that
* never complete. The reader thread blocks forever on the input stream, and
* writes go to a no-op output stream.
*/
private JsonRpcClient createHangingRpcClient() throws Exception {
InputStream blockingInput = new InputStream() {
@Override
public int read() throws IOException {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
}
return -1;
}
};
ByteArrayOutputStream sinkOutput = new ByteArrayOutputStream();

var ctor = JsonRpcClient.class.getDeclaredConstructor(InputStream.class, java.io.OutputStream.class,
Socket.class, Process.class);
ctor.setAccessible(true);
return (JsonRpcClient) ctor.newInstance(blockingInput, sinkOutput, null, null);
}

/**
* After {@code close()}, the future returned by {@code sendAndWait} must NOT be
* completed by a stale timeout.
* <p>
* Current buggy behavior: the per-call scheduler is not cancelled by
* {@code close()}, so its 2-second timeout fires during the 5-second
* {@code session.destroy} RPC wait, completing the future with
* {@code TimeoutException}.
* <p>
* Expected behavior after fix: {@code close()} cancels pending timeouts before
* the blocking RPC call, so the future remains incomplete.
*/
@Test
void testTimeoutDoesNotFireAfterSessionClose() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
try (CopilotSession session = new CopilotSession("test-timeout-id", rpc)) {

CompletableFuture<AssistantMessageEvent> result = session
.sendAndWait(new MessageOptions().setPrompt("hello"), 2000);

assertFalse(result.isDone(), "Future should be pending before timeout fires");

// close() blocks up to 5s on session.destroy RPC. The 2s timeout
// fires during that window with the current per-call scheduler.
session.close();

assertFalse(result.isDone(), "Future should not be completed by a timeout after session is closed. "
+ "The per-call ScheduledExecutorService leaked a TimeoutException.");
}
} finally {
rpc.close();
}
}

/**
* A shared scheduler should reuse a single thread across multiple
* {@code sendAndWait} calls, rather than spawning a new OS thread per call.
* <p>
* Current buggy behavior: two calls create two {@code sendAndWait-timeout}
* threads.
* <p>
* Expected behavior after fix: two calls still use only one scheduler thread.
*/
@Test
void testSendAndWaitReusesTimeoutThread() throws Exception {
JsonRpcClient rpc = createHangingRpcClient();
try {
try (CopilotSession session = new CopilotSession("test-thread-count-id", rpc)) {

long baselineCount = countTimeoutThreads();

CompletableFuture<AssistantMessageEvent> result1 = session
.sendAndWait(new MessageOptions().setPrompt("hello1"), 30000);

Thread.sleep(100);
long afterFirst = countTimeoutThreads();
assertTrue(afterFirst >= baselineCount + 1,
"Expected at least one new sendAndWait-timeout thread after first call. " + "Baseline: "
+ baselineCount + ", after: " + afterFirst);

CompletableFuture<AssistantMessageEvent> result2 = session
.sendAndWait(new MessageOptions().setPrompt("hello2"), 30000);

Thread.sleep(100);
long afterSecond = countTimeoutThreads();
assertTrue(afterSecond == afterFirst,
"Shared scheduler should reuse the same thread — no new threads after second call. "
+ "After first: " + afterFirst + ", after second: " + afterSecond);

result1.cancel(true);
result2.cancel(true);
}
} finally {
rpc.close();
}
}

/**
* Counts the number of live threads whose name contains "sendAndWait-timeout".
*/
private long countTimeoutThreads() {
return Thread.getAllStackTraces().keySet().stream().filter(t -> t.getName().contains("sendAndWait-timeout"))
.filter(Thread::isAlive).count();
}
}
47 changes: 47 additions & 0 deletions src/test/java/com/github/copilot/sdk/ZeroTimeoutContractTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
*--------------------------------------------------------------------------------------------*/

package com.github.copilot.sdk;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Test;

import com.github.copilot.sdk.events.AssistantMessageEvent;
import com.github.copilot.sdk.json.MessageOptions;

/**
* Verifies the documented contract that {@code timeoutMs <= 0} means "no
* timeout" in {@link CopilotSession#sendAndWait(MessageOptions, long)}.
*/
public class ZeroTimeoutContractTest {

@SuppressWarnings("unchecked")
@Test
void sendAndWaitWithZeroTimeoutShouldNotTimeOut() throws Exception {
// Build a session via reflection (package-private constructor)
var ctor = CopilotSession.class.getDeclaredConstructor(String.class, JsonRpcClient.class, String.class);
ctor.setAccessible(true);

var mockRpc = mock(JsonRpcClient.class);
when(mockRpc.invoke(any(), any(), any())).thenReturn(new CompletableFuture<>());

var session = ctor.newInstance("zero-timeout-test", mockRpc, null);

// Per the Javadoc: timeoutMs of 0 means "no timeout".
// The future should NOT complete with TimeoutException.
CompletableFuture<AssistantMessageEvent> result = session.sendAndWait(new MessageOptions().setPrompt("test"),
0);

// Give the scheduler a chance to fire if it was (incorrectly) scheduled
Thread.sleep(200);

// The future should still be pending — not timed out
assertFalse(result.isDone(), "Future should not be done; timeoutMs=0 means no timeout per Javadoc");
}
}
Loading