Skip to content

Commit 89d2236

Browse files
stephentoubCopilot
andcommitted
Address review feedback: fix Go deadlock, bounded channel, per-handler errors
Go SDK: - Use channel-based event queue to avoid deadlocking the JSON-RPC readLoop. Broadcast handlers (tool calls, permission requests) now run on a dedicated consumer goroutine instead of inline on the readLoop, preventing deadlock when they issue RPC requests. - Close event channel on Disconnect for clean shutdown. - Fix unit tests to use channel-based dispatch with goroutines, properly validating serialization guarantees. .NET SDK: - Switch from unbounded to bounded channel (1024, Wait mode) to prevent unbounded memory growth under high event throughput. - Use GetInvocationList() for per-handler error catching so one handler exception does not prevent remaining subscribers from seeing the event. - Add exception filters (when ex is not OperationCanceledException) to avoid masking cooperative cancellation. - Add dedicated test snapshot files for new tests so they don't interfere with existing snapshot-based tests. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 9940278 commit 89d2236

6 files changed

Lines changed: 180 additions & 69 deletions

File tree

dotnet/src/Session.cs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,18 @@ public sealed partial class CopilotSession : IAsyncDisposable
7272
private int _isDisposed;
7373

7474
/// <summary>
75-
/// Unbounded channel that serializes event dispatch. <see cref="DispatchEvent"/>
75+
/// Bounded channel that serializes event dispatch. <see cref="DispatchEvent"/>
7676
/// enqueues; a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues
7777
/// and invokes handlers one at a time, preserving arrival order.
78+
/// When the channel is full, writers will asynchronously wait, providing backpressure
79+
/// instead of unbounded buffering.
7880
/// </summary>
79-
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateUnbounded<SessionEvent>(
80-
new() { SingleReader = true });
81+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateBounded<SessionEvent>(
82+
new BoundedChannelOptions(1024)
83+
{
84+
SingleReader = true,
85+
FullMode = BoundedChannelFullMode.Wait
86+
});
8187

8288
/// <summary>
8389
/// Gets the unique identifier for this session.
@@ -315,19 +321,27 @@ private async Task ProcessEventsAsync()
315321
{
316322
await HandleBroadcastEventAsync(sessionEvent);
317323
}
318-
catch (Exception ex)
324+
catch (Exception ex) when (ex is not OperationCanceledException)
319325
{
320326
LogBroadcastHandlerError(ex);
321327
}
322328

323-
// Invoke user handlers serially. Catch exceptions to keep the loop alive.
324-
try
329+
// Invoke user handlers serially. Catch exceptions per handler to keep the
330+
// loop alive and ensure all subscribers see the event even if one fails.
331+
var handlers = EventHandlers;
332+
if (handlers is not null)
325333
{
326-
EventHandlers?.Invoke(sessionEvent);
327-
}
328-
catch (Exception ex)
329-
{
330-
LogEventHandlerError(ex);
334+
foreach (var handler in handlers.GetInvocationList())
335+
{
336+
try
337+
{
338+
handler.DynamicInvoke(sessionEvent);
339+
}
340+
catch (Exception ex) when (ex is not OperationCanceledException)
341+
{
342+
LogEventHandlerError(ex);
343+
}
344+
}
331345
}
332346
}
333347
}

