Skip to content

Commit d267105

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): executor drain-loop scheduler
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 54d0c57 commit d267105

2 files changed

Lines changed: 219 additions & 0 deletions

File tree

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import * as fs from 'fs';
2+
import * as os from 'os';
3+
import * as path from 'path';
4+
import {
5+
QueueStore,
6+
type QueuedTask,
7+
type TaskHandoff,
8+
} from '@lib/programs/orchestrator/queue';
9+
import { drainQueue, type RunTask } from '@lib/programs/orchestrator/executor';
10+
11+
const HANDOFF: TaskHandoff = { goals: 'g', did: 'd', forNextAgent: 'n' };
12+
13+
function tmpDir(): string {
14+
return fs.mkdtempSync(path.join(os.tmpdir(), 'executor-test-'));
15+
}
16+
17+
describe('drainQueue', () => {
18+
let dir: string;
19+
let q: QueueStore;
20+
21+
beforeEach(() => {
22+
dir = tmpDir();
23+
q = new QueueStore(dir, 'run-1');
24+
});
25+
26+
afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));
27+
28+
const completing: RunTask = async (task) => {
29+
await q.complete(task.id, HANDOFF);
30+
};
31+
32+
it('runs a single task to done and drains', async () => {
33+
const a = await q.enqueue({ type: 'install' });
34+
await drainQueue(q, completing, { cap: 1, maxIterations: 50 });
35+
expect(q.get(a.id)?.status).toBe('done');
36+
expect(q.isDrained()).toBe(true);
37+
});
38+
39+
it('runs a dependent task only after its dependency completes', async () => {
40+
const order: string[] = [];
41+
const a = await q.enqueue({ type: 'install' });
42+
const b = await q.enqueue({ type: 'init', dependsOn: [a.id] });
43+
const runner: RunTask = async (task) => {
44+
order.push(task.type);
45+
await q.complete(task.id, HANDOFF);
46+
};
47+
await drainQueue(q, runner, { cap: 1, maxIterations: 50 });
48+
expect(order).toEqual(['install', 'init']);
49+
expect(q.get(b.id)?.status).toBe('done');
50+
});
51+
52+
it('retries a task that ends without reporting, then fails it', async () => {
53+
const a = await q.enqueue({ type: 'install', maxAttempts: 2 });
54+
const noReport: RunTask = async () => {
55+
/* agent never calls complete_task */
56+
};
57+
await drainQueue(q, noReport, { cap: 1, maxIterations: 50 });
58+
expect(q.get(a.id)?.status).toBe('failed');
59+
expect(q.get(a.id)?.attempts).toBe(2);
60+
});
61+
62+
it('succeeds on a retry within the attempt budget', async () => {
63+
let calls = 0;
64+
const a = await q.enqueue({ type: 'install', maxAttempts: 3 });
65+
const flaky: RunTask = async (task: QueuedTask) => {
66+
calls += 1;
67+
if (calls >= 2) await q.complete(task.id, HANDOFF);
68+
};
69+
await drainQueue(q, flaky, { cap: 1, maxIterations: 50 });
70+
expect(q.get(a.id)?.status).toBe('done');
71+
expect(calls).toBe(2);
72+
});
73+
74+
it('does not run a task whose dependency failed', async () => {
75+
const a = await q.enqueue({ type: 'install', maxAttempts: 1 });
76+
const b = await q.enqueue({ type: 'init', dependsOn: [a.id] });
77+
const runner: RunTask = async (task) => {
78+
if (task.type === 'init') await q.complete(task.id, HANDOFF);
79+
// install never reports, so it fails after its single attempt.
80+
};
81+
await drainQueue(q, runner, { cap: 1, maxIterations: 50 });
82+
expect(q.get(a.id)?.status).toBe('failed');
83+
expect(q.get(b.id)?.status).toBe('pending');
84+
expect(q.isDrained()).toBe(true);
85+
});
86+
87+
it('respects the concurrency cap', async () => {
88+
let active = 0;
89+
let maxActive = 0;
90+
const runner: RunTask = async (task) => {
91+
active += 1;
92+
maxActive = Math.max(maxActive, active);
93+
await new Promise((r) => setTimeout(r, 5));
94+
await q.complete(task.id, HANDOFF);
95+
active -= 1;
96+
};
97+
await q.enqueue({ type: 'install' });
98+
await q.enqueue({ type: 'init' });
99+
await drainQueue(q, runner, { cap: 2, maxIterations: 50 });
100+
expect(maxActive).toBe(2);
101+
});
102+
103+
it('runs independent tasks one at a time at cap 1', async () => {
104+
let active = 0;
105+
let maxActive = 0;
106+
const runner: RunTask = async (task) => {
107+
active += 1;
108+
maxActive = Math.max(maxActive, active);
109+
await new Promise((r) => setTimeout(r, 5));
110+
await q.complete(task.id, HANDOFF);
111+
active -= 1;
112+
};
113+
await q.enqueue({ type: 'install' });
114+
await q.enqueue({ type: 'init' });
115+
await drainQueue(q, runner, { cap: 1, maxIterations: 50 });
116+
expect(maxActive).toBe(1);
117+
});
118+
119+
it('terminates via the iteration backstop instead of looping forever', async () => {
120+
const a = await q.enqueue({ type: 'install', maxAttempts: 999 });
121+
const neverReports: RunTask = async () => {
122+
/* would retry forever without the backstop */
123+
};
124+
await drainQueue(q, neverReports, { cap: 1, maxIterations: 3 });
125+
expect(q.get(a.id)?.attempts).toBeLessThanOrEqual(3);
126+
});
127+
});
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* The executor drains the queue. It is the scheduler: it picks runnable tasks
3+
* (dependencies satisfied), respects the concurrency cap, runs each through a
4+
* `runTask` function, and reads the outcome the task reported through
5+
* `complete_task`. A task that ends without reporting is retried while attempts
6+
* remain, then failed. A `maxIterations` backstop guarantees termination.
7+
*
8+
* The drain loop is independent of how a task actually runs. `runTask` is
9+
* injected: the real one spins up a fresh agent, the tests use a fake.
10+
*/
11+
import type { QueueStore, QueuedTask } from './queue';
12+
13+
/** Per-task agent configuration the resolver produces from a task's type. */
14+
export interface ResolvedTask {
15+
model: string;
16+
allowedTools: readonly string[];
17+
disallowedTools: readonly string[];
18+
prompt: string;
19+
}
20+
21+
/** Resolves a queued task to what the agent needs. The real one is markdown-backed. */
22+
export type TaskResolver = (
23+
task: QueuedTask,
24+
store: QueueStore,
25+
) => ResolvedTask;
26+
27+
/** Runs one task's agent. It is expected to drive the task to a terminal state
28+
* (via the task agent calling complete_task). */
29+
export type RunTask = (task: QueuedTask) => Promise<void>;
30+
31+
export interface DrainOptions {
32+
/** Max tasks running at once. Default 1 (sequential) for the prototype. */
33+
cap: number;
34+
/** Backstop against a pathological always-one-more-pending loop. */
35+
maxIterations: number;
36+
}
37+
38+
export const DEFAULT_DRAIN_OPTIONS: DrainOptions = {
39+
cap: 1,
40+
maxIterations: 200,
41+
};
42+
43+
async function runOne(
44+
store: QueueStore,
45+
runTask: RunTask,
46+
task: QueuedTask,
47+
): Promise<void> {
48+
await store.start(task.id);
49+
try {
50+
await runTask(task);
51+
} catch {
52+
// The task threw rather than reporting. The outcome check below handles it.
53+
}
54+
55+
const after = store.get(task.id);
56+
if (!after) return;
57+
58+
if (after.status === 'in_progress') {
59+
// The agent ended without calling complete_task. Retry or fail.
60+
if (after.attempts < after.maxAttempts) {
61+
await store.requeue(task.id);
62+
} else {
63+
await store.fail(task.id, {
64+
type: 'no-report',
65+
message: 'Task ended without calling complete_task.',
66+
});
67+
}
68+
return;
69+
}
70+
71+
if (after.status === 'failed' && after.attempts < after.maxAttempts) {
72+
await store.requeue(task.id);
73+
}
74+
}
75+
76+
/**
77+
* Drain the queue to a terminal state. Returns when every task is done, failed,
78+
* or blocked by a failed dependency, or when the iteration backstop trips.
79+
*/
80+
export async function drainQueue(
81+
store: QueueStore,
82+
runTask: RunTask,
83+
opts: DrainOptions = DEFAULT_DRAIN_OPTIONS,
84+
): Promise<void> {
85+
let iterations = 0;
86+
while (!store.isDrained()) {
87+
if (++iterations > opts.maxIterations) break;
88+
const runnable = store.nextRunnable(opts.cap);
89+
if (runnable.length === 0) break;
90+
await Promise.all(runnable.map((t) => runOne(store, runTask, t)));
91+
}
92+
}

0 commit comments

Comments
 (0)