From 76796c9f4eb11a158540f6673d59873c7e5d3f7d Mon Sep 17 00:00:00 2001 From: Ravish Date: Wed, 8 Apr 2026 10:57:06 -0700 Subject: [PATCH 1/3] feat: add WithAgentInactivityTimeout option for terminating stalled agents Adds a new RequestHandlerOption that bounds how long the server will wait for the next event from an agent's producer before terminating its execution. The timer is reset whenever the agent writes an event to the event pipe, so steady producers are unaffected. Implementation: - New ErrAgentInactivityTimeout sentinel in internal/taskexec, re-exported from a2asrv so callers can detect the condition with errors.Is. - New activityTrackingWriter wraps the producer's eventpipe.Writer and signals on every successful Write. Wrapping is a no-op when the option is not configured, so existing users pay no cost. - New watcher goroutine inside runProducerConsumer, started only when the inactivity timeout is positive. On firing, it returns ErrAgentInactivityTimeout from the errgroup which becomes the cancellation cause on the producer and consumer contexts via context.Cause, exactly like the existing TestRunProducerConsumer_CausePropagation pattern. - Plumbed through LocalManagerConfig.AgentInactivityTimeout and DistributedManagerConfig.AgentInactivityTimeout for cluster-mode parity. Heartbeats from workqueue.Heartbeater remain independent. Defaults: 0 disables the watcher, preserving prior behavior. Tests: - 7 unit tests in execution_handler_test.go covering producer stall -> ErrAgentInactivityTimeout, activity signals reset the timer, zero timeout does not start watcher, and activityTrackingWriter behavior (nil tracker, success, failure, non-blocking signal). - 1 end-to-end test in manager_test.go that exercises localManager with the option configured and asserts the AgentExecutor sees ErrAgentInactivityTimeout via context.Cause. Fixes #78 --- a2asrv/handler.go | 49 +++-- internal/taskexec/api.go | 7 + internal/taskexec/distributed_manager.go | 6 + internal/taskexec/execution_handler.go | 77 ++++++++ internal/taskexec/execution_handler_test.go | 195 +++++++++++++++++++- internal/taskexec/local_manager.go | 56 ++++-- internal/taskexec/manager_test.go | 57 +++++- internal/taskexec/work_queue_handler.go | 44 +++-- 8 files changed, 449 insertions(+), 42 deletions(-) diff --git a/a2asrv/handler.go b/a2asrv/handler.go index 9df75c12..eedf3b4d 100644 --- a/a2asrv/handler.go +++ b/a2asrv/handler.go @@ -19,6 +19,7 @@ import ( "fmt" "iter" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -30,6 +31,12 @@ import ( "github.com/a2aproject/a2a-go/v2/log" ) +// ErrAgentInactivityTimeout is the cancellation cause set on an agent's +// execution context when no events are written to the event pipe for the +// duration configured via [WithAgentInactivityTimeout]. Callers can detect +// this condition with errors.Is. +var ErrAgentInactivityTimeout = taskexec.ErrAgentInactivityTimeout + // RequestHandler defines a transport-agnostic interface for handling incoming A2A requests. type RequestHandler interface { // GetTask handles the 'GetTask' protocol method. @@ -81,6 +88,8 @@ type defaultRequestHandler struct { workQueue workqueue.Queue reqContextInterceptors []ExecutorContextInterceptor + agentInactivityTimeout time.Duration + authenticatedCardProducer ExtendedAgentCardProducer capabilities *a2a.AgentCapabilities } @@ -122,6 +131,22 @@ func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption { } } +// WithAgentInactivityTimeout configures the maximum time the server will wait +// for the next event from an agent's producer before terminating its execution. +// The timer is reset whenever the agent writes an event to the event pipe. +// When the timeout fires the producer and consumer contexts are canceled with +// [ErrAgentInactivityTimeout] as the cause, and the task is finalized via the +// existing failure path. +// +// A value of 0 (default) or negative disables the inactivity watcher and +// preserves prior behavior. The timeout applies in both single-process and +// cluster modes. +func WithAgentInactivityTimeout(d time.Duration) RequestHandlerOption { + return func(ih *InterceptedHandler, h *defaultRequestHandler) { + h.agentInactivityTimeout = d + } +} + // WithConcurrencyConfig allows to set limits on the number of concurrent executions. func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption { return func(ih *InterceptedHandler, h *defaultRequestHandler) { @@ -186,12 +211,13 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request panic("TaskStore and QueueManager must be provided for cluster mode") } h.execManager = taskexec.NewDistributedManager(taskexec.DistributedManagerConfig{ - WorkQueue: h.workQueue, - TaskStore: h.taskStore, - QueueManager: h.queueManager, - ConcurrencyConfig: h.concurrencyConfig, - Factory: execFactory, - PanicHandler: h.panicHandler, + WorkQueue: h.workQueue, + TaskStore: h.taskStore, + QueueManager: h.queueManager, + ConcurrencyConfig: h.concurrencyConfig, + Factory: execFactory, + PanicHandler: h.panicHandler, + AgentInactivityTimeout: h.agentInactivityTimeout, }) } else { if h.queueManager == nil { @@ -204,11 +230,12 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request execFactory.taskStore = h.taskStore } h.execManager = taskexec.NewLocalManager(taskexec.LocalManagerConfig{ - QueueManager: h.queueManager, - ConcurrencyConfig: h.concurrencyConfig, - Factory: execFactory, - TaskStore: h.taskStore, - PanicHandler: h.panicHandler, + QueueManager: h.queueManager, + ConcurrencyConfig: h.concurrencyConfig, + Factory: execFactory, + TaskStore: h.taskStore, + PanicHandler: h.panicHandler, + AgentInactivityTimeout: h.agentInactivityTimeout, }) } diff --git a/internal/taskexec/api.go b/internal/taskexec/api.go index 8ac5c574..5c18cf96 100644 --- a/internal/taskexec/api.go +++ b/internal/taskexec/api.go @@ -16,6 +16,7 @@ package taskexec import ( "context" + "errors" "iter" "github.com/a2aproject/a2a-go/v2/a2a" @@ -23,6 +24,12 @@ import ( "github.com/a2aproject/a2a-go/v2/internal/eventpipe" ) +// ErrAgentInactivityTimeout is the cancellation cause set on the producer +// context when an agent fails to write any events to the event pipe within +// the configured inactivity timeout window. Callers can check for this +// condition with errors.Is. +var ErrAgentInactivityTimeout = errors.New("agent inactivity timeout") + // Manager provides an API for executing and canceling tasks. type Manager interface { // Resubscribe is used to resubscribe to events of an active execution. diff --git a/internal/taskexec/distributed_manager.go b/internal/taskexec/distributed_manager.go index adf9715c..118ed393 100644 --- a/internal/taskexec/distributed_manager.go +++ b/internal/taskexec/distributed_manager.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -38,6 +39,11 @@ type DistributedManagerConfig struct { ConcurrencyConfig limiter.ConcurrencyConfig Logger *slog.Logger PanicHandler PanicHandlerFn + // AgentInactivityTimeout, if positive, terminates an execution when the + // agent's producer has not written any events to the pipe for the + // configured duration. The terminating cause is [ErrAgentInactivityTimeout]. + // A value of 0 disables the watcher, preserving prior behavior. + AgentInactivityTimeout time.Duration } type distributedManager struct { diff --git a/internal/taskexec/execution_handler.go b/internal/taskexec/execution_handler.go index a99f988b..5a986f9a 100644 --- a/internal/taskexec/execution_handler.go +++ b/internal/taskexec/execution_handler.go @@ -82,6 +82,53 @@ func (h *executionHandler) processEvents(ctx context.Context) (a2a.SendMessageRe type eventProducerFn func(context.Context) error type eventConsumerFn func(context.Context) (a2a.SendMessageResult, error) +// activityTracker is wired into the producer's event pipe via +// [newActivityTrackingWriter] so that successful writes signal the inactivity +// watcher started in [runProducerConsumer]. A nil tracker disables tracking. +type activityTracker struct { + signal chan<- struct{} +} + +func (t *activityTracker) record() { + if t == nil { + return + } + select { + case t.signal <- struct{}{}: + default: // non-blocking; the watcher only needs an "activity happened" hint + } +} + +// newActivityTrackingWriter wraps an [eventpipe.Writer] so each successful +// write signals the provided tracker. A nil tracker returns the writer +// unchanged so callers without an inactivity timeout configured pay no cost. +func newActivityTrackingWriter(inner eventpipe.Writer, tracker *activityTracker) eventpipe.Writer { + if tracker == nil { + return inner + } + return &activityTrackingWriter{inner: inner, tracker: tracker} +} + +type activityTrackingWriter struct { + inner eventpipe.Writer + tracker *activityTracker +} + +func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) error { + if err := w.inner.Write(ctx, event); err != nil { + return err + } + w.tracker.record() + return nil +} + +// inactivityConfig configures the inactivity watcher started by +// [runProducerConsumer]. A zero or negative timeout disables the watcher. +type inactivityConfig struct { + timeout time.Duration + signal <-chan struct{} +} + // runProducerConsumer starts producer and consumer goroutines in an error group and waits // for both of them to finish or one of them to fail. If both complete successfuly and consumer produces a result, // the result is returned, otherwise an error is returned. @@ -91,9 +138,39 @@ func runProducerConsumer( consumer eventConsumerFn, heartbeater workqueue.Heartbeater, panicHandler PanicHandlerFn, + inactivity inactivityConfig, ) (a2a.SendMessageResult, error) { group, ctx := errgroup.WithContext(ctx) + // errgroup already wraps its derived context with [context.WithCancelCause] + // internally and uses the goroutine's returned error as the cancellation + // cause. Returning [ErrAgentInactivityTimeout] from the watcher is enough + // to surface it via [context.Cause] on the producer and consumer ctx, and + // the existing TestRunProducerConsumer_CausePropagation regression test + // covers the same pattern. + if inactivity.timeout > 0 && inactivity.signal != nil { + group.Go(func() error { + timer := time.NewTimer(inactivity.timeout) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-inactivity.signal: + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(inactivity.timeout) + case <-timer.C: + return ErrAgentInactivityTimeout + } + } + }) + } + if heartbeater != nil { group.Go(func() error { timer := time.NewTicker(heartbeater.HeartbeatInterval()) diff --git a/internal/taskexec/execution_handler_test.go b/internal/taskexec/execution_handler_test.go index d920c81d..f59c22eb 100644 --- a/internal/taskexec/execution_handler_test.go +++ b/internal/taskexec/execution_handler_test.go @@ -16,9 +16,11 @@ package taskexec import ( "context" + "errors" "fmt" "strings" "testing" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/taskstore" @@ -122,7 +124,7 @@ func TestRunProducerConsumer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler) + result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler, inactivityConfig{}) if tc.wantErr != nil && err == nil { t.Fatalf("expected error, got %v", result) } @@ -153,8 +155,199 @@ func TestRunProducerConsumer_CausePropagation(t *testing.T) { }, nil, nil, + inactivityConfig{}, ) if gotProducerErr != consumerErr { t.Fatalf("expected producer error = %s, got %s", consumerErr, gotProducerErr) } } + +// TestRunProducerConsumer_InactivityTimeout exercises the inactivity watcher +// added for issue #78. It uses a tight timeout and a synthetic activity +// channel to keep the test fast and deterministic without sleeping for the +// real timeout window. +func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { + t.Parallel() + + t.Run("producer stalls without writing events", func(t *testing.T) { + t.Parallel() + signal := make(chan struct{}, 1) + var producerCause error + + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + <-ctx.Done() + producerCause = context.Cause(ctx) + return ctx.Err() + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + nil, + nil, + inactivityConfig{ + timeout: 50 * time.Millisecond, + signal: signal, + }, + ) + if !errors.Is(err, ErrAgentInactivityTimeout) { + t.Fatalf("runProducerConsumer() error = %v, want errors.Is(_, ErrAgentInactivityTimeout)", err) + } + if !errors.Is(producerCause, ErrAgentInactivityTimeout) { + t.Fatalf("context.Cause(producerCtx) = %v, want errors.Is(_, ErrAgentInactivityTimeout)", producerCause) + } + }) + + t.Run("activity signals reset the timer", func(t *testing.T) { + t.Parallel() + signal := make(chan struct{}, 1) + + producerStarted := make(chan struct{}) + releaseProducer := make(chan struct{}) + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + close(producerStarted) + // Send activity signals at half the timeout interval, so the + // watcher's timer keeps getting reset and never fires. + ticker := time.NewTicker(25 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-releaseProducer: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + select { + case signal <- struct{}{}: + default: + } + } + } + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + <-producerStarted + // Hold the consumer open long enough that, without timer + // resets, the watcher would have fired multiple times. + time.Sleep(200 * time.Millisecond) + close(releaseProducer) + return a2a.NewMessage(a2a.MessageRoleUser), nil + }, + nil, + nil, + inactivityConfig{ + timeout: 50 * time.Millisecond, + signal: signal, + }, + ) + if err != nil { + t.Fatalf("runProducerConsumer() error = %v, want nil (timer should have been reset)", err) + } + }) + + t.Run("zero timeout does not start watcher", func(t *testing.T) { + t.Parallel() + // With timeout=0 the watcher must not be started at all. We verify + // by running a long-lived producer and asserting completion is + // driven only by the consumer's result. + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + time.Sleep(20 * time.Millisecond) + return a2a.NewMessage(a2a.MessageRoleUser), nil + }, + nil, + nil, + inactivityConfig{}, + ) + if err != nil { + t.Fatalf("runProducerConsumer() error = %v, want nil", err) + } + }) +} + +// TestActivityTrackingWriter ensures the writer wrapper signals on +// successful writes only and is non-blocking when the signal channel is +// full (the watcher only needs the "activity happened" hint). +func TestActivityTrackingWriter(t *testing.T) { + t.Parallel() + + t.Run("nil tracker returns inner writer unchanged", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + got := newActivityTrackingWriter(inner, nil) + if got != inner { + t.Fatalf("newActivityTrackingWriter(_, nil) = %v, want inner writer", got) + } + }) + + t.Run("successful write signals tracker", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + signal := make(chan struct{}, 1) + w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + + if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err != nil { + t.Fatalf("Write() error = %v, want nil", err) + } + select { + case <-signal: + default: + t.Fatalf("expected signal after successful Write") + } + }) + + t.Run("failed write does not signal", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{err: errors.New("boom")} + signal := make(chan struct{}, 1) + w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + + if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err == nil { + t.Fatalf("Write() error = nil, want non-nil") + } + select { + case <-signal: + t.Fatalf("expected no signal after failed Write") + default: + } + }) + + t.Run("signal is non-blocking when channel is full", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + signal := make(chan struct{}, 1) + // Pre-fill the channel so a non-blocking send must drop the new signal. + signal <- struct{}{} + w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + + // Should return without blocking even though the channel is full. + done := make(chan error, 1) + go func() { + done <- w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)) + }() + select { + case err := <-done: + if err != nil { + t.Fatalf("Write() error = %v, want nil", err) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("Write() blocked when signal channel was full") + } + }) +} + +type fakeWriter struct { + err error +} + +func (w *fakeWriter) Write(_ context.Context, _ a2a.Event) error { + return w.err +} diff --git a/internal/taskexec/local_manager.go b/internal/taskexec/local_manager.go index edebf2f9..888b0a86 100644 --- a/internal/taskexec/local_manager.go +++ b/internal/taskexec/local_manager.go @@ -21,6 +21,7 @@ import ( "runtime/debug" "sync" "sync/atomic" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -50,10 +51,11 @@ var ( // Both cancelations and executions are started in detached context and run until completion. // The type is suitable only for single-process execution management. type localManager struct { - queueManager eventqueue.Manager - factory Factory - store taskstore.Store - panicHandler PanicHandlerFn + queueManager eventqueue.Manager + factory Factory + store taskstore.Store + panicHandler PanicHandlerFn + inactivityTimeout time.Duration mu sync.Mutex executions map[a2a.TaskID]*localExecution @@ -89,18 +91,24 @@ type LocalManagerConfig struct { Factory Factory TaskStore taskstore.Store PanicHandler PanicHandlerFn + // AgentInactivityTimeout, if positive, terminates an execution when the + // agent's producer has not written any events to the pipe for the + // configured duration. The terminating cause is [ErrAgentInactivityTimeout]. + // A value of 0 disables the watcher, preserving prior behavior. + AgentInactivityTimeout time.Duration } // NewLocalManager is a [localManager] constructor function. func NewLocalManager(cfg LocalManagerConfig) Manager { manager := &localManager{ - queueManager: cfg.QueueManager, - factory: cfg.Factory, - store: cfg.TaskStore, - panicHandler: cfg.PanicHandler, - limiter: newConcurrencyLimiter(cfg.ConcurrencyConfig), - executions: make(map[a2a.TaskID]*localExecution), - cancelations: make(map[a2a.TaskID]*cancelation), + queueManager: cfg.QueueManager, + factory: cfg.Factory, + store: cfg.TaskStore, + panicHandler: cfg.PanicHandler, + inactivityTimeout: cfg.AgentInactivityTimeout, + limiter: newConcurrencyLimiter(cfg.ConcurrencyConfig), + executions: make(map[a2a.TaskID]*localExecution), + cancelations: make(map[a2a.TaskID]*cancelation), } if manager.queueManager == nil { manager.queueManager = eventqueue.NewInMemoryManager() @@ -108,6 +116,22 @@ func NewLocalManager(cfg LocalManagerConfig) Manager { return manager } +// inactivityConfig builds the watcher configuration and the matching activity +// tracker that should be used to wrap the producer's writer. When the +// inactivity timeout is not configured both return values are zero, in which +// case [newActivityTrackingWriter] is a no-op and the watcher goroutine is +// not started. +func (m *localManager) inactivityConfig() (*activityTracker, inactivityConfig) { + if m.inactivityTimeout <= 0 { + return nil, inactivityConfig{} + } + signal := make(chan struct{}, 1) + return &activityTracker{signal: signal}, inactivityConfig{ + timeout: m.inactivityTimeout, + signal: signal, + } +} + func newCancelation(req *a2a.CancelTaskRequest) *cancelation { return &cancelation{req: req, result: newPromise()} } @@ -283,12 +307,15 @@ func (m *localManager) handleExecution(ctx context.Context, execution *localExec }, handleErrorFn: processor.ProcessError, } + tracker, inactivity := m.inactivityConfig() + producerWriter := newActivityTrackingWriter(execution.pipe.Writer, tracker) result, err := runProducerConsumer( ctx, - func(ctx context.Context) error { return executor.Execute(ctx, execution.pipe.Writer) }, + func(ctx context.Context) error { return executor.Execute(ctx, producerWriter) }, handler.processEvents, nil, m.panicHandler, + inactivity, ) cleaner.Cleanup(ctx, result, err) @@ -327,12 +354,15 @@ func (m *localManager) handleCancel(ctx context.Context, cancel *cancelation) { handleEventFn: processor.Process, handleErrorFn: func(ctx context.Context, err error) (a2a.SendMessageResult, error) { return nil, err }, } + tracker, inactivity := m.inactivityConfig() + producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) result, err := runProducerConsumer( ctx, - func(ctx context.Context) error { return canceler.Cancel(ctx, pipe.Writer) }, + func(ctx context.Context) error { return canceler.Cancel(ctx, producerWriter) }, handler.processEvents, nil, m.panicHandler, + inactivity, ) cleaner.Cleanup(ctx, result, err) diff --git a/internal/taskexec/manager_test.go b/internal/taskexec/manager_test.go index 17bcca4d..67b83d42 100644 --- a/internal/taskexec/manager_test.go +++ b/internal/taskexec/manager_test.go @@ -128,12 +128,13 @@ func (e *testProcessor) ProcessError(ctx context.Context, err error) (a2a.SendMe type testExecutor struct { *testProcessor - executeCalled chan struct{} - executeErr error - queue eventpipe.Writer - contextCanceled bool - block chan struct{} - emitTask *a2a.Task + executeCalled chan struct{} + executeErr error + queue eventpipe.Writer + contextCanceled bool + contextCauseObserver func(error) + block chan struct{} + emitTask *a2a.Task } var _ Executor = (*testExecutor)(nil) @@ -151,6 +152,9 @@ func (e *testExecutor) Execute(ctx context.Context, queue eventpipe.Writer) erro case <-e.block: case <-ctx.Done(): e.contextCanceled = true + if e.contextCauseObserver != nil { + e.contextCauseObserver(context.Cause(ctx)) + } return ctx.Err() } } @@ -826,3 +830,44 @@ func TestManager_GetExecution(t *testing.T) { t.Fatal("manager.Resubscribe() succeeded for finished execution, want error") } } + +// TestManager_AgentInactivityTimeout verifies that a localManager configured +// with AgentInactivityTimeout terminates an executor that stalls without +// writing any events. The executor's context must be canceled with +// [ErrAgentInactivityTimeout] as the cause. Regression test for #78. +func TestManager_AgentInactivityTimeout(t *testing.T) { + t.Parallel() + ctx := t.Context() + + executor := newExecutor() + executor.block = make(chan struct{}) + defer close(executor.block) + + var executorCause error + executor.contextCauseObserver = func(c error) { executorCause = c } + + manager := NewLocalManager(LocalManagerConfig{ + Factory: newStaticFactory(executor, nil), + TaskStore: testutil.NewTestTaskStore(), + AgentInactivityTimeout: 50 * time.Millisecond, + }) + + subscription, err := manager.Execute(ctx, &a2a.SendMessageRequest{Message: a2a.NewMessage(a2a.MessageRoleUser)}) + if err != nil { + t.Fatalf("manager.Execute() error = %v, want nil", err) + } + executionResult, _ := consumeEvents(t, subscription) + + // Wait for the executor to be invoked. + <-executor.executeCalled + // The executor blocks without writing events. The watcher should + // fire after AgentInactivityTimeout and cancel the executor's ctx. + <-executionResult + + if !executor.contextCanceled { + t.Fatalf("executor.contextCanceled = false, want true") + } + if !errors.Is(executorCause, ErrAgentInactivityTimeout) { + t.Fatalf("context.Cause(executorCtx) = %v, want errors.Is(_, ErrAgentInactivityTimeout)", executorCause) + } +} diff --git a/internal/taskexec/work_queue_handler.go b/internal/taskexec/work_queue_handler.go index fb0cf887..199d6ea7 100644 --- a/internal/taskexec/work_queue_handler.go +++ b/internal/taskexec/work_queue_handler.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -28,18 +29,20 @@ import ( ) type workQueueHandler struct { - queueManager eventqueue.Manager - taskStore taskstore.Store - factory Factory - panicHandler PanicHandlerFn + queueManager eventqueue.Manager + taskStore taskstore.Store + factory Factory + panicHandler PanicHandlerFn + inactivityTimeout time.Duration } func newWorkQueueHandler(cfg DistributedManagerConfig) *workQueueHandler { backend := &workQueueHandler{ - queueManager: cfg.QueueManager, - taskStore: cfg.TaskStore, - factory: cfg.Factory, - panicHandler: cfg.PanicHandler, + queueManager: cfg.QueueManager, + taskStore: cfg.TaskStore, + factory: cfg.Factory, + panicHandler: cfg.PanicHandler, + inactivityTimeout: cfg.AgentInactivityTimeout, } if cfg.Logger == nil { cfg.Logger = slog.Default() @@ -60,6 +63,9 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa pipe := eventpipe.NewLocal() defer pipe.Close() + tracker, inactivity := b.inactivityConfig() + producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) + var eventProducer eventProducerFn var eventProcessor Processor var cleaner Cleaner @@ -73,7 +79,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa if err != nil { return nil, fmt.Errorf("executor setup failed: %w", err) } - eventProducer = func(ctx context.Context) error { return executor.Execute(ctx, pipe.Writer) } + eventProducer = func(ctx context.Context) error { return executor.Execute(ctx, producerWriter) } eventProcessor = processor cleaner = localCleaner @@ -85,7 +91,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa if err != nil { return nil, fmt.Errorf("canceler setup failed: %w", err) } - eventProducer = func(ctx context.Context) error { return canceler.Cancel(ctx, pipe.Writer) } + eventProducer = func(ctx context.Context) error { return canceler.Cancel(ctx, producerWriter) } eventProcessor = processor cleaner = localCleaner @@ -117,7 +123,23 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa heartbeater = hb } - result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler) + result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler, inactivity) cleaner.Cleanup(ctx, result, err) return result, err } + +// inactivityConfig builds the watcher configuration for [runProducerConsumer] +// and the matching activity tracker that should wrap the producer's writer. +// Returns a nil tracker and zero config when the timeout is not configured, +// in which case the watcher goroutine is not started and the writer wrapping +// is a no-op. +func (b *workQueueHandler) inactivityConfig() (*activityTracker, inactivityConfig) { + if b.inactivityTimeout <= 0 { + return nil, inactivityConfig{} + } + signal := make(chan struct{}, 1) + return &activityTracker{signal: signal}, inactivityConfig{ + timeout: b.inactivityTimeout, + signal: signal, + } +} From 724c5ce08f7f6668c30cf20e1d06f81b5a89f364 Mon Sep 17 00:00:00 2001 From: Ravish Date: Wed, 29 Apr 2026 13:02:14 -0700 Subject: [PATCH 2/3] refactor(taskexec): address review feedback on inactivity tracker - Collapse activityTracker + inactivityConfig into a single inactivityTracker type with a config field, constructed via the package-level newInactivityTracker(timeout) function. This removes the duplicated inactivityConfig() methods on localManager and workQueueHandler. - Rename inactivityConfig.signal -> writeRecorded so the watcher loop reads as "<-cfg.writeRecorded" (a write was recorded) rather than the prior "<-cfg.signal" (which read like a signal of inactivity). - Drop the timer Stop+drain dance. As of Go 1.23 a receive after Stop or Reset is guaranteed not to deliver a stale tick, so a single timer.Reset(cfg.timeout) is sufficient. - Reduce sleep/timeout values in the inactivity tests so they cost less when run with -count: timeout 50ms -> 20ms, activity ticker 25ms -> 10ms, hold sleep 200ms -> 80ms (still 4x timeout), zero timeout sleep 20ms -> 5ms, deadlock guard 100ms -> 50ms. Verified stable under -race -count=20. --- internal/taskexec/execution_handler.go | 64 +++++++++++++-------- internal/taskexec/execution_handler_test.go | 63 +++++++++----------- internal/taskexec/local_manager.go | 24 ++------ internal/taskexec/work_queue_handler.go | 20 +------ 4 files changed, 75 insertions(+), 96 deletions(-) diff --git a/internal/taskexec/execution_handler.go b/internal/taskexec/execution_handler.go index 5a986f9a..b15bb49b 100644 --- a/internal/taskexec/execution_handler.go +++ b/internal/taskexec/execution_handler.go @@ -82,27 +82,46 @@ func (h *executionHandler) processEvents(ctx context.Context) (a2a.SendMessageRe type eventProducerFn func(context.Context) error type eventConsumerFn func(context.Context) (a2a.SendMessageResult, error) -// activityTracker is wired into the producer's event pipe via -// [newActivityTrackingWriter] so that successful writes signal the inactivity -// watcher started in [runProducerConsumer]. A nil tracker disables tracking. -type activityTracker struct { - signal chan<- struct{} +// inactivityTracker bundles the inactivity watcher's configuration with the +// activity-recording channel. Callers obtain one via [newInactivityTracker] +// (returns nil when disabled), wrap the producer's writer with +// [newActivityTrackingWriter], and pass the tracker to [runProducerConsumer] +// to start the watcher. A nil tracker disables tracking end-to-end: the writer +// wrapper is a no-op and the watcher goroutine is not started. +type inactivityTracker struct { + config inactivityConfig + writeRecorded chan struct{} } -func (t *activityTracker) record() { +// newInactivityTracker returns a tracker for the given timeout. A non-positive +// timeout returns nil, disabling inactivity tracking. +func newInactivityTracker(timeout time.Duration) *inactivityTracker { + if timeout <= 0 { + return nil + } + signal := make(chan struct{}, 1) + return &inactivityTracker{ + config: inactivityConfig{timeout: timeout, writeRecorded: signal}, + writeRecorded: signal, + } +} + +// record signals that a producer write just succeeded. Non-blocking: the +// watcher only needs an "activity happened" hint, so a full channel is fine. +func (t *inactivityTracker) record() { if t == nil { return } select { - case t.signal <- struct{}{}: - default: // non-blocking; the watcher only needs an "activity happened" hint + case t.writeRecorded <- struct{}{}: + default: } } // newActivityTrackingWriter wraps an [eventpipe.Writer] so each successful // write signals the provided tracker. A nil tracker returns the writer // unchanged so callers without an inactivity timeout configured pay no cost. -func newActivityTrackingWriter(inner eventpipe.Writer, tracker *activityTracker) eventpipe.Writer { +func newActivityTrackingWriter(inner eventpipe.Writer, tracker *inactivityTracker) eventpipe.Writer { if tracker == nil { return inner } @@ -111,7 +130,7 @@ func newActivityTrackingWriter(inner eventpipe.Writer, tracker *activityTracker) type activityTrackingWriter struct { inner eventpipe.Writer - tracker *activityTracker + tracker *inactivityTracker } func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) error { @@ -125,8 +144,8 @@ func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) err // inactivityConfig configures the inactivity watcher started by // [runProducerConsumer]. A zero or negative timeout disables the watcher. type inactivityConfig struct { - timeout time.Duration - signal <-chan struct{} + timeout time.Duration + writeRecorded <-chan struct{} } // runProducerConsumer starts producer and consumer goroutines in an error group and waits @@ -138,7 +157,7 @@ func runProducerConsumer( consumer eventConsumerFn, heartbeater workqueue.Heartbeater, panicHandler PanicHandlerFn, - inactivity inactivityConfig, + inactivity *inactivityTracker, ) (a2a.SendMessageResult, error) { group, ctx := errgroup.WithContext(ctx) @@ -148,22 +167,21 @@ func runProducerConsumer( // to surface it via [context.Cause] on the producer and consumer ctx, and // the existing TestRunProducerConsumer_CausePropagation regression test // covers the same pattern. - if inactivity.timeout > 0 && inactivity.signal != nil { + if inactivity != nil && inactivity.config.timeout > 0 && inactivity.config.writeRecorded != nil { + cfg := inactivity.config group.Go(func() error { - timer := time.NewTimer(inactivity.timeout) + // As of Go 1.23 a single Reset is sufficient: receives from t.C + // after Stop or Reset are guaranteed not to deliver a stale tick, + // so the previous Stop+drain dance is no longer required. + // See https://pkg.go.dev/time#Timer.Reset. + timer := time.NewTimer(cfg.timeout) defer timer.Stop() for { select { case <-ctx.Done(): return ctx.Err() - case <-inactivity.signal: - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } - timer.Reset(inactivity.timeout) + case <-cfg.writeRecorded: + timer.Reset(cfg.timeout) case <-timer.C: return ErrAgentInactivityTimeout } diff --git a/internal/taskexec/execution_handler_test.go b/internal/taskexec/execution_handler_test.go index f59c22eb..6426dacc 100644 --- a/internal/taskexec/execution_handler_test.go +++ b/internal/taskexec/execution_handler_test.go @@ -124,7 +124,7 @@ func TestRunProducerConsumer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler, inactivityConfig{}) + result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler, nil) if tc.wantErr != nil && err == nil { t.Fatalf("expected error, got %v", result) } @@ -155,7 +155,7 @@ func TestRunProducerConsumer_CausePropagation(t *testing.T) { }, nil, nil, - inactivityConfig{}, + nil, ) if gotProducerErr != consumerErr { t.Fatalf("expected producer error = %s, got %s", consumerErr, gotProducerErr) @@ -171,7 +171,7 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { t.Run("producer stalls without writing events", func(t *testing.T) { t.Parallel() - signal := make(chan struct{}, 1) + tracker := newInactivityTracker(20 * time.Millisecond) var producerCause error _, err := runProducerConsumer( @@ -187,10 +187,7 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { }, nil, nil, - inactivityConfig{ - timeout: 50 * time.Millisecond, - signal: signal, - }, + tracker, ) if !errors.Is(err, ErrAgentInactivityTimeout) { t.Fatalf("runProducerConsumer() error = %v, want errors.Is(_, ErrAgentInactivityTimeout)", err) @@ -202,7 +199,7 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { t.Run("activity signals reset the timer", func(t *testing.T) { t.Parallel() - signal := make(chan struct{}, 1) + tracker := newInactivityTracker(20 * time.Millisecond) producerStarted := make(chan struct{}) releaseProducer := make(chan struct{}) @@ -212,7 +209,7 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { close(producerStarted) // Send activity signals at half the timeout interval, so the // watcher's timer keeps getting reset and never fires. - ticker := time.NewTicker(25 * time.Millisecond) + ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for { select { @@ -221,27 +218,22 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - select { - case signal <- struct{}{}: - default: - } + tracker.record() } } }, func(ctx context.Context) (a2a.SendMessageResult, error) { <-producerStarted - // Hold the consumer open long enough that, without timer - // resets, the watcher would have fired multiple times. - time.Sleep(200 * time.Millisecond) + // Hold the consumer open long enough (4x the timeout) that, + // without timer resets, the watcher would have fired + // multiple times. + time.Sleep(80 * time.Millisecond) close(releaseProducer) return a2a.NewMessage(a2a.MessageRoleUser), nil }, nil, nil, - inactivityConfig{ - timeout: 50 * time.Millisecond, - signal: signal, - }, + tracker, ) if err != nil { t.Fatalf("runProducerConsumer() error = %v, want nil (timer should have been reset)", err) @@ -250,9 +242,10 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { t.Run("zero timeout does not start watcher", func(t *testing.T) { t.Parallel() - // With timeout=0 the watcher must not be started at all. We verify - // by running a long-lived producer and asserting completion is - // driven only by the consumer's result. + // With timeout=0 newInactivityTracker returns nil, so the watcher + // must not be started at all. We verify by running a long-lived + // producer and asserting completion is driven only by the + // consumer's result. _, err := runProducerConsumer( t.Context(), func(ctx context.Context) error { @@ -260,12 +253,12 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { return ctx.Err() }, func(ctx context.Context) (a2a.SendMessageResult, error) { - time.Sleep(20 * time.Millisecond) + time.Sleep(5 * time.Millisecond) return a2a.NewMessage(a2a.MessageRoleUser), nil }, nil, nil, - inactivityConfig{}, + newInactivityTracker(0), ) if err != nil { t.Fatalf("runProducerConsumer() error = %v, want nil", err) @@ -291,14 +284,14 @@ func TestActivityTrackingWriter(t *testing.T) { t.Run("successful write signals tracker", func(t *testing.T) { t.Parallel() inner := &fakeWriter{} - signal := make(chan struct{}, 1) - w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + tracker := newInactivityTracker(time.Hour) // any positive timeout + w := newActivityTrackingWriter(inner, tracker) if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err != nil { t.Fatalf("Write() error = %v, want nil", err) } select { - case <-signal: + case <-tracker.config.writeRecorded: default: t.Fatalf("expected signal after successful Write") } @@ -307,14 +300,14 @@ func TestActivityTrackingWriter(t *testing.T) { t.Run("failed write does not signal", func(t *testing.T) { t.Parallel() inner := &fakeWriter{err: errors.New("boom")} - signal := make(chan struct{}, 1) - w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + tracker := newInactivityTracker(time.Hour) + w := newActivityTrackingWriter(inner, tracker) if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err == nil { t.Fatalf("Write() error = nil, want non-nil") } select { - case <-signal: + case <-tracker.config.writeRecorded: t.Fatalf("expected no signal after failed Write") default: } @@ -323,10 +316,10 @@ func TestActivityTrackingWriter(t *testing.T) { t.Run("signal is non-blocking when channel is full", func(t *testing.T) { t.Parallel() inner := &fakeWriter{} - signal := make(chan struct{}, 1) + tracker := newInactivityTracker(time.Hour) // Pre-fill the channel so a non-blocking send must drop the new signal. - signal <- struct{}{} - w := newActivityTrackingWriter(inner, &activityTracker{signal: signal}) + tracker.writeRecorded <- struct{}{} + w := newActivityTrackingWriter(inner, tracker) // Should return without blocking even though the channel is full. done := make(chan error, 1) @@ -338,7 +331,7 @@ func TestActivityTrackingWriter(t *testing.T) { if err != nil { t.Fatalf("Write() error = %v, want nil", err) } - case <-time.After(100 * time.Millisecond): + case <-time.After(50 * time.Millisecond): t.Fatalf("Write() blocked when signal channel was full") } }) diff --git a/internal/taskexec/local_manager.go b/internal/taskexec/local_manager.go index 888b0a86..4a371f0b 100644 --- a/internal/taskexec/local_manager.go +++ b/internal/taskexec/local_manager.go @@ -116,22 +116,6 @@ func NewLocalManager(cfg LocalManagerConfig) Manager { return manager } -// inactivityConfig builds the watcher configuration and the matching activity -// tracker that should be used to wrap the producer's writer. When the -// inactivity timeout is not configured both return values are zero, in which -// case [newActivityTrackingWriter] is a no-op and the watcher goroutine is -// not started. -func (m *localManager) inactivityConfig() (*activityTracker, inactivityConfig) { - if m.inactivityTimeout <= 0 { - return nil, inactivityConfig{} - } - signal := make(chan struct{}, 1) - return &activityTracker{signal: signal}, inactivityConfig{ - timeout: m.inactivityTimeout, - signal: signal, - } -} - func newCancelation(req *a2a.CancelTaskRequest) *cancelation { return &cancelation{req: req, result: newPromise()} } @@ -307,7 +291,7 @@ func (m *localManager) handleExecution(ctx context.Context, execution *localExec }, handleErrorFn: processor.ProcessError, } - tracker, inactivity := m.inactivityConfig() + tracker := newInactivityTracker(m.inactivityTimeout) producerWriter := newActivityTrackingWriter(execution.pipe.Writer, tracker) result, err := runProducerConsumer( ctx, @@ -315,7 +299,7 @@ func (m *localManager) handleExecution(ctx context.Context, execution *localExec handler.processEvents, nil, m.panicHandler, - inactivity, + tracker, ) cleaner.Cleanup(ctx, result, err) @@ -354,7 +338,7 @@ func (m *localManager) handleCancel(ctx context.Context, cancel *cancelation) { handleEventFn: processor.Process, handleErrorFn: func(ctx context.Context, err error) (a2a.SendMessageResult, error) { return nil, err }, } - tracker, inactivity := m.inactivityConfig() + tracker := newInactivityTracker(m.inactivityTimeout) producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) result, err := runProducerConsumer( ctx, @@ -362,7 +346,7 @@ func (m *localManager) handleCancel(ctx context.Context, cancel *cancelation) { handler.processEvents, nil, m.panicHandler, - inactivity, + tracker, ) cleaner.Cleanup(ctx, result, err) diff --git a/internal/taskexec/work_queue_handler.go b/internal/taskexec/work_queue_handler.go index 199d6ea7..c09e9ce5 100644 --- a/internal/taskexec/work_queue_handler.go +++ b/internal/taskexec/work_queue_handler.go @@ -63,7 +63,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa pipe := eventpipe.NewLocal() defer pipe.Close() - tracker, inactivity := b.inactivityConfig() + tracker := newInactivityTracker(b.inactivityTimeout) producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) var eventProducer eventProducerFn @@ -123,23 +123,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa heartbeater = hb } - result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler, inactivity) + result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler, tracker) cleaner.Cleanup(ctx, result, err) return result, err } - -// inactivityConfig builds the watcher configuration for [runProducerConsumer] -// and the matching activity tracker that should wrap the producer's writer. -// Returns a nil tracker and zero config when the timeout is not configured, -// in which case the watcher goroutine is not started and the writer wrapping -// is a no-op. -func (b *workQueueHandler) inactivityConfig() (*activityTracker, inactivityConfig) { - if b.inactivityTimeout <= 0 { - return nil, inactivityConfig{} - } - signal := make(chan struct{}, 1) - return &activityTracker{signal: signal}, inactivityConfig{ - timeout: b.inactivityTimeout, - signal: signal, - } -} From aed1fde89620d272ac95491bda4b738c7ac3d9e9 Mon Sep 17 00:00:00 2001 From: Yaroslav Shevchuk Date: Mon, 4 May 2026 12:54:12 +0000 Subject: [PATCH 3/3] cleanup --- internal/taskexec/execution_handler.go | 19 ++++--------------- internal/taskexec/execution_handler_test.go | 11 ++--------- internal/taskexec/manager_test.go | 4 ---- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/internal/taskexec/execution_handler.go b/internal/taskexec/execution_handler.go index b15bb49b..c9f65c8f 100644 --- a/internal/taskexec/execution_handler.go +++ b/internal/taskexec/execution_handler.go @@ -101,7 +101,7 @@ func newInactivityTracker(timeout time.Duration) *inactivityTracker { } signal := make(chan struct{}, 1) return &inactivityTracker{ - config: inactivityConfig{timeout: timeout, writeRecorded: signal}, + config: inactivityConfig{timeout: timeout}, writeRecorded: signal, } } @@ -144,8 +144,7 @@ func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) err // inactivityConfig configures the inactivity watcher started by // [runProducerConsumer]. A zero or negative timeout disables the watcher. type inactivityConfig struct { - timeout time.Duration - writeRecorded <-chan struct{} + timeout time.Duration } // runProducerConsumer starts producer and consumer goroutines in an error group and waits @@ -161,26 +160,16 @@ func runProducerConsumer( ) (a2a.SendMessageResult, error) { group, ctx := errgroup.WithContext(ctx) - // errgroup already wraps its derived context with [context.WithCancelCause] - // internally and uses the goroutine's returned error as the cancellation - // cause. Returning [ErrAgentInactivityTimeout] from the watcher is enough - // to surface it via [context.Cause] on the producer and consumer ctx, and - // the existing TestRunProducerConsumer_CausePropagation regression test - // covers the same pattern. - if inactivity != nil && inactivity.config.timeout > 0 && inactivity.config.writeRecorded != nil { + if inactivity != nil && inactivity.config.timeout > 0 && inactivity.writeRecorded != nil { cfg := inactivity.config group.Go(func() error { - // As of Go 1.23 a single Reset is sufficient: receives from t.C - // after Stop or Reset are guaranteed not to deliver a stale tick, - // so the previous Stop+drain dance is no longer required. - // See https://pkg.go.dev/time#Timer.Reset. timer := time.NewTimer(cfg.timeout) defer timer.Stop() for { select { case <-ctx.Done(): return ctx.Err() - case <-cfg.writeRecorded: + case <-inactivity.writeRecorded: timer.Reset(cfg.timeout) case <-timer.C: return ErrAgentInactivityTimeout diff --git a/internal/taskexec/execution_handler_test.go b/internal/taskexec/execution_handler_test.go index 6426dacc..4e2f46b5 100644 --- a/internal/taskexec/execution_handler_test.go +++ b/internal/taskexec/execution_handler_test.go @@ -162,10 +162,6 @@ func TestRunProducerConsumer_CausePropagation(t *testing.T) { } } -// TestRunProducerConsumer_InactivityTimeout exercises the inactivity watcher -// added for issue #78. It uses a tight timeout and a synthetic activity -// channel to keep the test fast and deterministic without sleeping for the -// real timeout window. func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { t.Parallel() @@ -266,9 +262,6 @@ func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { }) } -// TestActivityTrackingWriter ensures the writer wrapper signals on -// successful writes only and is non-blocking when the signal channel is -// full (the watcher only needs the "activity happened" hint). func TestActivityTrackingWriter(t *testing.T) { t.Parallel() @@ -291,7 +284,7 @@ func TestActivityTrackingWriter(t *testing.T) { t.Fatalf("Write() error = %v, want nil", err) } select { - case <-tracker.config.writeRecorded: + case <-tracker.writeRecorded: default: t.Fatalf("expected signal after successful Write") } @@ -307,7 +300,7 @@ func TestActivityTrackingWriter(t *testing.T) { t.Fatalf("Write() error = nil, want non-nil") } select { - case <-tracker.config.writeRecorded: + case <-tracker.writeRecorded: t.Fatalf("expected no signal after failed Write") default: } diff --git a/internal/taskexec/manager_test.go b/internal/taskexec/manager_test.go index 67b83d42..6a4e8314 100644 --- a/internal/taskexec/manager_test.go +++ b/internal/taskexec/manager_test.go @@ -831,10 +831,6 @@ func TestManager_GetExecution(t *testing.T) { } } -// TestManager_AgentInactivityTimeout verifies that a localManager configured -// with AgentInactivityTimeout terminates an executor that stalls without -// writing any events. The executor's context must be canceled with -// [ErrAgentInactivityTimeout] as the cause. Regression test for #78. func TestManager_AgentInactivityTimeout(t *testing.T) { t.Parallel() ctx := t.Context()