Skip to content

Commit 1657f9c

Browse files
stephentoubCopilot
andcommitted
Restore channel-based dispatch with fire-and-forget broadcast handlers
The channel approach is the right design for .NET: - Non-blocking enqueue decouples the StreamJsonRpc reader thread from user handler execution (StreamJsonRpc dispatches concurrently). - Single-reader channel guarantees FIFO ordering and serial dispatch. - Broadcast handlers (tool calls, permissions) fire-and-forget directly from DispatchEvent, outside the channel, so a stalled handler cannot block event delivery. Update Should_Receive_Session_Events to await the session.start event instead of asserting synchronous delivery, since the channel consumer delivers events asynchronously. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent a0db1a8 commit 1657f9c

2 files changed

Lines changed: 45 additions & 47 deletions

File tree

dotnet/src/Session.cs

Lines changed: 34 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using System.Text.Json.Nodes;
1010
using System.Text.Json.Serialization;
1111
using System.Threading;
12+
using System.Threading.Channels;
1213
using GitHub.Copilot.SDK.Rpc;
1314

1415
namespace GitHub.Copilot.SDK;
@@ -72,22 +73,18 @@ public sealed partial class CopilotSession : IAsyncDisposable
7273
private int _isDisposed;
7374

7475
/// <summary>
75-
/// Guards <see cref="_eventQueue"/> and <see cref="_isDispatching"/>.
76+
/// Bounded channel that serializes event dispatch. <see cref="DispatchEvent"/>
77+
/// enqueues; a single background consumer (<see cref="ProcessEventsAsync"/>) dequeues
78+
/// and invokes handlers one at a time, preserving arrival order.
79+
/// When the channel is full, writers will asynchronously wait, providing backpressure
80+
/// instead of unbounded buffering.
7681
/// </summary>
77-
private readonly object _dispatchLock = new();
78-
79-
/// <summary>
80-
/// FIFO queue that buffers events arriving while the drain loop is already
81-
/// running. Together with <see cref="_isDispatching"/>, this guarantees both
82-
/// serial dispatch and strict arrival-order delivery.
83-
/// </summary>
84-
private readonly Queue<SessionEvent> _eventQueue = new();
85-
86-
/// <summary>
87-
/// True while a thread is draining <see cref="_eventQueue"/>. Only one thread
88-
/// at a time runs the drain loop; subsequent callers enqueue and return.
89-
/// </summary>
90-
private bool _isDispatching;
82+
private readonly Channel<SessionEvent> _eventChannel = Channel.CreateBounded<SessionEvent>(
83+
new BoundedChannelOptions(1024)
84+
{
85+
SingleReader = true,
86+
FullMode = BoundedChannelFullMode.Wait
87+
});
9188

9289
/// <summary>
9390
/// Gets the unique identifier for this session.
@@ -125,6 +122,7 @@ internal CopilotSession(string sessionId, JsonRpc rpc, ILogger logger, string? w
125122
_rpc = rpc;
126123
_logger = logger;
127124
WorkspacePath = workspacePath;
125+
_ = ProcessEventsAsync();
128126
}
129127

130128
private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationToken cancellationToken)
@@ -289,61 +287,46 @@ public IDisposable On(SessionEventHandler handler)
289287
}
290288

