Skip to content

Commit 96e8fbb

Browse files
fanlvshentongmartin
authored andcommitted
feat(adk): add agent teams middleware with mailbox-based multi-agent coordination
Add a new `team` middleware package that enables multiple agents to collaborate within a team via file-backed mailbox message passing and shared task lists. Key components: - Team lifecycle management (leader/teammate creation, shutdown, registry) - File-backed mailbox system with per-agent inboxes and poll-based pump - Source router for dispatching messages to agent TurnLoops - Protocol layer with structured message types (DM, broadcast, shutdown, idle, plan-approval) - Team tools: Agent (spawn teammates), SendMessage, TeamCreate, TeamDelete - Team config store for persistent team/member metadata Also enhance the `plantask` middleware: - Add programmatic task API (TaskInput/CreateTask/UpdateTask/ListTasks/GetTask) - Add task reminder system that injects periodic reminders after N assistant turns - Refactor task CRUD tools to use shared task API internally - Expand test coverage for task operations Change-Id: I41e0dd3be788da2a8a5c4b211106b4a26b0aa2e8
1 parent e5e2b18 commit 96e8fbb

58 files changed

Lines changed: 13322 additions & 397 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,9 @@ CLAUDE.md
6363

6464
# Internal dev setup (not for public repo)
6565
/scripts/dev_setup_internal.sh
66+
*.local.md
67+
**/settings.local.json
68+
# Specs directories
69+
*/specs
70+
/todos
71+

adk/middlewares/plantask/backend_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ package plantask
1818

1919
import (
2020
"context"
21-
"errors"
21+
"fmt"
22+
"os"
2223
"path/filepath"
2324
"strings"
2425
"sync"
@@ -58,7 +59,7 @@ func (b *inMemoryBackend) Read(ctx context.Context, req *ReadRequest) (*fspkg.Fi
5859

5960
content, ok := b.files[req.FilePath]
6061
if !ok {
61-
return nil, errors.New("file not found")
62+
return nil, fmt.Errorf("%w: %s", os.ErrNotExist, req.FilePath)
6263
}
6364
return &fspkg.FileContent{Content: content}, nil
6465
}
@@ -75,6 +76,11 @@ func (b *inMemoryBackend) Delete(ctx context.Context, req *DeleteRequest) error
7576
b.mu.Lock()
7677
defer b.mu.Unlock()
7778

78-
delete(b.files, req.FilePath)
79+
prefix := req.FilePath + "/"
80+
for k := range b.files {
81+
if k == req.FilePath || strings.HasPrefix(k, prefix) {
82+
delete(b.files, k)
83+
}
84+
}
7985
return nil
8086
}

adk/middlewares/plantask/plantask.go

Lines changed: 227 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,167 @@ import (
2424
"github.com/cloudwego/eino/adk"
2525
)
2626

27-
// Config is the configuration for the tool search middleware.
27+
// Config is the core configuration for the plantask middleware.
28+
// Team-specific extensions are injected via Option functions.
2829
type Config struct {
30+
// Backend is the storage backend for reading and writing task files.
2931
Backend Backend
32+
// BaseDir is the root directory where task files are stored.
3033
BaseDir string
3134
}
3235

