Skip to content

Commit 0a5702f

Browse files
stephentoubCopilot
andcommitted
Simplify Go event dispatch: synchronous user handlers, goroutine broadcast
Revert from channel-based approach to simpler model that preserves the existing synchronous event delivery contract: - User event handlers are invoked synchronously on the calling goroutine, maintaining the implicit contract that events registered during CreateSession are available immediately after it returns. - Broadcast handlers (tool calls, permission requests) are fired in separate goroutines via 'go s.handleBroadcastEvent(event)' to avoid deadlocking the JSON-RPC readLoop. - Panics in user handlers are recovered per-handler so one handler's failure doesn't prevent others from receiving the event. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 44132f4 commit 0a5702f

2 files changed

Lines changed: 53 additions & 135 deletions

File tree

go/session.go

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"github.com/github/copilot-sdk/go/rpc"
1313
)
1414

15-
const eventChannelSize = 1024
16-
1715
type sessionHandler struct {
1816
id uint64
1917
fn SessionEventHandler
@@ -67,12 +65,6 @@ type Session struct {
6765
hooks *SessionHooks
6866
hooksMux sync.RWMutex
6967

70-
// eventCh serializes event dispatch. DispatchEvent enqueues; a single
71-
// background goroutine (processEvents) dequeues and invokes handlers
72-
// one at a time, preserving arrival order and keeping the JSON-RPC
73-
// read loop unblocked.
74-
eventCh chan SessionEvent
75-
7668
// RPC provides typed session-scoped RPC methods.
7769
RPC *rpc.SessionRpc
7870
}
@@ -86,17 +78,14 @@ func (s *Session) WorkspacePath() string {
8678

8779
// newSession creates a new session wrapper with the given session ID and client.
8880
func newSession(sessionID string, client *jsonrpc2.Client, workspacePath string) *Session {
89-
s := &Session{
81+
return &Session{
9082
SessionID: sessionID,
9183
workspacePath: workspacePath,
9284
client: client,
9385
handlers: make([]sessionHandler, 0),
9486
toolHandlers: make(map[string]ToolHandler),
95-
eventCh: make(chan SessionEvent, eventChannelSize),
9687
RPC: rpc.NewSessionRpc(client, sessionID),
9788
}
98-
go s.processEvents()
99-
return s
10089
}
10190

10291
// Send sends a message to this session and waits for the response.
@@ -446,49 +435,33 @@ func (s *Session) handleHooksInvoke(hookType string, rawInput json.RawMessage) (
446435
}
447436
}
448437

449-
// dispatchEvent enqueues an event for serial dispatch to all registered handlers.
438+
// dispatchEvent dispatches an event to all registered handlers.
450439
//
451-
// This method is non-blocking. The event is placed into an in-memory channel and
452-
// processed by a single background goroutine (processEvents), which guarantees
453-
// handlers see events one at a time, in order, without blocking the JSON-RPC
454-
// read loop.
440+
// Broadcast work (tool calls, permission requests) is fired in a separate
441+
// goroutine so it does not block the JSON-RPC read loop (which would deadlock
442+
// when the handler issues an RPC request). User event handlers are invoked
443+
// synchronously on the calling goroutine, preserving the existing delivery
444+
// contract. Panics in user handlers are recovered.
455445
func (s *Session) dispatchEvent(event SessionEvent) {
456-
select {
457-
case s.eventCh <- event:
458-
default:
459-
fmt.Printf("Warning: event %s dropped; channel full or session shutting down\n", event.Type)
460-
}
461-
}
446+
// Fire broadcast handlers in a goroutine to avoid deadlocking the readLoop.
447+
go s.handleBroadcastEvent(event)
462448

463-
// processEvents is the single-goroutine consumer loop that processes events
464-
// from the channel. Ensures user event handlers are invoked serially and in
465-
// FIFO order. Broadcast work (tool calls, permission requests) is fired
466-
// concurrently so that a stalled handler does not block event delivery.
467-
func (s *Session) processEvents() {
468-
for event := range s.eventCh {
469-
// Fire broadcast handlers in a separate goroutine so they don't block
470-
// event delivery. This preserves pre-existing fire-and-forget semantics
471-
// (important when a secondary client's handler intentionally never
472-
// completes), while running off the readLoop to avoid deadlocks.
473-
go s.handleBroadcastEvent(event)
474-
475-
s.handlerMutex.RLock()
476-
handlers := make([]SessionEventHandler, 0, len(s.handlers))
477-
for _, h := range s.handlers {
478-
handlers = append(handlers, h.fn)
479-
}
480-
s.handlerMutex.RUnlock()
481-
482-
for _, handler := range handlers {
483-
func() {
484-
defer func() {
485-
if r := recover(); r != nil {
486-
fmt.Printf("Error in session event handler: %v\n", r)
487-
}
488-
}()
489-
handler(event)
449+
s.handlerMutex.RLock()
450+
handlers := make([]SessionEventHandler, 0, len(s.handlers))
451+
for _, h := range s.handlers {
452+
handlers = append(handlers, h.fn)
453+
}
454+
s.handlerMutex.RUnlock()
455+
456+
for _, handler := range handlers {
457+
func() {
458+
defer func() {
459+
if r := recover(); r != nil {
460+
fmt.Printf("Error in session event handler: %v\n", r)
461+
}
490462
}()
491-
}
463+
handler(event)
464+
}()
492465
}
493466
}
494467

@@ -669,10 +642,6 @@ func (s *Session) Disconnect() error {
669642
return fmt.Errorf("failed to disconnect session: %w", err)
670643
}
671644

672-
// Stop accepting new events. The consumer goroutine will exit naturally
673-
// after draining remaining events.
674-
close(s.eventCh)
675-
676645
// Clear handlers
677646
s.handlerMutex.Lock()
678647
s.handlers = nil

go/session_test.go

Lines changed: 29 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,14 @@ func TestSession_On(t *testing.T) {
1111
t.Run("multiple handlers all receive events", func(t *testing.T) {
1212
session := &Session{
1313
handlers: make([]sessionHandler, 0),
14-
eventCh: make(chan SessionEvent, eventChannelSize),
1514
}
16-
go session.processEvents()
1715

18-
var wg sync.WaitGroup
19-
wg.Add(3)
2016
var received1, received2, received3 bool
21-
session.On(func(event SessionEvent) { received1 = true; wg.Done() })
22-
session.On(func(event SessionEvent) { received2 = true; wg.Done() })
23-
session.On(func(event SessionEvent) { received3 = true; wg.Done() })
17+
session.On(func(event SessionEvent) { received1 = true })
18+
session.On(func(event SessionEvent) { received2 = true })
19+
session.On(func(event SessionEvent) { received3 = true })
2420

2521
session.dispatchEvent(SessionEvent{Type: "test"})
26-
wg.Wait()
27-
close(session.eventCh)
2822

2923
if !received1 || !received2 || !received3 {
3024
t.Errorf("Expected all handlers to receive event, got received1=%v, received2=%v, received3=%v",
@@ -35,94 +29,66 @@ func TestSession_On(t *testing.T) {
3529
t.Run("unsubscribing one handler does not affect others", func(t *testing.T) {
3630
session := &Session{
3731
handlers: make([]sessionHandler, 0),
38-
eventCh: make(chan SessionEvent, eventChannelSize),
3932
}
40-
go session.processEvents()
41-
42-
var count1, count2, count3 atomic.Int32
43-
var wg sync.WaitGroup
4433

45-
wg.Add(3)
46-
session.On(func(event SessionEvent) { count1.Add(1); wg.Done() })
47-
unsub2 := session.On(func(event SessionEvent) { count2.Add(1); wg.Done() })
48-
session.On(func(event SessionEvent) { count3.Add(1); wg.Done() })
34+
var count1, count2, count3 int
35+
session.On(func(event SessionEvent) { count1++ })
36+
unsub2 := session.On(func(event SessionEvent) { count2++ })
37+
session.On(func(event SessionEvent) { count3++ })
4938

5039
// First event - all handlers receive it
5140
session.dispatchEvent(SessionEvent{Type: "test"})
52-
wg.Wait()
5341

5442
// Unsubscribe handler 2
5543
unsub2()
5644

5745
// Second event - only handlers 1 and 3 should receive it
58-
wg.Add(2)
5946
session.dispatchEvent(SessionEvent{Type: "test"})
60-
wg.Wait()
61-
close(session.eventCh)
6247

63-
if count1.Load() != 2 {
64-
t.Errorf("Expected handler 1 to receive 2 events, got %d", count1.Load())
48+
if count1 != 2 {
49+
t.Errorf("Expected handler 1 to receive 2 events, got %d", count1)
6550
}
66-
if count2.Load() != 1 {
67-
t.Errorf("Expected handler 2 to receive 1 event (before unsubscribe), got %d", count2.Load())
51+
if count2 != 1 {
52+
t.Errorf("Expected handler 2 to receive 1 event (before unsubscribe), got %d", count2)
6853
}
69-
if count3.Load() != 2 {
70-
t.Errorf("Expected handler 3 to receive 2 events, got %d", count3.Load())
54+
if count3 != 2 {
55+
t.Errorf("Expected handler 3 to receive 2 events, got %d", count3)
7156
}
7257
})
7358

7459
t.Run("calling unsubscribe multiple times is safe", func(t *testing.T) {
7560
session := &Session{
7661
handlers: make([]sessionHandler, 0),
77-
eventCh: make(chan SessionEvent, eventChannelSize),
7862
}
79-
go session.processEvents()
8063

81-
var count atomic.Int32
82-
var wg sync.WaitGroup
83-
84-
wg.Add(1)
85-
unsub := session.On(func(event SessionEvent) { count.Add(1); wg.Done() })
64+
var count int
65+
unsub := session.On(func(event SessionEvent) { count++ })
8666

8767
session.dispatchEvent(SessionEvent{Type: "test"})
88-
wg.Wait()
8968

9069
// Call unsubscribe multiple times - should not panic
9170
unsub()
9271
unsub()
9372
unsub()
9473

95-
wg.Add(1)
96-
// Dispatch to drain; handler won't run since it's unsubscribed.
97-
// Use a sentinel handler to signal completion.
98-
session.On(func(event SessionEvent) { wg.Done() })
9974
session.dispatchEvent(SessionEvent{Type: "test"})
100-
wg.Wait()
101-
close(session.eventCh)
10275

103-
if count.Load() != 1 {
104-
t.Errorf("Expected handler to receive 1 event, got %d", count.Load())
76+
if count != 1 {
77+
t.Errorf("Expected handler to receive 1 event, got %d", count)
10578
}
10679
})
10780

10881
t.Run("handlers are called in registration order", func(t *testing.T) {
10982
session := &Session{
11083
handlers: make([]sessionHandler, 0),
111-
eventCh: make(chan SessionEvent, eventChannelSize),
11284
}
113-
go session.processEvents()
11485

115-
var mu sync.Mutex
11686
var order []int
117-
var wg sync.WaitGroup
118-
wg.Add(3)
119-
session.On(func(event SessionEvent) { mu.Lock(); order = append(order, 1); mu.Unlock(); wg.Done() })
120-
session.On(func(event SessionEvent) { mu.Lock(); order = append(order, 2); mu.Unlock(); wg.Done() })
121-
session.On(func(event SessionEvent) { mu.Lock(); order = append(order, 3); mu.Unlock(); wg.Done() })
87+
session.On(func(event SessionEvent) { order = append(order, 1) })
88+
session.On(func(event SessionEvent) { order = append(order, 2) })
89+
session.On(func(event SessionEvent) { order = append(order, 3) })
12290

12391
session.dispatchEvent(SessionEvent{Type: "test"})
124-
wg.Wait()
125-
close(session.eventCh)
12692

12793
if len(order) != 3 || order[0] != 1 || order[1] != 2 || order[2] != 3 {
12894
t.Errorf("Expected handlers to be called in order [1,2,3], got %v", order)
@@ -158,9 +124,7 @@ func TestSession_On(t *testing.T) {
158124
t.Run("events are dispatched serially", func(t *testing.T) {
159125
session := &Session{
160126
handlers: make([]sessionHandler, 0),
161-
eventCh: make(chan SessionEvent, eventChannelSize),
162127
}
163-
go session.processEvents()
164128

165129
var concurrentCount atomic.Int32
166130
var maxConcurrent atomic.Int32
@@ -180,20 +144,14 @@ func TestSession_On(t *testing.T) {
180144
done.Done()
181145
})
182146

183-
// Dispatch from multiple goroutines to test that the channel
184-
// serializes concurrent arrivals.
185-
var wg sync.WaitGroup
186-
wg.Add(totalEvents)
147+
// dispatchEvent invokes user handlers synchronously on the calling
148+
// goroutine, so dispatching from a single goroutine guarantees serial
149+
// delivery (handlers cannot interleave).
187150
for i := 0; i < totalEvents; i++ {
188-
go func() {
189-
defer wg.Done()
190-
session.dispatchEvent(SessionEvent{Type: "test"})
191-
}()
151+
session.dispatchEvent(SessionEvent{Type: "test"})
192152
}
193-
wg.Wait()
194153

195154
done.Wait()
196-
close(session.eventCh)
197155

198156
if max := maxConcurrent.Load(); max != 1 {
199157
t.Errorf("Expected max concurrent count of 1, got %d", max)
@@ -203,30 +161,21 @@ func TestSession_On(t *testing.T) {
203161
t.Run("handler panic does not halt delivery", func(t *testing.T) {
204162
session := &Session{
205163
handlers: make([]sessionHandler, 0),
206-
eventCh: make(chan SessionEvent, eventChannelSize),
207164
}
208-
go session.processEvents()
209-
210-
var eventCount atomic.Int32
211-
var done sync.WaitGroup
212-
done.Add(2)
213165

166+
var eventCount int
214167
session.On(func(event SessionEvent) {
215-
count := eventCount.Add(1)
216-
defer done.Done()
217-
if count == 1 {
168+
eventCount++
169+
if eventCount == 1 {
218170
panic("boom")
219171
}
220172
})
221173

222174
session.dispatchEvent(SessionEvent{Type: "test"})
223175
session.dispatchEvent(SessionEvent{Type: "test"})
224176

225-
done.Wait()
226-
close(session.eventCh)
227-
228-
if eventCount.Load() != 2 {
229-
t.Errorf("Expected 2 events dispatched, got %d", eventCount.Load())
177+
if eventCount != 2 {
178+
t.Errorf("Expected 2 events dispatched, got %d", eventCount)
230179
}
231180
})
232181
}

0 commit comments

Comments
 (0)