Skip to content

Commit 95173fd

Browse files
authored
Merge pull request #40 from github/edburns/dd-2758695-virtual-threads-accept-executor
Accept optional Executor on CopilotClientOptions to mitigate pool starvation
2 parents 6ccc5e3 + 4fdef27 commit 95173fd

File tree

10 files changed

+808
-216
lines changed

10 files changed

+808
-216
lines changed

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,23 @@ implementation 'com.github:copilot-sdk-java:0.2.1-java.0'
6969
import com.github.copilot.sdk.CopilotClient;
7070
import com.github.copilot.sdk.events.AssistantMessageEvent;
7171
import com.github.copilot.sdk.events.SessionUsageInfoEvent;
72+
import com.github.copilot.sdk.json.CopilotClientOptions;
7273
import com.github.copilot.sdk.json.MessageOptions;
7374
import com.github.copilot.sdk.json.PermissionHandler;
7475
import com.github.copilot.sdk.json.SessionConfig;
7576

77+
import java.util.concurrent.Executors;
78+
7679
public class CopilotSDK {
7780
public static void main(String[] args) throws Exception {
7881
var lastMessage = new String[]{null};
7982

8083
// Create and start client
81-
try (var client = new CopilotClient()) {
84+
try (var client = new CopilotClient()) { // JDK 25+: comment out this line
85+
// JDK 25+: uncomment the following 3 lines for virtual thread support
86+
// var options = new CopilotClientOptions()
87+
// .setExecutor(Executors.newVirtualThreadPerTaskExecutor());
88+
// try (var client = new CopilotClient(options)) {
8289
client.start().get();
8390

8491
// Create a session

src/main/java/com/github/copilot/sdk/CopilotClient.java

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.CompletionException;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
17+
import java.util.concurrent.RejectedExecutionException;
1618
import java.util.concurrent.TimeUnit;
1719
import java.util.logging.Level;
1820
import java.util.logging.Logger;
@@ -150,42 +152,51 @@ public CompletableFuture<Void> start() {
150152
private CompletableFuture<Connection> startCore() {
151153
LOG.fine("Starting Copilot client");
152154

153-
return CompletableFuture.supplyAsync(() -> {
154-
try {
155-
JsonRpcClient rpc;
156-
Process process = null;
157-
158-
if (optionsHost != null && optionsPort != null) {
159-
// External server (TCP)
160-
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
161-
} else {
162-
// Child process (stdio or TCP)
163-
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
164-
process = processInfo.process();
165-
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
166-
processInfo.port());
167-
}
155+
Executor exec = options.getExecutor();
156+
try {
157+
return exec != null
158+
? CompletableFuture.supplyAsync(this::startCoreBody, exec)
159+
: CompletableFuture.supplyAsync(this::startCoreBody);
160+
} catch (RejectedExecutionException e) {
161+
return CompletableFuture.failedFuture(e);
162+
}
163+
}
168164

169-
Connection connection = new Connection(rpc, process);
165+
private Connection startCoreBody() {
166+
try {
167+
JsonRpcClient rpc;
168+
Process process = null;
169+
170+
if (optionsHost != null && optionsPort != null) {
171+
// External server (TCP)
172+
rpc = serverManager.connectToServer(null, optionsHost, optionsPort);
173+
} else {
174+
// Child process (stdio or TCP)
175+
CliServerManager.ProcessInfo processInfo = serverManager.startCliServer();
176+
process = processInfo.process();
177+
rpc = serverManager.connectToServer(process, processInfo.port() != null ? "localhost" : null,
178+
processInfo.port());
179+
}
170180

171-
// Register handlers for server-to-client calls
172-
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch);
173-
dispatcher.registerHandlers(rpc);
181+
Connection connection = new Connection(rpc, process);
174182

175-
// Verify protocol version
176-
verifyProtocolVersion(connection);
183+
// Register handlers for server-to-client calls
184+
RpcHandlerDispatcher dispatcher = new RpcHandlerDispatcher(sessions, lifecycleManager::dispatch,
185+
options.getExecutor());
186+
dispatcher.registerHandlers(rpc);
177187

178-
LOG.info("Copilot client connected");
179-
return connection;
180-
} catch (Exception e) {
181-
String stderr = serverManager.getStderrOutput();
182-
if (!stderr.isEmpty()) {
183-
throw new CompletionException(
184-
new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
185-
}
186-
throw new CompletionException(e);
188+
// Verify protocol version
189+
verifyProtocolVersion(connection);
190+
191+
LOG.info("Copilot client connected");
192+
return connection;
193+
} catch (Exception e) {
194+
String stderr = serverManager.getStderrOutput();
195+
if (!stderr.isEmpty()) {
196+
throw new CompletionException(new IOException("CLI process exited unexpectedly. stderr: " + stderr, e));
187197
}
188-
});
198+
throw new CompletionException(e);
199+
}
189200
}
190201

191202
private static final int MIN_PROTOCOL_VERSION = 2;
@@ -228,15 +239,27 @@ private void verifyProtocolVersion(Connection connection) throws Exception {
228239
*/
229240
public CompletableFuture<Void> stop() {
230241
var closeFutures = new ArrayList<CompletableFuture<Void>>();
242+
Executor exec = options.getExecutor();
231243

232244
for (CopilotSession session : new ArrayList<>(sessions.values())) {
233-
closeFutures.add(CompletableFuture.runAsync(() -> {
245+
Runnable closeTask = () -> {
234246
try {
235247
session.close();
236248
} catch (Exception e) {
237249
LOG.log(Level.WARNING, "Error closing session " + session.getSessionId(), e);
238250
}
239-
}));
251+
};
252+
CompletableFuture<Void> future;
253+
try {
254+
future = exec != null
255+
? CompletableFuture.runAsync(closeTask, exec)
256+
: CompletableFuture.runAsync(closeTask);
257+
} catch (RejectedExecutionException e) {
258+
LOG.log(Level.WARNING, "Executor rejected session close task; closing inline", e);
259+
closeTask.run();
260+
future = CompletableFuture.completedFuture(null);
261+
}
262+
closeFutures.add(future);
240263
}
241264
sessions.clear();
242265

@@ -329,6 +352,9 @@ public CompletableFuture<CopilotSession> createSession(SessionConfig config) {
329352
: java.util.UUID.randomUUID().toString();
330353

331354
var session = new CopilotSession(sessionId, connection.rpc);
355+
if (options.getExecutor() != null) {
356+
session.setExecutor(options.getExecutor());
357+
}
332358
SessionRequestBuilder.configureSession(session, config);
333359
sessions.put(sessionId, session);
334360

@@ -399,6 +425,9 @@ public CompletableFuture<CopilotSession> resumeSession(String sessionId, ResumeS
399425
return ensureConnected().thenCompose(connection -> {
400426
// Register the session before the RPC call to avoid missing early events.
401427
var session = new CopilotSession(sessionId, connection.rpc);
428+
if (options.getExecutor() != null) {
429+
session.setExecutor(options.getExecutor());
430+
}
402431
SessionRequestBuilder.configureSession(session, config);
403432
sessions.put(sessionId, session);
404433

src/main/java/com/github/copilot/sdk/CopilotSession.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Set;
1414
import java.util.concurrent.CompletableFuture;
1515
import java.util.concurrent.ConcurrentHashMap;
16+
import java.util.concurrent.Executor;
1617
import java.util.concurrent.RejectedExecutionException;
1718
import java.util.concurrent.ScheduledExecutorService;
1819
import java.util.concurrent.ScheduledFuture;
@@ -125,6 +126,7 @@ public final class CopilotSession implements AutoCloseable {
125126
private volatile EventErrorPolicy eventErrorPolicy = EventErrorPolicy.PROPAGATE_AND_LOG_ERRORS;
126127
private volatile Map<String, java.util.function.Function<String, CompletableFuture<String>>> transformCallbacks;
127128
private final ScheduledExecutorService timeoutScheduler;
129+
private volatile Executor executor;
128130

129131
/** Tracks whether this session instance has been terminated via close(). */
130132
private volatile boolean isTerminated = false;
@@ -170,6 +172,14 @@ public final class CopilotSession implements AutoCloseable {
170172
this.timeoutScheduler = executor;
171173
}
172174

175+
/**
176+
* Sets the executor for internal async operations. Package-private; called by
177+
* CopilotClient after construction.
178+
*/
179+
void setExecutor(Executor executor) {
180+
this.executor = executor;
181+
}
182+
173183
/**
174184
* Gets the unique identifier for this session.
175185
*
@@ -673,7 +683,7 @@ private void handleBroadcastEventAsync(AbstractSessionEvent event) {
673683
*/
674684
private void executeToolAndRespondAsync(String requestId, String toolName, String toolCallId, Object arguments,
675685
ToolDefinition tool) {
676-
CompletableFuture.runAsync(() -> {
686+
Runnable task = () -> {
677687
try {
678688
JsonNode argumentsNode = arguments instanceof JsonNode jn
679689
? jn
@@ -718,7 +728,17 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
718728
LOG.log(Level.WARNING, "Error sending tool error for requestId=" + requestId, sendEx);
719729
}
720730
}
721-
});
731+
};
732+
try {
733+
if (executor != null) {
734+
CompletableFuture.runAsync(task, executor);
735+
} else {
736+
CompletableFuture.runAsync(task);
737+
}
738+
} catch (RejectedExecutionException e) {
739+
LOG.log(Level.WARNING, "Executor rejected tool task for requestId=" + requestId + "; running inline", e);
740+
task.run();
741+
}
722742
}
723743

724744
/**
@@ -727,7 +747,7 @@ private void executeToolAndRespondAsync(String requestId, String toolName, Strin
727747
*/
728748
private void executePermissionAndRespondAsync(String requestId, PermissionRequest permissionRequest,
729749
PermissionHandler handler) {
730-
CompletableFuture.runAsync(() -> {
750+
Runnable task = () -> {
731751
try {
732752
var invocation = new PermissionInvocation();
733753
invocation.setSessionId(sessionId);
@@ -766,7 +786,17 @@ private void executePermissionAndRespondAsync(String requestId, PermissionReques
766786
LOG.log(Level.WARNING, "Error sending permission denied for requestId=" + requestId, sendEx);
767787
}
768788
}
769-
});
789+
};
790+
try {
791+
if (executor != null) {
792+
CompletableFuture.runAsync(task, executor);
793+
} else {
794+
CompletableFuture.runAsync(task);
795+
}
796+
} catch (RejectedExecutionException e) {
797+
LOG.log(Level.WARNING, "Executor rejected perm task for requestId=" + requestId + "; running inline", e);
798+
task.run();
799+
}
770800
}
771801

772802
/**

src/main/java/com/github/copilot/sdk/RpcHandlerDispatcher.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import java.util.Collections;
1010
import java.util.Map;
1111
import java.util.concurrent.CompletableFuture;
12+
import java.util.concurrent.Executor;
13+
import java.util.concurrent.RejectedExecutionException;
1214
import java.util.logging.Level;
1315
import java.util.logging.Logger;
1416

@@ -45,6 +47,7 @@ final class RpcHandlerDispatcher {
4547

4648
private final Map<String, CopilotSession> sessions;
4749
private final LifecycleEventDispatcher lifecycleDispatcher;
50+
private final Executor executor;
4851

4952
/**
5053
* Creates a dispatcher with session registry and lifecycle dispatcher.
@@ -53,10 +56,14 @@ final class RpcHandlerDispatcher {
5356
* the session registry to look up sessions by ID
5457
* @param lifecycleDispatcher
5558
* callback for dispatching lifecycle events
59+
* @param executor
60+
* the executor for async dispatch, or {@code null} for default
5661
*/
57-
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher) {
62+
RpcHandlerDispatcher(Map<String, CopilotSession> sessions, LifecycleEventDispatcher lifecycleDispatcher,
63+
Executor executor) {
5864
this.sessions = sessions;
5965
this.lifecycleDispatcher = lifecycleDispatcher;
66+
this.executor = executor;
6067
}
6168

6269
/**
@@ -118,7 +125,7 @@ private void handleLifecycleEvent(JsonNode params) {
118125
}
119126

120127
private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params) {
121-
CompletableFuture.runAsync(() -> {
128+
runAsync(() -> {
122129
try {
123130
String sessionId = params.get("sessionId").asText();
124131
String toolCallId = params.get("toolCallId").asText();
@@ -178,7 +185,7 @@ private void handleToolCall(JsonRpcClient rpc, String requestId, JsonNode params
178185
}
179186

180187
private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
181-
CompletableFuture.runAsync(() -> {
188+
runAsync(() -> {
182189
try {
183190
String sessionId = params.get("sessionId").asText();
184191
JsonNode permissionRequest = params.get("permissionRequest");
@@ -222,7 +229,7 @@ private void handlePermissionRequest(JsonRpcClient rpc, String requestId, JsonNo
222229

223230
private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNode params) {
224231
LOG.fine("Received userInput.request: " + params);
225-
CompletableFuture.runAsync(() -> {
232+
runAsync(() -> {
226233
try {
227234
String sessionId = params.get("sessionId").asText();
228235
String question = params.get("question").asText();
@@ -278,7 +285,7 @@ private void handleUserInputRequest(JsonRpcClient rpc, String requestId, JsonNod
278285
}
279286

280287
private void handleHooksInvoke(JsonRpcClient rpc, String requestId, JsonNode params) {
281-
CompletableFuture.runAsync(() -> {
288+
runAsync(() -> {
282289
try {
283290
String sessionId = params.get("sessionId").asText();
284291
String hookType = params.get("hookType").asText();
@@ -321,7 +328,7 @@ interface LifecycleEventDispatcher {
321328
}
322329

323330
private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, JsonNode params) {
324-
CompletableFuture.runAsync(() -> {
331+
runAsync(() -> {
325332
try {
326333
final long requestIdLong;
327334
try {
@@ -359,4 +366,17 @@ private void handleSystemMessageTransform(JsonRpcClient rpc, String requestId, J
359366
}
360367
});
361368
}
369+
370+
private void runAsync(Runnable task) {
371+
try {
372+
if (executor != null) {
373+
CompletableFuture.runAsync(task, executor);
374+
} else {
375+
CompletableFuture.runAsync(task);
376+
}
377+
} catch (RejectedExecutionException e) {
378+
LOG.log(Level.WARNING, "Executor rejected handler task; running inline", e);
379+
task.run();
380+
}
381+
}
362382
}

0 commit comments

Comments
 (0)