Skip to content

Commit d625a22

Browse files
giovaborgognoclaude
andcommitted
test(events): add DLQ service integration tests (Phase 12)
Add testcontainer-based tests for DeadLetterService and DeadLetterManagementService covering DLQ entry creation, field correctness, pagination, filtering, and discard flow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f269a5e commit d625a22

File tree

2 files changed

+489
-0
lines changed

2 files changed

+489
-0
lines changed
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
import { describe, expect, vi } from "vitest";
2+
3+
// Mock the db prisma client (required for webapp service imports)
4+
vi.mock("~/db.server", () => ({
5+
prisma: {},
6+
$replica: {},
7+
}));
8+
9+
vi.mock("~/services/platform.v3.server", async (importOriginal) => {
10+
const actual = (await importOriginal()) as Record<string, unknown>;
11+
return {
12+
...actual,
13+
getEntitlement: vi.fn(),
14+
};
15+
});
16+
17+
import { setupAuthenticatedEnvironment } from "@internal/run-engine/tests";
18+
import { postgresTest } from "@internal/testcontainers";
19+
import { generateFriendlyId } from "@trigger.dev/core/v3/isomorphic";
20+
import { DeadLetterManagementService } from "../../app/v3/services/events/deadLetterManagement.server";
21+
import { ServiceValidationError } from "../../app/v3/services/common.server";
22+
23+
vi.setConfig({ testTimeout: 120_000 });
24+
25+
/**
26+
* Helper: create a TaskRun in the database so DeadLetterEvent can reference it.
27+
*/
28+
async function createTaskRun(
29+
prisma: any,
30+
env: { id: string; projectId: string; organization: { id: string } },
31+
taskIdentifier: string
32+
) {
33+
const runId = generateFriendlyId("run");
34+
return prisma.taskRun.create({
35+
data: {
36+
id: runId,
37+
friendlyId: runId,
38+
number: 1,
39+
taskIdentifier,
40+
payload: JSON.stringify({ test: true }),
41+
payloadType: "application/json",
42+
traceId: "trace_" + runId,
43+
spanId: "span_" + runId,
44+
queue: `task/${taskIdentifier}`,
45+
status: "COMPLETED_WITH_ERRORS",
46+
runtimeEnvironmentId: env.id,
47+
projectId: env.projectId,
48+
organizationId: env.organization.id,
49+
engine: "V2",
50+
},
51+
});
52+
}
53+
54+
/**
55+
* Helper: create a DeadLetterEvent directly in the database.
56+
*/
57+
async function createDeadLetterEvent(
58+
prisma: any,
59+
env: { id: string; projectId: string },
60+
run: { id: string },
61+
overrides: {
62+
eventType?: string;
63+
status?: "PENDING" | "RETRIED" | "DISCARDED";
64+
payload?: object;
65+
createdAt?: Date;
66+
} = {}
67+
) {
68+
const dleId = generateFriendlyId("dle");
69+
return prisma.deadLetterEvent.create({
70+
data: {
71+
id: dleId,
72+
friendlyId: dleId,
73+
eventType: overrides.eventType ?? "test.event",
74+
payload: overrides.payload ?? { key: "value" },
75+
taskSlug: "test-task",
76+
failedRunId: run.id,
77+
error: { message: "test error" },
78+
attemptCount: 1,
79+
sourceEventId: "src_" + dleId,
80+
projectId: env.projectId,
81+
environmentId: env.id,
82+
status: overrides.status ?? "PENDING",
83+
...(overrides.createdAt && { createdAt: overrides.createdAt }),
84+
},
85+
});
86+
}
87+
88+
describe("DeadLetterManagementService", () => {
89+
postgresTest(
90+
"List DLQ entries with pagination",
91+
async ({ prisma }) => {
92+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
93+
94+
// Create 5 DLQ entries, each needs its own TaskRun (foreign key)
95+
const entries = [];
96+
for (let i = 0; i < 5; i++) {
97+
const run = await createTaskRun(prisma, env, `task-${i}`);
98+
const dle = await createDeadLetterEvent(prisma, env, run, {
99+
eventType: "paginated.event",
100+
// Stagger createdAt so ordering is deterministic
101+
createdAt: new Date(Date.now() - (4 - i) * 1000),
102+
});
103+
entries.push(dle);
104+
}
105+
106+
const service = new DeadLetterManagementService(prisma);
107+
108+
// Page 1: limit 2
109+
const page1 = await service.list({
110+
projectId: env.projectId,
111+
environmentId: env.id,
112+
limit: 2,
113+
});
114+
115+
expect(page1.data).toHaveLength(2);
116+
expect(page1.pagination.hasMore).toBe(true);
117+
expect(page1.pagination.cursor).toBeDefined();
118+
expect(page1.pagination.cursor).not.toBeNull();
119+
120+
// Page 2: use cursor from page 1
121+
const page2 = await service.list({
122+
projectId: env.projectId,
123+
environmentId: env.id,
124+
limit: 2,
125+
cursor: page1.pagination.cursor!,
126+
});
127+
128+
expect(page2.data).toHaveLength(2);
129+
expect(page2.pagination.hasMore).toBe(true);
130+
131+
// Page 3: last item
132+
const page3 = await service.list({
133+
projectId: env.projectId,
134+
environmentId: env.id,
135+
limit: 2,
136+
cursor: page2.pagination.cursor!,
137+
});
138+
139+
expect(page3.data).toHaveLength(1);
140+
expect(page3.pagination.hasMore).toBe(false);
141+
expect(page3.pagination.cursor).toBeNull();
142+
143+
// All 5 entries across all pages, no duplicates
144+
const allIds = [
145+
...page1.data.map((d: any) => d.id),
146+
...page2.data.map((d: any) => d.id),
147+
...page3.data.map((d: any) => d.id),
148+
];
149+
expect(new Set(allIds).size).toBe(5);
150+
}
151+
);
152+
153+
postgresTest(
154+
"List with eventType filter",
155+
async ({ prisma }) => {
156+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
157+
158+
// Create entries with different event types
159+
const run1 = await createTaskRun(prisma, env, "task-alpha");
160+
const run2 = await createTaskRun(prisma, env, "task-beta");
161+
const run3 = await createTaskRun(prisma, env, "task-gamma");
162+
163+
await createDeadLetterEvent(prisma, env, run1, { eventType: "order.created" });
164+
await createDeadLetterEvent(prisma, env, run2, { eventType: "user.signed_up" });
165+
await createDeadLetterEvent(prisma, env, run3, { eventType: "order.created" });
166+
167+
const service = new DeadLetterManagementService(prisma);
168+
169+
const orderEvents = await service.list({
170+
projectId: env.projectId,
171+
environmentId: env.id,
172+
eventType: "order.created",
173+
});
174+
175+
expect(orderEvents.data).toHaveLength(2);
176+
expect(orderEvents.data.every((d: any) => d.eventType === "order.created")).toBe(true);
177+
178+
const userEvents = await service.list({
179+
projectId: env.projectId,
180+
environmentId: env.id,
181+
eventType: "user.signed_up",
182+
});
183+
184+
expect(userEvents.data).toHaveLength(1);
185+
expect(userEvents.data[0].eventType).toBe("user.signed_up");
186+
}
187+
);
188+
189+
postgresTest(
190+
"List with status filter",
191+
async ({ prisma }) => {
192+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
193+
194+
const run1 = await createTaskRun(prisma, env, "task-p1");
195+
const run2 = await createTaskRun(prisma, env, "task-p2");
196+
const run3 = await createTaskRun(prisma, env, "task-r1");
197+
198+
await createDeadLetterEvent(prisma, env, run1, { status: "PENDING" });
199+
await createDeadLetterEvent(prisma, env, run2, { status: "PENDING" });
200+
await createDeadLetterEvent(prisma, env, run3, { status: "RETRIED" });
201+
202+
const service = new DeadLetterManagementService(prisma);
203+
204+
const pendingOnly = await service.list({
205+
projectId: env.projectId,
206+
environmentId: env.id,
207+
status: "PENDING",
208+
});
209+
210+
expect(pendingOnly.data).toHaveLength(2);
211+
expect(pendingOnly.data.every((d: any) => d.status === "PENDING")).toBe(true);
212+
213+
const retriedOnly = await service.list({
214+
projectId: env.projectId,
215+
environmentId: env.id,
216+
status: "RETRIED",
217+
});
218+
219+
expect(retriedOnly.data).toHaveLength(1);
220+
expect(retriedOnly.data[0].status).toBe("RETRIED");
221+
}
222+
);
223+
224+
postgresTest(
225+
"Discard marks entry as DISCARDED",
226+
async ({ prisma }) => {
227+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
228+
229+
const run = await createTaskRun(prisma, env, "discard-task");
230+
const dle = await createDeadLetterEvent(prisma, env, run, {
231+
eventType: "invoice.failed",
232+
status: "PENDING",
233+
});
234+
235+
const service = new DeadLetterManagementService(prisma);
236+
237+
const result = await service.discard(dle.id, env);
238+
239+
expect(result.id).toBe(dle.id);
240+
expect(result.status).toBe("DISCARDED");
241+
242+
// Verify in DB
243+
const updated = await prisma.deadLetterEvent.findUnique({
244+
where: { id: dle.id },
245+
});
246+
247+
expect(updated).toBeDefined();
248+
expect(updated!.status).toBe("DISCARDED");
249+
expect(updated!.processedAt).toBeDefined();
250+
expect(updated!.processedAt).not.toBeNull();
251+
}
252+
);
253+
254+
postgresTest(
255+
"Discard nonexistent ID returns error",
256+
async ({ prisma }) => {
257+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
258+
259+
const service = new DeadLetterManagementService(prisma);
260+
261+
await expect(service.discard("dle_nonexistent_fake_id", env)).rejects.toThrow(
262+
ServiceValidationError
263+
);
264+
265+
await expect(service.discard("dle_nonexistent_fake_id", env)).rejects.toThrow(
266+
"Dead letter event not found or already processed"
267+
);
268+
}
269+
);
270+
});

0 commit comments

Comments
 (0)