Skip to content

Commit df58164

Browse files
committed
feat: add FilesystemSink for telemetry audit mode
1 parent 13b34a3 commit df58164

3 files changed

Lines changed: 226 additions & 0 deletions

File tree

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import { createTempConfig } from '../../__tests__/helpers/temp-config';
2+
import { resolveResourceAttributes } from '../config';
3+
import type { ResourceAttributes } from '../schemas/common-attributes';
4+
import { FilesystemSink } from '../sinks/filesystem-sink';
5+
import { readFile } from 'fs/promises';
6+
import { join } from 'node:path';
7+
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest';
8+
9+
const tmp = createTempConfig('fs-sink');
10+
const outputDir = join(tmp.configDir, 'telemetry');
11+
12+
let resource: ResourceAttributes;
13+
const ORIGINAL_CONFIG_DIR = process.env.AGENTCORE_CONFIG_DIR;
14+
15+
beforeAll(async () => {
16+
process.env.AGENTCORE_CONFIG_DIR = tmp.configDir;
17+
await tmp.setup();
18+
resource = await resolveResourceAttributes('cli');
19+
});
20+
21+
afterAll(async () => {
22+
if (ORIGINAL_CONFIG_DIR === undefined) {
23+
delete process.env.AGENTCORE_CONFIG_DIR;
24+
} else {
25+
process.env.AGENTCORE_CONFIG_DIR = ORIGINAL_CONFIG_DIR;
26+
}
27+
await tmp.cleanup();
28+
});
29+
30+
function createSink(opts: { dir?: string; log?: (msg: string) => void } = {}) {
31+
return new FilesystemSink({ outputDir: opts.dir ?? outputDir, resource, log: opts.log });
32+
}
33+
34+
function expectFilePath(sink: FilesystemSink): string {
35+
const p = sink.filePath;
36+
if (p === undefined) throw new Error('expected filePath to be defined');
37+
return p;
38+
}
39+
40+
describe('FilesystemSink', () => {
41+
beforeEach(() => tmp.setup());
42+
43+
it('derives file name from first record command_group and session id', () => {
44+
const sink = createSink();
45+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
46+
expect(sink.filePath).toBe(join(outputDir, `deploy-${resource['agentcore-cli.session_id']}.json`));
47+
});
48+
49+
it('uses "unknown" as entrypoint when command_group is absent', () => {
50+
const sink = createSink();
51+
sink.record(100, { command: 'orphan' });
52+
expect(sink.filePath).toBe(join(outputDir, `unknown-${resource['agentcore-cli.session_id']}.json`));
53+
});
54+
55+
it('entrypoint stays fixed after first record', () => {
56+
const sink = createSink();
57+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
58+
sink.record(200, { command_group: 'status', command: 'status' });
59+
expect(sink.filePath).toContain('deploy-');
60+
});
61+
62+
it('flush writes buffered entries as a JSON array', async () => {
63+
const sink = createSink();
64+
sink.record(42, { command_group: 'deploy', command: 'deploy', exit_reason: 'success' });
65+
66+
await sink.flush();
67+
68+
const path = expectFilePath(sink);
69+
const written = JSON.parse(await readFile(path, 'utf-8'));
70+
expect(written).toHaveLength(1);
71+
expect(written[0]).toMatchObject({
72+
metric_name: 'cli.command_run',
73+
resource,
74+
attributes: {
75+
command_group: 'deploy',
76+
command: 'deploy',
77+
exit_reason: 'success',
78+
duration_ms: 42,
79+
},
80+
});
81+
});
82+
83+
it('accumulates multiple records into a single file', async () => {
84+
const sink = createSink();
85+
sink.record(10, { command_group: 'add', command: 'add.agent' });
86+
sink.record(20, { command_group: 'add', command: 'add.memory' });
87+
88+
await sink.flush();
89+
90+
const path = expectFilePath(sink);
91+
const written = JSON.parse(await readFile(path, 'utf-8'));
92+
expect(written).toHaveLength(2);
93+
expect(written[0].attributes.duration_ms).toBe(10);
94+
expect(written[1].attributes.duration_ms).toBe(20);
95+
});
96+
97+
it('creates output directory if it does not exist', async () => {
98+
const nested = join(tmp.testDir, 'deep', 'nested', 'telemetry');
99+
const sink = createSink({ dir: nested });
100+
sink.record(1, { command_group: 'status', command: 'status' });
101+
102+
await sink.flush();
103+
104+
const path = expectFilePath(sink);
105+
const written = JSON.parse(await readFile(path, 'utf-8'));
106+
expect(written).toHaveLength(1);
107+
});
108+
109+
it('flush is a no-op when no records exist', async () => {
110+
const sink = createSink();
111+
await expect(sink.flush()).resolves.toBeUndefined();
112+
});
113+
114+
it('shutdown writes entries and logs audit message', async () => {
115+
const logged: string[] = [];
116+
const sink = createSink({ log: msg => logged.push(msg) });
117+
sink.record(99, { command_group: 'invoke', command: 'invoke' });
118+
119+
await sink.shutdown();
120+
121+
const path = expectFilePath(sink);
122+
const written = JSON.parse(await readFile(path, 'utf-8'));
123+
expect(written).toHaveLength(1);
124+
expect(logged).toHaveLength(1);
125+
expect(logged[0]).toContain('[audit mode]');
126+
expect(logged[0]).toContain(path);
127+
});
128+
129+
it('shutdown does not log when no records were written', async () => {
130+
const logged: string[] = [];
131+
const sink = createSink({ log: msg => logged.push(msg) });
132+
133+
await sink.shutdown();
134+
135+
expect(logged).toHaveLength(0);
136+
});
137+
138+
it('shutdown after flush with no new records still logs', async () => {
139+
const logged: string[] = [];
140+
const sink = createSink({ log: msg => logged.push(msg) });
141+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
142+
await sink.flush();
143+
144+
await sink.shutdown();
145+
146+
expect(logged).toHaveLength(1);
147+
expect(logged[0]).toContain('[audit mode]');
148+
});
149+
150+
it('subsequent flushes include all records', async () => {
151+
const sink = createSink();
152+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
153+
await sink.flush();
154+
155+
sink.record(2, { command_group: 'deploy', command: 'deploy' });
156+
await sink.flush();
157+
158+
const path = expectFilePath(sink);
159+
const written = JSON.parse(await readFile(path, 'utf-8'));
160+
expect(written).toHaveLength(2);
161+
});
162+
});

