-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathtaskRunProcess.test.ts
More file actions
155 lines (140 loc) · 5.08 KB
/
Copy pathtaskRunProcess.test.ts
File metadata and controls
155 lines (140 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import { TaskRunProcess, type TaskRunProcessOptions } from "./taskRunProcess.js";
import { describe, it, expect, vi } from "vitest";
import { UncaughtExceptionError, UnexpectedExitError } from "@trigger.dev/core/v3/errors";
import type {
TaskRunExecution,
TaskRunExecutionPayload,
WorkerManifest,
ServerBackgroundWorker,
MachinePresetResources,
} from "@trigger.dev/core/v3";
function createTaskRunProcessOptions(
overrides: Partial<TaskRunProcessOptions> = {}
): TaskRunProcessOptions {
return {
workerManifest: {
runtime: "node",
workerEntryPoint: "/dev/null",
configEntryPoint: "/dev/null",
otelImportHook: {},
} as unknown as WorkerManifest,
serverWorker: {} as unknown as ServerBackgroundWorker,
env: {},
machineResources: { cpu: 1, memory: 1 } as MachinePresetResources,
...overrides,
};
}
function createExecution(runId: string, attemptNumber: number): TaskRunExecution {
return {
run: {
id: runId,
payload: "{}",
payloadType: "application/json",
tags: [],
isTest: false,
createdAt: new Date(),
startedAt: new Date(),
maxAttempts: 3,
version: "1",
durationMs: 0,
costInCents: 0,
baseCostInCents: 0,
},
attempt: {
number: attemptNumber,
startedAt: new Date(),
id: "deprecated",
backgroundWorkerId: "deprecated",
backgroundWorkerTaskId: "deprecated",
status: "deprecated" as any,
},
task: { id: "test-task", filePath: "test.ts" },
queue: { id: "queue-1", name: "test-queue" },
environment: { id: "env-1", slug: "test", type: "DEVELOPMENT" },
organization: { id: "org-1", slug: "test-org", name: "Test Org" },
project: { id: "proj-1", ref: "proj_test", slug: "test", name: "Test" },
machine: { name: "small-1x", cpu: 0.5, memory: 0.5, centsPerMs: 0 },
} as unknown as TaskRunExecution;
}
describe("TaskRunProcess", () => {
describe("execute() on a dead child process", () => {
it("should reject when child process has already exited and IPC send is skipped", async () => {
const proc = new TaskRunProcess(createTaskRunProcessOptions());
// Simulate a child process that has exited: _child exists but is not connected
const fakeChild = {
connected: false,
killed: false,
pid: 12345,
kill: vi.fn(),
on: vi.fn(),
stdout: { on: vi.fn() },
stderr: { on: vi.fn() },
};
// Set internal state to mimic a process whose child has crashed
(proc as any)._child = fakeChild;
(proc as any)._childPid = 12345;
(proc as any)._isBeingKilled = false;
const execution = createExecution("run-1", 2);
// This should NOT hang forever - it should reject promptly.
//
// BUG: Currently execute() creates a promise, skips the IPC send because
// _child.connected is false, then awaits the promise which will never
// resolve because the child is dead and #handleExit already ran.
//
// The Promise.race with a timeout detects the hang.
const result = await Promise.race([
proc
.execute(
{
payload: { execution, traceContext: {}, metrics: [] },
messageId: "run_run-1",
env: {},
},
true
)
.then(
(v) => ({ type: "resolved" as const, value: v }),
(e) => ({ type: "rejected" as const, error: e })
),
new Promise<{ type: "hung" }>((resolve) =>
setTimeout(() => resolve({ type: "hung" as const }), 2000)
),
]);
// The test fails (proving the bug) if execute() hangs
expect(result.type).not.toBe("hung");
expect(result.type).toBe("rejected");
if (result.type === "rejected") {
expect(result.error).toBeInstanceOf(UnexpectedExitError);
expect(result.error.stderr).toContain("not connected");
}
});
});
describe("parseExecuteError(UncaughtExceptionError)", () => {
it("returns INTERNAL_ERROR with TASK_RUN_UNCAUGHT_EXCEPTION + original message and stack", () => {
const error = new UncaughtExceptionError(
{
name: "Error",
message: "read ECONNRESET",
stack:
"Error: read ECONNRESET\n at TCP.onStreamRead (node:internal/stream_base_commons:216:20)",
},
"uncaughtException"
);
const result = TaskRunProcess.parseExecuteError(error);
expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("read ECONNRESET");
expect(result.stackTrace).toContain("TCP.onStreamRead");
});
it("uses the same code for unhandledRejection origin", () => {
const error = new UncaughtExceptionError(
{ name: "TypeError", message: "boom" },
"unhandledRejection"
);
const result = TaskRunProcess.parseExecuteError(error);
expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("boom");
});
});
});