feat: add WithAgentInactivityTimeout option for terminating stalled a…#315
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces an agent inactivity timeout mechanism to terminate executions when an agent fails to write events within a configured duration. Key changes include the addition of the WithAgentInactivityTimeout option, an activityTrackingWriter to monitor event pipe writes, and a watcher goroutine in the execution handler to manage the timeout logic. I have no feedback to provide.
|
Thanks for the PR. I had a look last week and on a high-level the changes look good. Want to review these carefully before merging because of the part of the code which changes. Will try my best to finish the review. |
| // inactivity timeout is not configured both return values are zero, in which | ||
| // case [newActivityTrackingWriter] is a no-op and the watcher goroutine is | ||
| // not started. | ||
| func (m *localManager) inactivityConfig() (*activityTracker, inactivityConfig) { |
There was a problem hiding this comment.
the method is duplicated and only uses inactivityTimeout. let's make it package level free function newInactivityTracker. don't mind it being called newInactivityConfig as well, but I guess feels more natural to create a tracker with config as a field which callers will use
| select { | ||
| case <-ctx.Done(): | ||
| return ctx.Err() | ||
| case <-inactivity.signal: |
There was a problem hiding this comment.
nit: I'd rename to inactivity.writeRecorded or something like that, <-inactivity.signal reads like a signal of inactivity was received
| return ctx.Err() | ||
| case <-inactivity.signal: | ||
| if !timer.Stop() { | ||
| select { |
There was a problem hiding this comment.
drain is not necessary as of Go 1.23+, from docs:
For a chan-based timer created with NewTimer(d), as of Go 1.23, any receive from t.C after Stop has returned is guaranteed to block rather than receive a stale time value from before the Stop; if the program has not received from t.C already and the timer is running, Stop is guaranteed to return true.
and just a single Reset should be enough:
For a chan-based timer created with NewTimer, as of Go 1.23,
any receive from t.C after Reset has returned is guaranteed not
to receive a time value corresponding to the previous timer settings
| <-producerStarted | ||
| // Hold the consumer open long enough that, without timer | ||
| // resets, the watcher would have fired multiple times. | ||
| time.Sleep(200 * time.Millisecond) |
There was a problem hiding this comment.
let's please reduce those sleep times where possible. prompted AI to give lower values, don't vouch for all suggestions, please apply where reasonable:
- Inactivity timeout: 50ms → 20ms (all 3 tests)
- Ticker interval: 25ms → 10ms
- Consumer sleep ("activity resets"): 200ms → 80ms (still 4x the timeout)
- Consumer sleep ("zero timeout"): 20ms → 5ms
- Deadlock guard: 100ms → 50ms
I prefer to start small and increase if flaky. these quickly compound when tests are run with higher -count
…gents Adds a new RequestHandlerOption that bounds how long the server will wait for the next event from an agent's producer before terminating its execution. The timer is reset whenever the agent writes an event to the event pipe, so steady producers are unaffected. Implementation: - New ErrAgentInactivityTimeout sentinel in internal/taskexec, re-exported from a2asrv so callers can detect the condition with errors.Is. - New activityTrackingWriter wraps the producer's eventpipe.Writer and signals on every successful Write. Wrapping is a no-op when the option is not configured, so existing users pay no cost. - New watcher goroutine inside runProducerConsumer, started only when the inactivity timeout is positive. On firing, it returns ErrAgentInactivityTimeout from the errgroup which becomes the cancellation cause on the producer and consumer contexts via context.Cause, exactly like the existing TestRunProducerConsumer_CausePropagation pattern. - Plumbed through LocalManagerConfig.AgentInactivityTimeout and DistributedManagerConfig.AgentInactivityTimeout for cluster-mode parity. Heartbeats from workqueue.Heartbeater remain independent. Defaults: 0 disables the watcher, preserving prior behavior. Tests: - 7 unit tests in execution_handler_test.go covering producer stall -> ErrAgentInactivityTimeout, activity signals reset the timer, zero timeout does not start watcher, and activityTrackingWriter behavior (nil tracker, success, failure, non-blocking signal). - 1 end-to-end test in manager_test.go that exercises localManager with the option configured and asserts the AgentExecutor sees ErrAgentInactivityTimeout via context.Cause. Fixes a2aproject#78
- Collapse activityTracker + inactivityConfig into a single inactivityTracker type with a config field, constructed via the package-level newInactivityTracker(timeout) function. This removes the duplicated inactivityConfig() methods on localManager and workQueueHandler. - Rename inactivityConfig.signal -> writeRecorded so the watcher loop reads as "<-cfg.writeRecorded" (a write was recorded) rather than the prior "<-cfg.signal" (which read like a signal of inactivity). - Drop the timer Stop+drain dance. As of Go 1.23 a receive after Stop or Reset is guaranteed not to deliver a stale tick, so a single timer.Reset(cfg.timeout) is sufficient. - Reduce sleep/timeout values in the inactivity tests so they cost less when run with -count: timeout 50ms -> 20ms, activity ticker 25ms -> 10ms, hold sleep 200ms -> 80ms (still 4x timeout), zero timeout sleep 20ms -> 5ms, deadlock guard 100ms -> 50ms. Verified stable under -race -count=20.
9430185 to
724c5ce
Compare
|
Thanks for the thorough review @yarolegovich! Pushed two commits:
Full module |
Adds a new
RequestHandlerOptionthat bounds how long the server will wait for the next event from an agent's producer before terminating its execution. The timer is reset whenever the agent writes an event to the event pipe, so steady producers are unaffected.Approach was discussed and confirmed on the issue: #78 (comment).
Implementation
ErrAgentInactivityTimeoutsentinel ininternal/taskexec, re-exported froma2asrvso callers can detect the condition witherrors.Is.activityTrackingWriterwraps the producer'seventpipe.Writerand signals on every successfulWrite. Wrapping is a no-op when the option is not configured, so existing users pay no cost.runProducerConsumer, started only when the inactivity timeout is positive. On firing, it returnsErrAgentInactivityTimeoutfrom the errgroup which becomes the cancellation cause on the producer and consumer contexts viacontext.Cause, exactly like the existingTestRunProducerConsumer_CausePropagationpattern.LocalManagerConfig.AgentInactivityTimeoutandDistributedManagerConfig.AgentInactivityTimeoutfor cluster-mode parity. Heartbeats fromworkqueue.Heartbeaterremain independent.Usage
When the agent stops emitting events for the configured duration, the executor's context is canceled and the task is finalized via the existing failure path. Callers can detect the cause:
Defaults and compatibility
0disables the watcher, preserving prior behavior. Existing users see no change.localManagerand the cluster-modeworkQueueHandlerfor parity.workqueue.Heartbeaterheartbeats are independent — they continue to fire on their own timer, unaffected by the inactivity watcher.Tests
execution_handler_test.go:ErrAgentInactivityTimeoutreturned fromrunProducerConsumerand surfaced ascontext.Causeon the producer ctxactivityTrackingWriter: nil tracker is a no-op, successful write signals, failed write does not signal, non-blocking when signal channel is fullmanager_test.go(TestManager_AgentInactivityTimeout) that constructs alocalManagerwith the option configured, runs a stalledAgentExecutor, and asserts the executor seesErrAgentInactivityTimeoutviacontext.Cause.All existing tests in
internal/taskexec/...anda2asrv/...continue to pass.go vet ./...clean. Full repogo test ./...green.Fixes #78