Skip to content

Commit 9940278

Browse files
stephentoubCopilot
andcommitted
Serialize event dispatch in .NET and Go SDKs
Event handlers, tool handlers, and permission handlers are now invoked serially in event-arrival order. This prevents concurrent handler execution, preserves event ordering guarantees, and ensures broadcast work (tool calls, permission requests) completes before user handlers see the event. .NET changes: - Add Channel<SessionEvent> with single-reader consumer loop that serializes all user code: broadcast handlers, tool handlers, permission handlers, and event handlers. - Change HandleBroadcastEventAsync from async void to async Task so broadcast work is awaited inline by the consumer loop. - Thread ILogger through from CopilotClient to CopilotSession; use LoggerMessage source generator for AOT-friendly diagnostics. - Log dropped events (during dispose) and handler exceptions instead of silently swallowing them. - Complete the event channel in DisposeAsync; document caller responsibility to ensure the session is idle before disposing. Go changes: - Remove `go` keyword from executeToolAndRespond and executePermissionAndRespond calls in handleBroadcastEvent, so broadcast work runs inline on the JSON-RPC read loop goroutine. - Update Disconnect docs for caller-ensures-idle contract. Tests: - .NET: add serial dispatch assertion to existing E2E test; add two new E2E tests (handler exception resilience, DisposeAsync from handler). - Go: add two unit tests (serial dispatch, handler panic resilience). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 062b61c commit 9940278

5 files changed

Lines changed: 224 additions & 18 deletions

File tree

dotnet/src/Client.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ public async Task<CopilotSession> CreateSessionAsync(SessionConfig config, Cance
407407

408408
// Create and register the session before issuing the RPC so that
409409
// events emitted by the CLI (e.g. session.start) are not dropped.
410-
var session = new CopilotSession(sessionId, connection.Rpc);
410+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
411411
session.RegisterTools(config.Tools ?? []);
412412
session.RegisterPermissionHandler(config.OnPermissionRequest);
413413
if (config.OnUserInputRequest != null)
@@ -511,7 +511,7 @@ public async Task<CopilotSession> ResumeSessionAsync(string sessionId, ResumeSes
511511

512512
// Create and register the session before issuing the RPC so that
513513
// events emitted by the CLI (e.g. session.start) are not dropped.
514-
var session = new CopilotSession(sessionId, connection.Rpc);
514+
var session = new CopilotSession(sessionId, connection.Rpc, _logger);
515515
session.RegisterTools(config.Tools ?? []);
516516
session.RegisterPermissionHandler(config.OnPermissionRequest);
517517
if (config.OnUserInputRequest != null)

dotnet/src/Session.cs

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
*--------------------------------------------------------------------------------------------*/
44

55
using Microsoft.Extensions.AI;
6+
using Microsoft.Extensions.Logging;
67
using StreamJsonRpc;
78
using System.Text.Json;
89
using System.Text.Json.Nodes;
910
using System.Text.Json.Serialization;
11+
using System.Threading.Channels;
1012
using GitHub.Copilot.SDK.Rpc;
1113

1214
namespace GitHub.Copilot.SDK;
@@ -55,19 +57,28 @@ public sealed partial class CopilotSession : IAsyncDisposable
5557
/// <summary>
5658
/// Multicast delegate used as a thread-safe, insertion-ordered handler list.
5759
/// The compiler-generated add/remove accessors use a lock-free CAS loop over the backing field.
58-
/// Dispatch reads the field once (inherent snapshot, no allocation).
60+
/// Invocation is serialized by the single-reader event channel consumer loop.
5961
/// Expected handler count is small (typically 1–3), so Delegate.Combine/Remove cost is negligible.
6062
/// </summary>
6163
private event SessionEventHandler? EventHandlers;
6264
private readonly Dictionary<string, AIFunction> _toolHandlers = [];
6365
private readonly JsonRpc _rpc;
66+
private readonly ILogger _logger;
6467
private volatile PermissionRequestHandler? _permissionHandler;
6568
private volatile UserInputHandler? _userInputHandler;
6669
private SessionHooks? _hooks;
6770
private readonly SemaphoreSlim _hooksLock = new(1, 1);
6871
private SessionRpc? _sessionRpc;
6972
private int _isDisposed;
7073

74+
/// <summary>
75+
/// Unbounded channel that serializes event dispatch. <see cref="DispatchEvent"/>
76+
/// enqueues; a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues
77+
/// and invokes handlers one at a time, preserving arrival order.
78+
/// </summary>
79+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
80+
new() { SingleReader = true });
81+
7182
/// <summary>
7283
/// Gets the unique identifier for this session.
7384
/// </summary>
@@ -93,15 +104,18 @@ public sealed partial class CopilotSession : IAsyncDisposable
93104
/// </summary>
94105
/// <param name="sessionId">The unique identifier for this session.</param>
95106
/// <param name="rpc">The JSON-RPC connection to the Copilot CLI.</param>
107+
/// <param name="logger">Logger for diagnostics.</param>
96108
/// <param name="workspacePath">The workspace path if infinite sessions are enabled.</param>
97109
/// <remarks>
98110
/// This constructor is internal. Use <see cref="CopilotClient.CreateSessionAsync"/> to create sessions.
99111
/// </remarks>
100-
internal CopilotSession(string sessionId, JsonRpc rpc, string? workspacePath = null)
112+
internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? workspacePath = null)
101113
{
102114
SessionId = sessionId;
103115
_rpc = rpc;
116+
_logger = logger;
104117
WorkspacePath = workspacePath;
118+
_ = ProcessEventsAsync();
105119
}
106120

