Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions apps/server/src/diagnostics/ProcessResourceMonitor.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, expect, it } from "@effect/vitest";
import { assert, describe, it } from "@effect/vitest";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Option from "effect/Option";

Expand Down Expand Up @@ -60,10 +61,16 @@ describe("ProcessResourceMonitor", () => {
],
});

expect(samples.map((sample) => sample.pid)).toEqual([100, 101, 102]);
expect(samples.map((sample) => sample.depth)).toEqual([0, 1, 2]);
expect(samples[0]?.isServerRoot).toBe(true);
expect(samples[1]?.isServerRoot).toBe(false);
assert.deepStrictEqual(
samples.map((sample) => sample.pid),
[100, 101, 102],
);
assert.deepStrictEqual(
samples.map((sample) => sample.depth),
[0, 1, 2],
);
assert.equal(samples[0]?.isServerRoot, true);
assert.equal(samples[1]?.isServerRoot, false);
}),
);

Expand Down Expand Up @@ -112,18 +119,21 @@ describe("ProcessResourceMonitor", () => {
samples,
readAt: secondAt,
readAtMs: DateTime.toEpochMillis(secondAt),
windowMs: 60_000,
bucketMs: 10_000,
lastError: null,
windowMs: Duration.toMillis(Duration.minutes(1)),
bucketMs: Duration.toMillis(Duration.seconds(10)),
lastError: Option.none(),
});

expect(Option.isNone(result.error)).toBe(true);
expect(result.topProcesses).toHaveLength(1);
expect(result.topProcesses[0]?.avgCpuPercent).toBe(20);
expect(result.topProcesses[0]?.maxCpuPercent).toBe(30);
expect(result.topProcesses[0]?.cpuSecondsApprox).toBe(2);
expect(result.totalCpuSecondsApprox).toBe(2);
expect(result.buckets.some((bucket) => bucket.maxCpuPercent === 30)).toBe(true);
assert.equal(Option.isNone(result.error), true);
assert.equal(result.topProcesses.length, 1);
assert.equal(result.topProcesses[0]?.avgCpuPercent, 20);
assert.equal(result.topProcesses[0]?.maxCpuPercent, 30);
assert.equal(result.topProcesses[0]?.cpuSecondsApprox, 2);
assert.equal(result.totalCpuSecondsApprox, 2);
assert.equal(
result.buckets.some((bucket) => bucket.maxCpuPercent === 30),
true,
);
}),
);

Expand Down Expand Up @@ -172,15 +182,15 @@ describe("ProcessResourceMonitor", () => {
samples,
readAt: secondAt,
readAtMs: DateTime.toEpochMillis(secondAt),
windowMs: 60_000,
bucketMs: 10_000,
lastError: null,
windowMs: Duration.toMillis(Duration.minutes(1)),
bucketMs: Duration.toMillis(Duration.seconds(10)),
lastError: Option.none(),
});

expect(result.topProcesses).toHaveLength(1);
expect(result.topProcesses[0]?.isServerRoot).toBe(true);
expect(result.topProcesses[0]?.sampleCount).toBe(2);
expect(result.topProcesses[0]?.maxRssBytes).toBe(2_000);
assert.equal(result.topProcesses.length, 1);
assert.equal(result.topProcesses[0]?.isServerRoot, true);
assert.equal(result.topProcesses[0]?.sampleCount, 2);
assert.equal(result.topProcesses[0]?.maxRssBytes, 2_000);
}),
);

Expand Down Expand Up @@ -219,13 +229,35 @@ describe("ProcessResourceMonitor", () => {
samples,
readAt: sampledAt,
readAtMs: DateTime.toEpochMillis(sampledAt),
windowMs: 60_000,
bucketMs: 10_000,
lastError: null,
windowMs: Duration.toMillis(Duration.minutes(1)),
bucketMs: Duration.toMillis(Duration.seconds(10)),
lastError: Option.none(),
});

expect(result.topProcesses).toHaveLength(36);
expect(result.topProcesses.some((process) => process.command === "worker 34")).toBe(true);
assert.equal(result.topProcesses.length, 36);
assert.equal(
result.topProcesses.some((process) => process.command === "worker 34"),
true,
);
}),
);