36+
// Option configures optional behavior on the plantask middleware.
37+
type Option func(*middleware)
38+
39+
// WithTaskBaseDirResolver enables the shared-task mode used by team integration.
40+
// When set, resolveBaseDir calls this resolver instead of using baseDir directly.
41+
// The resolver should return the full path to the task storage directory.
42+
// When nil or returning "", single-agent baseDir is used as fallback.
43+
func WithTaskBaseDirResolver(resolver func(ctx context.Context) string) Option {
44+
return func(m *middleware) {
45+
m.taskBaseDirResolver = resolver
46+
}
47+
}
48+
49+
// WithAgentNameResolver sets the resolver for the current agent name.
50+
// This is only consulted in shared-task mode (enabled by WithTaskBaseDirResolver),
51+
// where it is used to auto-fill task ownership metadata such as
52+
// TaskAssignment.AssignedBy and the implicit owner for in_progress tasks.
53+
func WithAgentNameResolver(resolver func(ctx context.Context) string) Option {
54+
return func(m *middleware) {
55+
m.agentNameResolver = resolver
56+
}
57+
}
58+
59+
// WithTaskAssignedHook registers a callback invoked when TaskUpdate changes a
60+
// task's owner in shared-task mode (enabled by WithTaskBaseDirResolver).
61+
// The team middleware uses this to send task_assignment messages to the
62+
// assignee's mailbox.
63+
func WithTaskAssignedHook(hook func(ctx context.Context, assignment TaskAssignment) error) Option {
64+
return func(m *middleware) {
65+
m.onTaskAssigned = hook
66+
}
67+
}
68+
69+
// WithSharedTaskLock injects an external lock that replaces the per-instance
70+
// taskLock for all task operations. This is used by team integration so that
71+
// all agents in the same team serialize against a single shared lock.
72+
func WithSharedTaskLock(lock *sync.RWMutex) Option {
73+
return func(m *middleware) {
74+
m.sharedTaskLock = lock
75+
}
76+
}
77+
78+
// WithReminder configures task reminder injection. The interval specifies how
79+
// many assistant turns without TaskCreate/TaskUpdate before a reminder is
80+
// injected. Set to negative to disable. Default is 10.
81+
// When onReminder is non-nil, BeforeModelRewriteState calls onReminder with
82+
// the reminder text and leaves the current state untouched, instead of
83+
// injecting the reminder directly into state.Messages. Throttling is tracked
84+
// via an internal assistant-turn counter so repeated reminders are still
85+
// suppressed correctly.
86+
func WithReminder(interval int, onReminder func(ctx context.Context, reminderText string)) Option {
87+
return func(m *middleware) {
88+
m.reminderInterval = interval
89+
m.onReminder = onReminder
90+
}
91+
}
92+
93+
// TaskAssignment contains information about a task ownership change emitted by
94+
// the shared-task/team workflow.
95+
type TaskAssignment struct {
96+
TaskID string
97+
Subject string
98+
Description string
99+
Owner string // new owner (assignee)
100+
AssignedBy string // who set the owner (from context)
101+
}
102+
103+
// Middleware is a marker interface for identifying plantask middleware instances.
104+
// Used by team.NewRunner to detect if a plantask middleware is already present
105+
// in user-provided handlers to avoid duplicate injection.
106+
type Middleware interface {
107+
isPlanTaskMiddleware()
108+
109+
// UnassignOwnerTasks finds all tasks owned by the given owner, clears their
110+
// owner, reverts in_progress tasks to pending, and returns the unassigned task IDs.
111+
// This is used by the team layer when a teammate exits to release their tasks.
112+
UnassignOwnerTasks(ctx context.Context, owner string) ([]string, error)
113+
}
114+
115+
// isPlanTaskMiddleware implements the Middleware marker interface.
116+
func (m *middleware) isPlanTaskMiddleware() {}
117+
118+
// rwLock returns the effective read-write lock: the shared team lock when set,
119+
// otherwise the per-instance lock.
120+
func (m *middleware) rwLock() *sync.RWMutex {
121+
if m.sharedTaskLock != nil {
122+
return m.sharedTaskLock
123+
}
124+
return &m.taskLock
125+
}
126+
127+
// CreateTask creates a task with proper locking. It resolves the baseDir from
128+
// the context (team mode) or falls back to the configured baseDir.
129+
func (m *middleware) CreateTask(ctx context.Context, input *TaskInput) (string, error) {
130+
lock := m.rwLock()
131+
lock.Lock()
132+
defer lock.Unlock()
133+
134+
return createTaskLocked(ctx, m.backend, m.resolveBaseDir(ctx), input)
135+
}
136+
137+
// DeleteTask deletes a task with proper locking.
138+
func (m *middleware) DeleteTask(ctx context.Context, taskID string) error {
139+
lock := m.rwLock()
140+
lock.Lock()
141+
defer lock.Unlock()
142+
143+
return deleteTaskLocked(ctx, m.backend, m.resolveBaseDir(ctx), taskID)
144+
}
145+
146+
// UnassignOwnerTasks finds all tasks owned by the given owner, clears their owner,
147+
// reverts in_progress tasks to pending, and returns the unassigned task IDs.
148+
func (m *middleware) UnassignOwnerTasks(ctx context.Context, owner string) ([]string, error) {
149+
lock := m.rwLock()
150+
lock.Lock()
151+
defer lock.Unlock()
152+
153+
baseDir := m.resolveBaseDir(ctx)
154+
tasks, err := listTasks(ctx, m.backend, baseDir)
155+
if err != nil {
156+
return nil, fmt.Errorf("list tasks for unassign: %w", err)
157+
}
158+
159+
var unassigned []string
160+
for _, t := range tasks {
161+
if t.Owner != owner {
162+
continue
163+
}
164+
t.Owner = ""
165+
if t.Status == taskStatusInProgress {
166+
t.Status = taskStatusPending
167+
}
168+
if err := writeTask(ctx, m.backend, baseDir, t); err != nil {
169+
return nil, fmt.Errorf("unassign task #%s: %w", t.ID, err)
170+
}
171+
unassigned = append(unassigned, t.ID)
172+
}
173+
174+
return unassigned, nil
175+
}
176+
33177
// New creates a new plantask middleware that provides task management tools for agents.
34178
// It adds TaskCreate, TaskGet, TaskUpdate, and TaskList tools to the agent's tool set,
35179
// allowing agents to create and manage structured task lists during coding sessions.
36-
func New(ctx context.Context, config *Config) (adk.ChatModelAgentMiddleware, error) {
180+
//
181+
// Use Option functions to enable team-specific extensions:
182+
//
183+
// plantask.New(ctx, config,
184+
// plantask.WithTaskBaseDirResolver(resolver),
185+
// plantask.WithTaskAssignedHook(hook),
186+
// plantask.WithReminder(interval, callback))
187+
func New(ctx context.Context, config *Config, opts ...Option) (adk.ChatModelAgentMiddleware, error) {
37188
if config == nil {
38189
return nil, fmt.Errorf("config is required")
39190
}
@@ -44,27 +195,93 @@ func New(ctx context.Context, config *Config) (adk.ChatModelAgentMiddleware, err
44195
return nil, fmt.Errorf("baseDir is required")
45196
}
46197

47-
return &middleware{backend: config.Backend, baseDir: config.BaseDir}, nil
198+
m := &middleware{
199+
backend: config.Backend,
200+
baseDir: config.BaseDir,
201+
reminderInterval: defaultReminderInterval,
202+
}
203+
204+
for _, opt := range opts {
205+
opt(m)
206+
}
207+
208+
return m, nil
48209
}
49210

50211
type middleware struct {
51212
adk.BaseChatModelAgentMiddleware
52-
backend Backend
53-
baseDir string
213+
backend Backend
214+
baseDir string
215+
taskLock sync.RWMutex // protects all task read/write operations within this middleware instance
216+
sharedTaskLock *sync.RWMutex // when non-nil, used instead of taskLock (team mode cross-agent lock)
217+
218+
// Task reminder config (set via WithReminder) , 0 means disable
219+
reminderInterval int
220+
onReminder func(ctx context.Context, reminderText string)
221+
222+
// lastCallbackReminderAssistantCount stores the total number of assistant
223+
// messages in state.Messages at the time onReminder was last invoked.
224+
// Used to throttle subsequent reminders when onReminder is set, since the
225+
// callback path does not inject a _task_reminder marker into messages.
226+
lastCallbackReminderAssistantCount int
227+
228+
// Task assignment notification (set via WithTaskAssignedHook)
229+
onTaskAssigned func(ctx context.Context, assignment TaskAssignment) error
230+
231+
// Context resolvers (set via WithTaskBaseDirResolver / WithAgentNameResolver, nil in single-agent mode)
232+
taskBaseDirResolver func(ctx context.Context) string
233+
agentNameResolver func(ctx context.Context) string
234+
}
235+
236+
// resolveBaseDir returns the task storage directory at call time.
237+
// In shared-task mode, the taskBaseDirResolver provides the full path.
238+
func (m *middleware) resolveBaseDir(ctx context.Context) string {
239+
if m.taskBaseDirResolver != nil {
240+
if dir := m.taskBaseDirResolver(ctx); dir != "" {
241+
return dir
242+
}
243+
}
244+
return m.baseDir
245+
}
246+
247+
// usesSharedTaskMode returns true when task storage is resolved dynamically
248+
// from context and task operations should use the middleware-wide lock.
249+
// This is the mode used by team integration.
250+
func (m *middleware) usesSharedTaskMode() bool {
251+
return m.taskBaseDirResolver != nil
252+
}
253+
254+
// getAgentName returns the current agent name, or empty if not set.
255+
func (m *middleware) getAgentName(ctx context.Context) string {
256+
if m.agentNameResolver != nil {
257+
return m.agentNameResolver(ctx)
258+
}
259+
return ""
260+
}
261+
262+
func (m *middleware) getLock(turnLock *sync.RWMutex) *sync.RWMutex {
263+
if m.usesSharedTaskMode() {
264+
if m.sharedTaskLock != nil {
265+
return m.sharedTaskLock
266+
}
267+
return &m.taskLock
268+
}
269+
return turnLock
54270
}
55271

56272
func (m *middleware) BeforeAgent(ctx context.Context, runCtx *adk.ChatModelAgentContext) (context.Context, *adk.ChatModelAgentContext, error) {
57273
if runCtx == nil {
58274
return ctx, runCtx, nil
59275
}
60276

277+
turnLock := &sync.RWMutex{}
61278
nRunCtx := *runCtx
62-
lock := sync.Mutex{}
279+
// In shared-task mode, tools share m.sharedTaskLock (or m.taskLock as fallback); otherwise they share the per-turn lock.
63280
nRunCtx.Tools = append(nRunCtx.Tools,
64-
newTaskCreateTool(m.backend, m.baseDir, &lock),
65-
newTaskGetTool(m.backend, m.baseDir, &lock),
66-
newTaskUpdateTool(m.backend, m.baseDir, &lock),
67-
newTaskListTool(m.backend, m.baseDir, &lock),
281+
newTaskCreateTool(m, turnLock),
282+
newTaskGetTool(m, turnLock),
283+
newTaskUpdateTool(m, turnLock),
284+
newTaskListTool(m, turnLock),
68285
)
69286

70287
return ctx, &nRunCtx, nil

0 commit comments

Comments
 (0)