Skip to content

Commit 9e632f5

Browse files
juliusmarmingeJulius Marminge
andauthored
[codex] add diagnostics resource history (pingdotgg#2685)
Co-authored-by: Julius Marminge <julius@macmini.local>
1 parent 4120e94 commit 9e632f5

15 files changed

Lines changed: 1075 additions & 13 deletions

File tree

apps/server/src/diagnostics/ProcessDiagnostics.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
1515

1616
import { collectUint8StreamText } from "../stream/collectUint8StreamText.ts";
1717

18-
interface ProcessRow {
18+
export interface ProcessRow {
1919
readonly pid: number;
2020
readonly ppid: number;
2121
readonly pgid: number | null;
@@ -186,7 +186,7 @@ function parseWindowsProcessRows(output: string): ReadonlyArray<ProcessRow> {
186186
}
187187
}
188188

189-
function buildDescendantEntries(
189+
export function buildDescendantEntries(
190190
rows: ReadonlyArray<ProcessRow>,
191191
serverPid: number,
192192
): ReadonlyArray<ServerProcessDiagnosticsEntry> {
@@ -230,7 +230,7 @@ function buildDescendantEntries(
230230
return entries;
231231
}
232232

233-
function isDiagnosticsQueryProcess(row: ProcessRow, serverPid: number): boolean {
233+
export function isDiagnosticsQueryProcess(row: ProcessRow, serverPid: number): boolean {
234234
if (row.ppid !== serverPid) return false;
235235

236236
const command = row.command.trim();
@@ -370,7 +370,7 @@ function readWindowsProcessRows(): Effect.Effect<
370370
);
371371
}
372372

373-
const readProcessRows = (platform = process.platform) =>
373+
export const readProcessRows = (platform = process.platform) =>
374374
platform === "win32" ? readWindowsProcessRows() : readPosixProcessRows();
375375

376376
export function aggregateProcessDiagnostics(input: {
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
import { describe, expect, it } from "@effect/vitest";
2+
import * as DateTime from "effect/DateTime";
3+
import * as Effect from "effect/Effect";
4+
import * as Option from "effect/Option";
5+
6+
import {
7+
aggregateProcessResourceHistory,
8+
collectMonitoredSamples,
9+
} from "./ProcessResourceMonitor.ts";
10+
11+
describe("ProcessResourceMonitor", () => {
12+
it.effect("samples the server root process and descendants", () =>
13+
Effect.sync(() => {
14+
const sampledAt = DateTime.makeUnsafe("2026-05-05T10:00:00.000Z");
15+
const samples = collectMonitoredSamples({
16+
serverPid: 100,
17+
sampledAt,
18+
sampledAtMs: DateTime.toEpochMillis(sampledAt),
19+
rows: [
20+
{
21+
pid: 100,
22+
ppid: 1,
23+
pgid: 100,
24+
status: "S",
25+
cpuPercent: 2,
26+
rssBytes: 1_000,
27+
elapsed: "01:00",
28+
command: "t3 server",
29+
},
30+
{
31+
pid: 101,
32+
ppid: 100,
33+
pgid: 100,
34+
status: "S",
35+
cpuPercent: 10,
36+
rssBytes: 2_000,
37+
elapsed: "00:20",
38+
command: "codex app-server",
39+
},
40+
{
41+
pid: 102,
42+
ppid: 101,
43+
pgid: 100,
44+
status: "R",
45+
cpuPercent: 50,
46+
rssBytes: 3_000,
47+
elapsed: "00:05",
48+
command: "rg needle",
49+
},
50+
{
51+
pid: 200,
52+
ppid: 1,
53+
pgid: 200,
54+
status: "R",
55+
cpuPercent: 99,
56+
rssBytes: 9_000,
57+
elapsed: "00:05",
58+
command: "unrelated",
59+
},
60+
],
61+
});
62+
63+
expect(samples.map((sample) => sample.pid)).toEqual([100, 101, 102]);
64+
expect(samples.map((sample) => sample.depth)).toEqual([0, 1, 2]);
65+
expect(samples[0]?.isServerRoot).toBe(true);
66+
expect(samples[1]?.isServerRoot).toBe(false);
67+
}),
68+
);
69+
70+
it.effect("rolls samples up by process and CPU time", () =>
71+
Effect.sync(() => {
72+
const firstAt = DateTime.makeUnsafe("2026-05-05T10:00:00.000Z");
73+
const secondAt = DateTime.makeUnsafe("2026-05-05T10:00:05.000Z");
74+
const samples = [
75+
...collectMonitoredSamples({
76+
serverPid: 100,
77+
sampledAt: firstAt,
78+
sampledAtMs: DateTime.toEpochMillis(firstAt),
79+
rows: [
80+
{
81+
pid: 100,
82+
ppid: 1,
83+
pgid: 100,
84+
status: "S",
85+
cpuPercent: 10,
86+
rssBytes: 1_000,
87+
elapsed: "01:00",
88+
command: "t3 server",
89+
},
90+
],
91+
}),
92+
...collectMonitoredSamples({
93+
serverPid: 100,
94+
sampledAt: secondAt,
95+
sampledAtMs: DateTime.toEpochMillis(secondAt),
96+
rows: [
97+
{
98+
pid: 100,
99+
ppid: 1,
100+
pgid: 100,
101+
status: "S",
102+
cpuPercent: 30,
103+
rssBytes: 2_000,
104+
elapsed: "01:05",
105+
command: "t3 server",
106+
},
107+
],
108+
}),
109+
];
110+
111+
const result = aggregateProcessResourceHistory({
112+
samples,
113+
readAt: secondAt,
114+
readAtMs: DateTime.toEpochMillis(secondAt),
115+
windowMs: 60_000,
116+
bucketMs: 10_000,
117+
lastError: null,
118+
});
119+
120+
expect(Option.isNone(result.error)).toBe(true);
121+
expect(result.topProcesses).toHaveLength(1);
122+
expect(result.topProcesses[0]?.avgCpuPercent).toBe(20);
123+
expect(result.topProcesses[0]?.maxCpuPercent).toBe(30);
124+
expect(result.topProcesses[0]?.cpuSecondsApprox).toBe(2);
125+
expect(result.totalCpuSecondsApprox).toBe(2);
126+
expect(result.buckets.some((bucket) => bucket.maxCpuPercent === 30)).toBe(true);
127+
}),
128+
);
129+
130+
it.effect("keeps a process grouped when elapsed time drifts between samples", () =>
131+
Effect.sync(() => {
132+
const firstAt = DateTime.makeUnsafe("2026-05-05T10:00:00.400Z");
133+
const secondAt = DateTime.makeUnsafe("2026-05-05T10:00:05.900Z");
134+
const samples = [
135+
...collectMonitoredSamples({
136+
serverPid: 100,
137+
sampledAt: firstAt,
138+
sampledAtMs: DateTime.toEpochMillis(firstAt),
139+
rows: [
140+
{
141+
pid: 100,
142+
ppid: 1,
143+
pgid: 100,
144+
status: "S",
145+
cpuPercent: 1,
146+
rssBytes: 1_000,
147+
elapsed: "01:00",
148+
command: "t3 server",
149+
},
150+
],
151+
}),
152+
...collectMonitoredSamples({
153+
serverPid: 100,
154+
sampledAt: secondAt,
155+
sampledAtMs: DateTime.toEpochMillis(secondAt),
156+
rows: [
157+
{
158+
pid: 100,
159+
ppid: 1,
160+
pgid: 100,
161+
status: "S",
162+
cpuPercent: 2,
163+
rssBytes: 2_000,
164+
elapsed: "01:06",
165+
command: "t3 server",
166+
},
167+
],
168+
}),
169+
];
170+
171+
const result = aggregateProcessResourceHistory({
172+
samples,
173+
readAt: secondAt,
174+
readAtMs: DateTime.toEpochMillis(secondAt),
175+
windowMs: 60_000,
176+
bucketMs: 10_000,
177+
lastError: null,
178+
});
179+
180+
expect(result.topProcesses).toHaveLength(1);
181+
expect(result.topProcesses[0]?.isServerRoot).toBe(true);
182+
expect(result.topProcesses[0]?.sampleCount).toBe(2);
183+
expect(result.topProcesses[0]?.maxRssBytes).toBe(2_000);
184+
}),
185+
);
186+
187+
it.effect("returns all process summaries in the selected window", () =>
188+
Effect.sync(() => {
189+
const sampledAt = DateTime.makeUnsafe("2026-05-05T10:00:00.000Z");
190+
const samples = collectMonitoredSamples({
191+
serverPid: 100,
192+
sampledAt,
193+
sampledAtMs: DateTime.toEpochMillis(sampledAt),
194+
rows: [
195+
{
196+
pid: 100,
197+
ppid: 1,
198+
pgid: 100,
199+
status: "S",
200+
cpuPercent: 1,
201+
rssBytes: 1_000,
202+
elapsed: "01:00",
203+
command: "t3 server",
204+
},
205+
...Array.from({ length: 35 }, (_, index) => ({
206+
pid: 200 + index,
207+
ppid: index === 0 ? 100 : 199 + index,
208+
pgid: 100,
209+
status: "S",
210+
cpuPercent: 35 - index,
211+
rssBytes: 2_000 + index,
212+
elapsed: "00:10",
213+
command: `worker ${index}`,
214+
})),
215+
],
216+
});
217+
218+
const result = aggregateProcessResourceHistory({
219+
samples,
220+
readAt: sampledAt,
221+
readAtMs: DateTime.toEpochMillis(sampledAt),
222+
windowMs: 60_000,
223+
bucketMs: 10_000,
224+
lastError: null,
225+
});
226+
227+
expect(result.topProcesses).toHaveLength(36);
228+
expect(result.topProcesses.some((process) => process.command === "worker 34")).toBe(true);
229+
}),
230+
);
231+
});

0 commit comments

Comments
 (0)