src/cli/telemetry/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export type { TelemetryPreference } from './config.js';
33
export { TelemetryClient, CANCELLED } from './client.js';
44
export { type MetricSink, CompositeSink } from './sinks/metric-sink.js';
55
export { OtelMetricSink, type OtelMetricSinkConfig } from './sinks/otel-metric-sink.js';
6+
export { FilesystemSink, type FilesystemSinkConfig } from './sinks/filesystem-sink.js';
67
export { classifyError, isUserError } from './error-classification.js';
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import type { ResourceAttributes } from '../schemas/common-attributes.js';
2+
import type { MetricSink } from './metric-sink.js';
3+
import { mkdir, writeFile } from 'fs/promises';
4+
import { join } from 'path';
5+
6+
export interface FilesystemSinkConfig {
7+
outputDir: string;
8+
resource: ResourceAttributes;
9+
log?: (message: string) => void;
10+
}
11+
12+
interface MetricEntry {
13+
metric_name: 'cli.command_run';
14+
resource: ResourceAttributes;
15+
attributes: Record<string, string | number>;
16+
}
17+
18+
export class FilesystemSink implements MetricSink {
19+
private readonly entries: MetricEntry[] = [];
20+
private readonly outputDir: string;
21+
private readonly resource: ResourceAttributes;
22+
private readonly sessionId: string;
23+
private readonly log: (message: string) => void;
24+
private entrypoint: string | undefined;
25+
26+
constructor(config: FilesystemSinkConfig) {
27+
this.outputDir = config.outputDir;
28+
this.resource = config.resource;
29+
this.sessionId = config.resource['agentcore-cli.session_id'];
30+
this.log = config.log ?? (msg => console.error(msg));
31+
}
32+
33+
get filePath(): string | undefined {
34+
if (this.entrypoint === undefined) return undefined;
35+
return join(this.outputDir, `${this.entrypoint}-${this.sessionId}.json`);
36+
}
37+
38+
record(value: number, attrs: Record<string, string | number>): void {
39+
this.entrypoint ??= String(attrs.command_group ?? 'unknown');
40+
this.entries.push({
41+
metric_name: 'cli.command_run',
42+
resource: this.resource,
43+
attributes: { ...attrs, duration_ms: value },
44+
});
45+
}
46+
47+
async flush(): Promise<void> {
48+
await this.writeEntries();
49+
}
50+
51+
async shutdown(): Promise<void> {
52+
await this.writeEntries();
53+
if (this.filePath !== undefined) {
54+
this.log(`[audit mode] Telemetry written to ${this.filePath}`);
55+
}
56+
}
57+
58+
private async writeEntries(): Promise<void> {
59+
if (this.entries.length === 0 || this.filePath === undefined) return;
60+
await mkdir(this.outputDir, { recursive: true });
61+
await writeFile(this.filePath, JSON.stringify(this.entries, null, 2));
62+
}
63+
}

0 commit comments

Comments
 (0)