Skip to content

Commit 6a45adf

Browse files
fix(adk): skip saving checkpoint when TurnLoop is idle (#916)
* fix(adk): skip saving checkpoint when TurnLoop is idle When Stop() is called on an idle TurnLoop (no active agent run, no unhandled items, no canceled items), the resulting checkpoint contains no meaningful state. Skip saving such checkpoints to avoid unnecessary store writes. - Add isIdle check in cleanup() before checkpoint save decision - Add TestTurnLoop_StopWhileIdle_SkipsCheckpoint test Change-Id: I6aeaff5ed5833a971cb95298193fdb96d904baf8 * fix(internal): merge id2State in PopulateInterruptState instead of replacing PopulateInterruptState merged id2Addr entries one by one but replaced id2State wholesale. In a parallel workflow resume, two goroutines share the same globalResumeInfo. If one goroutine's compose graph called PopulateInterruptState (replacing id2State with compose-only entries) before the other goroutine looked up its outer-level entry, the lookup returned a zero-value InterruptState with State=nil, triggering the 'has no state' panic in ChatModelAgent.Resume. Change id2State handling to merge entry by entry, consistent with id2Addr. Change-Id: Ia21f65289bff7beb2bc383fb033926ad9c92d7e7 * fix(adk): keep watching for cancel escalation after stopSig.done When watchStopSignal entered the stopSig.done branch, it processed the initial cancel and then blocked on <-done (turn completion), never looping back to check notify. This meant a subsequent Stop() call with a higher cancel mode (e.g. CancelImmediate) was never forwarded to the agent, causing TestTurnLoop_Stop_EscalatesCancelMode to time out. Replace the blocking <-done with an inner loop that selects on both done and notify, so escalation signals are always delivered. Also apply the generation-based dedup check consistent with the notify branch. Change-Id: Ia6a04d00a2b44625ffbcb625ff0e559c12ed145f
1 parent 2117056 commit 6a45adf

3 files changed

Lines changed: 60 additions & 9 deletions

File tree

adk/turn_loop.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,14 +1228,31 @@ func (l *TurnLoop[T]) watchStopSignal(done <-chan struct{}, agentCancelFunc Agen
12281228
}
12291229
}
12301230
case <-l.stopSig.done:
1231-
_, opts := l.stopSig.check()
1232-
_, contributed := agentCancelFunc(opts...)
1233-
if contributed && !stoppedClosed {
1234-
close(stoppedDone)
1235-
stoppedClosed = true
1231+
gen, opts := l.stopSig.check()
1232+
if gen != lastGen {
1233+
lastGen = gen
1234+
_, contributed := agentCancelFunc(opts...)
1235+
if contributed && !stoppedClosed {
1236+
close(stoppedDone)
1237+
stoppedClosed = true
1238+
}
1239+
}
1240+
for {
1241+
select {
1242+
case <-done:
1243+
return
1244+
case <-l.stopSig.notify:
1245+
gen, opts := l.stopSig.check()
1246+
if gen != lastGen {
1247+
lastGen = gen
1248+
_, contributed := agentCancelFunc(opts...)
1249+
if contributed && !stoppedClosed {
1250+
close(stoppedDone)
1251+
stoppedClosed = true
1252+
}
1253+
}
1254+
}
12361255
}
1237-
<-done
1238-
return
12391256
}
12401257
}
12411258
}
@@ -1380,7 +1397,8 @@ func (l *TurnLoop[T]) cleanup(ctx context.Context) {
13801397

13811398
unhandled := l.buffer.TakeAll()
13821399
checkpointID := l.config.CheckpointID
1383-
shouldSaveCheckpoint := l.config.Store != nil && checkpointID != "" && l.stopSig.isStopped()
1400+
isIdle := len(l.checkPointRunnerBytes) == 0 && len(unhandled) == 0 && len(l.canceledItems) == 0
1401+
shouldSaveCheckpoint := l.config.Store != nil && checkpointID != "" && l.stopSig.isStopped() && !isIdle
13841402
if shouldSaveCheckpoint {
13851403
cp := &turnLoopCheckpoint[T]{
13861404
RunnerCheckpoint: l.checkPointRunnerBytes,

adk/turn_loop_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,34 @@ func TestTurnLoop_StopWithoutCheckpointIDDoesNotPersist(t *testing.T) {
14841484
assert.Empty(t, store.m, "no checkpoint should be saved when CheckpointID is not configured")
14851485
}
14861486

1487+
func TestTurnLoop_StopWhileIdle_SkipsCheckpoint(t *testing.T) {
1488+
ctx := context.Background()
1489+
store := &deletableCheckpointStore{
1490+
turnLoopCheckpointStore: turnLoopCheckpointStore{m: make(map[string][]byte)},
1491+
}
1492+
cpID := "idle-session"
1493+
1494+
loop := newAndRunTurnLoop(ctx, TurnLoopConfig[string]{
1495+
Store: store,
1496+
CheckpointID: cpID,
1497+
GenInput: func(ctx context.Context, _ *TurnLoop[string], items []string) (*GenInputResult[string], error) {
1498+
return &GenInputResult[string]{Input: &AgentInput{}, Consumed: items}, nil
1499+
},
1500+
PrepareAgent: func(ctx context.Context, _ *TurnLoop[string], consumed []string) (Agent, error) {
1501+
return &turnLoopMockAgent{name: "test"}, nil
1502+
},
1503+
})
1504+
1505+
loop.Stop()
1506+
exit := loop.Wait()
1507+
assert.NoError(t, exit.ExitReason)
1508+
1509+
store.mu.Lock()
1510+
defer store.mu.Unlock()
1511+
_, exists := store.m[cpID]
1512+
assert.False(t, exists, "no checkpoint should be saved when TurnLoop is idle")
1513+
}
1514+
14871515
func TestTurnLoop_StopBetweenTurnsAndResume(t *testing.T) {
14881516
ctx := context.Background()
14891517
store := &turnLoopCheckpointStore{m: make(map[string][]byte)}

internal/core/address.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,12 @@ func PopulateInterruptState(ctx context.Context, id2Addr map[string]Address,
281281
for id, addr := range id2Addr {
282282
rInfo.id2Addr[id] = addr
283283
}
284-
rInfo.id2State = id2State
284+
if rInfo.id2State == nil {
285+
rInfo.id2State = make(map[string]InterruptState)
286+
}
287+
for id, state := range id2State {
288+
rInfo.id2State[id] = state
289+
}
285290
} else {
286291
rInfo = &globalResumeInfo{
287292
id2Addr: id2Addr,

0 commit comments

Comments
 (0)