Skip to content

Commit 4d1d57c

Browse files
stephentoubCopilot
andcommitted
Serialize event dispatch in .NET and Go SDKs
In .NET, StreamJsonRpc dispatches notifications concurrently on the thread pool. The old code invoked user event handlers inline from DispatchEvent, which meant handlers could run concurrently and out of order. In Go, the JSON-RPC read loop is single-threaded, so user handlers were already serialized. However, broadcast handlers (tool calls, permission requests) ran inline on the read loop, which deadlocked when a handler issued an RPC request back through the same connection. This PR decouples user handler dispatch from the transport by routing events through a channel (Go) / Channel<T> (.NET). A single consumer goroutine/task drains the channel and invokes user handlers serially, in FIFO order. This matches the guarantees provided by the Node.js and Python SDKs (which get natural serialization from their single-threaded event loops) while fitting Go's and .NET's multi-threaded runtimes. Broadcast handlers (tool calls, permission requests) are fired as fire-and-forget directly from the dispatch entry point, outside the channel, so a stalled handler cannot block event delivery. This matches the existing Node.js (void this._executeToolAndRespond()) and Python (asyncio.ensure_future()) behavior. Go changes: - Add eventCh channel to Session; start processEvents consumer goroutine - dispatchEvent enqueues to channel and fires broadcast handler goroutine - Close channel on Disconnect to stop the consumer - Update unit tests and E2E tests for async delivery .NET changes: - Add unbounded Channel<SessionEvent> to CopilotSession; start ProcessEventsAsync consumer task in constructor - DispatchEvent enqueues to channel and fires broadcast handler task - Complete channel on DisposeAsync - Per-handler error catching via ImmutableArray iteration - Cache handler array snapshot to avoid repeated allocation - Inline broadcast error handling into HandleBroadcastEventAsync - Update Should_Receive_Session_Events test to await async delivery - Add Handler_Exception_Does_Not_Halt_Event_Delivery test - Add DisposeAsync_From_Handler_Does_Not_Deadlock test Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 062b61c commit 4d1d57c

8 files changed

Lines changed: 386 additions & 135 deletions

File tree

dotnet/src/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance
407407

408408
// Create and register the session before issuing the RPC so that
409409
// events emitted by the CLI (e.g. session.start) are not dropped.
410-
var session = new CopilotSession(sessionId, connection.Rpc);
410+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
411411
session.RegisterTools(config.Tools ?? []);
412412
session.RegisterPermissionHandler(config.OnPermissionRequest);
413413
if (config.OnUserInputRequest != null)
@@ -511,7 +511,7 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes
511511

512512
// Create and register the session before issuing the RPC so that
513513
// events emitted by the CLI (e.g. session.start) are not dropped.
514-
var session = new CopilotSession(sessionId, connection.Rpc);
514+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
515515
session.RegisterTools(config.Tools ?? []);
516516
session.RegisterPermissionHandler(config.OnPermissionRequest);
517517
if (config.OnUserInputRequest != null)

dotnet/src/Session.cs

Lines changed: 113 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
* Copyright (c) Microsoft Corporation. All rights reserved.
33
*--------------------------------------------------------------------------------------------*/
44

5+
using GitHub.Copilot.SDK.Rpc;
56
using Microsoft.Extensions.AI;
7+
using Microsoft.Extensions.Logging;
68
using StreamJsonRpc;
9+
using System.Collections.Immutable;
710
using System.Text.Json;
811
using System.Text.Json.Nodes;
912
using System.Text.Json.Serialization;
10-
using GitHub.Copilot.SDK.Rpc;
13+
using System.Threading.Channels;
1114

1215
namespace GitHub.Copilot.SDK;
1316

