Skip to content

Commit a450075

Browse files
committed
feat: add FilesystemSink for telemetry audit mode
1 parent 13b34a3 commit a450075

3 files changed

Lines changed: 225 additions & 0 deletions

File tree

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import type { ResourceAttributes } from '../schemas/common-attributes';
2+
import { FilesystemSink } from '../sinks/filesystem-sink';
3+
import { mkdir, readFile, rm } from 'fs/promises';
4+
import { randomUUID } from 'node:crypto';
5+
import { tmpdir } from 'node:os';
6+
import { join } from 'node:path';
7+
import { afterAll, beforeEach, describe, expect, it } from 'vitest';
8+
9+
const sessionId = randomUUID();
10+
const testDir = join(tmpdir(), `agentcore-fs-sink-${randomUUID()}`);
11+
const outputDir = join(testDir, 'telemetry');
12+
13+
const resource: ResourceAttributes = {
14+
'service.name': 'agentcore-cli',
15+
'service.version': '1.0.0',
16+
'agentcore-cli.installation_id': randomUUID(),
17+
'agentcore-cli.session_id': sessionId,
18+
'agentcore-cli.mode': 'cli',
19+
'os.type': 'Linux',
20+
'os.version': '6.1.0',
21+
'host.arch': 'x64',
22+
'node.version': process.version,
23+
};
24+
25+
function createSink(opts: { dir?: string; log?: (msg: string) => void } = {}) {
26+
return new FilesystemSink({ outputDir: opts.dir ?? outputDir, resource, log: opts.log });
27+
}
28+
29+
function expectFilePath(sink: FilesystemSink): string {
30+
const p = sink.filePath;
31+
if (p === undefined) throw new Error('expected filePath to be defined');
32+
return p;
33+
}
34+
35+
describe('FilesystemSink', () => {
36+
beforeEach(async () => {
37+
await rm(testDir, { recursive: true, force: true });
38+
await mkdir(testDir, { recursive: true });
39+
});
40+
afterAll(() => rm(testDir, { recursive: true, force: true }));
41+
42+
it('filePath is undefined before any records', () => {
43+
const sink = createSink();
44+
expect(sink.filePath).toBeUndefined();
45+
});
46+
47+
it('derives file name from first record command_group', () => {
48+
const sink = createSink();
49+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
50+
expect(sink.filePath).toBe(join(outputDir, `deploy-${sessionId}.json`));
51+
});
52+
53+
it('uses "unknown" as entrypoint when command_group is absent', () => {
54+
const sink = createSink();
55+
sink.record(100, { command: 'orphan' });
56+
expect(sink.filePath).toBe(join(outputDir, `unknown-${sessionId}.json`));
57+
});
58+
59+
it('entrypoint stays fixed after first record', () => {
60+
const sink = createSink();
61+
sink.record(100, { command_group: 'deploy', command: 'deploy' });
62+
sink.record(200, { command_group: 'status', command: 'status' });
63+
expect(sink.filePath).toBe(join(outputDir, `deploy-${sessionId}.json`));
64+
});
65+
66+
it('flush writes buffered entries as a JSON array', async () => {
67+
const sink = createSink();
68+
sink.record(42, { command_group: 'deploy', command: 'deploy', exit_reason: 'success' });
69+
70+
await sink.flush();
71+
72+
const path = expectFilePath(sink);
73+
const written = JSON.parse(await readFile(path, 'utf-8'));
74+
expect(written).toHaveLength(1);
75+
expect(written[0]).toMatchObject({
76+
metric_name: 'cli.command_run',
77+
resource,
78+
attributes: { command_group: 'deploy', command: 'deploy', exit_reason: 'success', duration_ms: 42 },
79+
});
80+
});
81+
82+
it('accumulates multiple records into a single file', async () => {
83+
const sink = createSink();
84+
sink.record(10, { command_group: 'add', command: 'add.agent' });
85+
sink.record(20, { command_group: 'add', command: 'add.memory' });
86+
87+
await sink.flush();
88+
89+
const path = expectFilePath(sink);
90+
const written = JSON.parse(await readFile(path, 'utf-8'));
91+
expect(written).toHaveLength(2);
92+
expect(written[0].attributes.duration_ms).toBe(10);
93+
expect(written[1].attributes.duration_ms).toBe(20);
94+
});
95+
96+
it('creates output directory if it does not exist', async () => {
97+
const nested = join(testDir, 'deep', 'nested', 'telemetry');
98+
const sink = createSink({ dir: nested });
99+
sink.record(1, { command_group: 'status', command: 'status' });
100+
101+
await sink.flush();
102+
103+
const path = expectFilePath(sink);
104+
const written = JSON.parse(await readFile(path, 'utf-8'));
105+
expect(written).toHaveLength(1);
106+
});
107+
108+
it('flush is a no-op when no records exist', async () => {
109+
const sink = createSink();
110+
await expect(sink.flush()).resolves.toBeUndefined();
111+
});
112+
113+
it('shutdown writes entries and logs audit message', async () => {
114+
const logged: string[] = [];
115+
const sink = createSink({ log: msg => logged.push(msg) });
116+
sink.record(99, { command_group: 'invoke', command: 'invoke' });
117+
118+
await sink.shutdown();
119+
120+
const path = expectFilePath(sink);
121+
const written = JSON.parse(await readFile(path, 'utf-8'));
122+
expect(written).toHaveLength(1);
123+
expect(logged).toHaveLength(1);
124+
expect(logged[0]).toContain('[audit mode]');
125+
expect(logged[0]).toContain(path);
126+
});
127+
128+
it('shutdown does not log when no records were written', async () => {
129+
const logged: string[] = [];
130+
const sink = createSink({ log: msg => logged.push(msg) });
131+
132+
await sink.shutdown();
133+
134+
expect(logged).toHaveLength(0);
135+
});
136+
137+
it('shutdown after flush with no new records still logs', async () => {
138+
const logged: string[] = [];
139+
const sink = createSink({ log: msg => logged.push(msg) });
140+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
141+
await sink.flush();
142+
143+
await sink.shutdown();
144+
145+
expect(logged).toHaveLength(1);
146+
expect(logged[0]).toContain('[audit mode]');
147+
});
148+
149+
it('subsequent flushes include all records', async () => {
150+
const sink = createSink();
151+
sink.record(1, { command_group: 'deploy', command: 'deploy' });
152+
await sink.flush();
153+
154+
sink.record(2, { command_group: 'deploy', command: 'deploy' });
155+
await sink.flush();
156+
157+
const path = expectFilePath(sink);
158+
const written = JSON.parse(await readFile(path, 'utf-8'));
159+
expect(written).toHaveLength(2);
160+
});
161+
});

