Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 38 additions & 11 deletions a2asrv/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"iter"
"log/slog"
"time"

"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue"
Expand All @@ -30,6 +31,12 @@ import (
"github.com/a2aproject/a2a-go/v2/log"
)

// ErrAgentInactivityTimeout is the cancellation cause set on an agent's
// execution context when no events are written to the event pipe for the
// duration configured via [WithAgentInactivityTimeout]. Callers can detect
// this condition with errors.Is.
var ErrAgentInactivityTimeout = taskexec.ErrAgentInactivityTimeout

// RequestHandler defines a transport-agnostic interface for handling incoming A2A requests.
type RequestHandler interface {
// GetTask handles the 'GetTask' protocol method.
Expand Down Expand Up @@ -81,6 +88,8 @@ type defaultRequestHandler struct {
workQueue workqueue.Queue
reqContextInterceptors []ExecutorContextInterceptor

agentInactivityTimeout time.Duration

authenticatedCardProducer ExtendedAgentCardProducer
capabilities *a2a.AgentCapabilities
}
Expand Down Expand Up @@ -122,6 +131,22 @@ func WithExecutionPanicHandler(handler func(r any) error) RequestHandlerOption {
}
}

// WithAgentInactivityTimeout configures the maximum time the server will wait
// for the next event from an agent's producer before terminating its execution.
// The timer is reset whenever the agent writes an event to the event pipe.
// When the timeout fires the producer and consumer contexts are canceled with
// [ErrAgentInactivityTimeout] as the cause, and the task is finalized via the
// existing failure path.
//
// A value of 0 (default) or negative disables the inactivity watcher and
// preserves prior behavior. The timeout applies in both single-process and
// cluster modes.
func WithAgentInactivityTimeout(d time.Duration) RequestHandlerOption {
return func(ih *InterceptedHandler, h *defaultRequestHandler) {
h.agentInactivityTimeout = d
}
}

// WithConcurrencyConfig allows to set limits on the number of concurrent executions.
func WithConcurrencyConfig(config limiter.ConcurrencyConfig) RequestHandlerOption {
return func(ih *InterceptedHandler, h *defaultRequestHandler) {
Expand Down Expand Up @@ -186,12 +211,13 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request
panic("TaskStore and QueueManager must be provided for cluster mode")
}
h.execManager = taskexec.NewDistributedManager(taskexec.DistributedManagerConfig{
WorkQueue: h.workQueue,
TaskStore: h.taskStore,
QueueManager: h.queueManager,
ConcurrencyConfig: h.concurrencyConfig,
Factory: execFactory,
PanicHandler: h.panicHandler,
WorkQueue: h.workQueue,
TaskStore: h.taskStore,
QueueManager: h.queueManager,
ConcurrencyConfig: h.concurrencyConfig,
Factory: execFactory,
PanicHandler: h.panicHandler,
AgentInactivityTimeout: h.agentInactivityTimeout,
})
} else {
if h.queueManager == nil {
Expand All @@ -204,11 +230,12 @@ func NewHandler(executor AgentExecutor, options ...RequestHandlerOption) Request
execFactory.taskStore = h.taskStore
}
h.execManager = taskexec.NewLocalManager(taskexec.LocalManagerConfig{
QueueManager: h.queueManager,
ConcurrencyConfig: h.concurrencyConfig,
Factory: execFactory,
TaskStore: h.taskStore,
PanicHandler: h.panicHandler,
QueueManager: h.queueManager,
ConcurrencyConfig: h.concurrencyConfig,
Factory: execFactory,
TaskStore: h.taskStore,
PanicHandler: h.panicHandler,
AgentInactivityTimeout: h.agentInactivityTimeout,
})
}

Expand Down
7 changes: 7 additions & 0 deletions internal/taskexec/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ package taskexec

import (
"context"
"errors"
"iter"

"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv/taskstore"
"github.com/a2aproject/a2a-go/v2/internal/eventpipe"
)

// ErrAgentInactivityTimeout is the cancellation cause set on the producer
// context when an agent fails to write any events to the event pipe within
// the configured inactivity timeout window. Callers can check for this
// condition with errors.Is.
var ErrAgentInactivityTimeout = errors.New("agent inactivity timeout")

