Skip to content

Commit 8f1ae3d

Browse files
Transition to disconnected state on unexpected process/connection death
All SDKs now properly transition their connection state to 'disconnected' when the child process exits unexpectedly or the TCP connection drops: - Node.js: onClose/onError handlers in attachConnectionHandlers() - Go: onClose callback fired from readLoop() on unexpected exit - Python: on_close callback fired from _read_loop() on unexpected exit - .NET: rpc.Completion continuation sets _disconnected flag Includes unit tests for all four SDKs verifying the state transition. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c2930b4 commit 8f1ae3d

9 files changed

Lines changed: 223 additions & 0 deletions

File tree

dotnet/src/Client.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public sealed partial class CopilotClient : IDisposable, IAsyncDisposable
6363
private readonly CopilotClientOptions _options;
6464
private readonly ILogger _logger;
6565
private Task<Connection>? _connectionTask;
66+
private volatile bool _disconnected;
6667
private bool _disposed;
6768
private readonly int? _optionsPort;
6869
private readonly string? _optionsHost;
@@ -199,6 +200,7 @@ public Task StartAsync(CancellationToken cancellationToken = default)
199200
async Task<Connection> StartCoreAsync(CancellationToken ct)
200201
{
201202
_logger.LogDebug("Starting Copilot client");
203+
_disconnected = false;
202204

203205
Task<Connection> result;
204206

@@ -590,6 +592,7 @@ public ConnectionState State
590592
if (_connectionTask == null) return ConnectionState.Disconnected;
591593
if (_connectionTask.IsFaulted) return ConnectionState.Error;
592594
if (!_connectionTask.IsCompleted) return ConnectionState.Connecting;
595+
if (_disconnected) return ConnectionState.Disconnected;
593596
return ConnectionState.Connected;
594597
}
595598
}
@@ -1198,6 +1201,9 @@ private async Task<Connection> ConnectToServerAsync(Process? cliProcess, string?
11981201
rpc.AddLocalRpcMethod("hooks.invoke", handler.OnHooksInvoke);
11991202
rpc.StartListening();
12001203

1204+
// Transition state to Disconnected if the JSON-RPC connection drops
1205+
_ = rpc.Completion.ContinueWith(_ => _disconnected = true, TaskScheduler.Default);
1206+
12011207
_rpc = new ServerRpc(rpc);
12021208

12031209
return new Connection(rpc, cliProcess, tcpClient, networkStream, stderrBuffer);

dotnet/test/ClientTests.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,4 +374,42 @@ public async Task ListModels_WithCustomHandler_WorksWithoutStart()
374374
Assert.Single(models);
375375
Assert.Equal("no-start-model", models[0].Id);
376376
}
377+
378+
[Fact]
379+
public async Task State_Should_Transition_To_Disconnected_When_Process_Is_Killed()
380+
{
381+
using var client = new CopilotClient(new CopilotClientOptions { UseStdio = true });
382+
383+
try
384+
{
385+
await client.StartAsync();
386+
Assert.Equal(ConnectionState.Connected, client.State);
387+
388+
// Use reflection to reach the child process inside the private Connection object
389+
var taskField = typeof(CopilotClient)
390+
.GetField("_connectionTask", System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance)!;
391+
var task = (Task)taskField.GetValue(client)!;
392+
await task; // ensure it's completed
393+
// Task<Connection>.Result via reflection
394+
var resultProp = task.GetType().GetProperty("Result")!;
395+
var connection = resultProp.GetValue(task)!;
396+
var processProp = connection.GetType().GetProperty("CliProcess")!;
397+
var process = (System.Diagnostics.Process)processProp.GetValue(connection)!;
398+
399+
process.Kill();
400+
401+
// Wait for ContinueWith callback to set _disconnected
402+
for (var i = 0; i < 50; i++)
403+
{
404+
if (client.State == ConnectionState.Disconnected) break;
405+
await Task.Delay(100);
406+
}
407+
408+
Assert.Equal(ConnectionState.Disconnected, client.State);
409+
}
410+
finally
411+
{
412+
try { await client.ForceStopAsync(); } catch { /* process already dead */ }
413+
}
414+
}
377415
}