src/cli/telemetry/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export type { TelemetryPreference } from './config.js';
33
export { TelemetryClient, CANCELLED } from './client.js';
44
export { type MetricSink, CompositeSink } from './sinks/metric-sink.js';
55
export { OtelMetricSink, type OtelMetricSinkConfig } from './sinks/otel-metric-sink.js';
6+
export { FilesystemSink, type FilesystemSinkConfig } from './sinks/filesystem-sink.js';
67
export { classifyError, isUserError } from './error-classification.js';
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import type { ResourceAttributes } from '../schemas/common-attributes.js';
2+
import type { MetricSink } from './metric-sink.js';
3+
import { mkdir, writeFile } from 'fs/promises';
4+
import { join } from 'path';
5+
6+
export interface FilesystemSinkConfig {
7+
outputDir: string;
8+
resource: ResourceAttributes;
9+
log?: (message: string) => void;
10+
}
11+
12+
interface MetricEntry {
13+
metric_name: 'cli.command_run';
14+
resource: ResourceAttributes;
15+
attributes: Record<string, string | number>;
16+
}
17+
18+
export class FilesystemSink implements MetricSink {
19+
private readonly entries: MetricEntry[] = [];
20+
private readonly outputDir: string;
21+
private readonly resource: ResourceAttributes;
22+
private readonly sessionId: string;
23+
private readonly log: (message: string) => void;
24+
private entrypoint: string | undefined;
25+
26+
constructor(config: FilesystemSinkConfig) {
27+
this.outputDir = config.outputDir;
28+
this.resource = config.resource;
29+
this.sessionId = config.resource['agentcore-cli.session_id'];
30+
this.log = config.log ?? (msg => console.error(msg));
31+
}
32+
33+
get filePath(): string | undefined {
34+
if (this.entrypoint === undefined) return undefined;
35+
return join(this.outputDir, `${this.entrypoint}-${this.sessionId}.json`);
36+
}
37+
38+
record(value: number, attrs: Record<string, string | number>): void {
39+
this.entrypoint ??= String(attrs.command_group ?? 'unknown');
40+
this.entries.push({
41+
metric_name: 'cli.command_run',
42+
resource: this.resource,
43+
attributes: { ...attrs, duration_ms: value },
44+
});
45+
}
46+
47+
async flush(): Promise<void> {
48+
await this.writeEntries();
49+
}
50+
51+
async shutdown(): Promise<void> {
52+
await this.writeEntries();
53+
if (this.filePath !== undefined) {
54+
this.log(`[audit mode] Telemetry written to ${this.filePath}`);
55+
}
56+
}
57+
58+
private async writeEntries(): Promise<void> {
59+
if (this.entries.length === 0 || this.filePath === undefined) return;
60+
await mkdir(this.outputDir, { recursive: true });
61+
await writeFile(this.filePath, JSON.stringify(this.entries, null, 2));
62+
}
63+
}

0 commit comments

Comments
 (0)