Skip to content

feat: add WithAgentInactivityTimeout option for terminating stalled a…#315

Merged
yarolegovich merged 3 commits intoa2aproject:mainfrom
ravyg:feat/78-agent-inactivity-timeout
May 4, 2026
Merged

feat: add WithAgentInactivityTimeout option for terminating stalled a…#315
yarolegovich merged 3 commits intoa2aproject:mainfrom
ravyg:feat/78-agent-inactivity-timeout

Conversation

@ravyg
Copy link
Copy Markdown
Contributor

@ravyg ravyg commented Apr 8, 2026

Adds a new RequestHandlerOption that bounds how long the server will wait for the next event from an agent's producer before terminating its execution. The timer is reset whenever the agent writes an event to the event pipe, so steady producers are unaffected.

Approach was discussed and confirmed on the issue: #78 (comment).

Implementation

  • New ErrAgentInactivityTimeout sentinel in internal/taskexec, re-exported from a2asrv so callers can detect the condition with errors.Is.
  • New activityTrackingWriter wraps the producer's eventpipe.Writer and signals on every successful Write. Wrapping is a no-op when the option is not configured, so existing users pay no cost.
  • New watcher goroutine inside runProducerConsumer, started only when the inactivity timeout is positive. On firing, it returns ErrAgentInactivityTimeout from the errgroup which becomes the cancellation cause on the producer and consumer contexts via context.Cause, exactly like the existing TestRunProducerConsumer_CausePropagation pattern.
  • Plumbed through LocalManagerConfig.AgentInactivityTimeout and DistributedManagerConfig.AgentInactivityTimeout for cluster-mode parity. Heartbeats from workqueue.Heartbeater remain independent.

Usage

handler := a2asrv.NewHandler(
    myExecutor,
    a2asrv.WithAgentInactivityTimeout(30 * time.Second),
)

When the agent stops emitting events for the configured duration, the executor's context is canceled and the task is finalized via the existing failure path. Callers can detect the cause:

if errors.Is(err, a2asrv.ErrAgentInactivityTimeout) {
    // handle inactivity termination
}

Defaults and compatibility

  • Default 0 disables the watcher, preserving prior behavior. Existing users see no change.
  • Applies to both localManager and the cluster-mode workQueueHandler for parity.
  • workqueue.Heartbeater heartbeats are independent — they continue to fire on their own timer, unaffected by the inactivity watcher.

Tests

  • 7 unit tests in execution_handler_test.go:
    • Producer stall → ErrAgentInactivityTimeout returned from runProducerConsumer and surfaced as context.Cause on the producer ctx
    • Activity signals reset the timer (producer emits at half the timeout interval, watcher never fires)
    • Zero timeout does not start the watcher goroutine
    • activityTrackingWriter: nil tracker is a no-op, successful write signals, failed write does not signal, non-blocking when signal channel is full
  • 1 end-to-end test in manager_test.go (TestManager_AgentInactivityTimeout) that constructs a localManager with the option configured, runs a stalled AgentExecutor, and asserts the executor sees ErrAgentInactivityTimeout via context.Cause.

All existing tests in internal/taskexec/... and a2asrv/... continue to pass. go vet ./... clean. Full repo go test ./... green.

Fixes #78

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an agent inactivity timeout mechanism to terminate executions when an agent fails to write events within a configured duration. Key changes include the addition of the WithAgentInactivityTimeout option, an activityTrackingWriter to monitor event pipe writes, and a watcher goroutine in the execution handler to manage the timeout logic. I have no feedback to provide.

@yarolegovich
Copy link
Copy Markdown
Member

Thanks for the PR. I had a look last week and on a high-level the changes look good. Want to review these carefully before merging because of the part of the code which changes. Will try my best to finish the review.

Comment thread internal/taskexec/local_manager.go Outdated
// inactivity timeout is not configured both return values are zero, in which
// case [newActivityTrackingWriter] is a no-op and the watcher goroutine is
// not started.
func (m *localManager) inactivityConfig() (*activityTracker, inactivityConfig) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method is duplicated and only uses inactivityTimeout. let's make it package level free function newInactivityTracker. don't mind it being called newInactivityConfig as well, but I guess feels more natural to create a tracker with config as a field which callers will use

