diff --git a/a2asrv/handler.go b/a2asrv/handler.go index 9df75c12..eedf3b4d 100644 --- a/a2asrv/handler.go +++ b/a2asrv/handler.go @@ -19,6 +19,7 @@ import ( "fmt" "iter" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -30,6 +31,12 @@ import ( "github.com/a2aproject/a2a-go/v2/log" ) +// ErrAgentInactivityTimeout is the cancellation cause set on an agent's +// execution context when no events are written to the event pipe for the +// duration configured via [WithAgentInactivityTimeout]. Callers can detect +// this condition with errors.Is. +var ErrAgentInactivityTimeout = taskexec.ErrAgentInactivityTimeout + // RequestHandler defines a transport-agnostic interface for handling incoming A2A requests. type RequestHandler interface { // GetTask handles the 'GetTask' protocol method. @@ -81,6 +88,8 @@ type defaultRequestHandler struct { workQueue workqueue.Queue reqContextInterceptors []ExecutorContextInterceptor + agentInactivityTimeout time.Duration + authenticatedCardProducer ExtendedAgentCardProducer capabilities *a2a.AgentCapabilities } @@ -122,6 +131,22 @@ func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption { } } +// WithAgentInactivityTimeout configures the maximum time the server will wait +// for the next event from an agent's producer before terminating its execution. +// The timer is reset whenever the agent writes an event to the event pipe. +// When the timeout fires the producer and consumer contexts are canceled with +// [ErrAgentInactivityTimeout] as the cause, and the task is finalized via the +// existing failure path. +// +// A value of 0 (default) or negative disables the inactivity watcher and +// preserves prior behavior. The timeout applies in both single-process and +// cluster modes. +func WithAgentInactivityTimeout(d time.Duration) RequestHandlerOption { + return func(ih *InterceptedHandler, h *defaultRequestHandler) { + h.agentInactivityTimeout = d + } +} + // WithConcurrencyConfig allows to set limits on the number of concurrent executions. func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption { return func(ih *InterceptedHandler, h *defaultRequestHandler) { @@ -186,12 +211,13 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request panic("TaskStore and QueueManager must be provided for cluster mode") } h.execManager = taskexec.NewDistributedManager(taskexec.DistributedManagerConfig{ - WorkQueue: h.workQueue, - TaskStore: h.taskStore, - QueueManager: h.queueManager, - ConcurrencyConfig: h.concurrencyConfig, - Factory: execFactory, - PanicHandler: h.panicHandler, + WorkQueue: h.workQueue, + TaskStore: h.taskStore, + QueueManager: h.queueManager, + ConcurrencyConfig: h.concurrencyConfig, + Factory: execFactory, + PanicHandler: h.panicHandler, + AgentInactivityTimeout: h.agentInactivityTimeout, }) } else { if h.queueManager == nil { @@ -204,11 +230,12 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request execFactory.taskStore = h.taskStore } h.execManager = taskexec.NewLocalManager(taskexec.LocalManagerConfig{ - QueueManager: h.queueManager, - ConcurrencyConfig: h.concurrencyConfig, - Factory: execFactory, - TaskStore: h.taskStore, - PanicHandler: h.panicHandler, + QueueManager: h.queueManager, + ConcurrencyConfig: h.concurrencyConfig, + Factory: execFactory, + TaskStore: h.taskStore, + PanicHandler: h.panicHandler, + AgentInactivityTimeout: h.agentInactivityTimeout, }) } diff --git a/internal/taskexec/api.go b/internal/taskexec/api.go index 8ac5c574..5c18cf96 100644 --- a/internal/taskexec/api.go +++ b/internal/taskexec/api.go @@ -16,6 +16,7 @@ package taskexec import ( "context" + "errors" "iter" "github.com/a2aproject/a2a-go/v2/a2a" @@ -23,6 +24,12 @@ import ( "github.com/a2aproject/a2a-go/v2/internal/eventpipe" ) +// ErrAgentInactivityTimeout is the cancellation cause set on the producer +// context when an agent fails to write any events to the event pipe within +// the configured inactivity timeout window. Callers can check for this +// condition with errors.Is. +var ErrAgentInactivityTimeout = errors.New("agent inactivity timeout") + // Manager provides an API for executing and canceling tasks. type Manager interface { // Resubscribe is used to resubscribe to events of an active execution. diff --git a/internal/taskexec/distributed_manager.go b/internal/taskexec/distributed_manager.go index adf9715c..118ed393 100644 --- a/internal/taskexec/distributed_manager.go +++ b/internal/taskexec/distributed_manager.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -38,6 +39,11 @@ type DistributedManagerConfig struct { ConcurrencyConfig limiter.ConcurrencyConfig Logger *slog.Logger PanicHandler PanicHandlerFn + // AgentInactivityTimeout, if positive, terminates an execution when the + // agent's producer has not written any events to the pipe for the + // configured duration. The terminating cause is [ErrAgentInactivityTimeout]. + // A value of 0 disables the watcher, preserving prior behavior. + AgentInactivityTimeout time.Duration } type distributedManager struct { diff --git a/internal/taskexec/execution_handler.go b/internal/taskexec/execution_handler.go index a99f988b..c9f65c8f 100644 --- a/internal/taskexec/execution_handler.go +++ b/internal/taskexec/execution_handler.go @@ -82,6 +82,71 @@ func (h *executionHandler) processEvents(ctx context.Context) (a2a.SendMessageRe type eventProducerFn func(context.Context) error type eventConsumerFn func(context.Context) (a2a.SendMessageResult, error) +// inactivityTracker bundles the inactivity watcher's configuration with the +// activity-recording channel. Callers obtain one via [newInactivityTracker] +// (returns nil when disabled), wrap the producer's writer with +// [newActivityTrackingWriter], and pass the tracker to [runProducerConsumer] +// to start the watcher. A nil tracker disables tracking end-to-end: the writer +// wrapper is a no-op and the watcher goroutine is not started. +type inactivityTracker struct { + config inactivityConfig + writeRecorded chan struct{} +} + +// newInactivityTracker returns a tracker for the given timeout. A non-positive +// timeout returns nil, disabling inactivity tracking. +func newInactivityTracker(timeout time.Duration) *inactivityTracker { + if timeout <= 0 { + return nil + } + signal := make(chan struct{}, 1) + return &inactivityTracker{ + config: inactivityConfig{timeout: timeout}, + writeRecorded: signal, + } +} + +// record signals that a producer write just succeeded. Non-blocking: the +// watcher only needs an "activity happened" hint, so a full channel is fine. +func (t *inactivityTracker) record() { + if t == nil { + return + } + select { + case t.writeRecorded <- struct{}{}: + default: + } +} + +// newActivityTrackingWriter wraps an [eventpipe.Writer] so each successful +// write signals the provided tracker. A nil tracker returns the writer +// unchanged so callers without an inactivity timeout configured pay no cost. +func newActivityTrackingWriter(inner eventpipe.Writer, tracker *inactivityTracker) eventpipe.Writer { + if tracker == nil { + return inner + } + return &activityTrackingWriter{inner: inner, tracker: tracker} +} + +type activityTrackingWriter struct { + inner eventpipe.Writer + tracker *inactivityTracker +} + +func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) error { + if err := w.inner.Write(ctx, event); err != nil { + return err + } + w.tracker.record() + return nil +} + +// inactivityConfig configures the inactivity watcher started by +// [runProducerConsumer]. A zero or negative timeout disables the watcher. +type inactivityConfig struct { + timeout time.Duration +} + // runProducerConsumer starts producer and consumer goroutines in an error group and waits // for both of them to finish or one of them to fail. If both complete successfuly and consumer produces a result, // the result is returned, otherwise an error is returned. @@ -91,9 +156,28 @@ func runProducerConsumer( consumer eventConsumerFn, heartbeater workqueue.Heartbeater, panicHandler PanicHandlerFn, + inactivity *inactivityTracker, ) (a2a.SendMessageResult, error) { group, ctx := errgroup.WithContext(ctx) + if inactivity != nil && inactivity.config.timeout > 0 && inactivity.writeRecorded != nil { + cfg := inactivity.config + group.Go(func() error { + timer := time.NewTimer(cfg.timeout) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-inactivity.writeRecorded: + timer.Reset(cfg.timeout) + case <-timer.C: + return ErrAgentInactivityTimeout + } + } + }) + } + if heartbeater != nil { group.Go(func() error { timer := time.NewTicker(heartbeater.HeartbeatInterval()) diff --git a/internal/taskexec/execution_handler_test.go b/internal/taskexec/execution_handler_test.go index d920c81d..4e2f46b5 100644 --- a/internal/taskexec/execution_handler_test.go +++ b/internal/taskexec/execution_handler_test.go @@ -16,9 +16,11 @@ package taskexec import ( "context" + "errors" "fmt" "strings" "testing" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/taskstore" @@ -122,7 +124,7 @@ func TestRunProducerConsumer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler) + result, err := runProducerConsumer(t.Context(), tc.producer, tc.consumer, nil, tc.panicHandler, nil) if tc.wantErr != nil && err == nil { t.Fatalf("expected error, got %v", result) } @@ -153,8 +155,185 @@ func TestRunProducerConsumer_CausePropagation(t *testing.T) { }, nil, nil, + nil, ) if gotProducerErr != consumerErr { t.Fatalf("expected producer error = %s, got %s", consumerErr, gotProducerErr) } } + +func TestRunProducerConsumer_InactivityTimeout(t *testing.T) { + t.Parallel() + + t.Run("producer stalls without writing events", func(t *testing.T) { + t.Parallel() + tracker := newInactivityTracker(20 * time.Millisecond) + var producerCause error + + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + <-ctx.Done() + producerCause = context.Cause(ctx) + return ctx.Err() + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + nil, + nil, + tracker, + ) + if !errors.Is(err, ErrAgentInactivityTimeout) { + t.Fatalf("runProducerConsumer() error = %v, want errors.Is(_, ErrAgentInactivityTimeout)", err) + } + if !errors.Is(producerCause, ErrAgentInactivityTimeout) { + t.Fatalf("context.Cause(producerCtx) = %v, want errors.Is(_, ErrAgentInactivityTimeout)", producerCause) + } + }) + + t.Run("activity signals reset the timer", func(t *testing.T) { + t.Parallel() + tracker := newInactivityTracker(20 * time.Millisecond) + + producerStarted := make(chan struct{}) + releaseProducer := make(chan struct{}) + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + close(producerStarted) + // Send activity signals at half the timeout interval, so the + // watcher's timer keeps getting reset and never fires. + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-releaseProducer: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + tracker.record() + } + } + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + <-producerStarted + // Hold the consumer open long enough (4x the timeout) that, + // without timer resets, the watcher would have fired + // multiple times. + time.Sleep(80 * time.Millisecond) + close(releaseProducer) + return a2a.NewMessage(a2a.MessageRoleUser), nil + }, + nil, + nil, + tracker, + ) + if err != nil { + t.Fatalf("runProducerConsumer() error = %v, want nil (timer should have been reset)", err) + } + }) + + t.Run("zero timeout does not start watcher", func(t *testing.T) { + t.Parallel() + // With timeout=0 newInactivityTracker returns nil, so the watcher + // must not be started at all. We verify by running a long-lived + // producer and asserting completion is driven only by the + // consumer's result. + _, err := runProducerConsumer( + t.Context(), + func(ctx context.Context) error { + <-ctx.Done() + return ctx.Err() + }, + func(ctx context.Context) (a2a.SendMessageResult, error) { + time.Sleep(5 * time.Millisecond) + return a2a.NewMessage(a2a.MessageRoleUser), nil + }, + nil, + nil, + newInactivityTracker(0), + ) + if err != nil { + t.Fatalf("runProducerConsumer() error = %v, want nil", err) + } + }) +} + +func TestActivityTrackingWriter(t *testing.T) { + t.Parallel() + + t.Run("nil tracker returns inner writer unchanged", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + got := newActivityTrackingWriter(inner, nil) + if got != inner { + t.Fatalf("newActivityTrackingWriter(_, nil) = %v, want inner writer", got) + } + }) + + t.Run("successful write signals tracker", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + tracker := newInactivityTracker(time.Hour) // any positive timeout + w := newActivityTrackingWriter(inner, tracker) + + if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err != nil { + t.Fatalf("Write() error = %v, want nil", err) + } + select { + case <-tracker.writeRecorded: + default: + t.Fatalf("expected signal after successful Write") + } + }) + + t.Run("failed write does not signal", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{err: errors.New("boom")} + tracker := newInactivityTracker(time.Hour) + w := newActivityTrackingWriter(inner, tracker) + + if err := w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)); err == nil { + t.Fatalf("Write() error = nil, want non-nil") + } + select { + case <-tracker.writeRecorded: + t.Fatalf("expected no signal after failed Write") + default: + } + }) + + t.Run("signal is non-blocking when channel is full", func(t *testing.T) { + t.Parallel() + inner := &fakeWriter{} + tracker := newInactivityTracker(time.Hour) + // Pre-fill the channel so a non-blocking send must drop the new signal. + tracker.writeRecorded <- struct{}{} + w := newActivityTrackingWriter(inner, tracker) + + // Should return without blocking even though the channel is full. + done := make(chan error, 1) + go func() { + done <- w.Write(t.Context(), a2a.NewMessage(a2a.MessageRoleUser)) + }() + select { + case err := <-done: + if err != nil { + t.Fatalf("Write() error = %v, want nil", err) + } + case <-time.After(50 * time.Millisecond): + t.Fatalf("Write() blocked when signal channel was full") + } + }) +} + +type fakeWriter struct { + err error +} + +func (w *fakeWriter) Write(_ context.Context, _ a2a.Event) error { + return w.err +} diff --git a/internal/taskexec/local_manager.go b/internal/taskexec/local_manager.go index edebf2f9..4a371f0b 100644 --- a/internal/taskexec/local_manager.go +++ b/internal/taskexec/local_manager.go @@ -21,6 +21,7 @@ import ( "runtime/debug" "sync" "sync/atomic" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -50,10 +51,11 @@ var ( // Both cancelations and executions are started in detached context and run until completion. // The type is suitable only for single-process execution management. type localManager struct { - queueManager eventqueue.Manager - factory Factory - store taskstore.Store - panicHandler PanicHandlerFn + queueManager eventqueue.Manager + factory Factory + store taskstore.Store + panicHandler PanicHandlerFn + inactivityTimeout time.Duration mu sync.Mutex executions map[a2a.TaskID]*localExecution @@ -89,18 +91,24 @@ type LocalManagerConfig struct { Factory Factory TaskStore taskstore.Store PanicHandler PanicHandlerFn + // AgentInactivityTimeout, if positive, terminates an execution when the + // agent's producer has not written any events to the pipe for the + // configured duration. The terminating cause is [ErrAgentInactivityTimeout]. + // A value of 0 disables the watcher, preserving prior behavior. + AgentInactivityTimeout time.Duration } // NewLocalManager is a [localManager] constructor function. func NewLocalManager(cfg LocalManagerConfig) Manager { manager := &localManager{ - queueManager: cfg.QueueManager, - factory: cfg.Factory, - store: cfg.TaskStore, - panicHandler: cfg.PanicHandler, - limiter: newConcurrencyLimiter(cfg.ConcurrencyConfig), - executions: make(map[a2a.TaskID]*localExecution), - cancelations: make(map[a2a.TaskID]*cancelation), + queueManager: cfg.QueueManager, + factory: cfg.Factory, + store: cfg.TaskStore, + panicHandler: cfg.PanicHandler, + inactivityTimeout: cfg.AgentInactivityTimeout, + limiter: newConcurrencyLimiter(cfg.ConcurrencyConfig), + executions: make(map[a2a.TaskID]*localExecution), + cancelations: make(map[a2a.TaskID]*cancelation), } if manager.queueManager == nil { manager.queueManager = eventqueue.NewInMemoryManager() @@ -283,12 +291,15 @@ func (m *localManager) handleExecution(ctx context.Context, execution *localExec }, handleErrorFn: processor.ProcessError, } + tracker := newInactivityTracker(m.inactivityTimeout) + producerWriter := newActivityTrackingWriter(execution.pipe.Writer, tracker) result, err := runProducerConsumer( ctx, - func(ctx context.Context) error { return executor.Execute(ctx, execution.pipe.Writer) }, + func(ctx context.Context) error { return executor.Execute(ctx, producerWriter) }, handler.processEvents, nil, m.panicHandler, + tracker, ) cleaner.Cleanup(ctx, result, err) @@ -327,12 +338,15 @@ func (m *localManager) handleCancel(ctx context.Context, cancel *cancelation) { handleEventFn: processor.Process, handleErrorFn: func(ctx context.Context, err error) (a2a.SendMessageResult, error) { return nil, err }, } + tracker := newInactivityTracker(m.inactivityTimeout) + producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) result, err := runProducerConsumer( ctx, - func(ctx context.Context) error { return canceler.Cancel(ctx, pipe.Writer) }, + func(ctx context.Context) error { return canceler.Cancel(ctx, producerWriter) }, handler.processEvents, nil, m.panicHandler, + tracker, ) cleaner.Cleanup(ctx, result, err) diff --git a/internal/taskexec/manager_test.go b/internal/taskexec/manager_test.go index 17bcca4d..6a4e8314 100644 --- a/internal/taskexec/manager_test.go +++ b/internal/taskexec/manager_test.go @@ -128,12 +128,13 @@ func (e *testProcessor) ProcessError(ctx context.Context, err error) (a2a.SendMe type testExecutor struct { *testProcessor - executeCalled chan struct{} - executeErr error - queue eventpipe.Writer - contextCanceled bool - block chan struct{} - emitTask *a2a.Task + executeCalled chan struct{} + executeErr error + queue eventpipe.Writer + contextCanceled bool + contextCauseObserver func(error) + block chan struct{} + emitTask *a2a.Task } var _ Executor = (*testExecutor)(nil) @@ -151,6 +152,9 @@ func (e *testExecutor) Execute(ctx context.Context, queue eventpipe.Writer) erro case <-e.block: case <-ctx.Done(): e.contextCanceled = true + if e.contextCauseObserver != nil { + e.contextCauseObserver(context.Cause(ctx)) + } return ctx.Err() } } @@ -826,3 +830,40 @@ func TestManager_GetExecution(t *testing.T) { t.Fatal("manager.Resubscribe() succeeded for finished execution, want error") } } + +func TestManager_AgentInactivityTimeout(t *testing.T) { + t.Parallel() + ctx := t.Context() + + executor := newExecutor() + executor.block = make(chan struct{}) + defer close(executor.block) + + var executorCause error + executor.contextCauseObserver = func(c error) { executorCause = c } + + manager := NewLocalManager(LocalManagerConfig{ + Factory: newStaticFactory(executor, nil), + TaskStore: testutil.NewTestTaskStore(), + AgentInactivityTimeout: 50 * time.Millisecond, + }) + + subscription, err := manager.Execute(ctx, &a2a.SendMessageRequest{Message: a2a.NewMessage(a2a.MessageRoleUser)}) + if err != nil { + t.Fatalf("manager.Execute() error = %v, want nil", err) + } + executionResult, _ := consumeEvents(t, subscription) + + // Wait for the executor to be invoked. + <-executor.executeCalled + // The executor blocks without writing events. The watcher should + // fire after AgentInactivityTimeout and cancel the executor's ctx. + <-executionResult + + if !executor.contextCanceled { + t.Fatalf("executor.contextCanceled = false, want true") + } + if !errors.Is(executorCause, ErrAgentInactivityTimeout) { + t.Fatalf("context.Cause(executorCtx) = %v, want errors.Is(_, ErrAgentInactivityTimeout)", executorCause) + } +} diff --git a/internal/taskexec/work_queue_handler.go b/internal/taskexec/work_queue_handler.go index fb0cf887..c09e9ce5 100644 --- a/internal/taskexec/work_queue_handler.go +++ b/internal/taskexec/work_queue_handler.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/a2aproject/a2a-go/v2/a2a" "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue" @@ -28,18 +29,20 @@ import ( ) type workQueueHandler struct { - queueManager eventqueue.Manager - taskStore taskstore.Store - factory Factory - panicHandler PanicHandlerFn + queueManager eventqueue.Manager + taskStore taskstore.Store + factory Factory + panicHandler PanicHandlerFn + inactivityTimeout time.Duration } func newWorkQueueHandler(cfg DistributedManagerConfig) *workQueueHandler { backend := &workQueueHandler{ - queueManager: cfg.QueueManager, - taskStore: cfg.TaskStore, - factory: cfg.Factory, - panicHandler: cfg.PanicHandler, + queueManager: cfg.QueueManager, + taskStore: cfg.TaskStore, + factory: cfg.Factory, + panicHandler: cfg.PanicHandler, + inactivityTimeout: cfg.AgentInactivityTimeout, } if cfg.Logger == nil { cfg.Logger = slog.Default() @@ -60,6 +63,9 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa pipe := eventpipe.NewLocal() defer pipe.Close() + tracker := newInactivityTracker(b.inactivityTimeout) + producerWriter := newActivityTrackingWriter(pipe.Writer, tracker) + var eventProducer eventProducerFn var eventProcessor Processor var cleaner Cleaner @@ -73,7 +79,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa if err != nil { return nil, fmt.Errorf("executor setup failed: %w", err) } - eventProducer = func(ctx context.Context) error { return executor.Execute(ctx, pipe.Writer) } + eventProducer = func(ctx context.Context) error { return executor.Execute(ctx, producerWriter) } eventProcessor = processor cleaner = localCleaner @@ -85,7 +91,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa if err != nil { return nil, fmt.Errorf("canceler setup failed: %w", err) } - eventProducer = func(ctx context.Context) error { return canceler.Cancel(ctx, pipe.Writer) } + eventProducer = func(ctx context.Context) error { return canceler.Cancel(ctx, producerWriter) } eventProcessor = processor cleaner = localCleaner @@ -117,7 +123,7 @@ func (b *workQueueHandler) handle(ctx context.Context, payload *workqueue.Payloa heartbeater = hb } - result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler) + result, err := runProducerConsumer(ctx, eventProducer, handler.processEvents, heartbeater, b.panicHandler, tracker) cleaner.Cleanup(ctx, result, err) return result, err }