go/client.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,6 +1227,11 @@ func (c *Client) startCLIServer(ctx context.Context) error {
12271227
// Create JSON-RPC client immediately
12281228
c.client = jsonrpc2.NewClient(stdin, stdout)
12291229
c.client.SetProcessDone(c.processDone, c.processErrorPtr)
1230+
c.client.SetOnClose(func() {
1231+
c.startStopMux.Lock()
1232+
defer c.startStopMux.Unlock()
1233+
c.state = StateDisconnected
1234+
})
12301235
c.RPC = rpc.NewServerRpc(c.client)
12311236
c.setupNotificationHandler()
12321237
c.client.Start()
@@ -1342,6 +1347,11 @@ func (c *Client) connectViaTcp(ctx context.Context) error {
13421347
if c.processDone != nil {
13431348
c.client.SetProcessDone(c.processDone, c.processErrorPtr)
13441349
}
1350+
c.client.SetOnClose(func() {
1351+
c.startStopMux.Lock()
1352+
defer c.startStopMux.Unlock()
1353+
c.state = StateDisconnected
1354+
})
13451355
c.RPC = rpc.NewServerRpc(c.client)
13461356
c.setupNotificationHandler()
13471357
c.client.Start()

go/internal/jsonrpc2/jsonrpc2.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ type Client struct {
6161
processDone chan struct{} // closed when the underlying process exits
6262
processError error // set before processDone is closed
6363
processErrorMu sync.RWMutex // protects processError
64+
onClose func() // called when the read loop exits unexpectedly
6465
}
6566

6667
// NewClient creates a new JSON-RPC client
@@ -293,9 +294,22 @@ func (c *Client) sendMessage(message any) error {
293294
return nil
294295
}
295296

297+
// SetOnClose sets a callback invoked when the read loop exits unexpectedly
298+
// (e.g. the underlying connection or process was lost).
299+
func (c *Client) SetOnClose(fn func()) {
300+
c.onClose = fn
301+
}
302+
296303
// readLoop reads messages from stdout in a background goroutine
297304
func (c *Client) readLoop() {
298305
defer c.wg.Done()
306+
defer func() {
307+
// If still running, the read loop exited unexpectedly (process died or
308+
// connection dropped). Notify the caller so it can update its state.
309+
if c.onClose != nil && c.running.Load() {
310+
c.onClose()
311+
}
312+
}()
299313

300314
reader := bufio.NewReader(c.stdout)
301315

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package jsonrpc2
2+
3+
import (
4+
"io"
5+
"sync"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestOnCloseCalledOnUnexpectedExit(t *testing.T) {
11+
stdinR, stdinW := io.Pipe()
12+
stdoutR, stdoutW := io.Pipe()
13+
defer stdinR.Close()
14+
15+
client := NewClient(stdinW, stdoutR)
16+
17+
var called bool
18+
var mu sync.Mutex
19+
client.SetOnClose(func() {
20+
mu.Lock()
21+
called = true
22+
mu.Unlock()
23+
})
24+
25+
client.Start()
26+
27+
// Simulate unexpected process death by closing the stdout writer
28+
stdoutW.Close()
29+
30+
// Wait for readLoop to detect the close and invoke the callback
31+
time.Sleep(200 * time.Millisecond)
32+
33+
mu.Lock()
34+
defer mu.Unlock()
35+
if !called {
36+
t.Error("expected onClose to be called when read loop exits unexpectedly")
37+
}
38+
}
39+
40+
func TestOnCloseNotCalledOnIntentionalStop(t *testing.T) {
41+
stdinR, stdinW := io.Pipe()
42+
stdoutR, stdoutW := io.Pipe()
43+
defer stdinR.Close()
44+
defer stdoutW.Close()
45+
46+
client := NewClient(stdinW, stdoutR)
47+
48+
var called bool
49+
var mu sync.Mutex
50+
client.SetOnClose(func() {
51+
mu.Lock()
52+
called = true
53+
mu.Unlock()
54+
})
55+
56+
client.Start()
57+
58+
// Intentional stop — should set running=false before closing stdout,
59+
// so the readLoop should NOT invoke onClose.
60+
client.Stop()
61+
62+
time.Sleep(200 * time.Millisecond)
63+
64+
mu.Lock()
65+
defer mu.Unlock()
66+
if called {
67+
t.Error("onClose should not be called on intentional Stop()")
68+
}
69+
}

nodejs/test/client.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,4 +484,23 @@ describe("CopilotClient", () => {
484484
expect(models).toEqual(customModels);
485485
});
486486
});
487+
488+
describe("unexpected disconnection", () => {
489+
it("transitions to disconnected when child process is killed", async () => {
490+
const client = new CopilotClient();
491+
await client.start();
492+
onTestFinished(() => client.forceStop());
493+
494+
expect(client.getState()).toBe("connected");
495+
496+
// Kill the child process to simulate unexpected termination
497+
const proc = (client as any).cliProcess as import("node:child_process").ChildProcess;
498+
proc.kill();
499+
500+
// Wait for the connection.onClose handler to fire
501+
await vi.waitFor(() => {
502+
expect(client.getState()).toBe("disconnected");
503+
});
504+
});
505+
});
487506
});