Comment thread internal/taskexec/execution_handler.go Outdated
select {
case <-ctx.Done():
return ctx.Err()
case <-inactivity.signal:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd rename to inactivity.writeRecorded or something like that, <-inactivity.signal reads like a signal of inactivity was received

Comment thread internal/taskexec/execution_handler.go Outdated
return ctx.Err()
case <-inactivity.signal:
if !timer.Stop() {
select {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drain is not necessary as of Go 1.23+, from docs:

For a chan-based timer created with NewTimer(d), as of Go 1.23, any receive from t.C after Stop has returned is guaranteed to block rather than receive a stale time value from before the Stop; if the program has not received from t.C already and the timer is running, Stop is guaranteed to return true.

and just a single Reset should be enough:

For a chan-based timer created with NewTimer, as of Go 1.23,
any receive from t.C after Reset has returned is guaranteed not
to receive a time value corresponding to the previous timer settings

<-producerStarted
// Hold the consumer open long enough that, without timer
// resets, the watcher would have fired multiple times.
time.Sleep(200 * time.Millisecond)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's please reduce those sleep times where possible. prompted AI to give lower values, don't vouch for all suggestions, please apply where reasonable:

- Inactivity timeout: 50ms → 20ms (all 3 tests)
- Ticker interval: 25ms → 10ms
- Consumer sleep ("activity resets"): 200ms → 80ms (still 4x the timeout)
- Consumer sleep ("zero timeout"): 20ms → 5ms
- Deadlock guard: 100ms → 50ms

I prefer to start small and increase if flaky. these quickly compound when tests are run with higher -count

ravyg added 2 commits April 29, 2026 12:50
…gents

Adds a new RequestHandlerOption that bounds how long the server will wait
for the next event from an agent's producer before terminating its
execution. The timer is reset whenever the agent writes an event to the
event pipe, so steady producers are unaffected.

Implementation:

- New ErrAgentInactivityTimeout sentinel in internal/taskexec, re-exported
  from a2asrv so callers can detect the condition with errors.Is.
- New activityTrackingWriter wraps the producer's eventpipe.Writer and
  signals on every successful Write. Wrapping is a no-op when the option
  is not configured, so existing users pay no cost.
- New watcher goroutine inside runProducerConsumer, started only when
  the inactivity timeout is positive. On firing, it returns
  ErrAgentInactivityTimeout from the errgroup which becomes the
  cancellation cause on the producer and consumer contexts via
  context.Cause, exactly like the existing TestRunProducerConsumer_CausePropagation
  pattern.
- Plumbed through LocalManagerConfig.AgentInactivityTimeout and
  DistributedManagerConfig.AgentInactivityTimeout for cluster-mode parity.
  Heartbeats from workqueue.Heartbeater remain independent.

Defaults: 0 disables the watcher, preserving prior behavior.

Tests:

- 7 unit tests in execution_handler_test.go covering producer stall ->
  ErrAgentInactivityTimeout, activity signals reset the timer, zero
  timeout does not start watcher, and activityTrackingWriter behavior
  (nil tracker, success, failure, non-blocking signal).
- 1 end-to-end test in manager_test.go that exercises localManager with
  the option configured and asserts the AgentExecutor sees
  ErrAgentInactivityTimeout via context.Cause.

Fixes a2aproject#78
- Collapse activityTracker + inactivityConfig into a single
  inactivityTracker type with a config field, constructed via the
  package-level newInactivityTracker(timeout) function. This removes
  the duplicated inactivityConfig() methods on localManager and
  workQueueHandler.
- Rename inactivityConfig.signal -> writeRecorded so the watcher loop
  reads as "<-cfg.writeRecorded" (a write was recorded) rather than
  the prior "<-cfg.signal" (which read like a signal of inactivity).
- Drop the timer Stop+drain dance. As of Go 1.23 a receive after Stop
  or Reset is guaranteed not to deliver a stale tick, so a single
  timer.Reset(cfg.timeout) is sufficient.
- Reduce sleep/timeout values in the inactivity tests so they cost
  less when run with -count: timeout 50ms -> 20ms, activity ticker
  25ms -> 10ms, hold sleep 200ms -> 80ms (still 4x timeout), zero
  timeout sleep 20ms -> 5ms, deadlock guard 100ms -> 50ms. Verified
  stable under -race -count=20.
@ravyg ravyg force-pushed the feat/78-agent-inactivity-timeout branch from 9430185 to 724c5ce Compare April 29, 2026 20:02
@ravyg
Copy link
Copy Markdown
Contributor Author

ravyg commented Apr 29, 2026

Thanks for the thorough review @yarolegovich! Pushed two commits:

  • Rebased onto latest main (02de8b1); resolved a small conflict in local_manager.go between the new fast-client exiting flag (fix: extra query and fast client rejection in local executor #330) and the activity-tracking writer.
  • refactor(taskexec): address review feedback on inactivity tracker addressing all four comments:
    1. Collapsed activityTracker + inactivityConfig into a single inactivityTracker type with a config field, constructed via package-level newInactivityTracker(timeout). The duplicated inactivityConfig() methods on localManager and workQueueHandler are gone.
    2. Renamed inactivityConfig.signalwriteRecorded so the watcher loop reads naturally (<-cfg.writeRecorded).
    3. Dropped the Stop+drain dance — single timer.Reset(cfg.timeout) per Go 1.23+ semantics, with a comment + docs link.
    4. Reduced test timings to your suggested values. Verified stable under go test ./internal/taskexec -race -count=20.

Full module go test ./... is green. Let me know if anything else.

@yarolegovich yarolegovich merged commit 357bb11 into a2aproject:main May 4, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Add an option for configuring agent inactivity timeouts

2 participants