Skip to content

Commit e116168

Browse files
gewenyu99claude
andcommitted
feat(orchestrator): run telemetry — the responsiveness A/B spine
Queue transitions emit orchestrator task enqueued/started/completed/skipped/ failed with the resolved model, attempts, duration, time-to-first-task and the gap between consecutive starts — responsiveness is the dark launch's headline metric. agent completed/aborted carry per-task type, id, model, tokens and cost. The run-end reflection remark fires once, on the task that is last in the queue when it starts. analytics tags every event with the variant. Closes #628 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 0d02315 commit e116168

7 files changed

Lines changed: 245 additions & 16 deletions

File tree

src/lib/agent/agent-interface.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,11 @@ export async function runAgent(
683683
abortCases?: readonly AbortCaseMatcher[];
684684
/** Request the end-of-run reflection remark. Defaults to true. */
685685
requestRemark?: boolean;
686+
/**
687+
* Extra properties attached to this run's `agent completed` / `agent
688+
* aborted` events (e.g. the orchestrator's task type and id).
689+
*/
690+
analyticsProperties?: Record<string, unknown>;
686691
},
687692
middleware?: {
688693
onMessage(message: any): void;
@@ -761,9 +766,27 @@ export async function runAgent(
761766
analytics.capture(WIZARD_REMARK_EVENT_NAME, { remark });
762767
}
763768

769+
// Token usage comes from the SDK result message and is per agent run —
770+
// for the orchestrator that means per task, the secondary cost to watch.
771+
const usage = lastResultMessage?.usage as
772+
| {
773+
input_tokens?: number;
774+
output_tokens?: number;
775+
cache_creation_input_tokens?: number;
776+
cache_read_input_tokens?: number;
777+
}
778+
| undefined;
764779
analytics.wizardCapture('agent completed', {
765780
duration_ms: durationMs,
766781
duration_seconds: durationSeconds,
782+
model: agentConfig.model,
783+
num_turns: lastResultMessage?.num_turns,
784+
total_cost_usd: lastResultMessage?.total_cost_usd,
785+
input_tokens: usage?.input_tokens,
786+
output_tokens: usage?.output_tokens,
787+
cache_creation_input_tokens: usage?.cache_creation_input_tokens,
788+
cache_read_input_tokens: usage?.cache_read_input_tokens,
789+
...config?.analyticsProperties,
767790
});
768791
try {
769792
middleware?.finalize(lastResultMessage, durationMs);
@@ -1177,6 +1200,8 @@ export async function runAgent(
11771200
analytics.wizardCapture('agent aborted', {
11781201
duration_ms: durationMs,
11791202
duration_seconds: Math.round(durationMs / 1000),
1203+
model: agentConfig.model,
1204+
...config?.analyticsProperties,
11801205
});
11811206
}
11821207
}

src/lib/agent/agent-runner.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,9 @@ async function bootstrapProgram(
368368
// fork decision reads the flags.
369369
const wizardFlags = await analytics.getAllFlagsForWizard();
370370
const wizardMetadata = buildWizardMetadata(wizardFlags);
371+
// Tag every wizard event with the variant so runs segment in PostHog; the
372+
// orchestrator arm overwrites this with its own variant when it forks.
373+
analytics.setTag('variant', wizardMetadata.VARIANT);
371374

372375
const mcpUrl = session.localMcp
373376
? 'http://localhost:8787/mcp'

src/lib/programs/orchestrator/__tests__/agent-prompt-loader.test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
buildRegistry,
88
parseAgentPrompt,
99
resolveTask,
10+
taskModel,
1011
type AgentPrompt,
1112
type AgentRegistry,
1213
type OrchestratorPromptContext,
@@ -206,6 +207,23 @@ describe('resolveTask', () => {
206207
});
207208
});
208209

