Skip to content

Commit ba13cca

Browse files
fix(adk): fix ack channel leak in concurrent preempt during TurnLoop shutdown
When drainAll runs during TurnLoop cleanup, a concurrent Push caller that has already called holdRunLoop but not yet requestPreempt can add an ack channel to pendingAckList after drainAll clears it. This orphaned ack is never closed, causing the Push caller to hang. Add a `drained` flag to preemptSignal. drainAll sets it, and requestPreempt checks it — if drained, the ack is closed immediately instead of being appended to pendingAckList. Also fix the test to stop the loop concurrently with wg.Wait, since the run loop may be blocked on buffer.Receive after processing all preempts, and Stop is needed to unblock it and trigger drainAll. Change-Id: I8d58b0e1478d2ae6f89e06d420b9252e43aa9cd6
1 parent eab218d commit ba13cca

2 files changed

Lines changed: 16 additions & 7 deletions

File tree

adk/turn_loop.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ type preemptSignal struct {
191191
agentCancelOpts []AgentCancelOption
192192
pendingAckList []chan struct{}
193193
notify chan struct{}
194+
drained bool
194195

195196
currentTC any
196197
currentRunCtx context.Context
@@ -223,12 +224,13 @@ func (s *preemptSignal) holdAndGetTurn() (context.Context, any) {
223224
}
224225

225226
// requestPreempt records a preempt request and wakes both waiters.
226-
// If holdCount is 0, no one is listening — close the ack immediately as a no-op.
227+
// If holdCount is 0 or the signal has been drained, no one is listening —
228+
// close the ack immediately as a no-op.
227229
func (s *preemptSignal) requestPreempt(ack chan struct{}, opts ...AgentCancelOption) {
228230
s.mu.Lock()
229231
defer s.mu.Unlock()
230232

231-
if s.holdCount <= 0 {
233+
if s.drained || s.holdCount <= 0 {
232234
if ack != nil {
233235
close(ack)
234236
}
@@ -343,11 +345,13 @@ func (s *preemptSignal) endTurnAndUnhold() {
343345
// drainAll forcefully resets all preemptSignal state and closes any pending
344346
// ack channels. Called during TurnLoop cleanup to prevent ack channels from
345347
// leaking when the run loop exits (e.g. due to Stop) while a Push caller
346-
// still holds a reference.
348+
// still holds a reference. After drainAll, any subsequent holdRunLoop or
349+
// requestPreempt calls will be no-ops that close the ack immediately.
347350
func (s *preemptSignal) drainAll() {
348351
s.mu.Lock()
349352
defer s.mu.Unlock()
350353

354+
s.drained = true
351355
s.holdCount = 0
352356
s.currentTC = nil
353357
s.currentRunCtx = nil

adk/turn_loop_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3122,17 +3122,22 @@ func TestTurnLoop_ConcurrentPreemptsDuringTurn(t *testing.T) {
31223122
if ok && ack != nil {
31233123
select {
31243124
case <-ack:
3125-
case <-time.After(30 * time.Second):
3125+
case <-time.After(5 * time.Second):
31263126
t.Error("ack channel not closed within timeout")
31273127
}
31283128
}
31293129
}(i)
31303130
}
31313131

3132-
wg.Wait()
3133-
time.Sleep(200 * time.Millisecond)
3132+
// Stop the loop concurrently. The run loop may be blocked on
3133+
// buffer.Receive after processing all preempts; Stop unblocks it
3134+
// and triggers drainAll which closes any orphaned ack channels.
3135+
go func() {
3136+
time.Sleep(500 * time.Millisecond)
3137+
loop.Stop(WithImmediate())
3138+
}()
31343139

3135-
loop.Stop(WithImmediate())
3140+
wg.Wait()
31363141
result := loop.Wait()
31373142
assert.NoError(t, result.ExitReason)
31383143
assert.True(t, atomic.LoadInt32(&genInputCount) >= 2, "should have had at least the initial turn + one preempted turn")

0 commit comments

Comments
 (0)