Skip to content

Commit 5484f0b

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): executor drain-loop scheduler
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 00153f0 commit 5484f0b

2 files changed

Lines changed: 265 additions & 0 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import * as fs from 'fs';
2+
import * as os from 'os';
3+
import * as path from 'path';
4+
import {
5+
QueueStore,
6+
type QueuedTask,
7+
type TaskHandoff,
8+
} from '@lib/programs/orchestrator/queue';
9+
import { drainQueue, type RunTask } from '@lib/programs/orchestrator/executor';
10+
11+
jest.mock('@utils/analytics', () => ({
12+
analytics: { captureException: jest.fn(), wizardCapture: jest.fn() },
13+
}));
14+
import { analytics } from '@utils/analytics';
15+
16+
const HANDOFF: TaskHandoff = { goals: 'g', did: 'd', forNextAgent: 'n' };
17+
18+
function tmpDir(): string {
19+
return fs.mkdtempSync(path.join(os.tmpdir(), 'executor-test-'));
20+
}
21+
22+
describe('drainQueue', () => {
23+
let dir: string;
24+
let q: QueueStore;
25+
26+
beforeEach(() => {
27+
dir = tmpDir();
28+
q = new QueueStore(dir, 'run-1');
29+
});
30+
31+
afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));
32+
33+
const completing: RunTask = (task) => {
34+
q.complete(task.id, HANDOFF);
35+
return Promise.resolve();
36+
};
37+
38+
it('runs a single task to done and drains', async () => {
39+
const a = q.enqueue({ type: 'install' });
40+
await drainQueue(q, completing, { maxStarts: 50 });
41+
expect(q.get(a.id)?.status).toBe('done');
42+
expect(q.isDrained()).toBe(true);
43+
});
44+
45+
it('runs a dependent task only after its dependency completes', async () => {
46+
const order: string[] = [];
47+
const a = q.enqueue({ type: 'install' });
48+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
49+
const runner: RunTask = (task) => {
50+
order.push(task.type);
51+
q.complete(task.id, HANDOFF);
52+
return Promise.resolve();
53+
};
54+
await drainQueue(q, runner, { maxStarts: 50 });
55+
expect(order).toEqual(['install', 'init']);
56+
expect(q.get(b.id)?.status).toBe('done');
57+
});
58+
59+
it('runs independent branches concurrently; the graph is the only schedule', async () => {
60+
let active = 0;
61+
let maxActive = 0;
62+
const runner: RunTask = async (task) => {
63+
active += 1;
64+
maxActive = Math.max(maxActive, active);
65+
await new Promise((r) => setTimeout(r, 5));
66+
q.complete(task.id, HANDOFF);
67+
active -= 1;
68+
};
69+
const a = q.enqueue({ type: 'install' });
70+
const b = q.enqueue({ type: 'init' });
71+
q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] });
72+
await drainQueue(q, runner, { maxStarts: 50 });
73+
// install and init overlap; capture waits for both.
74+
expect(maxActive).toBe(2);
75+
expect(q.summary().done).toBe(3);
76+
});
77+
78+
it('starts a dependent the moment its dependency finishes, not in waves', async () => {
79+
const startedAt: Record<string, number> = {};
80+
let clock = 0;
81+
const runner: RunTask = async (task) => {
82+
startedAt[task.type] = clock++;
83+
// slow holds the wave open; fast finishes early and unblocks after-fast.
84+
const delay = task.type === 'slow' ? 30 : 5;
85+
await new Promise((r) => setTimeout(r, delay));
86+
q.complete(task.id, HANDOFF);
87+
};
88+
q.enqueue({ type: 'slow' });
89+
const fast = q.enqueue({ type: 'fast' });
90+
q.enqueue({ type: 'after-fast', dependsOn: [fast.id] });
91+
await drainQueue(q, runner, { maxStarts: 50 });
92+
// after-fast started while slow was still running.
93+
expect(startedAt['after-fast']).toBeDefined();
94+
expect(q.summary().done).toBe(3);
95+
});
96+
97+
it('retries a task that ends without reporting, then fails it', async () => {
98+
const a = q.enqueue({ type: 'install', maxAttempts: 2 });
99+
const noReport: RunTask = async () => {
100+
/* agent never calls complete_task */
101+
};
102+
await drainQueue(q, noReport, { maxStarts: 50 });
103+
expect(q.get(a.id)?.status).toBe('failed');
104+
expect(q.get(a.id)?.attempts).toBe(2);
105+
});
106+
107+
it('succeeds on a retry within the attempt budget', async () => {
108+
let calls = 0;
109+
const a = q.enqueue({ type: 'install', maxAttempts: 3 });
110+
const flaky: RunTask = (task: QueuedTask) => {
111+
calls += 1;
112+
if (calls >= 2) q.complete(task.id, HANDOFF);
113+
return Promise.resolve();
114+
};
115+
await drainQueue(q, flaky, { maxStarts: 50 });
116+
expect(q.get(a.id)?.status).toBe('done');
117+
expect(calls).toBe(2);
118+
});
119+
120+
it('captures and fails a task whose runner throws', async () => {
121+
const a = q.enqueue({ type: 'install', maxAttempts: 1 });
122+
const throwing: RunTask = () => Promise.reject(new Error('agent exploded'));
123+
await drainQueue(q, throwing, { maxStarts: 50 });
124+
expect(q.get(a.id)?.status).toBe('failed');
125+
expect(analytics.captureException).toHaveBeenCalled();
126+
});
127+
128+
it('does not run a task whose dependency failed', async () => {
129+
const a = q.enqueue({ type: 'install', maxAttempts: 1 });
130+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
131+
const runner: RunTask = (task) => {
132+
if (task.type === 'init') q.complete(task.id, HANDOFF);
133+
// install never reports, so it fails after its single attempt.
134+
return Promise.resolve();
135+
};
136+
await drainQueue(q, runner, { maxStarts: 50 });
137+
expect(q.get(a.id)?.status).toBe('failed');
138+
expect(q.get(b.id)?.status).toBe('pending');
139+
expect(q.isDrained()).toBe(true);
140+
});
141+
142+
it('terminates via the start backstop instead of looping forever', async () => {
143+
const a = q.enqueue({ type: 'install', maxAttempts: 999 });
144+
const neverReports: RunTask = async () => {
145+
/* would retry forever without the backstop */
146+
};
147+
await drainQueue(q, neverReports, { maxStarts: 3 });
148+
expect(q.get(a.id)?.attempts).toBeLessThanOrEqual(3);
149+
});
150+
});
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* The executor drains the queue. It starts every runnable task (dependencies
3+
* satisfied) as soon as it becomes runnable — parallelism is decided by the
4+
* task graph, not by an executor knob. Each task runs through an injected
5+
* `runTask` function and reports its outcome via `complete_task`; a task that
6+
* ends without reporting is retried while attempts remain, then failed. A
7+
* `maxStarts` backstop guarantees termination.
8+
*
9+
* The drain loop is independent of how a task actually runs. `runTask` is
10+
* injected: the real one spins up a fresh agent, the tests use a fake.
11+
*/
12+
import { analytics } from '../../../utils/analytics';
13+
import { logToFile } from '../../../utils/debug';
14+
import { TaskStatus, type QueueStore, type QueuedTask } from './queue';
15+
16+
/** Per-task agent configuration the resolver produces from a task's type. */
17+
export interface ResolvedTask {
18+
model: string;
19+
allowedTools: readonly string[];
20+
disallowedTools: readonly string[];
21+
/** Mini-skills to install before the task runs (the HOW). */
22+
skills: readonly string[];
23+
prompt: string;
24+
}
25+
26+
/** Resolves a queued task to what the agent needs. The real one is markdown-backed. */
27+
export type TaskResolver = (
28+
task: QueuedTask,
29+
store: QueueStore,
30+
) => ResolvedTask;
31+
32+
/** Runs one task's agent. It is expected to drive the task to a terminal state
33+
* (via the task agent calling complete_task). */
34+
export type RunTask = (task: QueuedTask) => Promise<void>;
35+
36+
export interface DrainOptions {
37+
/** Backstop against a pathological always-one-more-pending loop. */
38+
maxStarts: number;
39+
}
40+
41+
export const DEFAULT_DRAIN_OPTIONS: DrainOptions = {
42+
maxStarts: 200,
43+
};
44+
45+
async function runOne(
46+
store: QueueStore,
47+
runTask: RunTask,
48+
task: QueuedTask,
49+
): Promise<void> {
50+
store.start(task.id);
51+
try {
52+
await runTask(task);
53+
} catch (error) {
54+
// The task threw rather than reporting. The outcome check below handles
55+
// the queue; the exception itself should never be silent.
56+
logToFile(`[executor] runTask threw for ${task.type}:`, error);
57+
analytics.captureException(
58+
error instanceof Error ? error : new Error(String(error)),
59+
{ step: 'orchestrator_run_task', task_type: task.type },
60+
);
61+
}
62+
63+
const after = store.get(task.id);
64+
if (!after) return;
65+
66+
if (after.status === TaskStatus.Running) {
67+
// The agent ended without calling complete_task. Retry or fail.
68+
if (after.attempts < after.maxAttempts) {
69+
store.requeue(task.id);
70+
} else {
71+
store.fail(task.id, {
72+
type: 'no-report',
73+
message: 'Task ended without calling complete_task.',
74+
});
75+
}
76+
return;
77+
}
78+
79+
if (
80+
after.status === TaskStatus.Failed &&
81+
after.attempts < after.maxAttempts
82+
) {
83+
store.requeue(task.id);
84+
}
85+
}
86+
87+
/**
88+
* Drain the queue to a terminal state. Every runnable task starts the moment
89+
* its dependencies finish; independent branches run concurrently. Returns when
90+
* every task is done, failed, or blocked by a failed dependency, or when the
91+
* start backstop trips.
92+
*/
93+
export async function drainQueue(
94+
store: QueueStore,
95+
runTask: RunTask,
96+
opts: DrainOptions = DEFAULT_DRAIN_OPTIONS,
97+
): Promise<void> {
98+
const running = new Map<string, Promise<void>>();
99+
let starts = 0;
100+
101+
for (;;) {
102+
for (const task of store.nextRunnable()) {
103+
if (++starts > opts.maxStarts) break;
104+
// runOne marks the task in_progress synchronously, so the next
105+
// nextRunnable() call no longer offers it.
106+
const p = runOne(store, runTask, task).finally(() =>
107+
running.delete(task.id),
108+
);
109+
running.set(task.id, p);
110+
}
111+
if (running.size === 0) break;
112+
// Wake on the first finish; it may have unblocked dependents or requeued.
113+
await Promise.race(running.values());
114+
}
115+
}

0 commit comments

Comments
 (0)