Skip to content

Commit a3c60af

Browse files
committed
feat(telemetry): persist and restore telemetry data across sessions
load persisted telemetry from DB in use-workflow-events and recovery add session file reader for claude telemetry data emit telemetry updates when restoring crashed workflows
1 parent 25f6d32 commit a3c60af

File tree

6 files changed

+107
-55
lines changed

6 files changed

+107
-55
lines changed

src/cli/tui/routes/workflow/hooks/use-workflow-events.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,23 @@ export function useWorkflowEvents(options: UseWorkflowEventsOptions): UseWorkflo
8484
const monitor = AgentMonitorService.getInstance()
8585
const monitoringId = Number(controllerState.controllerConfig.monitoringId) || undefined
8686
const controllerAgent = monitoringId ? monitor.getAgent(monitoringId) : undefined
87+
88+
// Load persisted telemetry from DB, fall back to zeros if not available
89+
const persistedTelemetry = controllerAgent?.telemetry
90+
8791
actions.setControllerState({
8892
id: controllerState.controllerConfig.agentId,
8993
name: controllerState.controllerConfig.agentId, // Name may not be persisted, use agentId as fallback
9094
engine: controllerAgent?.engine ?? 'unknown',
9195
model: controllerAgent?.modelName,
92-
telemetry: { tokensIn: 0, tokensOut: 0 }, // Default telemetry, will be updated if controller runs
96+
telemetry: persistedTelemetry
97+
? {
98+
tokensIn: persistedTelemetry.tokensIn,
99+
tokensOut: persistedTelemetry.tokensOut,
100+
cached: persistedTelemetry.cached,
101+
cost: persistedTelemetry.cost,
102+
}
103+
: { tokensIn: 0, tokensOut: 0 },
93104
monitoringId,
94105
})
95106
}

src/infra/engines/providers/claude/execution/runner.ts

Lines changed: 72 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,74 @@
11
import * as path from 'node:path';
22
import { homedir } from 'node:os';
3+
import { readFileSync, existsSync } from 'node:fs';
34

45
import { spawnProcess } from '../../../../process/spawn.js';
56
import { buildClaudeExecCommand } from './commands.js';
67
import { metadata } from '../metadata.js';
78
import { expandHomeDir } from '../../../../../shared/utils/index.js';
89
import { ENV } from '../config.js';
9-
import { createTelemetryCapture } from '../../../../../shared/telemetry/index.js';
1010
import { debug } from '../../../../../shared/logging/logger.js';
1111
import type { ParsedTelemetry } from '../../../core/types.js';
1212
import {
1313
formatThinking,
1414
formatCommand,
1515
formatResult,
1616
formatStatus,
17-
formatDuration,
18-
formatCost,
19-
formatTokens,
20-
addMarker,
21-
SYMBOL_BULLET,
2217
} from '../../../../../shared/formatters/outputMarkers.js';
2318