210+
describe('taskModel', () => {
211+
const prompt = parseAgentPrompt(
212+
'---\nmodel: prompt-model\n---\nx',
213+
'capture',
214+
);
215+
216+
it('prefers the enqueue override, then the prompt, then the default', () => {
217+
const registry = registryOf([prompt]);
218+
const task = { type: 'capture' };
219+
expect(taskModel(registry, { ...task, model: 'override' } as never)).toBe(
220+
'override',
221+
);
222+
expect(taskModel(registry, task as never)).toBe('prompt-model');
223+
expect(taskModel(registryOf([]), task as never)).toBe('claude-sonnet-4-6');
224+
});
225+
});
226+
209227
describe('assembleTaskPrompt', () => {
210228
const ctx: OrchestratorPromptContext = {
211229
projectId: 1,

src/lib/programs/orchestrator/__tests__/queue.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import {
77
type TaskHandoff,
88
} from '@lib/programs/orchestrator/queue';
99

10+
jest.mock('@utils/analytics', () => ({
11+
analytics: { captureException: jest.fn(), wizardCapture: jest.fn() },
12+
}));
13+
1014
function tmpDir(): string {
1115
return fs.mkdtempSync(path.join(os.tmpdir(), 'queue-test-'));
1216
}
@@ -132,4 +136,40 @@ describe('QueueStore', () => {
132136
expect(file.tasks[0].status).toBe('done');
133137
expect(file.tasks[0].handoff?.did).toBe('d');
134138
});
139+
140+
it('notifies the transition listener with post-transition task state', () => {
141+
const seen: Array<{ event: string; status: string; attempts: number }> = [];
142+
const listened = new QueueStore(dir, 'run-2', {
143+
onTransition: (event, task) =>
144+
seen.push({ event, status: task.status, attempts: task.attempts }),
145+
});
146+
147+
const t = listened.enqueue({ type: 'install' });
148+
listened.start(t.id);
149+
listened.fail(t.id, { type: 'API_ERROR', message: 'boom' });
150+
listened.requeue(t.id);
151+
listened.start(t.id);
152+
listened.complete(t.id);
153+
154+
expect(seen).toEqual([
155+
{ event: 'enqueue', status: 'pending', attempts: 0 },
156+
{ event: 'start', status: 'running', attempts: 1 },
157+
{ event: 'fail', status: 'failed', attempts: 1 },
158+
{ event: 'requeue', status: 'pending', attempts: 1 },
159+
{ event: 'start', status: 'running', attempts: 2 },
160+
{ event: 'complete', status: 'done', attempts: 2 },
161+
]);
162+
});
163+
164+
it('a throwing listener does not break transitions', () => {
165+
const listened = new QueueStore(dir, 'run-3', {
166+
onTransition: () => {
167+
throw new Error('listener boom');
168+
},
169+
});
170+
const t = listened.enqueue({ type: 'install' });
171+
listened.start(t.id);
172+
listened.complete(t.id);
173+
expect(listened.get(t.id)?.status).toBe('done');
174+
});
135175
});

src/lib/programs/orchestrator/agent-prompt-loader.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,16 @@ function exampleReference(ctx: OrchestratorPromptContext): string | null {
4949
return `A reference PostHog integration for this framework is at \`${ctx.examplePath}\`. It shows the target implementation pattern. Reference its patterns and conventions, adapting them to this codebase.`;
5050
}
5151

52+
/** The framework's rules ship with the reference skill; every task follows them. */
53+
function commandmentsReference(ctx: OrchestratorPromptContext): string | null {
54+
if (!ctx.commandmentsPath) return null;
55+
return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`;
56+
}
57+
58+
const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;
59+
60+
const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`;
61+
5262
/**
5363
* Points the agent at its installed task instructions (the HOW). They live under
5464
* the wizard's run dir, not `.claude/skills/`, so the SDK does not auto-load
@@ -60,16 +70,6 @@ function skillReference(paths: readonly string[]): string | null {
6070
return `Your task instructions are at ${list}. Read them before you start and follow them. They are wizard scaffolding, not part of the project.`;
6171
}
6272

63-
/** The framework's rules ship with the reference skill; every task follows them. */
64-
function commandmentsReference(ctx: OrchestratorPromptContext): string | null {
65-
if (!ctx.commandmentsPath) return null;
66-
return `Framework rules for this integration are at \`${ctx.commandmentsPath}\`. Read them before you edit and follow them.`;
67-
}
68-
69-
const TASK_BASICS = `You are one isolated task in a larger PostHog workflow, run as a fresh agent with no memory of the other tasks beyond the context you are given. Do only your task, then report exactly once by calling complete_task with a structured handoff: what your goal was, what you did, and what the next agent should know. When you are given context from previous steps, trust it — those agents already did their work, so do not re-verify or re-read what their handoffs tell you. Build on it and move fast. Read a file before you edit it, so your own changes do not duplicate what is already there. Work only within this project's own directory; nothing outside it is part of your task. If your task does not apply to this project — there is genuinely nothing for it to do — report it with status \`skipped\` and say why, rather than marking it done.`;
70-
71-
const SEED_BASICS = `You are the orchestrator. Plan the work and seed the queue with enqueue_task — each call returns an id you can pass as a dependency to a later task. Give each task a short label for the UI — the action in a few words, not file names, class names, or other specifics. You are not a task yourself: do not call complete_task and do not edit the project.`;
72-
7373
/** A task agent's full prompt: injected basics, then the authored intent. */
7474
export function assembleTaskPrompt(
7575
ctx: OrchestratorPromptContext,
@@ -315,9 +315,14 @@ export function resolveTask(
315315
.join('\n\n');
316316

317317
return {
318-
model: task.model ?? prompt.model ?? DEFAULT_TASK_MODEL,
318+
model: taskModel(registry, task),
319319
...agentRunTools(prompt),
320320
prompt: body,
321321
skills: prompt.skills,
322322
};
323323
}
324+
325+
/** The model a task runs on: enqueue override, then prompt frontmatter, then default. */
326+
export function taskModel(registry: AgentRegistry, task: QueuedTask): string {
327+
return task.model ?? registry.get(task.type)?.model ?? DEFAULT_TASK_MODEL;
328+
}

src/lib/programs/orchestrator/orchestrator-runner.ts

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,20 @@ import { logToFile } from '../../../utils/debug';
2727
import type { ProgramConfig } from '../program-step';
2828
import type { BootstrapResult } from '../../agent/agent-runner';
2929
import type { WizardRunOptions } from '../../../utils/types';
30-
import { QueueStore, QUEUE_DIR_NAME, TaskStatus } from './queue';
30+
import {
31+
QueueStore,
32+
QUEUE_DIR_NAME,
33+
TaskStatus,
34+
type QueuedTask,
35+
} from './queue';
3136
import { drainQueue, type RunTask } from './executor';
3237
import {
3338
agentRunTools,
3439
assembleSeedPrompt,
3540
assembleTaskPrompt,
3641
loadAgentRegistry,
3742
resolveTask,
43+
taskModel,
3844
type OrchestratorPromptContext,
3945
} from './agent-prompt-loader';
4046

@@ -73,7 +79,6 @@ export async function runOrchestrator(
7379
boot: BootstrapResult,
7480
): Promise<void> {
7581
const runId = randomUUID();
76-
const store = new QueueStore(session.installDir, runId);
7782

7883
const options = sessionRunOptions(session);
7984

@@ -91,6 +96,74 @@ export async function runOrchestrator(
9196
);
9297
}
9398

99+
// Every wizard event from here on carries the variant, so orchestrator runs
100+
// segment cleanly from the linear baseline.
101+
analytics.setTag('variant', 'orchestrator');
102+
103+
// Responsiveness is the headline metric of the dark launch: time to first
104+
// visible progress, and no single step dominating wall-clock. Track it from
105+
// queue transitions, with the resolved model so cheap work is attributable
106+
// to cheap models.
107+
const runStartMs = Date.now();
108+
let firstStartMs: number | undefined;
109+
let lastStartMs: number | undefined;
110+
const durationMs = (t: QueuedTask) =>
111+
t.startedAt && t.finishedAt
112+
? Date.parse(t.finishedAt) - Date.parse(t.startedAt)
113+
: undefined;
114+
115+
const store = new QueueStore(session.installDir, runId, {
116+
onTransition: (event, task) => {
117+
const base = {
118+
type: task.type,
119+
model: taskModel(registry, task),
120+
attempts: task.attempts,
121+
};
122+
switch (event) {
123+
case 'enqueue':
124+
analytics.wizardCapture('orchestrator task enqueued', {
125+
type: task.type,
126+
enqueued_by: task.enqueuedBy,
127+
dynamic: task.enqueuedBy !== 'orchestrator',
128+
});
129+
break;
130+
case 'start': {
131+
const now = Date.now();
132+
analytics.wizardCapture('orchestrator task started', {
133+
...base,
134+
ms_since_run_start: now - runStartMs,
135+
gap_since_prev_start_ms:
136+
lastStartMs === undefined ? undefined : now - lastStartMs,
137+
});
138+
firstStartMs ??= now;
139+
lastStartMs = now;
140+
break;
141+
}
142+
case 'complete':
143+
analytics.wizardCapture('orchestrator task completed', {
144+
...base,
145+
duration_ms: durationMs(task),
146+
});
147+
break;
148+
case 'skip':
149+
analytics.wizardCapture('orchestrator task skipped', {
150+
...base,
151+
duration_ms: durationMs(task),
152+
});
153+
break;
154+
case 'fail':
155+
analytics.wizardCapture('orchestrator task failed', {
156+
...base,
157+
duration_ms: durationMs(task),
158+
error: task.error?.type,
159+
});
160+
break;
161+
case 'requeue':
162+
break;
163+
}
164+
},
165+
});
166+
94167
// Give task agents the framework's finished reference integration to match,
95168
// the same EXAMPLE.md the linear flow uses. Install it under the run dir rather
96169
// than .claude/skills so its "do everything" workflow is not auto-loaded as a
@@ -191,6 +264,7 @@ export async function runOrchestrator(
191264
successMessage: 'Planned the integration',
192265
additionalFeatureQueue: [],
193266
requestRemark: false,
267+
analyticsProperties: { task_type: 'seed' },
194268
},
195269
);
196270
if (seedResult.error) {
@@ -211,6 +285,7 @@ export async function runOrchestrator(
211285
// its agent prompt (the WHAT) and the mini-skills it needs (the HOW), then
212286
// runs on its own model and tools.
213287
const taskSkillsRoot = path.join(QUEUE_DIR_NAME, 'skills');
288+
let remarkRequested = false;
214289
const runTask: RunTask = async (task) => {
215290
renderQueue();
216291
try {
@@ -236,6 +311,18 @@ export async function runOrchestrator(
236311
);
237312
}
238313
}
314+
// The run-end reflection fires once, on the task that is last in the
315+
// queue when it starts — nothing else pending or running alongside it.
316+
const isLastTask = !store
317+
.list()
318+
.some(
319+
(t) =>
320+
t.id !== task.id &&
321+
(t.status === TaskStatus.Pending ||
322+
t.status === TaskStatus.Running),
323+
);
324+
const requestRemark = isLastTask && !remarkRequested;
325+
if (requestRemark) remarkRequested = true;
239326
await runAgent(
240327
{
241328
...agent,
@@ -249,12 +336,12 @@ export async function runOrchestrator(
249336
// Empty messages suppress the per-task spinner lines (the spinner renders
250337
// only when a message is set); the queue panel shows progress. Errors
251338
// still surface — runAgent stops the spinner with its own error text.
252-
// No per-task remark — the reflection would fire on every task.
253339
{
254340
spinnerMessage: '',
255341
successMessage: '',
256342
additionalFeatureQueue: [],
257-
requestRemark: false,
343+
requestRemark,
344+
analyticsProperties: { task_type: task.type, task_id: task.id },
258345
},
259346
);
260347
} finally {
@@ -281,6 +368,10 @@ export async function runOrchestrator(
281368
tasks_total: summary.total,
282369
tasks_done: summary.done,
283370
tasks_failed: summary.failed,
371+
tasks_skipped: summary.skipped,
372+
total_duration_ms: Date.now() - runStartMs,
373+
time_to_first_task_ms:
374+
firstStartMs === undefined ? undefined : firstStartMs - runStartMs,
284375
});
285376

286377
// The build step flags any unresolved conflict in its handoff; surface the

0 commit comments

Comments
 (0)