Skip to content

Commit cab9fae

Browse files
author
deepshekhardas
committed
feat: implement 4 medium-level improvements
- Feat: Expose more trigger options in MCP trigger-task tool - Refactor: Remove deprecated 'id' field from SCHEDULE_ATTEMPT message - Test: Add ResourceMonitor unit tests for memory scaling - Fix: Handle processKeepAlive in runTimelineMetrics to prevent stale fork metrics
1 parent 4b1f911 commit cab9fae

6 files changed

Lines changed: 292 additions & 6 deletions

File tree

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -928,7 +928,6 @@ export class SharedQueueConsumer {
928928
machine,
929929
nextAttemptNumber,
930930
// identifiers
931-
id: "placeholder", // TODO: Remove this completely in a future release
932931
envId: lockedTaskRun.runtimeEnvironment.id,
933932
envType: lockedTaskRun.runtimeEnvironment.type,
934933
orgId: lockedTaskRun.runtimeEnvironment.organizationId,

packages/cli-v3/src/dev/mcpServer.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,53 @@ server.tool(
5757
}
5858
})
5959
.describe("The payload to pass to the task run, must be a valid JSON"),
60-
// TODO: expose more parameteres from the trigger options
60+
delay: z
61+
.string()
62+
.optional()
63+
.describe("Delay before the task run starts, e.g. '1m', '30s', '2h', or an ISO 8601 date"),
64+
ttl: z
65+
.union([z.string(), z.number()])
66+
.optional()
67+
.describe(
68+
"Time-to-live: how long the run remains valid before it starts, e.g. '1h' or seconds as a number"
69+
),
70+
tags: z
71+
.array(z.string())
72+
.optional()
73+
.describe("Tags to attach to the task run for filtering and organization"),
74+
queue: z.string().optional().describe("The queue name to use for this task run"),
75+
maxAttempts: z
76+
.number()
77+
.int()
78+
.optional()
79+
.describe("Maximum number of retry attempts for this task run"),
80+
idempotencyKey: z
81+
.string()
82+
.optional()
83+
.describe("Idempotency key for deduplication of task runs"),
84+
concurrencyKey: z
85+
.string()
86+
.optional()
87+
.describe("Concurrency key for controlling concurrent execution"),
88+
priority: z.number().optional().describe("Priority of the task run (higher = more priority)"),
89+
test: z.boolean().optional().describe("Whether this is a test run"),
6190
},
62-
async ({ id, payload }) => {
91+
async ({ id, payload, delay, ttl, tags, queue, maxAttempts, idempotencyKey, concurrencyKey, priority, test }) => {
92+
const options: Record<string, unknown> = {};
93+
94+
if (delay !== undefined) options.delay = delay;
95+
if (ttl !== undefined) options.ttl = ttl;
96+
if (tags !== undefined) options.tags = tags;
97+
if (queue !== undefined) options.queue = { name: queue };
98+
if (maxAttempts !== undefined) options.maxAttempts = maxAttempts;
99+
if (idempotencyKey !== undefined) options.idempotencyKey = idempotencyKey;
100+
if (concurrencyKey !== undefined) options.concurrencyKey = concurrencyKey;
101+
if (priority !== undefined) options.priority = priority;
102+
if (test !== undefined) options.test = test;
103+
63104
const result = await sdkApiClient.triggerTask(id, {
64105
payload,
106+
options: Object.keys(options).length > 0 ? options : undefined,
65107
});
66108

67109
const taskRunUrl = `${dashboardUrl}/projects/v3/${projectRef}/runs/${result.id}`;

packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana
3737
this._metrics = [];
3838
}
3939

40-
// TODO: handle this when processKeepAlive is enabled
4140
#seedMetricsFromEnvironment(isWarmStartOverride?: boolean) {
4241
const forkStartTime = getEnvVar("TRIGGER_PROCESS_FORK_START_TIME");
4342
const warmStart = getEnvVar("TRIGGER_WARM_START");
@@ -46,12 +45,21 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana
4645

4746
if (typeof forkStartTime === "string" && !isWarmStart) {
4847
const forkStartTimeMs = parseInt(forkStartTime, 10);
48+
const forkDuration = Date.now() - forkStartTimeMs;
49+
50+
// When processKeepAlive is enabled, the process is reused across multiple runs.
51+
// The TRIGGER_PROCESS_FORK_START_TIME env var from the original cold start persists
52+
// in the process environment and becomes stale. Skip registration if the fork time
53+
// is unreasonably old (> 60s), which indicates a kept-alive process.
54+
if (forkDuration > 60_000) {
55+
return;
56+
}
4957

5058
this.registerMetric({
5159
name: "trigger.dev/start",
5260
event: "fork",
5361
attributes: {
54-
duration: Date.now() - forkStartTimeMs,
62+
duration: forkDuration,
5563
},
5664
timestamp: forkStartTimeMs,
5765
});

packages/core/src/v3/schemas/api.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@ import {
66
MachinePresetName,
77
SerializedError,
88
TaskRunError,
9+
TaskEventKindSchema,
10+
TaskEventLevelSchema,
11+
TaskEventStatusSchema,
912
} from "./common.js";
13+
import { TaskEventStyle } from "./style.js";
14+
1015
import { BackgroundWorkerMetadata } from "./resources.js";
1116
import { DequeuedMessage, MachineResources } from "./runEngine.js";
1217

@@ -1597,3 +1602,31 @@ export const AppendToStreamResponseBody = z.object({
15971602
message: z.string().optional(),
15981603
});
15991604
export type AppendToStreamResponseBody = z.infer<typeof AppendToStreamResponseBody>;
1605+
1606+
export const TaskEventSchema = z.object({
1607+
id: z.string(),
1608+
runId: z.string(),
1609+
traceId: z.string(),
1610+
spanId: z.string(),
1611+
parentId: z.string().nullish(),
1612+
message: z.string(),
1613+
kind: TaskEventKindSchema,
1614+
level: TaskEventLevelSchema,
1615+
status: TaskEventStatusSchema,
1616+
startTime: z.coerce.date(),
1617+
duration: z.number(),
1618+
isError: z.boolean(),
1619+
isCancelled: z.boolean(),
1620+
properties: z.record(z.unknown()).optional(),
1621+
metadata: z.record(z.unknown()).optional(),
1622+
style: TaskEventStyle.optional(),
1623+
});
1624+
1625+
export type TaskEventSchema = z.infer<typeof TaskEventSchema>;
1626+
1627+
export const RunEventsResponseSchema = z.object({
1628+
events: z.array(TaskEventSchema),
1629+
});
1630+
1631+
export type RunEventsResponseSchema = z.infer<typeof RunEventsResponseSchema>;
1632+

packages/core/src/v3/schemas/messages.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
5151
machine: MachinePreset,
5252
nextAttemptNumber: z.number().optional(),
5353
// identifiers
54-
id: z.string().optional(), // TODO: Remove this completely in a future release
5554
envId: z.string(),
5655
envType: EnvironmentType,
5756
orgId: z.string(),
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import { describe, test, expect, vi, beforeEach, afterEach } from "vitest";
2+
import { ResourceMonitor } from "./resourceMonitor.js";
3+
4+
// Mock node:child_process
5+
vi.mock("node:child_process", () => ({
6+
exec: vi.fn(),
7+
}));
8+
9+
// Mock node:v8
10+
vi.mock("node:v8", () => ({
11+
getHeapStatistics: vi.fn(() => ({
12+
total_heap_size: 50 * 1024 * 1024,
13+
total_heap_size_executable: 0,
14+
total_physical_size: 50 * 1024 * 1024,
15+
total_available_size: 100 * 1024 * 1024,
16+
used_heap_size: 25 * 1024 * 1024,
17+
heap_size_limit: 200 * 1024 * 1024,
18+
malloced_memory: 0,
19+
peak_malloced_memory: 0,
20+
does_zap_garbage: 0,
21+
number_of_native_contexts: 1,
22+
number_of_detached_contexts: 0,
23+
total_global_handles_size: 0,
24+
used_global_handles_size: 0,
25+
external_memory: 0,
26+
})),
27+
}));
28+
29+
import os from "node:os";
30+
import { exec } from "node:child_process";
31+
32+
const mockedExec = vi.mocked(exec);
33+
34+
describe("ResourceMonitor", () => {
35+
beforeEach(() => {
36+
vi.clearAllMocks();
37+
38+
// Default mocks for os module
39+
vi.spyOn(os, "totalmem").mockReturnValue(4 * 1024 * 1024 * 1024); // 4 GB
40+
vi.spyOn(os, "freemem").mockReturnValue(2 * 1024 * 1024 * 1024); // 2 GB free
41+
vi.spyOn(os, "cpus").mockReturnValue([
42+
{ model: "test", speed: 2400, times: { user: 0, nice: 0, sys: 0, idle: 0, irq: 0 } },
43+
{ model: "test", speed: 2400, times: { user: 0, nice: 0, sys: 0, idle: 0, irq: 0 } },
44+
]);
45+
46+
// Mock process.memoryUsage
47+
vi.spyOn(process, "memoryUsage").mockReturnValue({
48+
rss: 100 * 1024 * 1024, // 100 MB RSS
49+
heapTotal: 50 * 1024 * 1024,
50+
heapUsed: 25 * 1024 * 1024,
51+
external: 0,
52+
arrayBuffers: 0,
53+
});
54+
});
55+
56+
afterEach(() => {
57+
vi.restoreAllMocks();
58+
});
59+
60+
test("should return properly formatted system memory metrics", async () => {
61+
// Mock disk metrics (du command fails on non-Linux, so simulate failure)
62+
mockedExec.mockImplementation(((
63+
cmd: string,
64+
callback?: (error: Error | null, result: { stdout: string; stderr: string }) => void
65+
) => {
66+
if (callback) {
67+
callback(new Error("Command not available"), { stdout: "", stderr: "" });
68+
}
69+
return {} as any;
70+
}) as any);
71+
72+
const monitor = new ResourceMonitor({
73+
dirName: "/tmp",
74+
ctx: {},
75+
verbose: false,
76+
});
77+
78+
const payload = await monitor.getResourceSnapshotPayload();
79+
80+
// System memory should reflect our mocked values
81+
// 4GB total, 2GB free = 50% used
82+
expect(parseFloat(payload.system.memory.percentUsed)).toBeCloseTo(50.0, 0);
83+
expect(parseFloat(payload.system.memory.freeGB)).toBeCloseTo(2.0, 0);
84+
});
85+
86+
test("should calculate node process memory percentage correctly", async () => {
87+
mockedExec.mockImplementation(((
88+
cmd: string,
89+
callback?: (error: Error | null, result: { stdout: string; stderr: string }) => void
90+
) => {
91+
if (callback) {
92+
callback(new Error("Command not available"), { stdout: "", stderr: "" });
93+
}
94+
return {} as any;
95+
}) as any);
96+
97+
const monitor = new ResourceMonitor({
98+
dirName: "/tmp",
99+
ctx: {},
100+
verbose: false,
101+
});
102+
103+
const payload = await monitor.getResourceSnapshotPayload();
104+
105+
// 100 MB RSS out of 4 GB total = ~2.44%
106+
const nodeMemPercent = parseFloat(payload.process.node.memoryUsagePercent);
107+
expect(nodeMemPercent).toBeCloseTo(2.4, 0);
108+
109+
// RSS should be ~100 MB
110+
const nodeMemMB = parseFloat(payload.process.node.memoryUsageMB);
111+
expect(nodeMemMB).toBeCloseTo(100.0, 0);
112+
});
113+
114+
test("should calculate heap usage percentage correctly", async () => {
115+
mockedExec.mockImplementation(((
116+
cmd: string,
117+
callback?: (error: Error | null, result: { stdout: string; stderr: string }) => void
118+
) => {
119+
if (callback) {
120+
callback(new Error("Command not available"), { stdout: "", stderr: "" });
121+
}
122+
return {} as any;
123+
}) as any);
124+
125+
const monitor = new ResourceMonitor({
126+
dirName: "/tmp",
127+
ctx: {},
128+
verbose: false,
129+
});
130+
131+
const payload = await monitor.getResourceSnapshotPayload();
132+
133+
// 25 MB used / 200 MB limit = 12.5%
134+
const heapPercent = parseFloat(payload.process.node.heapUsagePercent);
135+
expect(heapPercent).toBeCloseTo(12.5, 0);
136+
expect(payload.process.node.isNearHeapLimit).toBe(false);
137+
});
138+
139+
test("should detect near heap limit condition", async () => {
140+
// Override getHeapStatistics to return near-limit values
141+
const { getHeapStatistics } = await import("node:v8");
142+
vi.mocked(getHeapStatistics).mockReturnValue({
143+
total_heap_size: 180 * 1024 * 1024,
144+
total_heap_size_executable: 0,
145+
total_physical_size: 180 * 1024 * 1024,
146+
total_available_size: 20 * 1024 * 1024,
147+
used_heap_size: 170 * 1024 * 1024, // 85% of 200MB limit
148+
heap_size_limit: 200 * 1024 * 1024,
149+
malloced_memory: 0,
150+
peak_malloced_memory: 0,
151+
does_zap_garbage: 0,
152+
number_of_native_contexts: 1,
153+
number_of_detached_contexts: 0,
154+
total_global_handles_size: 0,
155+
used_global_handles_size: 0,
156+
external_memory: 0,
157+
});
158+
159+
mockedExec.mockImplementation(((
160+
cmd: string,
161+
callback?: (error: Error | null, result: { stdout: string; stderr: string }) => void
162+
) => {
163+
if (callback) {
164+
callback(new Error("Command not available"), { stdout: "", stderr: "" });
165+
}
166+
return {} as any;
167+
}) as any);
168+
169+
const monitor = new ResourceMonitor({
170+
dirName: "/tmp",
171+
ctx: {},
172+
verbose: false,
173+
});
174+
175+
const payload = await monitor.getResourceSnapshotPayload();
176+
177+
// 170/200 = 85% > 80% threshold
178+
expect(payload.process.node.isNearHeapLimit).toBe(true);
179+
});
180+
181+
test("should include constraint information", async () => {
182+
mockedExec.mockImplementation(((
183+
cmd: string,
184+
callback?: (error: Error | null, result: { stdout: string; stderr: string }) => void
185+
) => {
186+
if (callback) {
187+
callback(new Error("Command not available"), { stdout: "", stderr: "" });
188+
}
189+
return {} as any;
190+
}) as any);
191+
192+
const monitor = new ResourceMonitor({
193+
dirName: "/tmp",
194+
ctx: {},
195+
verbose: false,
196+
});
197+
198+
const payload = await monitor.getResourceSnapshotPayload();
199+
200+
expect(payload.constraints).toBeDefined();
201+
expect(payload.constraints.cpu).toBe(2); // 2 CPUs mocked
202+
expect(payload.constraints.memoryGB).toBe(4); // 4 GB mocked
203+
expect(payload.timestamp).toBeDefined();
204+
});
205+
});

0 commit comments

Comments
 (0)