it.effect("maps the latest sampling error option into the response", () =>
Effect.sync(() => {
const readAt = DateTime.makeUnsafe("2026-05-05T10:00:00.000Z");
const result = aggregateProcessResourceHistory({
samples: [],
readAt,
readAtMs: DateTime.toEpochMillis(readAt),
windowMs: Duration.toMillis(Duration.minutes(1)),
bucketMs: Duration.toMillis(Duration.seconds(10)),
lastError: Option.some("ps failed"),
});

if (Option.isNone(result.error)) {
assert.fail("Expected response error");
}
assert.deepStrictEqual(result.error.value, { message: "ps failed" });
}),
);
});
65 changes: 38 additions & 27 deletions apps/server/src/diagnostics/ProcessResourceMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import type {
} from "@t3tools/contracts";
import * as Context from "effect/Context";
import * as DateTime from "effect/DateTime";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Ref from "effect/Ref";
import * as Schedule from "effect/Schedule";
import { ChildProcessSpawner } from "effect/unstable/process";

import {
Expand All @@ -19,8 +21,11 @@ import {
readProcessRows,
} from "./ProcessDiagnostics.ts";

const SAMPLE_INTERVAL_MS = 5_000;
const RETENTION_MS = 60 * 60_000;
const SAMPLE_INTERVAL = Duration.seconds(5);
const RETENTION = Duration.minutes(60);
const MIN_HISTORY_DURATION = Duration.seconds(1);
const SAMPLE_INTERVAL_MS = Duration.toMillis(SAMPLE_INTERVAL);
const RETENTION_MS = Duration.toMillis(RETENTION);
const MAX_RETAINED_SAMPLES = 20_000;

export interface ProcessResourceSample {
Expand All @@ -38,7 +43,7 @@ export interface ProcessResourceSample {

interface MonitorState {
readonly samples: ReadonlyArray<ProcessResourceSample>;
readonly lastError: string | null;
readonly lastError: Option.Option<string>;
}

export interface ProcessResourceMonitorShape {
Expand All @@ -60,8 +65,11 @@ function sampleKey(row: Pick<ProcessRow, "pid" | "command">): string {
return `${row.pid}:${row.command}`;
}

function findServerRootRow(rows: ReadonlyArray<ProcessRow>, serverPid: number): ProcessRow | null {
return rows.find((row) => row.pid === serverPid) ?? null;
function findServerRootRow(
rows: ReadonlyArray<ProcessRow>,
serverPid: number,
): Option.Option<ProcessRow> {
return Option.fromUndefinedOr(rows.find((row) => row.pid === serverPid));
}

export function collectMonitoredSamples(input: {
Expand All @@ -75,16 +83,16 @@ export function collectMonitoredSamples(input: {
const descendants = buildDescendantEntries(rows, input.serverPid);
const samples: ProcessResourceSample[] = [];

if (root) {
if (Option.isSome(root)) {
samples.push({
sampledAt: input.sampledAt,
sampledAtMs: input.sampledAtMs,
processKey: sampleKey(root),
pid: root.pid,
ppid: root.ppid,
command: root.command,
cpuPercent: root.cpuPercent,
rssBytes: root.rssBytes,
processKey: sampleKey(root.value),
pid: root.value.pid,
ppid: root.value.ppid,
command: root.value.command,
cpuPercent: root.value.cpuPercent,
rssBytes: root.value.rssBytes,
depth: 0,
isServerRoot: true,
});
Expand Down Expand Up @@ -166,11 +174,12 @@ function summarizeProcesses(
function buildBuckets(input: {
readonly samples: ReadonlyArray<ProcessResourceSample>;
readonly nowMs: number;
readonly windowMs: number;
readonly bucketMs: number;
readonly window: Duration.Duration;
readonly bucket: Duration.Duration;
}): ReadonlyArray<ServerProcessResourceHistoryBucket> {
const bucketMs = Math.max(1_000, input.bucketMs);
const windowStartMs = input.nowMs - input.windowMs;
const bucketMs = Duration.toMillis(Duration.max(MIN_HISTORY_DURATION, input.bucket));
const windowMs = Duration.toMillis(input.window);
const windowStartMs = input.nowMs - windowMs;
const buckets: ServerProcessResourceHistoryBucket[] = [];

for (let startedAtMs = windowStartMs; startedAtMs < input.nowMs; startedAtMs += bucketMs) {
Expand Down Expand Up @@ -220,10 +229,12 @@ export function aggregateProcessResourceHistory(input: {
readonly readAtMs: number;
readonly windowMs: number;
readonly bucketMs: number;
readonly lastError: string | null;
readonly lastError: Option.Option<string>;
}): ServerProcessResourceHistoryResult {
const windowMs = Math.max(1_000, input.windowMs);
const bucketMs = Math.max(1_000, input.bucketMs);
const window = Duration.max(MIN_HISTORY_DURATION, Duration.millis(input.windowMs));
const bucket = Duration.max(MIN_HISTORY_DURATION, Duration.millis(input.bucketMs));
const windowMs = Duration.toMillis(window);
const bucketMs = Duration.toMillis(bucket);
const minSampledAtMs = input.readAtMs - windowMs;
const samples = input.samples.filter((sample) => sample.sampledAtMs >= minSampledAtMs);
const topProcesses = summarizeProcesses(samples);
Expand All @@ -239,15 +250,15 @@ export function aggregateProcessResourceHistory(input: {
sampleIntervalMs: SAMPLE_INTERVAL_MS,
retainedSampleCount: input.samples.length,
totalCpuSecondsApprox,
buckets: buildBuckets({ samples, nowMs: input.readAtMs, windowMs, bucketMs }),
buckets: buildBuckets({ samples, nowMs: input.readAtMs, window, bucket }),
topProcesses,
error: input.lastError ? Option.some({ message: input.lastError }) : Option.none(),
error: Option.map(input.lastError, (message) => ({ message })),
};
}

export const make = Effect.fn("makeProcessResourceMonitor")(function* () {
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner;
const state = yield* Ref.make<MonitorState>({ samples: [], lastError: null });
const state = yield* Ref.make<MonitorState>({ samples: [], lastError: Option.none() });

const sampleOnce = Effect.gen(function* () {
const sampledAt = yield* DateTime.now;
Expand All @@ -263,20 +274,20 @@ export const make = Effect.fn("makeProcessResourceMonitor")(function* () {
});
yield* Ref.update(state, (current) => ({
samples: trimSamples([...current.samples, ...samples], sampledAtMs),
lastError: null,
lastError: Option.none(),
}));
}).pipe(
Effect.catch((error: unknown) =>
Ref.update(state, (current) => ({
...current,
lastError: error instanceof Error ? error.message : "Failed to sample process resources.",
lastError: Option.some(
error instanceof Error ? error.message : "Failed to sample process resources.",
),
})),
),
);

yield* Effect.forever(sampleOnce.pipe(Effect.andThen(Effect.sleep(SAMPLE_INTERVAL_MS)))).pipe(
Effect.forkScoped,
);
yield* sampleOnce.pipe(Effect.repeat(Schedule.spaced(SAMPLE_INTERVAL)), Effect.forkScoped);

const readHistory: ProcessResourceMonitorShape["readHistory"] = (input) =>
Effect.gen(function* () {
Expand Down
30 changes: 25 additions & 5 deletions packages/shared/src/KeyedCoalescingWorker.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { it } from "@effect/vitest";
import { describe, expect } from "vitest";
import { assert, describe, it } from "@effect/vitest";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";

Expand Down Expand Up @@ -47,12 +46,12 @@ describe("makeKeyedCoalescingWorker", () => {
yield* Deferred.succeed(releaseFirst, undefined);
yield* Deferred.await(secondStarted);

expect(yield* Deferred.isDone(drained)).toBe(false);
assert.equal(yield* Deferred.isDone(drained), false);

yield* Deferred.succeed(releaseSecond, undefined);
yield* Deferred.await(drained);

expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]);
assert.deepStrictEqual(processed, ["terminal-1:first", "terminal-1:second"]);
}),
),
);
Expand Down Expand Up @@ -90,7 +89,28 @@ describe("makeKeyedCoalescingWorker", () => {
yield* Deferred.await(secondProcessed);
yield* worker.drainKey("terminal-1");

expect(processed).toEqual(["terminal-1:first", "terminal-1:second"]);
assert.deepStrictEqual(processed, ["terminal-1:first", "terminal-1:second"]);
}),
),
);

it.live("treats an undefined value as present queued work", () =>
Effect.scoped(
Effect.gen(function* () {
const processed: Array<string | undefined> = [];

const worker = yield* makeKeyedCoalescingWorker<string, string | undefined, never, never>({
merge: (_current, next) => next,
process: (_key, value) =>
Effect.sync(() => {
processed.push(value);
}),
});

yield* worker.enqueue("terminal-1", undefined);
yield* worker.drainKey("terminal-1");

assert.deepStrictEqual(processed, [undefined]);
}),
),
);
Expand Down
Loading
Loading