Skip to content

Commit 48ff891

Browse files
improve test coverage
1 parent bfb8182 commit 48ff891

6 files changed

Lines changed: 657 additions & 0 deletions

File tree

src/clone.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { describe, it, expect } from "vitest";
2+
import { deepClone } from "./clone.js";
3+
4+
describe("deepClone", () => {
5+
it("returns an equal but distinct nested structure", () => {
6+
const input = { a: 1, nested: { b: [2, 3] } };
7+
const copy = deepClone(input);
8+
expect(copy).toEqual(input);
9+
expect(copy).not.toBe(input);
10+
expect(copy.nested).not.toBe(input.nested);
11+
expect(copy.nested.b).not.toBe(input.nested.b);
12+
});
13+
});

src/error.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { describe, it, expect } from "vitest";
2+
import { TxOBError, ErrorUnprocessableEventHandler } from "./error.js";
3+
4+
describe("TxOBError", () => {
5+
it("sets message and optional backoffUntil", () => {
6+
const until = new Date("2030-01-01");
7+
const err = new TxOBError("rate limited", { backoffUntil: until });
8+
expect(err.message).toBe("rate limited");
9+
expect(err.backoffUntil).toBe(until);
10+
});
11+
12+
it("preserves cause when provided", () => {
13+
const cause = new Error("upstream");
14+
const err = new TxOBError("wrapped", { cause });
15+
expect(err.cause).toBe(cause);
16+
});
17+
});
18+
19+
describe("ErrorUnprocessableEventHandler", () => {
20+
it("wraps the original error and exposes it", () => {
21+
const inner = new Error("bad payload");
22+
const err = new ErrorUnprocessableEventHandler(inner);
23+
expect(err.message).toBe("unprocessable event handler: bad payload");
24+
expect(err.error).toBe(inner);
25+
});
26+
});