107121
private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
@@ -236,7 +250,9 @@ void Handler(SessionEvent evt)
236250
/// Multiple handlers can be registered and will all receive events.
237251
/// </para>
238252
/// <para>
239-
/// Handler exceptions are allowed to propagate so they are not lost.
253+
/// Handlers are invoked serially in event-arrival order on a background thread.
254+
/// A handler will never be called concurrently with itself or with other handlers
255+
/// on the same session.
240256
/// </para>
241257
/// </remarks>
242258
/// <example>
@@ -264,22 +280,56 @@ public IDisposable On(SessionEventHandler handler)
264280
}
265281

266282
/// <summary>
267-
/// Dispatches an event to all registered handlers.
283+
/// Enqueues an event for serial dispatch to all registered handlers.
268284
/// </summary>
269285
/// <param name="sessionEvent">The session event to dispatch.</param>
270286
/// <remarks>
271-
/// This method is internal. Handler exceptions are allowed to propagate so they are not lost.
287+
/// This method is non-blocking. The event is placed into an in-memory channel and
288+
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
289+
/// which guarantees handlers see events one at a time, in order.
272290
/// Broadcast request events (external_tool.requested, permission.requested) are handled
273291
/// internally before being forwarded to user handlers.
274292
/// </remarks>
275293
internal void DispatchEvent(SessionEvent sessionEvent)
276294
{
277-
// Handle broadcast request events (protocol v3) before dispatching to user handlers.
278-
// Fire-and-forget: the response is sent asynchronously via RPC.
279-
HandleBroadcastEventAsync(sessionEvent);
295+
if (!_eventChannel.Writer.TryWrite(sessionEvent))
296+
{
297+
LogEventDropped(sessionEvent.Type);
298+
}
299+
}
300+
301+
/// <summary>
302+
/// Single-reader consumer loop that processes events from the channel.
303+
/// Ensures all user code — event handlers, tool handlers, permission handlers —
304+
/// is invoked serially and in FIFO order. Broadcast work (tool calls, permission
305+
/// requests) is awaited inline before dispatching to user handlers.
306+
/// </summary>
307+
private async Task ProcessEventsAsync()
308+
{
309+
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
310+
{
311+
// Await broadcast work inline so tool/permission handlers are serialized
312+
// with everything else. If two tool requests arrive back-to-back, the second
313+
// won't start until the first completes.
314+
try
315+
{
316+
await HandleBroadcastEventAsync(sessionEvent);
317+
}
318+
catch (Exception ex)
319+
{
320+
LogBroadcastHandlerError(ex);
321+
}
280322

281-
// Reading the field once gives us a snapshot; delegates are immutable.
282-
EventHandlers?.Invoke(sessionEvent);
323+
// Invoke user handlers serially. Catch exceptions to keep the loop alive.
324+
try
325+
{
326+
EventHandlers?.Invoke(sessionEvent);
327+
}
328+
catch (Exception ex)
329+
{
330+
LogEventHandlerError(ex);
331+
}
332+
}
283333
}
284334

