Skip to content

Commit c5d1111

Browse files
committed
Add e2e tests for DO-to-DO sync via miniflare
Tests the LinkProcessorDO ↔ SyncBackendDO sync path end-to-end: store creation, link ingestion, duplicate detection, concurrent ingests, cross-org isolation, fetch trigger path, queue batch handler with real DOs, and mixed-org batch routing.
1 parent 5fa4a50 commit c5d1111

1 file changed

Lines changed: 320 additions & 0 deletions

File tree

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
import { env } from "cloudflare:test";
2+
import { describe, it, expect, beforeAll } from "vitest";
3+
4+
import type { OrgId } from "@/cf-worker/db/branded";
5+
import type { LinkQueueMessage } from "@/cf-worker/link-processor/types";
6+
import { handleQueueBatch } from "@/cf-worker/queue-handler";
7+
8+
import { signupUser } from "./helpers";
9+
import type { UserInfo } from "./helpers";
10+
11+
function getLinkProcessorStub(orgId: string) {
12+
const id = env.LINK_PROCESSOR_DO.idFromName(orgId);
13+
return env.LINK_PROCESSOR_DO.get(id);
14+
}
15+
16+
function makeQueueMessage(
17+
url: string,
18+
storeId: string,
19+
opts?: { source?: string; sourceMeta?: string | null }
20+
): LinkQueueMessage {
21+
return {
22+
url,
23+
storeId: storeId as OrgId,
24+
source: opts?.source ?? "api",
25+
sourceMeta: opts?.sourceMeta ?? null,
26+
};
27+
}
28+
29+
function createMockBatch(
30+
messages: Array<{ body: LinkQueueMessage; id?: string }>
31+
) {
32+
const mockMessages = messages.map((m, i) => {
33+
let acked = false;
34+
let retried = false;
35+
return {
36+
body: m.body,
37+
id: m.id ?? `msg-${i}`,
38+
timestamp: new Date(),
39+
attempts: 1,
40+
ack() {
41+
acked = true;
42+
},
43+
retry() {
44+
retried = true;
45+
},
46+
get _acked() {
47+
return acked;
48+
},
49+
get _retried() {
50+
return retried;
51+
},
52+
};
53+
});
54+
55+
const batch = {
56+
messages: mockMessages,
57+
queue: "cloudstash-link-queue",
58+
} as unknown as MessageBatch<LinkQueueMessage>;
59+
60+
return { batch, mockMessages };
61+
}
62+
63+
describe("DO-to-DO Sync E2E", () => {
64+
let user: UserInfo;
65+
66+
beforeAll(async () => {
67+
user = await signupUser("do-sync-user@test.com", "DO Sync User");
68+
});
69+
70+
describe("cold boot sync", () => {
71+
it("ingests a link on a fresh LinkProcessorDO", async () => {
72+
const stub = getLinkProcessorStub(user.orgId);
73+
const msg = makeQueueMessage("https://example.com/cold-boot", user.orgId);
74+
75+
const result = await stub.ingestAndProcess(msg);
76+
77+
expect(result.status).toBe("ingested");
78+
expect(result.linkId).toBeDefined();
79+
});
80+
});
81+
82+
describe("warm DO sync", () => {
83+
it("ingests a second link on an already-initialized DO", async () => {
84+
const stub = getLinkProcessorStub(user.orgId);
85+
const msg = makeQueueMessage("https://example.com/warm-do", user.orgId);
86+
87+
const result = await stub.ingestAndProcess(msg);
88+
89+
expect(result.status).toBe("ingested");
90+
expect(result.linkId).toBeDefined();
91+
});
92+
});
93+
94+
describe("duplicate detection", () => {
95+
it("detects duplicate URLs via livestore state", async () => {
96+
const stub = getLinkProcessorStub(user.orgId);
97+
const url = "https://example.com/duplicate-test";
98+
const msg = makeQueueMessage(url, user.orgId);
99+
100+
const first = await stub.ingestAndProcess(msg);
101+
expect(first.status).toBe("ingested");
102+
103+
const second = await stub.ingestAndProcess(msg);
104+
expect(second.status).toBe("duplicate");
105+
expect(second.linkId).toBe(first.linkId);
106+
});
107+
});
108+
109+
describe("concurrent ingests", () => {
110+
it("ingests multiple links simultaneously", async () => {
111+
const stub = getLinkProcessorStub(user.orgId);
112+
const urls = [
113+
"https://example.com/concurrent-1",
114+
"https://example.com/concurrent-2",
115+
"https://example.com/concurrent-3",
116+
];
117+
118+
const results = await Promise.all(
119+
urls.map((url) =>
120+
stub.ingestAndProcess(makeQueueMessage(url, user.orgId))
121+
)
122+
);
123+
124+
for (const result of results) {
125+
expect(result.status).toBe("ingested");
126+
expect(result.linkId).toBeDefined();
127+
}
128+
129+
const linkIds = results.map((r) => r.linkId);
130+
const uniqueIds = new Set(linkIds);
131+
expect(uniqueIds.size).toBe(urls.length);
132+
});
133+
});
134+
135+
describe("invalid URL handling", () => {
136+
it("rejects invalid URLs", async () => {
137+
const stub = getLinkProcessorStub(user.orgId);
138+
const msg = makeQueueMessage("not-a-url", user.orgId);
139+
140+
const result = await stub.ingestAndProcess(msg);
141+
142+
expect(result.status).toBe("invalid_url");
143+
});
144+
});
145+
146+
describe("cross-org isolation", () => {
147+
let otherUser: UserInfo;
148+
149+
beforeAll(async () => {
150+
otherUser = await signupUser(
151+
"do-sync-other@test.com",
152+
"DO Sync Other User"
153+
);
154+
});
155+
156+
it("different orgs have independent stores", async () => {
157+
const url = "https://example.com/cross-org-test";
158+
159+
const stubA = getLinkProcessorStub(user.orgId);
160+
const stubB = getLinkProcessorStub(otherUser.orgId);
161+
162+
const resultA = await stubA.ingestAndProcess(
163+
makeQueueMessage(url, user.orgId)
164+
);
165+
const resultB = await stubB.ingestAndProcess(
166+
makeQueueMessage(url, otherUser.orgId)
167+
);
168+
169+
expect(resultA.status).toBe("ingested");
170+
expect(resultB.status).toBe("ingested");
171+
expect(resultA.linkId).not.toBe(resultB.linkId);
172+
});
173+
});
174+
175+
describe("fetch trigger path (SyncBackendDO → LinkProcessorDO)", () => {
176+
it("initializes store and subscription via fetch", async () => {
177+
const triggerUser = await signupUser(
178+
"do-sync-trigger@test.com",
179+
"Trigger User"
180+
);
181+
const stub = getLinkProcessorStub(triggerUser.orgId);
182+
183+
const res = await stub.fetch(
184+
`https://link-processor/?storeId=${triggerUser.orgId}`
185+
);
186+
187+
expect(res.status).toBe(200);
188+
expect(await res.text()).toBe("OK");
189+
190+
const ingestResult = await stub.ingestAndProcess(
191+
makeQueueMessage("https://example.com/after-trigger", triggerUser.orgId)
192+
);
193+
expect(ingestResult.status).toBe("ingested");
194+
});
195+
196+
it("returns 400 without storeId", async () => {
197+
const stub = getLinkProcessorStub(user.orgId);
198+
199+
const res = await stub.fetch("https://link-processor/");
200+
201+
expect(res.status).toBe(400);
202+
expect(await res.text()).toBe("Missing storeId");
203+
});
204+
});
205+
206+
describe("queue batch handler (real DOs)", () => {
207+
it("processes a single-message batch through the real queue handler", async () => {
208+
const queueUser = await signupUser(
209+
"do-sync-queue@test.com",
210+
"Queue User"
211+
);
212+
const { batch, mockMessages } = createMockBatch([
213+
{
214+
body: makeQueueMessage(
215+
"https://example.com/queue-single",
216+
queueUser.orgId
217+
),
218+
},
219+
]);
220+
221+
await handleQueueBatch(batch, env);
222+
223+
expect(mockMessages[0]._acked).toBe(true);
224+
expect(mockMessages[0]._retried).toBe(false);
225+
});
226+
227+
it("processes a multi-message batch", async () => {
228+
const batchUser = await signupUser(
229+
"do-sync-batch@test.com",
230+
"Batch User"
231+
);
232+
const { batch, mockMessages } = createMockBatch([
233+
{
234+
body: makeQueueMessage(
235+
"https://example.com/batch-1",
236+
batchUser.orgId
237+
),
238+
},
239+
{
240+
body: makeQueueMessage(
241+
"https://example.com/batch-2",
242+
batchUser.orgId
243+
),
244+
},
245+
{
246+
body: makeQueueMessage(
247+
"https://example.com/batch-3",
248+
batchUser.orgId
249+
),
250+
},
251+
]);
252+
253+
await handleQueueBatch(batch, env);
254+
255+
for (const msg of mockMessages) {
256+
expect(msg._acked).toBe(true);
257+
expect(msg._retried).toBe(false);
258+
}
259+
});
260+
261+
it("handles mixed-org batches routing to separate DOs", async () => {
262+
const orgA = await signupUser("do-sync-mix-a@test.com", "Mix User A");
263+
const orgB = await signupUser("do-sync-mix-b@test.com", "Mix User B");
264+
265+
const url = "https://example.com/mixed-org-batch";
266+
const { batch, mockMessages } = createMockBatch([
267+
{ body: makeQueueMessage(url, orgA.orgId) },
268+
{ body: makeQueueMessage(url, orgB.orgId) },
269+
]);
270+
271+
await handleQueueBatch(batch, env);
272+
273+
for (const msg of mockMessages) {
274+
expect(msg._acked).toBe(true);
275+
}
276+
277+
const stubA = getLinkProcessorStub(orgA.orgId);
278+
const dupA = await stubA.ingestAndProcess(
279+
makeQueueMessage(url, orgA.orgId)
280+
);
281+
expect(dupA.status).toBe("duplicate");
282+
283+
const stubB = getLinkProcessorStub(orgB.orgId);
284+
const dupB = await stubB.ingestAndProcess(
285+
makeQueueMessage(url, orgB.orgId)
286+
);
287+
expect(dupB.status).toBe("duplicate");
288+
});
289+
});
290+
291+
describe("sequential ingestion after queue batch", () => {
292+
it("DO accepts direct ingestAndProcess after being initialized via queue batch", async () => {
293+
const seqUser = await signupUser(
294+
"do-sync-seq@test.com",
295+
"Sequential User"
296+
);
297+
298+
const { batch } = createMockBatch([
299+
{
300+
body: makeQueueMessage(
301+
"https://example.com/seq-queue",
302+
seqUser.orgId
303+
),
304+
},
305+
]);
306+
await handleQueueBatch(batch, env);
307+
308+
const stub = getLinkProcessorStub(seqUser.orgId);
309+
const direct = await stub.ingestAndProcess(
310+
makeQueueMessage("https://example.com/seq-direct", seqUser.orgId)
311+
);
312+
expect(direct.status).toBe("ingested");
313+
314+
const dup = await stub.ingestAndProcess(
315+
makeQueueMessage("https://example.com/seq-queue", seqUser.orgId)
316+
);
317+
expect(dup.status).toBe("duplicate");
318+
});
319+
});
320+
});

0 commit comments

Comments
 (0)