src/mongodb/client.test.ts

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
import { vi, describe, it, expect, beforeEach } from "vitest";
2+
import { ObjectId } from "mongodb";
3+
import type { TxOBEvent } from "../processor.js";
4+
import { createProcessorClient, createWakeupEmitter } from "./client.js";
5+
6+
const fixedNow = new Date("2024-06-01T12:00:00.000Z");
7+
8+
vi.mock("../date.js", () => ({
9+
getDate: () => fixedNow,
10+
}));
11+
12+
const eventSchemas = {
13+
TestEvent: {
14+
"~standard": {
15+
version: 1 as const,
16+
vendor: "test",
17+
validate: (value: unknown) => ({
18+
value:
19+
typeof value === "object" && value !== null
20+
? (value as Record<string, unknown>)
21+
: {},
22+
}),
23+
},
24+
},
25+
};
26+
27+
function createMongoMocks() {
28+
const toArray = vi.fn();
29+
const findChain = {
30+
project: vi.fn().mockReturnThis(),
31+
limit: vi.fn().mockReturnThis(),
32+
sort: vi.fn().mockReturnThis(),
33+
toArray,
34+
};
35+
const findOneAndUpdate = vi.fn();
36+
const updateOne = vi.fn();
37+
const insertOne = vi.fn();
38+
const watch = vi.fn();
39+
40+
const collection = vi.fn(() => ({
41+
find: vi.fn(() => findChain),
42+
findOneAndUpdate,
43+
updateOne,
44+
insertOne,
45+
watch,
46+
}));
47+
48+
const db = vi.fn(() => ({
49+
collection,
50+
}));
51+
52+
const session = {
53+
withTransaction: vi.fn(async (fn: () => Promise<void>) => {
54+
await fn();
55+
}),
56+
};
57+
58+
const mongo = {
59+
db,
60+
withSession: vi.fn(async (fn: (s: typeof session) => Promise<void>) => {
61+
await fn(session);
62+
}),
63+
};
64+
65+
return {
66+
mongo: mongo as any,
67+
toArray,
68+
findOneAndUpdate,
69+
updateOne,
70+
insertOne,
71+
watch,
72+
collection,
73+
session,
74+
};
75+
}
76+
77+
beforeEach(() => {
78+
vi.clearAllMocks();
79+
});
80+
81+
describe("createProcessorClient (MongoDB)", () => {
82+
it("getEventsToProcess queries with ready-to-process filter", async () => {
83+
const rows = [{ id: "e1", errors: 0 }];
84+
const { mongo, toArray, collection } = createMongoMocks();
85+
toArray.mockResolvedValue(rows);
86+
87+
const client = createProcessorClient({
88+
mongo,
89+
db: "app",
90+
collection: "outbox",
91+
limit: 50,
92+
eventSchemas,
93+
});
94+
95+
const result = await client.getEventsToProcess({ maxErrors: 3 });
96+
97+
expect(mongo.db).toHaveBeenCalledWith("app");
98+
expect(collection).toHaveBeenCalledWith("outbox");
99+
expect(result).toEqual(rows);
100+
101+
const coll = collection.mock.results[0].value;
102+
expect(coll.find).toHaveBeenCalledWith({
103+
processed_at: null,
104+
$and: [
105+
{
106+
$or: [
107+
{ backoff_until: null },
108+
{ backoff_until: { $lt: fixedNow } },
109+
],
110+
},
111+
{
112+
$or: [{ lock: null }, { lock: { $exists: false } }],
113+
},
114+
],
115+
errors: { $lt: 3 },
116+
});
117+
const findReturn = coll.find.mock.results[0].value;
118+
expect(findReturn.project).toHaveBeenCalledWith({ id: 1, errors: 1 });
119+
expect(findReturn.limit).toHaveBeenCalledWith(50);
120+
expect(findReturn.sort).toHaveBeenCalledWith("timestamp", "asc");
121+
});
122+
123+
it("transaction runs getEventByIdForUpdateSkipLocked and returns row", async () => {
124+
const { mongo, findOneAndUpdate } = createMongoMocks();
125+
const doc = {
126+
id: "1",
127+
timestamp: fixedNow,
128+
type: "TestEvent",
129+
data: {},
130+
correlation_id: "c",
131+
handler_results: {},
132+
errors: 0,
133+
backoff_until: null,
134+
processed_at: null,
135+
};
136+
findOneAndUpdate.mockResolvedValue({ value: doc });
137+
138+
const client = createProcessorClient({
139+
mongo,
140+
db: "app",
141+
eventSchemas,
142+
});
143+
144+
let loaded: (typeof doc) | null = null;
145+
await client.transaction(async (tx) => {
146+
loaded = (await tx.getEventByIdForUpdateSkipLocked("1", {
147+
maxErrors: 5,
148+
})) as (typeof doc) | null;
149+
});
150+
151+
expect(loaded).toEqual(doc);
152+
expect(findOneAndUpdate).toHaveBeenCalledOnce();
153+
const [filter, update, opts] = findOneAndUpdate.mock.calls[0];
154+
expect(filter).toMatchObject({ id: "1" });
155+
expect(update.$set.lock).toBeInstanceOf(ObjectId);
156+
expect(opts.returnDocument).toBe("after");
157+
expect(opts.session).toBeDefined();
158+
});
159+
160+
it("getEventByIdForUpdateSkipLocked returns null when no document", async () => {
161+
const { mongo, findOneAndUpdate } = createMongoMocks();
162+
findOneAndUpdate.mockResolvedValue(null);
163+
164+
const client = createProcessorClient({
165+
mongo,
166+
db: "app",
167+
eventSchemas,
168+
});
169+
170+
let loaded: unknown;
171+
await client.transaction(async (tx) => {
172+
loaded = await tx.getEventByIdForUpdateSkipLocked("missing", {
173+
maxErrors: 1,
174+
});
175+
});
176+
177+
expect(loaded).toBeNull();
178+
});
179+
180+
it("updateEvent and createEvent call driver with expected payloads", async () => {
181+
const { mongo, updateOne, insertOne } = createMongoMocks();
182+
183+
const client = createProcessorClient({
184+
mongo,
185+
db: "app",
186+
eventSchemas,
187+
});
188+
189+
const event = {
190+
id: "e1",
191+
timestamp: fixedNow,
192+
type: "TestEvent" as const,
193+
data: { x: 1 },
194+
correlation_id: "corr",
195+
handler_results: { h: {} },
196+
errors: 1,
197+
backoff_until: null,
198+
processed_at: null,
199+
} as unknown as TxOBEvent<"TestEvent", Record<string, unknown>>;
200+
201+
await client.transaction(async (tx) => {
202+
await tx.updateEvent(event);
203+
await tx.createEvent({
204+
id: "e2",
205+
timestamp: fixedNow,
206+
type: "TestEvent",
207+
data: {},
208+
correlation_id: "c2",
209+
handler_results: {},
210+
errors: 0,
211+
});
212+
});
213+
214+
expect(updateOne).toHaveBeenCalledWith(
215+
{ id: "e1" },
216+
{
217+
$set: {
218+
handler_results: event.handler_results,
219+
errors: 1,
220+
processed_at: null,
221+
backoff_until: null,
222+
lock: null,
223+
},
224+
},
225+
expect.objectContaining({ session: expect.anything() }),
226+
);
227+
228+
expect(insertOne).toHaveBeenCalledWith(
229+
{
230+
id: "e2",
231+
timestamp: fixedNow,
232+
type: "TestEvent",
233+
data: {},
234+
correlation_id: "c2",
235+
handler_results: {},
236+
errors: 0,
237+
processed_at: null,
238+
backoff_until: null,
239+
lock: null,
240+
},
241+
expect.objectContaining({ session: expect.anything() }),
242+
);
243+
});
244+
});
245+
246+
describe("createWakeupEmitter (MongoDB)", () => {
247+
it("emits wakeup on insert change and closes the stream", async () => {
248+
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
249+
const changeStream = {
250+
on: vi.fn((ev: string, fn: (...args: unknown[]) => void) => {
251+
(handlers[ev] ??= []).push(fn);
252+
}),
253+
close: vi.fn().mockResolvedValue(undefined),
254+
};
255+
256+
const collection = vi.fn(() => ({
257+
watch: vi.fn(() => changeStream),
258+
}));
259+
260+
const mongo = {
261+
db: vi.fn(() => ({ collection })),
262+
} as any;
263+
264+
const emitter = await createWakeupEmitter({
265+
mongo,
266+
db: "app",
267+
collection: "events",
268+
});
269+
270+
const wakeups: number[] = [];
271+
emitter.on("wakeup", () => wakeups.push(1));
272+
273+
for (const fn of handlers["change"] ?? []) {
274+
fn({});
275+
}
276+
expect(wakeups).toHaveLength(1);
277+
278+
await emitter.close();
279+
expect(changeStream.close).toHaveBeenCalledOnce();
280+
});
281+
282+
it("forwards change stream errors to error listeners", async () => {
283+
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
284+
const changeStream = {
285+
on: vi.fn((ev: string, fn: (...args: unknown[]) => void) => {
286+
(handlers[ev] ??= []).push(fn);
287+
}),
288+
close: vi.fn().mockResolvedValue(undefined),
289+
};
290+
291+
const mongo = {
292+
db: vi.fn(() => ({
293+
collection: vi.fn(() => ({
294+
watch: vi.fn(() => changeStream),
295+
})),
296+
})),
297+
} as any;
298+
299+
const emitter = await createWakeupEmitter({ mongo, db: "app" });
300+
const errors: unknown[] = [];
301+
const ee = emitter as unknown as import("node:events").EventEmitter;
302+
ee.on("error", (e: unknown) => errors.push(e));
303+
304+
const streamErr = new Error("replica set required");
305+
for (const fn of handlers["error"] ?? []) {
306+
fn(streamErr);
307+
}
308+
expect(errors).toEqual([streamErr]);
309+
});
310+
311+
it("emits error when change stream closes", async () => {
312+
const handlers: Record<string, ((...args: unknown[]) => void)[]> = {};
313+
const changeStream = {
314+
on: vi.fn((ev: string, fn: (...args: unknown[]) => void) => {
315+
(handlers[ev] ??= []).push(fn);
316+
}),
317+
close: vi.fn().mockResolvedValue(undefined),
318+
};
319+
320+
const mongo = {
321+
db: vi.fn(() => ({
322+
collection: vi.fn(() => ({
323+
watch: vi.fn(() => changeStream),
324+
})),
325+
})),
326+
} as any;
327+
328+
const emitter = await createWakeupEmitter({ mongo, db: "app" });
329+
const errors: unknown[] = [];
330+
const ee = emitter as unknown as import("node:events").EventEmitter;
331+
ee.on("error", (e: unknown) => errors.push(e));
332+
333+
for (const fn of handlers["close"] ?? []) {
334+
fn();
335+
}
336+
expect(errors).toHaveLength(1);
337+
expect((errors[0] as Error).message).toBe("MongoDB Change Stream closed");
338+
});
339+
340+
it("allows unregistering wakeup listener via off", async () => {
341+
const changeStream = {
342+
on: vi.fn(),
343+
close: vi.fn().mockResolvedValue(undefined),
344+
};
345+
const mongo = {
346+
db: vi.fn(() => ({
347+
collection: vi.fn(() => ({
348+
watch: vi.fn(() => changeStream),
349+
})),
350+
})),
351+
} as any;
352+
353+
const emitter = await createWakeupEmitter({ mongo, db: "app" });
354+
const fn = () => {};
355+
emitter.on("wakeup", fn);
356+
emitter.off("wakeup", fn);
357+
});
358+
});

0 commit comments

Comments
 (0)