Skip to content

Commit 486902a

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): enqueue_task, complete_task, read_handoffs tools with guards
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 1b9165b commit 486902a

3 files changed

Lines changed: 401 additions & 0 deletions

File tree

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import * as fs from 'fs';
2+
import * as os from 'os';
3+
import * as path from 'path';
4+
import { QueueStore } from '@lib/programs/orchestrator/queue';
5+
import {
6+
applyComplete,
7+
applyEnqueue,
8+
applyReadHandoffs,
9+
checkEnqueueGuards,
10+
type OrchestratorToolsContext,
11+
} from '@lib/programs/orchestrator/queue-tools';
12+
13+
function tmpDir(): string {
14+
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-tools-test-'));
15+
}
16+
17+
const VALID = ['install', 'init', 'capture'];
18+
19+
describe('checkEnqueueGuards', () => {
20+
let dir: string;
21+
let store: QueueStore;
22+
let ctx: OrchestratorToolsContext;
23+
24+
beforeEach(() => {
25+
dir = tmpDir();
26+
store = new QueueStore(dir, 'run-1');
27+
ctx = { store, validTypes: VALID };
28+
});
29+
30+
afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));
31+
32+
it('rejects an unknown type', () => {
33+
const r = checkEnqueueGuards(ctx, { type: 'nope', reason: 'x' });
34+
expect(r).toMatchObject({ ok: false, guard: 'unknown-type' });
35+
});
36+
37+
it('rejects an unknown dependency', () => {
38+
const r = checkEnqueueGuards(ctx, {
39+
type: 'init',
40+
dependsOn: ['ghost'],
41+
reason: 'x',
42+
});
43+
expect(r).toMatchObject({ ok: false, guard: 'unknown-dep' });
44+
});
45+
46+
it('trips dedup on the same type and inputs', () => {
47+
store.enqueue({ type: 'install', inputs: { pkg: 'posthog-js' } });
48+
const r = checkEnqueueGuards(ctx, {
49+
type: 'install',
50+
inputs: { pkg: 'posthog-js' },
51+
reason: 'x',
52+
});
53+
expect(r).toMatchObject({ ok: false, guard: 'dedup' });
54+
});
55+
56+
it('allows a valid enqueue', () => {
57+
const r = checkEnqueueGuards(ctx, { type: 'init', reason: 'x' });
58+
expect(r).toEqual({ ok: true });
59+
});
60+
});
61+
62+
describe('apply functions', () => {
63+
let dir: string;
64+
let store: QueueStore;
65+
let ctx: OrchestratorToolsContext;
66+
67+
beforeEach(() => {
68+
dir = tmpDir();
69+
store = new QueueStore(dir, 'run-1');
70+
ctx = { store, validTypes: VALID };
71+
});
72+
73+
afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));
74+
75+
it('attributes a seed enqueue to the orchestrator', () => {
76+
const r = applyEnqueue(ctx, { type: 'install', reason: 'seed' });
77+
expect(r.ok).toBe(true);
78+
if (!r.ok) return;
79+
expect(r.task.enqueuedBy).toBe('orchestrator');
80+
});
81+
82+
it('attributes a follow-up enqueue to the running task', () => {
83+
const parent = store.enqueue({ type: 'init' });
84+
ctx.currentTaskId = parent.id;
85+
const r = applyEnqueue(ctx, { type: 'capture', reason: 'follow-up' });
86+
expect(r.ok).toBe(true);
87+
if (!r.ok) return;
88+
expect(r.task.enqueuedBy).toBe(parent.id);
89+
});
90+
91+
it('complete_task fails when no task is running', () => {
92+
const r = applyComplete(ctx, {
93+
status: 'done',
94+
handoff: { goals: 'g', did: 'd', forNextAgent: 'n' },
95+
});
96+
expect(r.ok).toBe(false);
97+
});
98+
99+
it('complete_task marks the running task done and stores the handoff', () => {
100+
const t = store.enqueue({ type: 'install' });
101+
ctx.currentTaskId = t.id;
102+
store.start(t.id);
103+
const r = applyComplete(ctx, {
104+
status: 'done',
105+
handoff: { goals: 'g', did: 'added sdk', forNextAgent: 'env next' },
106+
});
107+
expect(r.ok).toBe(true);
108+
expect(store.get(t.id)?.status).toBe('done');
109+
expect(store.readHandoff(t.id)?.did).toBe('added sdk');
110+
});
111+
112+
it('read_handoffs returns a dependency handoff for the running task', () => {
113+
const dep = store.enqueue({ type: 'install' });
114+
store.start(dep.id);
115+
store.complete(dep.id, {
116+
goals: 'g',
117+
did: 'installed',
118+
forNextAgent: 'now init',
119+
});
120+
const t = store.enqueue({ type: 'init', dependsOn: [dep.id] });
121+
ctx.currentTaskId = t.id;
122+
123+
const handoffs = applyReadHandoffs(ctx, {});
124+
expect(handoffs).toHaveLength(1);
125+
expect(handoffs[0].did).toBe('installed');
126+
});
127+
});
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/**
2+
* Orchestrator MCP tools, registered into the existing `wizard-tools` server when
3+
* a queue is present. They let the orchestrator agent and task agents grow the
4+
* queue, report completion with a structured handoff, and read prior handoffs.
5+
*
6+
* The guard logic and the apply functions are plain, exported, and unit-tested.
7+
* `buildOrchestratorTools` wraps them in the SDK `tool()` shape.
8+
*/
9+
import { z } from 'zod';
10+
import { analytics } from '../../../utils/analytics';
11+
import type { QueueStore, QueuedTask, TaskHandoff } from './queue';
12+
13+
export interface OrchestratorToolsContext {
14+
store: QueueStore;
15+
/** Task types the registry knows about. enqueue_task rejects anything else. */
16+
validTypes: readonly string[];
17+
/**
18+
* The id of the task this tool server is bound to. Each task agent gets its
19+
* own wizard-tools server, so attribution holds when independent tasks run
20+
* in parallel. Absent for the seed, which is not a task.
21+
*/
22+
currentTaskId?: string;
23+
}
24+
25+
export interface EnqueueArgs {
26+
type: string;
27+
inputs?: Record<string, unknown>;
28+
dependsOn?: string[];
29+
model?: string;
30+
reason: string;
31+
}
32+
33+
export type GuardResult =
34+
| { ok: true }
35+
| { ok: false; guard: string; message: string };
36+
37+
function stableStringify(value: unknown): string {
38+
if (value === null || typeof value !== 'object') return JSON.stringify(value);
39+
if (Array.isArray(value)) return `[${value.map(stableStringify).join(',')}]`;
40+
const entries = Object.entries(value as Record<string, unknown>).sort(
41+
([a], [b]) => a.localeCompare(b),
42+
);
43+
return `{${entries
44+
.map(([k, v]) => `${JSON.stringify(k)}:${stableStringify(v)}`)
45+
.join(',')}}`;
46+
}
47+
48+
function dedupKey(type: string, inputs: Record<string, unknown>): string {
49+
return `${type}::${stableStringify(inputs)}`;
50+
}
51+
52+
/**
53+
* Validate an enqueue. Structural checks only — a real type, real dependencies,
54+
* and not a literal duplicate. How much runs, and in what shape, is the task
55+
* graph's business, not a knob's.
56+
*/
57+
export function checkEnqueueGuards(
58+
ctx: OrchestratorToolsContext,
59+
args: EnqueueArgs,
60+
): GuardResult {
61+
const tasks = ctx.store.list();
62+
63+
if (!ctx.validTypes.includes(args.type)) {
64+
return {
65+
ok: false,
66+
guard: 'unknown-type',
67+
message: `Unknown task type "${
68+
args.type
69+
}". Valid types: ${ctx.validTypes.join(', ')}.`,
70+
};
71+
}
72+
73+
for (const dep of args.dependsOn ?? []) {
74+
if (!ctx.store.get(dep)) {
75+
return {
76+
ok: false,
77+
guard: 'unknown-dep',
78+
message: `Dependency "${dep}" is not a known task id.`,
79+
};
80+
}
81+
}
82+
83+
const key = dedupKey(args.type, args.inputs ?? {});
84+
if (
85+
tasks.some(
86+
(t) => t.status !== 'failed' && dedupKey(t.type, t.inputs) === key,
87+
)
88+
) {
89+
return {
90+
ok: false,
91+
guard: 'dedup',
92+
message: `A "${args.type}" task with these inputs already exists.`,
93+
};
94+
}
95+
96+
return { ok: true };
97+
}
98+
99+
export type EnqueueResult =
100+
| { ok: true; task: QueuedTask }
101+
| { ok: false; guard: string; message: string };
102+
103+
export function applyEnqueue(
104+
ctx: OrchestratorToolsContext,
105+
args: EnqueueArgs,
106+
): EnqueueResult {
107+
const guard = checkEnqueueGuards(ctx, args);
108+
if (!guard.ok) return guard;
109+
110+
const task = ctx.store.enqueue({
111+
type: args.type,
112+
inputs: args.inputs ?? {},
113+
dependsOn: args.dependsOn ?? [],
114+
model: args.model,
115+
enqueuedBy: ctx.currentTaskId ?? 'orchestrator',
116+
});
117+
return { ok: true, task };
118+
}
119+
120+
export type CompleteResult = { ok: true } | { ok: false; message: string };
121+
122+
export function applyComplete(
123+
ctx: OrchestratorToolsContext,
124+
args: { status: 'done' | 'failed' | 'skipped'; handoff: TaskHandoff },
125+
): CompleteResult {
126+
const id = ctx.currentTaskId;
127+
if (!id) {
128+
return {
129+
ok: false,
130+
message: 'complete_task can only be called by a running task agent.',
131+
};
132+
}
133+
if (args.status === 'failed') {
134+
ctx.store.fail(
135+
id,
136+
{ type: 'self-reported', message: args.handoff.forNextAgent },
137+
args.handoff,
138+
);
139+
} else if (args.status === 'skipped') {
140+
ctx.store.skip(id, args.handoff);
141+
} else {
142+
ctx.store.complete(id, args.handoff);
143+
}
144+
return { ok: true };
145+
}
146+
147+
export function applyReadHandoffs(
148+
ctx: OrchestratorToolsContext,
149+
args: { type?: string; taskId?: string },
150+
): TaskHandoff[] {
151+
if (args.taskId) {
152+
const h = ctx.store.readHandoff(args.taskId);
153+
return h ? [h] : [];
154+
}
155+
if (args.type) {
156+
return ctx.store.readHandoffsByType(args.type);
157+
}
158+
// No filter: every handoff of a dependency of the current task.
159+
const currentId = ctx.currentTaskId;
160+
const current = currentId ? ctx.store.get(currentId) : undefined;
161+
if (!current) return [];
162+
return current.dependsOn
163+
.map((depId) => ctx.store.readHandoff(depId))
164+
.filter((h): h is TaskHandoff => h !== null);
165+
}
166+
167+
const HANDOFF_SHAPE = {
168+
goals: z.string().describe('What this task was asked to achieve.'),
169+
did: z.string().describe('What you actually did.'),
170+
forNextAgent: z.string().describe('What the next agent should know.'),
171+
filesTouched: z.array(z.string()).optional(),
172+
};
173+
174+
type SdkTool = (
175+
name: string,
176+
description: string,
177+
// The SDK accepts a plain object of zod fields as the schema.
178+
schema: Record<string, z.ZodTypeAny>,
179+
handler: (args: never) => unknown,
180+
) => unknown;
181+
182+
function textResult(text: string, isError = false) {
183+
return { isError, content: [{ type: 'text' as const, text }] };
184+
}
185+
186+
/**
187+
* Build the orchestrator tools in the SDK `tool()` shape. Called from
188+
* createWizardToolsServer only when a queue context is present.
189+
*/
190+
export function buildOrchestratorTools(
191+
tool: SdkTool,
192+
ctx: OrchestratorToolsContext,
193+
): unknown[] {
194+
const enqueueTask = tool(
195+
'enqueue_task',
196+
'Add a task to the orchestrator queue. Use it to seed work and to enqueue follow-up work you discover. Keep tasks small and discrete.',
197+
{
198+
type: z
199+
.string()
200+
.describe(`The task type. One of: ${ctx.validTypes.join(', ')}.`),
201+
inputs: z.record(z.unknown()).optional(),
202+
dependsOn: z
203+
.array(z.string())
204+
.optional()
205+
.describe('Task ids that must be done before this task runs.'),
206+
model: z.string().optional(),
207+
reason: z.string().describe('One line on why this task is needed.'),
208+
},
209+
((args: EnqueueArgs) => {
210+
const res = applyEnqueue(ctx, args);
211+
if (!res.ok) {
212+
analytics.wizardCapture('orchestrator guard tripped', {
213+
guard: res.guard,
214+
type: args.type,
215+
});
216+
return textResult(res.message, true);
217+
}
218+
return textResult(JSON.stringify({ id: res.task.id }));
219+
}) as (args: never) => unknown,
220+
);
221+
222+
const completeTask = tool(
223+
'complete_task',
224+
"Report the outcome of your task. Always call this exactly once when you finish, with a structured handoff for the next agent. Use status 'skipped' when the task does not apply to this project and you cannot do it (say why in the handoff) — not 'done'.",
225+
{
226+
status: z.enum(['done', 'failed', 'skipped']),
227+
handoff: z.object(HANDOFF_SHAPE),
228+
},
229+
((args: {
230+
status: 'done' | 'failed' | 'skipped';
231+
handoff: TaskHandoff;
232+
}) => {
233+
const res = applyComplete(ctx, args);
234+
if (!res.ok) return textResult(res.message, true);
235+
return textResult('ok');
236+
}) as (args: never) => unknown,
237+
);
238+
239+
const readHandoffs = tool(
240+
'read_handoffs',
241+
'Read structured handoffs from earlier tasks. With no argument, returns the handoffs of your dependencies.',
242+
{
243+
type: z.string().optional(),
244+
taskId: z.string().optional(),
245+
},
246+
((args: { type?: string; taskId?: string }) => {
247+
const handoffs = applyReadHandoffs(ctx, args);
248+
return textResult(JSON.stringify(handoffs, null, 2));
249+
}) as (args: never) => unknown,
250+
);
251+
252+
return [enqueueTask, completeTask, readHandoffs];
253+
}

0 commit comments

Comments
 (0)