285335
/// <summary>
@@ -355,7 +405,7 @@ internal async Task<PermissionRequestResult> HandlePermissionRequestAsync(JsonEl
355405
/// Implements the protocol v3 broadcast model where tool calls and permission requests
356406
/// are broadcast as session events to all clients.
357407
/// </summary>
358-
private async void HandleBroadcastEventAsync(SessionEvent sessionEvent)
408+
private async Task HandleBroadcastEventAsync(SessionEvent sessionEvent)
359409
{
360410
switch (sessionEvent)
361411
{
@@ -703,6 +753,11 @@ public async Task LogAsync(string message, SessionLogRequestLevel? level = null,
703753
/// <returns>A task representing the dispose operation.</returns>
704754
/// <remarks>
705755
/// <para>
756+
/// The caller should ensure the session is idle (e.g., <see cref="SendAndWaitAsync"/>
757+
/// has returned) before disposing. If the session is not idle, in-flight event handlers
758+
/// or tool handlers may observe failures.
759+
/// </para>
760+
/// <para>
706761
/// Session state on disk (conversation history, planning state, artifacts) is
707762
/// preserved, so the conversation can be resumed later by calling
708763
/// <see cref="CopilotClient.ResumeSessionAsync"/> with the session ID. To
@@ -731,6 +786,12 @@ public async ValueTask DisposeAsync()
731786
return;
732787
}
733788

789+
// Stop accepting new events. The consumer loop will exit naturally
790+
// after completing the current event (if any). The caller is expected
791+
// to have waited for the session to become idle before disposing;
792+
// if the session is not idle, in-flight handlers may see failures.
793+
_eventChannel.Writer.TryComplete();
794+
734795
try
735796
{
736797
await InvokeRpcAsync<object>(
@@ -751,6 +812,15 @@ await InvokeRpcAsync<object>(
751812
_permissionHandler = null;
752813
}
753814

815+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in broadcast event handler")]
816+
private partial void LogBroadcastHandlerError(Exception exception);
817+
818+
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
819+
private partial void LogEventHandlerError(Exception exception);
820+
821+
[LoggerMessage(Level = LogLevel.Warning, Message = "Event {EventType} dropped; session is shutting down")]
822+
private partial void LogEventDropped(string eventType);
823+
754824
internal record SendMessageRequest
755825
{
756826
public string SessionId { get; init; } = string.Empty;

dotnet/test/SessionTests.cs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,21 @@ public async Task Should_Receive_Session_Events()
258258

259259
var receivedEvents = new List<SessionEvent>();
260260
var idleReceived = new TaskCompletionSource<bool>();
261+
var concurrentCount = 0;
262+
var maxConcurrent = 0;
261263

262264
session.On(evt =>
263265
{
266+
// Track concurrent handler invocations to verify serial dispatch.
267+
var current = Interlocked.Increment(ref concurrentCount);
268+
var seenMax = Volatile.Read(ref maxConcurrent);
269+
if (current > seenMax)
270+
Interlocked.CompareExchange(ref maxConcurrent, current, seenMax);
271+
272+
Thread.Sleep(10);
273+
274+
Interlocked.Decrement(ref concurrentCount);
275+
264276
receivedEvents.Add(evt);
265277
if (evt is SessionIdleEvent)
266278
{
@@ -281,6 +293,9 @@ public async Task Should_Receive_Session_Events()
281293
Assert.Contains(receivedEvents, evt => evt is AssistantMessageEvent);
282294
Assert.Contains(receivedEvents, evt => evt is SessionIdleEvent);
283295

296+
// Events must be dispatched serially — never more than one handler invocation at a time.
297+
Assert.Equal(1, maxConcurrent);
298+
284299
// Verify the assistant response contains the expected answer
285300
var assistantMessage = await TestHelper.GetFinalAssistantMessageAsync(session);
286301
Assert.NotNull(assistantMessage);
@@ -452,6 +467,60 @@ await WaitForAsync(() =>
452467
Assert.Equal("notification", ephemeralEvent.Data.InfoType);
453468
}
454469

470+
[Fact]
471+
public async Task Handler_Exception_Does_Not_Halt_Event_Delivery()
472+
{
473+
// Reuse an existing snapshot — this test only customizes the C# handler.
474+
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
475+
476+
var session = await CreateSessionAsync();
477+
var eventCount = 0;
478+
var gotIdle = new TaskCompletionSource();
479+
480+
session.On(evt =>
481+
{
482+
eventCount++;
483+
484+
// Throw on the first event to verify the loop keeps going.
485+
if (eventCount == 1)
486+
throw new InvalidOperationException("boom");
487+
488+
if (evt is SessionIdleEvent)
489+
gotIdle.TrySetResult();
490+
});
491+
492+
await session.SendAsync(new MessageOptions { Prompt = "What is 1+1?" });
493+
494+
await gotIdle.Task.WaitAsync(TimeSpan.FromSeconds(30));
495+
496+
// Handler saw more than just the first (throwing) event.
497+
Assert.True(eventCount > 1);
498+
}
499+
500+
[Fact]
501+
public async Task DisposeAsync_From_Handler_Does_Not_Deadlock()
502+
{
503+
// Reuse an existing snapshot — this test only customizes the C# handler.
504+
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
505+
506+
var session = await CreateSessionAsync();
507+
var disposed = new TaskCompletionSource();
508+
509+
session.On(evt =>
510+
{
511+
if (evt is UserMessageEvent)
512+
{
513+
// Call DisposeAsync from within a handler — must not deadlock.
514+
session.DisposeAsync().AsTask().ContinueWith(_ => disposed.TrySetResult());
515+
}
516+
});
517+
518+
await session.SendAsync(new MessageOptions { Prompt = "What is 1+1?" });
519+
520+
// If this times out, we deadlocked.
521+
await disposed.Task.WaitAsync(TimeSpan.FromSeconds(10));
522+
}
523+
455524
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
456525
{
457526
var deadline = DateTime.UtcNow + timeout;

go/session.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -436,10 +436,13 @@ func (s *Session) handleHooksInvoke(hookType string, rawInput json.RawMessage) (
436436
}
437437

438438
// dispatchEvent dispatches an event to all registered handlers.
439-
// This is an internal method; handlers are called synchronously and any panics
440-
// are recovered to prevent crashing the event dispatcher.
439+
//
440+
// Events are processed serially on the JSON-RPC read loop goroutine:
441+
// broadcast work (tool calls, permission requests) is executed inline before
442+
// user handlers, and each handler completes before the next is invoked.
443+
// Panics in user handlers are recovered to prevent crashing the event dispatcher.
441444
func (s *Session) dispatchEvent(event SessionEvent) {
442-
// Handle broadcast request events internally (fire-and-forget)
445+
// Handle broadcast request events inline (serialized with user handlers)
443446
s.handleBroadcastEvent(event)
444447

445448
s.handlerMutex.RLock()
@@ -465,6 +468,9 @@ func (s *Session) dispatchEvent(event SessionEvent) {
465468
// handleBroadcastEvent handles broadcast request events by executing local handlers
466469
// and responding via RPC. This implements the protocol v3 broadcast model where tool
467470
// calls and permission requests are broadcast as session events to all clients.
471+
//
472+
// Handlers are executed inline (not in a separate goroutine) so that all user code
473+
// — tool handlers, permission handlers, and event handlers — runs serially.
468474
func (s *Session) handleBroadcastEvent(event SessionEvent) {
469475
switch event.Type {
470476
case ExternalToolRequested:
@@ -481,7 +487,7 @@ func (s *Session) handleBroadcastEvent(event SessionEvent) {
481487
if event.Data.ToolCallID != nil {
482488
toolCallID = *event.Data.ToolCallID
483489
}
484-
go s.executeToolAndRespond(*requestID, *toolName, toolCallID, event.Data.Arguments, handler)
490+
s.executeToolAndRespond(*requestID, *toolName, toolCallID, event.Data.Arguments, handler)
485491

486492
case PermissionRequested:
487493
requestID := event.Data.RequestID
@@ -492,7 +498,7 @@ func (s *Session) handleBroadcastEvent(event SessionEvent) {
492498
if handler == nil {
493499
return
494500
}
495-
go s.executePermissionAndRespond(*requestID, *event.Data.PermissionRequest, handler)
501+
s.executePermissionAndRespond(*requestID, *event.Data.PermissionRequest, handler)
496502
}
497503
}
498504

@@ -610,6 +616,10 @@ func (s *Session) GetMessages(ctx context.Context) ([]SessionEvent, error) {
610616
// Disconnect closes this session and releases all in-memory resources (event
611617
// handlers, tool handlers, permission handlers).
612618
//
619+
// The caller should ensure the session is idle (e.g., [Session.SendAndWait] has
620+
// returned) before disconnecting. If the session is not idle, in-flight event
621+
// handlers or tool handlers may observe failures.
622+
//
613623
// Session state on disk (conversation history, planning state, artifacts) is
614624
// preserved, so the conversation can be resumed later by calling
615625
// [Client.ResumeSession] with the session ID. To permanently remove all

0 commit comments

Comments
 (0)