dotnet/test/SessionTests.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -470,9 +470,6 @@ await WaitForAsync(() =>
470470
[Fact]
471471
public async Task Handler_Exception_Does_Not_Halt_Event_Delivery()
472472
{
473-
// Reuse an existing snapshot — this test only customizes the C# handler.
474-
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
475-
476473
var session = await CreateSessionAsync();
477474
var eventCount = 0;
478475
var gotIdle = new TaskCompletionSource();
@@ -500,9 +497,6 @@ public async Task Handler_Exception_Does_Not_Halt_Event_Delivery()
500497
[Fact]
501498
public async Task DisposeAsync_From_Handler_Does_Not_Deadlock()
502499
{
503-
// Reuse an existing snapshot — this test only customizes the C# handler.
504-
await Ctx.ConfigureForTestAsync("session", "should_have_stateful_conversation");
505-
506500
var session = await CreateSessionAsync();
507501
var disposed = new TaskCompletionSource();
508502

go/session.go

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/github/copilot-sdk/go/rpc"
1313
)
1414

15+
const eventChannelSize = 1024
16+
1517
type sessionHandler struct {
1618
id uint64
1719
fn SessionEventHandler
@@ -65,6 +67,12 @@ type Session struct {
6567
hooks *SessionHooks
6668
hooksMux sync.RWMutex
6769

70+
// eventCh serializes event dispatch. DispatchEvent enqueues; a single
71+
// background goroutine (processEvents) dequeues and invokes handlers
72+
// one at a time, preserving arrival order and keeping the JSON-RPC
73+
// read loop unblocked.
74+
eventCh chan SessionEvent
75+
6876
// RPC provides typed session-scoped RPC methods.
6977
RPC *rpc.SessionRpc
7078
}
@@ -78,14 +86,17 @@ func (s *Session) WorkspacePath() string {
7886

7987
// newSession creates a new session wrapper with the given session ID and client.
8088
func newSession(sessionID string, client *jsonrpc2.Client, workspacePath string) *Session {
81-
return &Session{
89+
s := &Session{
8290
SessionID: sessionID,
8391
workspacePath: workspacePath,
8492
client: client,
8593
handlers: make([]sessionHandler, 0),
8694
toolHandlers: make(map[string]ToolHandler),
95+
eventCh: make(chan SessionEvent, eventChannelSize),
8796
RPC: rpc.NewSessionRpc(client, sessionID),
8897
}
98+
go s.processEvents()
99+
return s
89100
}
90101

91102
// Send sends a message to this session and waits for the response.
@@ -435,42 +446,56 @@ func (s *Session) handleHooksInvoke(hookType string, rawInput json.RawMessage) (
435446
}
436447
}
437448

438-
// dispatchEvent dispatches an event to all registered handlers.
449+
// dispatchEvent enqueues an event for serial dispatch to all registered handlers.
439450
//
440-
// Events are processed serially on the JSON-RPC read loop goroutine:
441-
// broadcast work (tool calls, permission requests) is executed inline before
442-
// user handlers, and each handler completes before the next is invoked.
443-
// Panics in user handlers are recovered to prevent crashing the event dispatcher.
451+
// This method is non-blocking. The event is placed into an in-memory channel and
452+
// processed by a single background goroutine (processEvents), which guarantees
453+
// handlers see events one at a time, in order, without blocking the JSON-RPC
454+
// read loop.
444455
func (s *Session) dispatchEvent(event SessionEvent) {
445-
// Handle broadcast request events inline (serialized with user handlers)
446-
s.handleBroadcastEvent(event)
447-
448-
s.handlerMutex.RLock()
449-
handlers := make([]SessionEventHandler, 0, len(s.handlers))
450-
for _, h := range s.handlers {
451-
handlers = append(handlers, h.fn)
456+
select {
457+
case s.eventCh <- event:
458+
default:
459+
fmt.Printf("Warning: event %s dropped; channel full or session shutting down\n", event.Type)
452460
}
453-
s.handlerMutex.RUnlock()
454-
455-
for _, handler := range handlers {
456-
// Call handler - don't let panics crash the dispatcher
457-
func() {
458-
defer func() {
459-
if r := recover(); r != nil {
460-
fmt.Printf("Error in session event handler: %v\n", r)
461-
}
461+
}
462+
463+
// processEvents is the single-goroutine consumer loop that processes events
464+
// from the channel. Ensures all user code — event handlers, tool handlers,
465+
// permission handlers — is invoked serially and in FIFO order. Broadcast work
466+
// (tool calls, permission requests) completes before user handlers see the event.
467+
func (s *Session) processEvents() {
468+
for event := range s.eventCh {
469+
// Handle broadcast request events (serialized with user handlers).
470+
// Runs off the readLoop goroutine so RPC requests won't deadlock.
471+
s.handleBroadcastEvent(event)
472+
473+
s.handlerMutex.RLock()
474+
handlers := make([]SessionEventHandler, 0, len(s.handlers))
475+
for _, h := range s.handlers {
476+
handlers = append(handlers, h.fn)
477+
}
478+
s.handlerMutex.RUnlock()
479+
480+
for _, handler := range handlers {
481+
func() {
482+
defer func() {
483+
if r := recover(); r != nil {
484+
fmt.Printf("Error in session event handler: %v\n", r)
485+
}
486+
}()
487+
handler(event)
462488
}()
463-
handler(event)
464-
}()
489+
}
465490
}
466491
}
467492

468493
// handleBroadcastEvent handles broadcast request events by executing local handlers
469494
// and responding via RPC. This implements the protocol v3 broadcast model where tool
470495
// calls and permission requests are broadcast as session events to all clients.
471496
//
472-
// Handlers are executed inline (not in a separate goroutine) so that all user code
473-
// — tool handlers, permission handlers, and event handlers — runs serially.
497+
// Handlers are executed on the processEvents goroutine (not the JSON-RPC read loop)
498+
// so that RPC responses can be received without deadlocking.
474499
func (s *Session) handleBroadcastEvent(event SessionEvent) {
475500
switch event.Type {
476501
case ExternalToolRequested:
@@ -641,6 +666,10 @@ func (s *Session) Disconnect() error {
641666
return fmt.Errorf("failed to disconnect session: %w", err)
642667
}
643668

669+
// Stop accepting new events. The consumer goroutine will exit naturally
670+
// after draining remaining events.
671+
close(s.eventCh)
672+
644673
// Clear handlers
645674
s.handlerMutex.Lock()
646675
s.handlers = nil

0 commit comments

Comments
 (0)