@@ -52,22 +55,27 @@ namespace GitHub.Copilot.SDK;
5255
/// </example>
5356
public sealed partial class CopilotSession : IAsyncDisposable
5457
{
55-
/// <summary>
56-
/// Multicast delegate used as a thread-safe, insertion-ordered handler list.
57-
/// The compiler-generated add/remove accessors use a lock-free CAS loop over the backing field.
58-
/// Dispatch reads the field once (inherent snapshot, no allocation).
59-
/// Expected handler count is small (typically 1–3), so Delegate.Combine/Remove cost is negligible.
60-
/// </summary>
61-
private event SessionEventHandler? EventHandlers;
6258
private readonly Dictionary<string, AIFunction> _toolHandlers = [];
6359
private readonly JsonRpc _rpc;
60+
private readonly ILogger _logger;
61+
6462
private volatile PermissionRequestHandler? _permissionHandler;
6563
private volatile UserInputHandler? _userInputHandler;
64+
private ImmutableArray<SessionEventHandler> _eventHandlers = ImmutableArray<SessionEventHandler>.Empty;
65+
6666
private SessionHooks? _hooks;
6767
private readonly SemaphoreSlim _hooksLock = new(1, 1);
6868
private SessionRpc? _sessionRpc;
6969
private int _isDisposed;
7070

71+
/// <summary>
72+
/// Channel that serializes event dispatch. <see cref="DispatchEvent"/> enqueues;
73+
/// a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues and
74+
/// invokes handlers one at a time, preserving arrival order.
75+
/// </summary>
76+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
77+
new() { SingleReader = true });
78+
7179
/// <summary>
7280
/// Gets the unique identifier for this session.
7381
/// </summary>
@@ -93,15 +101,20 @@ public sealed partial class CopilotSession : IAsyncDisposable
93101
/// </summary>
94102
/// <param name="sessionId">The unique identifier for this session.</param>
95103
/// <param name="rpc">The JSON-RPC connection to the Copilot CLI.</param>
104+
/// <param name="logger">Logger for diagnostics.</param>
96105
/// <param name="workspacePath">The workspace path if infinite sessions are enabled.</param>
97106
/// <remarks>
98107
/// This constructor is internal. Use <see cref="CopilotClient.CreateSessionAsync"/> to create sessions.
99108
/// </remarks>
100-
internal CopilotSession(string sessionId, JsonRpc rpc, string? workspacePath = null)
109+
internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? workspacePath = null)
101110
{
102111
SessionId = sessionId;
103112
_rpc = rpc;
113+
_logger = logger;
104114
WorkspacePath = workspacePath;
115+
116+
// Start the asynchronous processing loop.
117+
_ = ProcessEventsAsync();
105118
}
106119

