Skip to content

Commit f8d51ae

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): in-memory queue + disk persistence (QueueStore)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7112748 commit f8d51ae

4 files changed

Lines changed: 401 additions & 20 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import * as fs from 'fs';
2+
import * as os from 'os';
3+
import * as path from 'path';
4+
import {
5+
QueueStore,
6+
type QueueFile,
7+
type TaskHandoff,
8+
} from '@lib/programs/orchestrator/queue';
9+
10+
function tmpDir(): string {
11+
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-'));
12+
}
13+
14+
describe('QueueStore', () => {
15+
let dir: string;
16+
let q: QueueStore;
17+
18+
beforeEach(() => {
19+
dir = tmpDir();
20+
q = new QueueStore(dir, 'run-1');
21+
});
22+
23+
afterEach(() => {
24+
fs.rmSync(dir, { recursive: true, force: true });
25+
});
26+
27+
it('enqueues a pending task with defaults', () => {
28+
const t = q.enqueue({ type: 'install' });
29+
expect(t.status).toBe('pending');
30+
expect(t.attempts).toBe(0);
31+
expect(t.maxAttempts).toBe(2);
32+
expect(t.enqueuedBy).toBe('orchestrator');
33+
expect(t.dependsOn).toEqual([]);
34+
expect(q.list()).toHaveLength(1);
35+
});
36+
37+
it('only marks a task runnable once its dependencies are done', () => {
38+
const a = q.enqueue({ type: 'install' });
39+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
40+
41+
expect(q.nextRunnable().map((t) => t.id)).toEqual([a.id]);
42+
43+
q.start(a.id);
44+
q.complete(a.id);
45+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
46+
});
47+
48+
it('returns every runnable task; the graph alone decides parallelism', () => {
49+
const a = q.enqueue({ type: 'install' });
50+
const b = q.enqueue({ type: 'init' });
51+
q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] });
52+
53+
// Both independent tasks are runnable at once; the dependent one is not.
54+
expect(
55+
q
56+
.nextRunnable()
57+
.map((t) => t.id)
58+
.sort(),
59+
).toEqual([a.id, b.id].sort());
60+
61+
q.start(a.id);
62+
// An in-progress task is no longer offered.
63+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
64+
});
65+
66+
it('treats a skipped dependency as satisfied', () => {
67+
const a = q.enqueue({ type: 'install' });
68+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
69+
70+
q.start(a.id);
71+
q.skip(a.id);
72+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
73+
});
74+
75+
it('start increments attempts and supports within-run retry while attempts remain', () => {
76+
const t = q.enqueue({ type: 'install', maxAttempts: 2 });
77+
q.start(t.id);
78+
expect(q.get(t.id)?.attempts).toBe(1);
79+
80+
q.fail(t.id, { type: 'API_ERROR', message: 'boom' });
81+
expect(q.get(t.id)?.status).toBe('failed');
82+
83+
// Retry: attempts (1) < maxAttempts (2), so requeue and run again.
84+
q.requeue(t.id);
85+
expect(q.get(t.id)?.status).toBe('pending');
86+
q.start(t.id);
87+
expect(q.get(t.id)?.attempts).toBe(2);
88+
});
89+
90+
it('completing a task records and reads back a structured handoff', () => {
91+
const t = q.enqueue({ type: 'install' });
92+
const handoff: TaskHandoff = {
93+
goals: 'install the sdk',
94+
did: 'added posthog-js',
95+
forNextAgent: 'env vars not set yet',
96+
filesTouched: ['package.json'],
97+
};
98+
q.start(t.id);
99+
q.complete(t.id, handoff);
100+
101+
expect(q.get(t.id)?.status).toBe('done');
102+
expect(q.readHandoff(t.id)).toEqual(handoff);
103+
expect(q.readHandoffsByType('install')).toEqual([handoff]);
104+
});
105+
106+
it('is drained when a pending task is blocked by a failed dependency', () => {
107+
const a = q.enqueue({ type: 'install' });
108+
q.enqueue({ type: 'init', dependsOn: [a.id] });
109+
110+
expect(q.isDrained()).toBe(false);
111+
q.start(a.id);
112+
q.fail(a.id, { type: 'API_ERROR', message: 'boom' });
113+
114+
// init can never run now, and nothing is in progress.
115+
expect(q.nextRunnable()).toHaveLength(0);
116+
expect(q.isDrained()).toBe(true);
117+
});
118+
119+
it('reflects every transition to queue.json, handoffs included', () => {
120+
const a = q.enqueue({ type: 'install' });
121+
q.start(a.id);
122+
q.complete(a.id, {
123+
goals: 'g',
124+
did: 'd',
125+
forNextAgent: 'n',
126+
});
127+
128+
const file = JSON.parse(fs.readFileSync(q.queuePath, 'utf8')) as QueueFile;
129+
expect(file.version).toBe(1);
130+
expect(file.runId).toBe('run-1');
131+
expect(file.tasks).toHaveLength(1);
132+
expect(file.tasks[0].status).toBe('done');
133+
expect(file.tasks[0].handoff?.did).toBe('d');
134+
});
135+
});
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/**
2+
* The orchestrator task queue.
3+
*
4+
* In memory, synchronous, single-owner: one Node process drives the run, so
5+
* there is no locking. The queue imposes no execution policy — `nextRunnable`
6+
* returns every pending task whose dependencies are satisfied, and how many of
7+
* those run at once is decided by the task graph, not the queue.
8+
*
9+
* Every transition rewrites `<installDir>/.posthog-wizard/queue.json`, a small
10+
* file holding the whole queue, handoffs included. Today it is the run's
11+
* log and the report's source; later it is the resume point.
12+
*/
13+
import * as fs from 'fs';
14+
import * as path from 'path';
15+
import { randomUUID } from 'crypto';
16+
import { writeJsonAtomic } from '../../../utils/atomic-ledger';
17+
18+
export type TaskStatus =
19+
| 'pending'
20+
| 'in_progress'
21+
| 'done'
22+
| 'skipped'
23+
| 'failed';
24+
25+
export interface QueuedTask {
26+
id: string;
27+
type: string;
28+
status: TaskStatus;
29+
dependsOn: string[];
30+
inputs: Record<string, unknown>;
31+
model?: string;
32+
attempts: number;
33+
maxAttempts: number;
34+
/** The structured handoff the task reported on completion. */
35+
handoff?: TaskHandoff;
36+
/** 'orchestrator' for seeded tasks, or the id of the task that enqueued this one. */
37+
enqueuedBy: string;
38+
depth: number;
39+
createdAt: string;
40+
startedAt?: string;
41+
finishedAt?: string;
42+
error?: { type: string; message: string };
43+
}
44+
45+
export interface QueueFile {
46+
version: 1;
47+
runId: string;
48+
tasks: QueuedTask[];
49+
}
50+
51+
/** The structured handoff a task leaves for the next agent. */
52+
export interface TaskHandoff {
53+
goals: string;
54+
did: string;
55+
forNextAgent: string;
56+
filesTouched?: string[];
57+
}
58+
59+
export interface EnqueueInput {
60+
type: string;
61+
inputs?: Record<string, unknown>;
62+
dependsOn?: string[];
63+
model?: string;
64+
maxAttempts?: number;
65+
enqueuedBy?: string;
66+
depth?: number;
67+
}
68+
69+
export const QUEUE_DIR_NAME = '.posthog-wizard';
70+
const DEFAULT_MAX_ATTEMPTS = 2;
71+
72+
function nowIso(): string {
73+
return new Date().toISOString();
74+
}
75+
76+
export class QueueStore {
77+
private tasks: QueuedTask[] = [];
78+
79+
readonly runId: string;
80+
readonly queuePath: string;
81+
82+
constructor(installDir: string, runId: string) {
83+
this.runId = runId;
84+
const dir = path.join(installDir, QUEUE_DIR_NAME);
85+
this.queuePath = path.join(dir, 'queue.json');
86+
fs.mkdirSync(dir, { recursive: true });
87+
}
88+
89+
// ── Reads ───────────────────────────────────────────────────────────
90+
91+
list(): readonly QueuedTask[] {
92+
return this.tasks;
93+
}
94+
95+
get(id: string): QueuedTask | undefined {
96+
return this.tasks.find((t) => t.id === id);
97+
}
98+
99+
/**
100+
* Every pending task whose dependencies are all satisfied (`done` or
101+
* `skipped`). A skipped dependency does not block downstream work.
102+
*/
103+
nextRunnable(): QueuedTask[] {
104+
const doneIds = new Set(
105+
this.tasks
106+
.filter((t) => t.status === 'done' || t.status === 'skipped')
107+
.map((t) => t.id),
108+
);
109+
return this.tasks.filter(
110+
(t) => t.status === 'pending' && t.dependsOn.every((d) => doneIds.has(d)),
111+
);
112+
}
113+
114+
/**
115+
* True when no task is in progress and none can be started. Either everything
116+
* is terminal, or the only pending tasks are blocked by a failed dependency.
117+
*/
118+
isDrained(): boolean {
119+
if (this.tasks.some((t) => t.status === 'in_progress')) return false;
120+
return this.nextRunnable().length === 0;
121+
}
122+
123+
summary(): Record<TaskStatus, number> & { total: number } {
124+
const counts: Record<TaskStatus, number> = {
125+
pending: 0,
126+
in_progress: 0,
127+
done: 0,
128+
skipped: 0,
129+
failed: 0,
130+
};
131+
for (const t of this.tasks) counts[t.status] += 1;
132+
return { ...counts, total: this.tasks.length };
133+
}
134+
135+
readHandoff(id: string): TaskHandoff | null {
136+
return this.get(id)?.handoff ?? null;
137+
}
138+
139+
/** Handoffs of completed tasks of a given type, oldest first. */
140+
readHandoffsByType(type: string): TaskHandoff[] {
141+
return this.tasks
142+
.filter((t) => t.type === type && t.handoff)
143+
.map((t) => t.handoff as TaskHandoff);
144+
}
145+
146+
// ── Transitions (each one reflected to queue.json) ──────────────────
147+
148+
enqueue(input: EnqueueInput): QueuedTask {
149+
const task: QueuedTask = {
150+
id: randomUUID(),
151+
type: input.type,
152+
status: 'pending',
153+
dependsOn: input.dependsOn ?? [],
154+
inputs: input.inputs ?? {},
155+
model: input.model,
156+
attempts: 0,
157+
maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS,
158+
enqueuedBy: input.enqueuedBy ?? 'orchestrator',
159+
depth: input.depth ?? 0,
160+
createdAt: nowIso(),
161+
};
162+
this.tasks.push(task);
163+
this.reflect();
164+
return task;
165+
}
166+
167+
start(id: string): QueuedTask {
168+
const t = this.require(id);
169+
t.status = 'in_progress';
170+
t.startedAt = nowIso();
171+
t.attempts += 1;
172+
this.reflect();
173+
return t;
174+
}
175+
176+
complete(id: string, handoff?: TaskHandoff): QueuedTask {
177+
return this.finish(id, 'done', handoff);
178+
}
179+
180+
/** Terminal: the agent could not do the task. Not done, not failed. */
181+
skip(id: string, handoff?: TaskHandoff): QueuedTask {
182+
return this.finish(id, 'skipped', handoff);
183+
}
184+
185+
fail(
186+
id: string,
187+
error: { type: string; message: string },
188+
handoff?: TaskHandoff,
189+
): QueuedTask {
190+
const t = this.require(id);
191+
t.error = error;
192+
return this.finish(id, 'failed', handoff);
193+
}
194+
195+
/** Put a failed/in-progress task back to pending for a retry within the run. */
196+
requeue(id: string): QueuedTask {
197+
const t = this.require(id);
198+
t.status = 'pending';
199+
t.startedAt = undefined;
200+
t.finishedAt = undefined;
201+
this.reflect();
202+
return t;
203+
}
204+
205+
// ── Internals ───────────────────────────────────────────────────────
206+
207+
private finish(
208+
id: string,
209+
status: 'done' | 'skipped' | 'failed',
210+
handoff?: TaskHandoff,
211+
): QueuedTask {
212+
const t = this.require(id);
213+
if (handoff) t.handoff = handoff;
214+
t.status = status;
215+
t.finishedAt = nowIso();
216+
this.reflect();
217+
return t;
218+
}
219+
220+
private reflect(): void {
221+
const file: QueueFile = {
222+
version: 1,
223+
runId: this.runId,
224+
tasks: this.tasks,
225+
};
226+
writeJsonAtomic(this.queuePath, file);
227+
}
228+
229+
private require(id: string): QueuedTask {
230+
const t = this.get(id);
231+
if (!t) throw new Error(`No task ${id} in the queue`);
232+
return t;
233+
}
234+
}

0 commit comments

Comments
 (0)