Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/lib/programs/orchestrator/__tests__/queue-tools.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import { QueueStore } from '@lib/programs/orchestrator/queue';
import {
applyComplete,
applyEnqueue,
applyReadHandoffs,
checkEnqueueGuards,
type OrchestratorToolsContext,
} from '@lib/programs/orchestrator/queue-tools';

function tmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-tools-test-'));
}

const VALID = ['install', 'init', 'capture'];

describe('checkEnqueueGuards', () => {
let dir: string;
let store: QueueStore;
let ctx: OrchestratorToolsContext;

beforeEach(() => {
dir = tmpDir();
store = new QueueStore(dir, 'run-1');
ctx = { store, validTypes: VALID };
});

afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));

it('rejects an unknown type', () => {
const r = checkEnqueueGuards(ctx, { type: 'nope', reason: 'x' });
expect(r).toMatchObject({ ok: false, guard: 'unknown-type' });
});

it('rejects an unknown dependency', () => {
const r = checkEnqueueGuards(ctx, {
type: 'init',
dependsOn: ['ghost'],
reason: 'x',
});
expect(r).toMatchObject({ ok: false, guard: 'unknown-dep' });
});

it('trips dedup on the same type and inputs', () => {
store.enqueue({ type: 'install', inputs: { pkg: 'posthog-js' } });
const r = checkEnqueueGuards(ctx, {
type: 'install',
inputs: { pkg: 'posthog-js' },
reason: 'x',
});
expect(r).toMatchObject({ ok: false, guard: 'dedup' });
});

it('allows a valid enqueue', () => {
const r = checkEnqueueGuards(ctx, { type: 'init', reason: 'x' });
expect(r).toEqual({ ok: true });
});
});

describe('apply functions', () => {
let dir: string;
let store: QueueStore;
let ctx: OrchestratorToolsContext;

beforeEach(() => {
dir = tmpDir();
store = new QueueStore(dir, 'run-1');
ctx = { store, validTypes: VALID };
});

afterEach(() => fs.rmSync(dir, { recursive: true, force: true }));

it('attributes a seed enqueue to the orchestrator', () => {
const r = applyEnqueue(ctx, { type: 'install', reason: 'seed' });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.task.enqueuedBy).toBe('orchestrator');
});

it('attributes a follow-up enqueue to the running task', () => {
const parent = store.enqueue({ type: 'init' });
ctx.currentTaskId = parent.id;
const r = applyEnqueue(ctx, { type: 'capture', reason: 'follow-up' });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.task.enqueuedBy).toBe(parent.id);
});

it('complete_task fails when no task is running', () => {
const r = applyComplete(ctx, {
status: 'done',
handoff: { goals: 'g', did: 'd', forNextAgent: 'n' },
});
expect(r.ok).toBe(false);
});

it('complete_task marks the running task done and stores the handoff', () => {
const t = store.enqueue({ type: 'install' });
ctx.currentTaskId = t.id;
store.start(t.id);
const r = applyComplete(ctx, {
status: 'done',
handoff: { goals: 'g', did: 'added sdk', forNextAgent: 'env next' },
});
expect(r.ok).toBe(true);
expect(store.get(t.id)?.status).toBe('done');
expect(store.readHandoff(t.id)?.did).toBe('added sdk');
});

it('read_handoffs returns a dependency handoff for the running task', () => {
const dep = store.enqueue({ type: 'install' });
store.start(dep.id);
store.complete(dep.id, {
goals: 'g',
did: 'installed',
forNextAgent: 'now init',
});
const t = store.enqueue({ type: 'init', dependsOn: [dep.id] });
ctx.currentTaskId = t.id;

const handoffs = applyReadHandoffs(ctx, {});
expect(handoffs).toHaveLength(1);
expect(handoffs[0].did).toBe('installed');
});
});
135 changes: 135 additions & 0 deletions src/lib/programs/orchestrator/__tests__/queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import * as fs from 'fs';
import * as os from 'os';
import * as path from 'path';
import {
QueueStore,
type QueueFile,
type TaskHandoff,
} from '@lib/programs/orchestrator/queue';

function tmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-'));
}

describe('QueueStore', () => {
let dir: string;
let q: QueueStore;

beforeEach(() => {
dir = tmpDir();
q = new QueueStore(dir, 'run-1');
});

afterEach(() => {
fs.rmSync(dir, { recursive: true, force: true });
});

it('enqueues a pending task with defaults', () => {
const t = q.enqueue({ type: 'install' });
expect(t.status).toBe('pending');
expect(t.attempts).toBe(0);
expect(t.maxAttempts).toBe(2);
expect(t.enqueuedBy).toBe('orchestrator');
expect(t.dependsOn).toEqual([]);
expect(q.list()).toHaveLength(1);
});

it('only marks a task runnable once its dependencies are done', () => {
const a = q.enqueue({ type: 'install' });
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });

expect(q.nextRunnable().map((t) => t.id)).toEqual([a.id]);

q.start(a.id);
q.complete(a.id);
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
});

it('returns every runnable task; the graph alone decides parallelism', () => {
const a = q.enqueue({ type: 'install' });
const b = q.enqueue({ type: 'init' });
q.enqueue({ type: 'capture', dependsOn: [a.id, b.id] });

// Both independent tasks are runnable at once; the dependent one is not.
expect(
q
.nextRunnable()
.map((t) => t.id)
.sort(),
).toEqual([a.id, b.id].sort());

q.start(a.id);
// An in-progress task is no longer offered.
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
});

it('treats a skipped dependency as satisfied', () => {
const a = q.enqueue({ type: 'install' });
const b = q.enqueue({ type: 'init', dependsOn: [a.id] });

q.start(a.id);
q.skip(a.id);
expect(q.nextRunnable().map((t) => t.id)).toEqual([b.id]);
});

it('start increments attempts and supports within-run retry while attempts remain', () => {
const t = q.enqueue({ type: 'install', maxAttempts: 2 });
q.start(t.id);
expect(q.get(t.id)?.attempts).toBe(1);

q.fail(t.id, { type: 'API_ERROR', message: 'boom' });
expect(q.get(t.id)?.status).toBe('failed');

// Retry: attempts (1) < maxAttempts (2), so requeue and run again.
q.requeue(t.id);
expect(q.get(t.id)?.status).toBe('pending');
q.start(t.id);
expect(q.get(t.id)?.attempts).toBe(2);
});

it('completing a task records and reads back a structured handoff', () => {
const t = q.enqueue({ type: 'install' });
const handoff: TaskHandoff = {
goals: 'install the sdk',
did: 'added posthog-js',
forNextAgent: 'env vars not set yet',
filesTouched: ['package.json'],
};
q.start(t.id);
q.complete(t.id, handoff);

expect(q.get(t.id)?.status).toBe('done');
expect(q.readHandoff(t.id)).toEqual(handoff);
expect(q.readHandoffsByType('install')).toEqual([handoff]);
});

it('is drained when a pending task is blocked by a failed dependency', () => {
const a = q.enqueue({ type: 'install' });
q.enqueue({ type: 'init', dependsOn: [a.id] });

expect(q.isDrained()).toBe(false);
q.start(a.id);
q.fail(a.id, { type: 'API_ERROR', message: 'boom' });

// init can never run now, and nothing is in progress.
expect(q.nextRunnable()).toHaveLength(0);
expect(q.isDrained()).toBe(true);
});

it('reflects every transition to queue.json, handoffs included', () => {
const a = q.enqueue({ type: 'install' });
q.start(a.id);
q.complete(a.id, {
goals: 'g',
did: 'd',
forNextAgent: 'n',
});

const file = JSON.parse(fs.readFileSync(q.queuePath, 'utf8')) as QueueFile;
expect(file.version).toBe(1);
expect(file.runId).toBe('run-1');
expect(file.tasks).toHaveLength(1);
expect(file.tasks[0].status).toBe('done');
expect(file.tasks[0].handoff?.did).toBe('d');
});
});
Loading
Loading