291289
/// <summary>
292-
/// Dispatches an event to all registered handlers.
290+
/// Enqueues an event for serial dispatch to all registered handlers.
293291
/// </summary>
294292
/// <param name="sessionEvent">The session event to dispatch.</param>
295293
/// <remarks>
296-
/// Broadcast request events (external_tool.requested, permission.requested) are fired
297-
/// concurrently so that a stalled handler does not block event delivery.
298-
/// User event handlers are delivered via a queue-drain loop that guarantees both
299-
/// serial dispatch and strict FIFO ordering. The first caller to arrive runs the
300-
/// drain loop (synchronous delivery); concurrent callers enqueue and return.
294+
/// This method is non-blocking. Broadcast request events (external_tool.requested,
295+
/// permission.requested) are fired concurrently so that a stalled handler does not
296+
/// block event delivery. The event is then placed into an in-memory channel and
297+
/// processed by a single background consumer (<see cref="ProcessEventsAsync"/>),
298+
/// which guarantees user handlers see events one at a time, in order.
301299
/// </remarks>
302300
internal void DispatchEvent(SessionEvent sessionEvent)
303301
{
304302
// Fire broadcast work concurrently (fire-and-forget with error logging).
303+
// This is done outside the channel so broadcast handlers don't block the
304+
// consumer loop — important when a secondary client's handler intentionally
305+
// never completes (multi-client permission scenario).
305306
_ = HandleBroadcastEventSafe(sessionEvent);
306307

307-
lock (_dispatchLock)
308+
if (!_eventChannel.Writer.TryWrite(sessionEvent))
308309
{
309-
_eventQueue.Enqueue(sessionEvent);
310-
if (_isDispatching)
311-
{
312-
return;
313-
}
314-
_isDispatching = true;
310+
LogEventDropped(sessionEvent.Type);
315311
}
316-
317-
DrainEventQueue();
318312
}
319313

320314
/// <summary>
321-
/// Processes queued events one at a time, in FIFO order. Only one thread
322-
/// runs this loop at a time; it keeps draining until the queue is empty.
315+
/// Single-reader consumer loop that processes events from the channel.
316+
/// Ensures user event handlers are invoked serially and in FIFO order.
323317
/// </summary>
324-
private void DrainEventQueue()
318+
private async Task ProcessEventsAsync()
325319
{
326-
while (true)
320+
await foreach (var sessionEvent in _eventChannel.Reader.ReadAllAsync())
327321
{
328-
SessionEvent evt;
329-
lock (_dispatchLock)
330-
{
331-
if (_eventQueue.Count == 0)
332-
{
333-
_isDispatching = false;
334-
return;
335-
}
336-
evt = _eventQueue.Dequeue();
337-
}
338-
339322
var handlers = EventHandlers;
340323
if (handlers is not null)
341324
{
342325
foreach (var handler in handlers.GetInvocationList())
343326
{
344327
try
345328
{
346-
handler.DynamicInvoke(evt);
329+
handler.DynamicInvoke(sessionEvent);
347330
}
348331
catch (Exception ex) when (ex is not OperationCanceledException)
349332
{
@@ -824,6 +807,8 @@ public async ValueTask DisposeAsync()
824807
return;
825808
}
826809

810+
_eventChannel.Writer.TryComplete();
811+
827812
try
828813
{
829814
await InvokeRpcAsync<object>(
@@ -850,6 +835,9 @@ await InvokeRpcAsync<object>(
850835
[LoggerMessage(Level = LogLevel.Error, Message = "Unhandled exception in session event handler")]
851836
private partial void LogEventHandlerError(Exception exception);
852837

838+
[LoggerMessage(Level = LogLevel.Warning, Message = "Event {EventType} dropped; channel full")]
839+
private partial void LogEventDropped(string eventType);
840+
853841
internal record SendMessageRequest
854842
{
855843
public string SessionId { get; init; } = string.Empty;

dotnet/test/SessionTests.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,21 @@ public async Task Should_Receive_Session_Events()
249249
// session.start is emitted during the session.create RPC; if the session
250250
// weren't registered in the sessions map before the RPC, it would be dropped.
251251
var earlyEvents = new List<SessionEvent>();
252+
var sessionStartReceived = new TaskCompletionSource<bool>();
252253
var session = await CreateSessionAsync(new SessionConfig
253254
{
254-
OnEvent = evt => earlyEvents.Add(evt),
255+
OnEvent = evt =>
256+
{
257+
earlyEvents.Add(evt);
258+
if (evt is SessionStartEvent)
259+
sessionStartReceived.TrySetResult(true);
260+
},
255261
});
256262

263+
// session.start is dispatched asynchronously via the event channel;
264+
// wait briefly for the consumer to deliver it.
265+
var started = await Task.WhenAny(sessionStartReceived.Task, Task.Delay(TimeSpan.FromSeconds(5)));
266+
Assert.Equal(sessionStartReceived.Task, started);
257267
Assert.Contains(earlyEvents, evt => evt is SessionStartEvent);
258268

259269
var receivedEvents = new List<SessionEvent>();

0 commit comments

Comments
 (0)