107120
private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
@@ -186,7 +199,7 @@ public async Task<string> SendAsync(MessageOptions options, CancellationToken ca
186199
CancellationToken cancellationToken = default)
187200
{
188201
var effectiveTimeout = timeout ?? TimeSpan.FromSeconds(60);
189-
var tcs = new TaskCompletionSource<AssistantMessageEvent?>();
202+
var tcs = new TaskCompletionSource<AssistantMessageEvent?>(TaskCreationOptions.RunContinuationsAsynchronously);
190203
AssistantMessageEvent? lastAssistantMessage = null;
191204

192205
void Handler(SessionEvent evt)
@@ -236,7 +249,9 @@ void Handler(SessionEvent evt)
236249
/// Multiple handlers can be registered and will all receive events.
237250
/// </para>
238251
/// <para>
239-
/// Handler exceptions are allowed to propagate so they are not lost.
252+
/// Handlers are invoked serially in event-arrival order on a background thread.
253+
/// A handler will never be called concurrently with itself or with other handlers
254+
/// on the same session.
240255
/// </para>
241256
/// </remarks>
242257
/// <example>
@@ -259,27 +274,53 @@ void Handler(SessionEvent evt)
259274
/// </example>
260275
public IDisposable On(SessionEventHandler handler)
261276
{
262-
EventHandlers += handler;
263-
return new ActionDisposable(() => EventHandlers -= handler);
277+
ImmutableInterlocked.Update(ref _eventHandlers, array => array.Add(handler));
278+
return new ActionDisposable(() => ImmutableInterlocked.Update(ref _eventHandlers, array => array.Remove(handler)));
264279
}
265280

266281
/// <summary>
267-
/// Dispatches an event to all registered handlers.
282+
/// Enqueues an event for serial dispatch to all registered handlers.
268283
/// </summary>
269284
/// <param name="sessionEvent">The session event to dispatch.</param>
270285
/// <remarks>
271-
/// This method is internal. Handler exceptions are allowed to propagate so they are not lost.
272-
/// Broadcast request events (external_tool.requested, permission.requested) are handled
273-
/// internally before being forwarded to user handlers.
286+
/// This method is non-blocking. Broadcast request events (external_tool.requested,
287+
/// permission.requested) are fired concurrently so that a stalled handler does not
288+
/// block event delivery. The event is then placed into an in-memory channel and
289+
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
290+
/// which guarantees user handlers see events one at a time, in order.
274291
/// </remarks>
275292
internal void DispatchEvent(SessionEvent sessionEvent)
276293
{
277-
// Handle broadcast request events (protocol v3) before dispatching to user handlers.
278-
// Fire-and-forget: the response is sent asynchronously via RPC.
279-
HandleBroadcastEventAsync(sessionEvent);
294+
// Fire broadcast work concurrently (fire-and-forget with error logging).
295+
// This is done outside the channel so broadcast handlers don't block the
296+
// consumer loop — important when a secondary client's handler intentionally
297+
// never completes (multi-client permission scenario).
298+
_ = HandleBroadcastEventAsync(sessionEvent);
299+
300+
// Queue the event for serial processing by user handlers.
301+
_eventChannel.Writer.TryWrite(sessionEvent);
302+
}
280303

281-
// Reading the field once gives us a snapshot; delegates are immutable.
282-
EventHandlers?.Invoke(sessionEvent);
304+
/// <summary>
305+
/// Single-reader consumer loop that processes events from the channel.
306+
/// Ensures user event handlers are invoked serially and in FIFO order.
307+
/// </summary>
308+
private async Task ProcessEventsAsync()
309+
{
310+
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
311+
{
312+
foreach (var handler in _eventHandlers)
313+
{
314+
try
315+
{
316+
handler(sessionEvent);
317+
}
318+
catch (Exception ex)
319+
{
320+
LogEventHandlerError(ex);
321+
}
322+
}
323+
}
283324
}
284325

285326
/// <summary>
@@ -355,37 +396,44 @@ internal async Task<PermissionRequestResult> HandlePermissionRequestAsync(JsonEl
355396
/// Implements the protocol v3 broadcast model where tool calls and permission requests
356397
/// are broadcast as session events to all clients.
357398
/// </summary>
358-
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
399+
private async Task HandleBroadcastEventAsync(SessionEvent sessionEvent)
359400
{
360-
switch (sessionEvent)
401+
try
361402
{
362-
case ExternalToolRequestedEvent toolEvent:
363-
{
364-
var data = toolEvent.Data;
365-
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
366-
return;
367-
368-
var tool = GetTool(data.ToolName);
369-
if (tool is null)
370-
return; // This client doesn't handle this tool; another client will.
371-
372-
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
373-
break;
374-
}
375-
376-
case PermissionRequestedEvent permEvent:
377-
{
378-
var data = permEvent.Data;
379-
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
380-
return;
381-
382-
var handler = _permissionHandler;
383-
if (handler is null)
384-
return; // This client doesn't handle permissions; another client will.
385-
386-
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
387-
break;
388-
}
403+
switch (sessionEvent)
404+
{
405+
case ExternalToolRequestedEvent toolEvent:
406+
{
407+
var data = toolEvent.Data;
408+
if (string.IsNullOrEmpty(data.RequestId) || string.IsNullOrEmpty(data.ToolName))
409+
return;
410+
411+
var tool = GetTool(data.ToolName);
412+
if (tool is null)
413+
return; // This client doesn't handle this tool; another client will.
414+
415+
await ExecuteToolAndRespondAsync(data.RequestId, data.ToolName, data.ToolCallId, data.Arguments, tool);
416+
break;
417+
}
418+
419+
case PermissionRequestedEvent permEvent:
420+
{
421+
var data = permEvent.Data;
422+
if (string.IsNullOrEmpty(data.RequestId) || data.PermissionRequest is null)
423+
return;
424+
425+
var handler = _permissionHandler;
426+
if (handler is null)
427+
return; // This client doesn't handle permissions; another client will.
428+
429+
await ExecutePermissionAndRespondAsync(data.RequestId, data.PermissionRequest, handler);
430+
break;
431+
}
432+
}
433+
}
434+
catch (Exception ex) when (ex is not OperationCanceledException)
435+
{
436+
LogBroadcastHandlerError(ex);
389437
}
390438
}
391439

@@ -703,6 +751,11 @@ public async Task LogAsync(string message, SessionLogRequestLevel? level = null,
703751
/// <returns>A task representing the dispose operation.</returns>
704752
/// <remarks>
705753
/// <para>
754+
/// The caller should ensure the session is idle (e.g., <see cref="SendAndWaitAsync"/>
755+
/// has returned) before disposing. If the session is not idle, in-flight event handlers
756+
/// or tool handlers may observe failures.
757+
/// </para>
758+
/// <para>
706759
/// Session state on disk (conversation history, planning state, artifacts) is
707760
/// preserved, so the conversation can be resumed later by calling
708761
/// <see cref="CopilotClient.ResumeSessionAsync"/> with the session ID. To
@@ -731,6 +784,8 @@ public async ValueTask DisposeAsync()
731784
return;
732785
}
733786

787+
_eventChannel.Writer.TryComplete();
788+
734789
try
735790
{
736791
await InvokeRpcAsync<object>(
@@ -745,12 +800,18 @@ await InvokeRpcAsync<object>(
745800
// Connection is broken or closed
746801
}
747802

748-
EventHandlers = null;
803+
_eventHandlers = ImmutableInterlocked.InterlockedExchange(ref _eventHandlers, ImmutableArray<SessionEventHandler>.Empty);
749804
_toolHandlers.Clear();
750805

751806
_permissionHandler = null;
752807
}
753808

809+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in broadcast event handler")]
810+
private partial void LogBroadcastHandlerError(Exception exception);
811+
812+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
813+
private partial void LogEventHandlerError(Exception exception);
814+
754815
internal record SendMessageRequest
755816
{
756817
public string SessionId { get; init; } = string.Empty;

dotnet/test/Harness/TestHelper.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public static class TestHelper
1010
CopilotSession session,
1111
TimeSpan? timeout = null)
1212
{
13-
var tcs = new TaskCompletionSource<AssistantMessageEvent>();
13+
var tcs = new TaskCompletionSource<AssistantMessageEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
1414
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));
1515

1616
AssistantMessageEvent? finalAssistantMessage = null;
@@ -78,7 +78,7 @@ public static async Task<T> GetNextEventOfTypeAsync<T>(
7878
CopilotSession session,
7979
TimeSpan? timeout = null) where T : SessionEvent
8080
{
81-
var tcs = new TaskCompletionSource<T>();
81+
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
8282
using var cts = new CancellationTokenSource(timeout ?? TimeSpan.FromSeconds(60));
8383

8484
using var subscription = session.On(evt =>

dotnet/test/MultiClientTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ public async Task Both_Clients_See_Tool_Request_And_Completion_Events()
109109
});
110110

111111
// Set up event waiters BEFORE sending the prompt to avoid race conditions
112-
var client1Requested = new TaskCompletionSource<bool>();
113-
var client2Requested = new TaskCompletionSource<bool>();
114-
var client1Completed = new TaskCompletionSource<bool>();
115-
var client2Completed = new TaskCompletionSource<bool>();
112+
var client1Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
113+
var client2Requested = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
114+
var client1Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
115+
var client2Completed = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
116116

117117
using var sub1 = session1.On(evt =>
118118
{

0 commit comments

Comments
 (0)