19+
/**
20+
* Get Claude session file path using claudeConfigDir
21+
*/
22+
function getSessionPath(sessionId: string, workingDir: string, claudeConfigDir: string): string {
23+
const slug = workingDir.replace(/\//g, '-');
24+
return path.join(claudeConfigDir, 'projects', slug, `${sessionId}.jsonl`);
25+
}
26+
27+
/**
28+
* Read telemetry from Claude session file (last assistant message with usage)
29+
*/
30+
function readSessionTelemetry(sessionPath: string): ParsedTelemetry | null {
31+
if (!existsSync(sessionPath)) {
32+
debug('[SESSION-READER] File not found: %s', sessionPath);
33+
return null;
34+
}
35+
36+
try {
37+
const content = readFileSync(sessionPath, 'utf-8');
38+
const lines = content.trim().split('\n').filter(Boolean);
39+
40+
for (let i = lines.length - 1; i >= 0; i--) {
41+
try {
42+
const entry = JSON.parse(lines[i]);
43+
if (entry.type === 'assistant' && entry.message?.usage) {
44+
const u = entry.message.usage;
45+
const input = u.input_tokens || 0;
46+
const output = u.output_tokens || 0;
47+
const cacheCreation = u.cache_creation_input_tokens || 0;
48+
const cacheRead = u.cache_read_input_tokens || 0;
49+
50+
const totalContext = input + cacheCreation + cacheRead;
51+
52+
debug('[SESSION-READER] input=%d, output=%d, cache_creation=%d, cache_read=%d, TOTAL=%d',
53+
input, output, cacheCreation, cacheRead, totalContext);
54+
55+
return {
56+
tokensIn: totalContext,
57+
tokensOut: output,
58+
cached: cacheCreation + cacheRead,
59+
cacheCreationTokens: cacheCreation,
60+
cacheReadTokens: cacheRead,
61+
};
62+
}
63+
} catch { /* skip malformed */ }
64+
}
65+
return null;
66+
} catch (err) {
67+
debug('[SESSION-READER] Error: %s', err);
68+
return null;
69+
}
70+
}
71+
2472
export interface RunClaudeOptions {
2573
prompt: string;
2674
workingDir: string;
@@ -181,68 +229,42 @@ export async function runClaude(options: RunClaudeOptions): Promise<RunClaudeRes
181229

182230
const { command, args } = buildClaudeExecCommand({ workingDir, resumeSessionId, model });
183231

184-
// Create telemetry capture instance
185-
const telemetryCapture = createTelemetryCapture('claude', model, prompt, workingDir);
186-
187-
// Track JSON error events (Claude may exit 0 even on errors)
232+
// Track state
188233
let capturedError: string | null = null;
189-
let sessionIdCaptured = false;
234+
let capturedSessionId: string | null = null;
190235
let stdoutBuffer = '';
191236

192237
const handleStreamLine = (line: string): void => {
193238
if (!line.trim()) return;
194239

195-
// Capture telemetry data
196-
telemetryCapture.captureFromStreamJson(line);
197-
198-
// Check for error events (Claude may exit 0 even on errors like invalid model)
199240
try {
200241
const json = JSON.parse(line);
201242

202-
// Capture session ID from first event that contains it
203-
if (!sessionIdCaptured && json.session_id && onSessionId) {
204-
sessionIdCaptured = true;
205-
onSessionId(json.session_id);
243+
// Capture session ID from first event
244+
if (!capturedSessionId && json.session_id) {
245+
capturedSessionId = json.session_id;
246+
onSessionId?.(json.session_id);
206247
}
207248

208-
// Check for error in result type
249+
// Check for errors
209250
if (json.type === 'result' && json.is_error && json.result && !capturedError) {
210251
capturedError = json.result;
211252
}
212-
// Check for error in assistant message
213253
if (json.type === 'assistant' && json.error && !capturedError) {
214-
const messageText = json.message?.content?.[0]?.text;
215-
capturedError = messageText || json.error;
254+
capturedError = json.message?.content?.[0]?.text || json.error;
216255
}
217-
} catch {
218-
// Ignore parse errors
219-
}
220256

221-
// Emit telemetry event if captured and callback provided
222-
if (onTelemetry) {
223-
const captured = telemetryCapture.getCaptured();
224-
if (captured && captured.tokens) {
225-
// Per Anthropic docs: total_input = input_tokens + cache_read + cache_creation
226-
// See: https://platform.claude.com/docs/en/build-with-claude/prompt-caching#tracking-cache-performance
227-
const totalIn = (captured.tokens.input ?? 0) + (captured.tokens.cached ?? 0);
228-
229-
debug('[TELEMETRY:2.5-RUNNER] [CLAUDE] Emitting telemetry via onTelemetry callback');
230-
debug('[TELEMETRY:2.5-RUNNER] [CLAUDE] CAPTURED: input=%d, output=%d, cached=%s',
231-
captured.tokens.input ?? 0,
232-
captured.tokens.output ?? 0,
233-
captured.tokens.cached ?? 'none');
234-
debug('[TELEMETRY:2.5-RUNNER] [CLAUDE] TOTAL CONTEXT: %d (input + cached), output=%d',
235-
totalIn,
236-
captured.tokens.output ?? 0);
237-
238-
onTelemetry({
239-
tokensIn: totalIn,
240-
tokensOut: captured.tokens.output ?? 0,
241-
cached: captured.tokens.cached,
242-
cost: captured.cost,
243-
duration: captured.duration,
244-
});
257+
// Result event = trigger to read telemetry from session file
258+
if (json.type === 'result' && onTelemetry && capturedSessionId) {
259+
const sessionPath = getSessionPath(capturedSessionId, workingDir, claudeConfigDir);
260+
debug('[SESSION-READER] Reading telemetry from: %s', sessionPath);
261+
const telemetry = readSessionTelemetry(sessionPath);
262+
if (telemetry) {
263+
onTelemetry(telemetry);
264+
}
245265
}
266+
} catch {
267+
// Ignore parse errors
246268
}
247269

248270
const formatted = formatStreamJsonLine(line);
@@ -351,9 +373,6 @@ export async function runClaude(options: RunClaudeOptions): Promise<RunClaudeRes
351373
throw new Error(errorMessage);
352374
}
353375

354-
// Log captured telemetry
355-
telemetryCapture.logCapturedTelemetry(result.exitCode);
356-
357376
return {
358377
stdout: result.stdout,
359378
stderr: result.stderr,

src/workflows/controller/init.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ export async function initControllerAgent(
8585
uniqueAgentId: agentId, // Required for ui callback to work
8686
engine: options?.engineOverride,
8787
model: options?.modelOverride,
88+
onTelemetry: options?.onTelemetry,
8889
});
8990
debug('[Controller] executeAgent returned: agentId=%s', result.agentId);
9091

src/workflows/controller/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ export interface InitControllerOptions {
3737
engineOverride?: string
3838
/** Model override (from workflow definition.options) - passed to executeAgent */
3939
modelOverride?: string
40+
/** Telemetry callback for first turn (so UI shows telemetry immediately) */
41+
onTelemetry?: (telemetry: import('../../agents/execution/types.js').ParsedTelemetry) => void
4042
}
4143

4244
/**

src/workflows/controller/view.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ export async function runControllerView(
300300
},
301301
engineOverride: resolvedEngine,
302302
modelOverride: resolvedModel,
303+
onTelemetry: createTelemetryCallback({
304+
uniqueAgentId: '',
305+
emitter,
306+
isController: true,
307+
}),
303308
}
304309
);
305310

src/workflows/recovery/restore.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { getNextChainIndex } from '../indexing/lifecycle.js';
1313
import { loadAgentConfig } from '../../agents/runner/index.js';
1414
import { loadChainedPrompts } from '../../agents/runner/chained.js';
1515
import { getSelectedConditions, getSelectedTrack } from '../../shared/workflows/template.js';
16-
import { StatusService } from '../../agents/monitoring/index.js';
16+
import { StatusService, AgentMonitorService } from '../../agents/monitoring/index.js';
1717
import type { CrashRestoreContext, CrashRestoreResult } from './types.js';
1818

1919
/**
@@ -47,6 +47,20 @@ export async function restoreFromCrash(ctx: CrashRestoreContext): Promise<CrashR
4747
// 1. Register monitoring ID with emitter (for log panel connection)
4848
if (stepData.monitoringId !== undefined) {
4949
emitter.registerMonitoringId(uniqueAgentId, stepData.monitoringId);
50+
51+
// 1b. Load persisted telemetry from DB and emit it
52+
const monitor = AgentMonitorService.getInstance();
53+
const agentData = monitor.getAgent(stepData.monitoringId);
54+
if (agentData?.telemetry) {
55+
debug('[recovery/restore] Emitting persisted telemetry for step %d: tokensIn=%d, tokensOut=%d, cached=%d',
56+
stepIndex, agentData.telemetry.tokensIn, agentData.telemetry.tokensOut, agentData.telemetry.cached);
57+
emitter.updateAgentTelemetry(uniqueAgentId, {
58+
tokensIn: agentData.telemetry.tokensIn,
59+
tokensOut: agentData.telemetry.tokensOut,
60+
cached: agentData.telemetry.cached,
61+
cost: agentData.telemetry.cost,
62+
});
63+
}
5064
}
5165

5266
// 2. Update agent status to awaiting

0 commit comments

Comments
 (0)