// Manager provides an API for executing and canceling tasks.
type Manager interface {
// Resubscribe is used to resubscribe to events of an active execution.
Expand Down
6 changes: 6 additions & 0 deletions internal/taskexec/distributed_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"log/slog"
"time"

"github.com/a2aproject/a2a-go/v2/a2a"
"github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue"
Expand All @@ -38,6 +39,11 @@ type DistributedManagerConfig struct {
ConcurrencyConfig limiter.ConcurrencyConfig
Logger *slog.Logger
PanicHandler PanicHandlerFn
// AgentInactivityTimeout, if positive, terminates an execution when the
// agent's producer has not written any events to the pipe for the
// configured duration. The terminating cause is [ErrAgentInactivityTimeout].
// A value of 0 disables the watcher, preserving prior behavior.
AgentInactivityTimeout time.Duration
}

type distributedManager struct {
Expand Down
84 changes: 84 additions & 0 deletions internal/taskexec/execution_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,71 @@ func (h *executionHandler) processEvents(ctx context.Context) (a2a.SendMessageRe
type eventProducerFn func(context.Context) error
type eventConsumerFn func(context.Context) (a2a.SendMessageResult, error)

// inactivityTracker bundles the inactivity watcher's configuration with the
// activity-recording channel. Callers obtain one via [newInactivityTracker]
// (returns nil when disabled), wrap the producer's writer with
// [newActivityTrackingWriter], and pass the tracker to [runProducerConsumer]
// to start the watcher. A nil tracker disables tracking end-to-end: the writer
// wrapper is a no-op and the watcher goroutine is not started.
type inactivityTracker struct {
config inactivityConfig
writeRecorded chan struct{}
}

// newInactivityTracker returns a tracker for the given timeout. A non-positive
// timeout returns nil, disabling inactivity tracking.
func newInactivityTracker(timeout time.Duration) *inactivityTracker {
if timeout <= 0 {
return nil
}
signal := make(chan struct{}, 1)
return &inactivityTracker{
config: inactivityConfig{timeout: timeout},
writeRecorded: signal,
}
}

// record signals that a producer write just succeeded. Non-blocking: the
// watcher only needs an "activity happened" hint, so a full channel is fine.
func (t *inactivityTracker) record() {
if t == nil {
return
}
select {
case t.writeRecorded <- struct{}{}:
default:
}
}

// newActivityTrackingWriter wraps an [eventpipe.Writer] so each successful
// write signals the provided tracker. A nil tracker returns the writer
// unchanged so callers without an inactivity timeout configured pay no cost.
func newActivityTrackingWriter(inner eventpipe.Writer, tracker *inactivityTracker) eventpipe.Writer {
if tracker == nil {
return inner
}
return &activityTrackingWriter{inner: inner, tracker: tracker}
}

type activityTrackingWriter struct {
inner eventpipe.Writer
tracker *inactivityTracker
}

func (w *activityTrackingWriter) Write(ctx context.Context, event a2a.Event) error {
if err := w.inner.Write(ctx, event); err != nil {
return err
}
w.tracker.record()
return nil
}

// inactivityConfig configures the inactivity watcher started by
// [runProducerConsumer]. A zero or negative timeout disables the watcher.
type inactivityConfig struct {
timeout time.Duration
}

// runProducerConsumer starts producer and consumer goroutines in an error group and waits
// for both of them to finish or one of them to fail. If both complete successfuly and consumer produces a result,
// the result is returned, otherwise an error is returned.
Expand All @@ -91,9 +156,28 @@ func runProducerConsumer(
consumer eventConsumerFn,
heartbeater workqueue.Heartbeater,
panicHandler PanicHandlerFn,
inactivity *inactivityTracker,
) (a2a.SendMessageResult, error) {
group, ctx := errgroup.WithContext(ctx)

if inactivity != nil && inactivity.config.timeout > 0 && inactivity.writeRecorded != nil {
cfg := inactivity.config
group.Go(func() error {
timer := time.NewTimer(cfg.timeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-inactivity.writeRecorded:
timer.Reset(cfg.timeout)
case <-timer.C:
return ErrAgentInactivityTimeout
}
}
})
}

if heartbeater != nil {
group.Go(func() error {
timer := time.NewTicker(heartbeater.HeartbeatInterval())
Expand Down
Loading
Loading