python/copilot/client.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1405,6 +1405,7 @@ async def _connect_via_stdio(self) -> None:
14051405

14061406
# Create JSON-RPC client with the process
14071407
self._client = JsonRpcClient(self._process)
1408+
self._client.on_close = lambda: setattr(self, '_state', 'disconnected')
14081409
self._rpc = ServerRpc(self._client)
14091410

14101411
# Set up notification handler for session events
@@ -1492,6 +1493,7 @@ def wait(self, timeout=None):
14921493

14931494
self._process = SocketWrapper(sock_file, sock) # type: ignore
14941495
self._client = JsonRpcClient(self._process)
1496+
self._client.on_close = lambda: setattr(self, '_state', 'disconnected')
14951497
self._rpc = ServerRpc(self._client)
14961498

14971499
# Set up notification handler for session events

python/copilot/jsonrpc.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def __init__(self, process):
6060
self._process_exit_error: str | None = None
6161
self._stderr_output: list[str] = []
6262
self._stderr_lock = threading.Lock()
63+
self.on_close: Callable[[], None] | None = None
6364

6465
def start(self, loop: asyncio.AbstractEventLoop | None = None):
6566
"""Start listening for messages in background thread"""
@@ -211,6 +212,8 @@ def _read_loop(self):
211212
# Process exited or read failed - fail all pending requests
212213
if self._running:
213214
self._fail_pending_requests()
215+
if self.on_close is not None:
216+
self.on_close()
214217

215218
def _fail_pending_requests(self):
216219
"""Fail all pending requests when process exits"""

python/test_jsonrpc.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
import io
99
import json
10+
import os
11+
import threading
12+
import time
1013

1114
import pytest
1215

@@ -265,3 +268,62 @@ def test_read_message_multiple_messages_in_sequence(self):
265268

266269
result2 = client._read_message()
267270
assert result2 == message2
271+
272+
273+
class ClosingStream:
274+
"""Stream that immediately returns empty bytes (simulates process death / EOF)."""
275+
276+
def readline(self):
277+
return b""
278+
279+
def read(self, n: int) -> bytes:
280+
return b""
281+
282+
283+
class TestOnClose:
284+
"""Tests for the on_close callback when the read loop exits unexpectedly."""
285+
286+
def test_on_close_called_on_unexpected_exit(self):
287+
"""on_close fires when the stream closes while client is still running."""
288+
import asyncio
289+
290+
process = MockProcess()
291+
process.stdout = ClosingStream()
292+
293+
client = JsonRpcClient(process)
294+
295+
called = threading.Event()
296+
client.on_close = lambda: called.set()
297+
298+
loop = asyncio.new_event_loop()
299+
try:
300+
client.start(loop=loop)
301+
assert called.wait(timeout=2), "on_close was not called within 2 seconds"
302+
finally:
303+
loop.close()
304+
305+
def test_on_close_not_called_on_intentional_stop(self):
306+
"""on_close should not fire when stop() is called intentionally."""
307+
import asyncio
308+
309+
r_fd, w_fd = os.pipe()
310+
process = MockProcess()
311+
process.stdout = os.fdopen(r_fd, "rb")
312+
313+
client = JsonRpcClient(process)
314+
315+
called = threading.Event()
316+
client.on_close = lambda: called.set()
317+
318+
loop = asyncio.new_event_loop()
319+
try:
320+
client.start(loop=loop)
321+
322+
# Intentional stop sets _running = False before the thread sees EOF
323+
loop.run_until_complete(client.stop())
324+
os.close(w_fd)
325+
326+
time.sleep(0.5)
327+
assert not called.is_set(), "on_close should not be called on intentional stop"
328+
finally:
329+
loop.close()

0 commit comments

Comments
 (0)