Skip to content

Commit 48e0be9

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): in-memory queue + disk persistence (QueueStore)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7112748 commit 48e0be9

4 files changed

Lines changed: 398 additions & 20 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import * as fs from 'fs';
2+
import * as os from 'os';
3+
import * as path from 'path';
4+
import {
5+
QueueStore,
6+
type QueueFile,
7+
type TaskHandoff,
8+
} from '@lib/programs/orchestrator/queue';
9+
10+
function tmpDir(): string {
11+
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-'));
12+
}
13+
14+
describe('QueueStore', () => {
15+
let dir: string;
16+
let q: QueueStore;
17+
18+
beforeEach(() => {
19+
dir = tmpDir();
20+
q = new QueueStore(dir, 'run-1');
21+
});
22+
23+
afterEach(() => {
24+
fs.rmSync(dir, { recursive: true, force: true });
25+
});
26+
27+
it('enqueues a pending task with defaults', () => {
28+
const t = q.enqueue({ type: 'install' });
29+
expect(t.status).toBe('pending');
30+
expect(t.attempts).toBe(0);
31+
expect(t.maxAttempts).toBe(2);
32+
expect(t.enqueuedBy).toBe('orchestrator');
33+
expect(t.dependsOn).toEqual([]);
34+
expect(q.list()).toHaveLength(1);
35+
});
36+
37+
it('only marks a task runnable once its dependencies are done', () => {
38+
const a = q.enqueue({ type: 'install' });
39+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
40+
41+
expect(q.nextRunnable().map((t) => t.id)).toEqual([a.id]);
42+
43+
q.start(a.id);
44+
q.complete(a.id);
45+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
46+
});
47+
48+
it('returns every runnable task; the graph alone decides parallelism', () => {
49+
const a = q.enqueue({ type: 'install' });
50+
const b = q.enqueue({ type: 'init' });
51+
q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] });
52+
53+
// Both independent tasks are runnable at once; the dependent one is not.
54+
expect(
55+
q
56+
.nextRunnable()
57+
.map((t) => t.id)
58+
.sort(),
59+
).toEqual([a.id, b.id].sort());
60+
61+
q.start(a.id);
62+
// An in-progress task is no longer offered.
63+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
64+
});
65+
66+
it('treats a skipped dependency as satisfied', () => {
67+
const a = q.enqueue({ type: 'install' });
68+
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });
69+
70+
q.start(a.id);
71+
q.skip(a.id);
72+
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
73+
});
74+
75+
it('start increments attempts and supports within-run retry while attempts remain', () => {
76+
const t = q.enqueue({ type: 'install', maxAttempts: 2 });
77+
q.start(t.id);
78+
expect(q.get(t.id)?.attempts).toBe(1);
79+
80+
q.fail(t.id, { type: 'API_ERROR', message: 'boom' });
81+
expect(q.get(t.id)?.status).toBe('failed');
82+
83+
// Retry: attempts (1) < maxAttempts (2), so requeue and run again.
84+
q.requeue(t.id);
85+
expect(q.get(t.id)?.status).toBe('pending');
86+
q.start(t.id);
87+
expect(q.get(t.id)?.attempts).toBe(2);
88+
});
89+
90+
it('completing a task records and reads back a structured handoff', () => {
91+
const t = q.enqueue({ type: 'install' });
92+
const handoff: TaskHandoff = {
93+
goals: 'install the sdk',
94+
did: 'added posthog-js',
95+
forNextAgent: 'env vars not set yet',
96+
filesTouched: ['package.json'],
97+
};
98+
q.start(t.id);
99+
q.complete(t.id, handoff);
100+
101+
expect(q.get(t.id)?.status).toBe('done');
102+
expect(q.readHandoff(t.id)).toEqual(handoff);
103+
expect(q.readHandoffsByType('install')).toEqual([handoff]);
104+
});
105+
106+
it('is drained when a pending task is blocked by a failed dependency', () => {
107+
const a = q.enqueue({ type: 'install' });
108+
q.enqueue({ type: 'init', dependsOn: [a.id] });
109+
110+
expect(q.isDrained()).toBe(false);
111+
q.start(a.id);
112+
q.fail(a.id, { type: 'API_ERROR', message: 'boom' });
113+
114+
// init can never run now, and nothing is in progress.
115+
expect(q.nextRunnable()).toHaveLength(0);
116+
expect(q.isDrained()).toBe(true);
117+
});
118+
119+
it('reflects every transition to queue.json, handoffs included', () => {
120+
const a = q.enqueue({ type: 'install' });
121+
q.start(a.id);
122+
q.complete(a.id, {
123+
goals: 'g',
124+
did: 'd',
125+
forNextAgent: 'n',
126+
});
127+
128+
const file = JSON.parse(fs.readFileSync(q.queuePath, 'utf8')) as QueueFile;
129+
expect(file.version).toBe(1);
130+
expect(file.runId).toBe('run-1');
131+
expect(file.tasks).toHaveLength(1);
132+
expect(file.tasks[0].status).toBe('done');
133+
expect(file.tasks[0].handoff?.did).toBe('d');
134+
});
135+
});
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/**
2+
* The orchestrator task queue.
3+
*
4+
* In memory, synchronous, single-owner: one Node process drives the run, so
5+
* there is no locking. The queue imposes no execution policy — `nextRunnable`
6+
* returns every pending task whose dependencies are satisfied, and how many of
7+
* those run at once is decided by the task graph, not the queue.
8+
*
9+
* Every transition rewrites `<installDir>/.posthog-wizard/queue.json`, a small
10+
* file holding the whole queue, handoffs included. Today it is the run's
11+
* log and the report's source; later it is the resume point.
12+
*/
13+
import * as fs from 'fs';
14+
import * as path from 'path';
15+
import { randomUUID } from 'crypto';
16+
import { writeJsonAtomic } from '../../../utils/atomic-ledger';
17+
18+
export type TaskStatus =
19+
| 'pending'
20+
| 'in_progress'
21+
| 'done'
22+
| 'skipped'
23+
| 'failed';
24+
25+
export interface QueuedTask {
26+
id: string;
27+
type: string;
28+
status: TaskStatus;
29+
dependsOn: string[];
30+
inputs: Record<string, unknown>;
31+
model?: string;
32+
attempts: number;
33+
maxAttempts: number;
34+
/** The structured handoff the task reported on completion. */
35+
handoff?: TaskHandoff;
36+
/** 'orchestrator' for seeded tasks, or the id of the task that enqueued this one. */
37+
enqueuedBy: string;
38+
createdAt: string;
39+
startedAt?: string;
40+
finishedAt?: string;
41+
error?: { type: string; message: string };
42+
}
43+
44+
export interface QueueFile {
45+
version: 1;
46+
runId: string;
47+
tasks: QueuedTask[];
48+
}
49+
50+
/** The structured handoff a task leaves for the next agent. */
51+
export interface TaskHandoff {
52+
goals: string;
53+
did: string;
54+
forNextAgent: string;
55+
filesTouched?: string[];
56+
}
57+
58+
export interface EnqueueInput {
59+
type: string;
60+
inputs?: Record<string, unknown>;
61+
dependsOn?: string[];
62+
model?: string;
63+
maxAttempts?: number;
64+
enqueuedBy?: string;
65+
}
66+
67+
export const QUEUE_DIR_NAME = '.posthog-wizard';
68+
const DEFAULT_MAX_ATTEMPTS = 2;
69+
70+
function nowIso(): string {
71+
return new Date().toISOString();
72+
}
73+
74+
export class QueueStore {
75+
private tasks: QueuedTask[] = [];
76+
77+
readonly runId: string;
78+
readonly queuePath: string;
79+
80+
constructor(installDir: string, runId: string) {
81+
this.runId = runId;
82+
const dir = path.join(installDir, QUEUE_DIR_NAME);
83+
this.queuePath = path.join(dir, 'queue.json');
84+
fs.mkdirSync(dir, { recursive: true });
85+
}
86+
87+
// ── Reads ───────────────────────────────────────────────────────────
88+
89+
list(): readonly QueuedTask[] {
90+
return this.tasks;
91+
}
92+
93+
get(id: string): QueuedTask | undefined {
94+
return this.tasks.find((t) => t.id === id);
95+
}
96+
97+
/**
98+
* Every pending task whose dependencies are all satisfied (`done` or
99+
* `skipped`). A skipped dependency does not block downstream work.
100+
*/
101+
nextRunnable(): QueuedTask[] {
102+
const doneIds = new Set(
103+
this.tasks
104+
.filter((t) => t.status === 'done' || t.status === 'skipped')
105+
.map((t) => t.id),
106+
);
107+
return this.tasks.filter(
108+
(t) => t.status === 'pending' && t.dependsOn.every((d) => doneIds.has(d)),
109+
);
110+
}
111+
112+
/**
113+
* True when no task is in progress and none can be started. Either everything
114+
* is terminal, or the only pending tasks are blocked by a failed dependency.
115+
*/
116+
isDrained(): boolean {
117+
if (this.tasks.some((t) => t.status === 'in_progress')) return false;
118+
return this.nextRunnable().length === 0;
119+
}
120+
121+
summary(): Record<TaskStatus, number> & { total: number } {
122+
const counts: Record<TaskStatus, number> = {
123+
pending: 0,
124+
in_progress: 0,
125+
done: 0,
126+
skipped: 0,
127+
failed: 0,
128+
};
129+
for (const t of this.tasks) counts[t.status] += 1;
130+
return { ...counts, total: this.tasks.length };
131+
}
132+
133+
readHandoff(id: string): TaskHandoff | null {
134+
return this.get(id)?.handoff ?? null;
135+
}
136+
137+
/** Handoffs of completed tasks of a given type, oldest first. */
138+
readHandoffsByType(type: string): TaskHandoff[] {
139+
return this.tasks
140+
.filter((t) => t.type === type && t.handoff)
141+
.map((t) => t.handoff as TaskHandoff);
142+
}
143+
144+
// ── Transitions (each one reflected to queue.json) ──────────────────
145+
146+
enqueue(input: EnqueueInput): QueuedTask {
147+
const task: QueuedTask = {
148+
id: randomUUID(),
149+
type: input.type,
150+
status: 'pending',
151+
dependsOn: input.dependsOn ?? [],
152+
inputs: input.inputs ?? {},
153+
model: input.model,
154+
attempts: 0,
155+
maxAttempts: input.maxAttempts ?? DEFAULT_MAX_ATTEMPTS,
156+
enqueuedBy: input.enqueuedBy ?? 'orchestrator',
157+
createdAt: nowIso(),
158+
};
159+
this.tasks.push(task);
160+
this.reflect();
161+
return task;
162+
}
163+
164+
start(id: string): QueuedTask {
165+
const t = this.require(id);
166+
t.status = 'in_progress';
167+
t.startedAt = nowIso();
168+
t.attempts += 1;
169+
this.reflect();
170+
return t;
171+
}
172+
173+
complete(id: string, handoff?: TaskHandoff): QueuedTask {
174+
return this.finish(id, 'done', handoff);
175+
}
176+
177+
/** Terminal: the agent could not do the task. Not done, not failed. */
178+
skip(id: string, handoff?: TaskHandoff): QueuedTask {
179+
return this.finish(id, 'skipped', handoff);
180+
}
181+
182+
fail(
183+
id: string,
184+
error: { type: string; message: string },
185+
handoff?: TaskHandoff,
186+
): QueuedTask {
187+
const t = this.require(id);
188+
t.error = error;
189+
return this.finish(id, 'failed', handoff);
190+
}
191+
192+
/** Put a failed/in-progress task back to pending for a retry within the run. */
193+
requeue(id: string): QueuedTask {
194+
const t = this.require(id);
195+
t.status = 'pending';
196+
t.startedAt = undefined;
197+
t.finishedAt = undefined;
198+
this.reflect();
199+
return t;
200+
}
201+
202+
// ── Internals ───────────────────────────────────────────────────────
203+
204+
private finish(
205+
id: string,
206+
status: 'done' | 'skipped' | 'failed',
207+
handoff?: TaskHandoff,
208+
): QueuedTask {
209+
const t = this.require(id);
210+
if (handoff) t.handoff = handoff;
211+
t.status = status;
212+
t.finishedAt = nowIso();
213+
this.reflect();
214+
return t;
215+
}
216+
217+
private reflect(): void {
218+
const file: QueueFile = {
219+
version: 1,
220+
runId: this.runId,
221+
tasks: this.tasks,
222+
};
223+
writeJsonAtomic(this.queuePath, file);
224+
}
225+
226+
private require(id: string): QueuedTask {
227+
const t = this.get(id);
228+
if (!t) throw new Error(`No task ${id} in the queue`);
229+
return t;
230+
}
231+
}

0 commit comments

Comments
 (0)