Skip to content

Commit ce201fb

Browse files
bchapuisclaude
andcommitted
Add read-only mailbox browsing for email addresses
Let users navigate the conversations their workflows have recorded for an email address. Writes still happen only through nodes, so this surface is strictly read-only. - MailboxDO: listThreads, listThreadAttachments, getAttachment read methods - API: GET endpoints under /:organizationId/emails/:id for threads, thread detail, message bodies, and attachment downloads (org + email scoped, blobs served with nosniff + CSP sandbox + forced-download headers) - Web: per-address master-detail inbox page, reached from the Emails list - Shared mailbox browsing types Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent ef6e710 commit ce201fb

8 files changed

Lines changed: 922 additions & 3 deletions

File tree

apps/api/src/durable-objects/mailbox-do.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,62 @@ describe("MailboxDO", () => {
152152
expect(messages).toHaveLength(1);
153153
});
154154

155+
it("lists threads scoped to an address", async () => {
156+
const other = "email-2";
157+
const { threadId: t1 } = await mailbox.ingestInbound(
158+
inbound({ subject: "First" })
159+
);
160+
const { threadId: t2 } = await mailbox.ingestInbound(
161+
inbound({ subject: "Second", fromEmail: "other@example.com" })
162+
);
163+
// A thread on a different address must not leak into the listing.
164+
await mailbox.ingestInbound(
165+
inbound({ emailId: other, subject: "Elsewhere" })
166+
);
167+
168+
const threads = await mailbox.listThreads(EMAIL_ID, 100, 0);
169+
expect(new Set(threads.map((t) => t.id))).toEqual(new Set([t1, t2]));
170+
171+
const elsewhere = await mailbox.listThreads(other, 100, 0);
172+
expect(elsewhere).toHaveLength(1);
173+
expect(elsewhere[0].subject).toBe("Elsewhere");
174+
});
175+
176+
it("lists and fetches attachments scoped to a thread", async () => {
177+
const messageId = uuidv7();
178+
const attachmentId = uuidv7();
179+
const r2Key = `${EMAIL_ID}/${messageId}/attachments/0-report.pdf`;
180+
const { threadId } = await mailbox.ingestInbound(
181+
inbound({
182+
messageId,
183+
attachments: [
184+
{
185+
id: attachmentId,
186+
filename: "report.pdf",
187+
contentType: "application/pdf",
188+
sizeBytes: 1234,
189+
r2Key,
190+
contentId: null,
191+
},
192+
],
193+
})
194+
);
195+
196+
const rows = await mailbox.listThreadAttachments(threadId);
197+
expect(rows).toHaveLength(1);
198+
expect(rows[0]).toMatchObject({
199+
id: attachmentId,
200+
messageId,
201+
filename: "report.pdf",
202+
sizeBytes: 1234,
203+
r2Key,
204+
});
205+
206+
const att = await mailbox.getAttachment(attachmentId);
207+
expect(att?.r2Key).toBe(r2Key);
208+
expect(await mailbox.getAttachment("missing")).toBeUndefined();
209+
});
210+
155211
it("prunes inactive threads and reports their R2 prefixes", async () => {
156212
const args = inbound();
157213
const { threadId, messageId } = await mailbox.ingestInbound(args);

apps/api/src/durable-objects/mailbox-do.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ export interface MailboxMessageRow {
9292
createdAt: number;
9393
}
9494

95+
export interface MailboxAttachmentRow {
96+
id: string;
97+
messageId: string;
98+
filename: string;
99+
contentType: string;
100+
sizeBytes: number;
101+
r2Key: string;
102+
}
103+
95104
export class MailboxDO extends DurableObject<Bindings> {
96105
private initialized = false;
97106

@@ -364,6 +373,84 @@ export class MailboxDO extends DurableObject<Bindings> {
364373
return { threadId };
365374
}
366375

376+
/**
377+
* List threads belonging to one email address, newest activity first. Powers
378+
* the read-only mailbox browser; the index on `last_message_at` keeps this
379+
* cheap even as a mailbox accumulates conversations.
380+
*/
381+
async listThreads(
382+
emailId: string,
383+
limit: number,
384+
offset: number
385+
): Promise<MailboxThreadRow[]> {
386+
this.ensureSchema();
387+
const rows = this.ctx.storage.sql
388+
.exec(
389+
`SELECT id, email_id, subject, from_email, last_message_at, created_at
390+
FROM threads WHERE email_id = ?
391+
ORDER BY last_message_at DESC LIMIT ? OFFSET ?`,
392+
emailId,
393+
limit,
394+
offset
395+
)
396+
.toArray() as Record<string, unknown>[];
397+
return rows.map((r) => ({
398+
id: r.id as string,
399+
emailId: r.email_id as string,
400+
subject: r.subject as string,
401+
fromEmail: r.from_email as string,
402+
lastMessageAt: Number(r.last_message_at),
403+
createdAt: Number(r.created_at),
404+
}));
405+
}
406+
407+
/** Attachment metadata for every message in a thread, for the detail view. */
408+
async listThreadAttachments(
409+
threadId: string
410+
): Promise<MailboxAttachmentRow[]> {
411+
this.ensureSchema();
412+
const rows = this.ctx.storage.sql
413+
.exec(
414+
`SELECT a.id, a.message_id, a.filename, a.content_type, a.size_bytes, a.r2_key
415+
FROM attachments a JOIN messages m ON m.id = a.message_id
416+
WHERE m.thread_id = ?`,
417+
threadId
418+
)
419+
.toArray() as Record<string, unknown>[];
420+
return rows.map((r) => ({
421+
id: r.id as string,
422+
messageId: r.message_id as string,
423+
filename: r.filename as string,
424+
contentType: r.content_type as string,
425+
sizeBytes: Number(r.size_bytes),
426+
r2Key: r.r2_key as string,
427+
}));
428+
}
429+
430+
/** Single attachment by id, used to resolve its R2 blob for download. */
431+
async getAttachment(
432+
attachmentId: string
433+
): Promise<MailboxAttachmentRow | undefined> {
434+
this.ensureSchema();
435+
const rows = this.ctx.storage.sql
436+
.exec(
437+
`SELECT id, message_id, filename, content_type, size_bytes, r2_key
438+
FROM attachments WHERE id = ? LIMIT 1`,
439+
attachmentId
440+
)
441+
.toArray() as Record<string, unknown>[];
442+
if (rows.length === 0) return undefined;
443+
const r = rows[0];
444+
return {
445+
id: r.id as string,
446+
messageId: r.message_id as string,
447+
filename: r.filename as string,
448+
contentType: r.content_type as string,
449+
sizeBytes: Number(r.size_bytes),
450+
r2Key: r.r2_key as string,
451+
};
452+
}
453+
367454
async getThread(threadId: string): Promise<MailboxThreadRow | undefined> {
368455
this.ensureSchema();
369456
const rows = this.ctx.storage.sql

apps/api/src/routes/emails.ts

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import type {
33
CreateEmailResponse,
44
DeleteEmailResponse,
55
GetEmailResponse,
6+
GetMailboxThreadResponse,
67
ListEmailsResponse,
8+
ListMailboxThreadsResponse,
9+
MailboxAttachmentSummary,
710
UpdateEmailRequest,
811
UpdateEmailResponse,
912
} from "@dafthunk/types";
@@ -23,6 +26,7 @@ import {
2326
updateEmail,
2427
} from "../db";
2528
import type { EmailRow } from "../db/schema";
29+
import { inboxKeys } from "../support-storage";
2630
import {
2731
formatEmailAddress,
2832
generateEmailHandle,
@@ -45,6 +49,21 @@ emailRoutes.use("*", jwtMiddleware);
4549

4650
const nameSchema = z.string().trim().min(1, "Email name is required").max(120);
4751

52+
// Defense-in-depth headers shared by every blob response. Bodies and
53+
// attachments may carry attacker-influenced content, so they must never render
54+
// same-origin: nosniff + a CSP sandbox + a forced download disposition together
55+
// neutralise script execution and top-level navigation. The frontend reads
56+
// these via `fetch()`, which ignores the headers, so rendering still works.
57+
const blobSecurityHeaders = (
58+
contentType: string,
59+
filename: string
60+
): Record<string, string> => ({
61+
"Content-Type": contentType,
62+
"X-Content-Type-Options": "nosniff",
63+
"Content-Security-Policy": "sandbox",
64+
"Content-Disposition": `attachment; filename="${filename.replace(/["\r\n]/g, "")}"`,
65+
});
66+
4867
const toEmailPayload = (
4968
email: Pick<EmailRow, "id" | "name" | "handle" | "createdAt" | "updatedAt">,
5069
domain: string
@@ -231,4 +250,190 @@ emailRoutes.delete("/:id", async (c) => {
231250
return c.json(response);
232251
});
233252

253+
// ── Read-only mailbox browsing ──────────────────────────────────────────────
254+
//
255+
// These endpoints let a user navigate the conversations recorded for one of
256+
// their email addresses. They are strictly read-only: messages are written
257+
// exclusively by workflow nodes through the Mailbox Durable Object. Each
258+
// endpoint first confirms the address belongs to the caller's organization,
259+
// then reads from the per-org mailbox (`mailbox:{organizationId}`).
260+
261+
const mailboxStub = (env: ApiContext["Bindings"], organizationId: string) =>
262+
env.MAILBOX.get(env.MAILBOX.idFromName(`mailbox:${organizationId}`));
263+
264+
/** List the conversations recorded for an email address (newest first). */
265+
emailRoutes.get("/:id/threads", async (c) => {
266+
const id = c.req.param("id");
267+
const organizationId = c.get("organizationId")!;
268+
const db = createDatabase(c.env.DB);
269+
270+
const email = await getEmail(db, id, organizationId);
271+
if (!email) {
272+
return c.json({ error: "Email not found" }, 404);
273+
}
274+
275+
const threads = await mailboxStub(c.env, organizationId).listThreads(
276+
id,
277+
200,
278+
0
279+
);
280+
281+
const response: ListMailboxThreadsResponse = {
282+
threads: threads.map((t) => ({
283+
id: t.id,
284+
subject: t.subject,
285+
fromEmail: t.fromEmail,
286+
lastMessageAt: t.lastMessageAt,
287+
createdAt: t.createdAt,
288+
})),
289+
};
290+
return c.json(response);
291+
});
292+
293+
/** A single conversation with its messages and attachment metadata. */
294+
emailRoutes.get("/:id/threads/:threadId", async (c) => {
295+
const id = c.req.param("id");
296+
const threadId = c.req.param("threadId");
297+
const organizationId = c.get("organizationId")!;
298+
const db = createDatabase(c.env.DB);
299+
300+
// These reads are independent: the ownership check (email exists, thread
301+
// belongs to it) gates the *response*, not the fetches, and the DO is already
302+
// scoped to the caller's org. Fetch them concurrently and validate after.
303+
const stub = mailboxStub(c.env, organizationId);
304+
const [email, thread, messages, attachmentRows] = await Promise.all([
305+
getEmail(db, id, organizationId),
306+
stub.getThread(threadId),
307+
stub.listThreadMessages(threadId),
308+
stub.listThreadAttachments(threadId),
309+
]);
310+
311+
if (!email) {
312+
return c.json({ error: "Email not found" }, 404);
313+
}
314+
// Confirm the thread really belongs to this address; the DO is per-org so a
315+
// mismatch means the caller addressed the wrong mailbox slot.
316+
if (!thread || thread.emailId !== id) {
317+
return c.json({ error: "Thread not found" }, 404);
318+
}
319+
320+
const byMessage = new Map<string, MailboxAttachmentSummary[]>();
321+
for (const a of attachmentRows) {
322+
const list = byMessage.get(a.messageId) ?? [];
323+
list.push({
324+
id: a.id,
325+
filename: a.filename,
326+
contentType: a.contentType,
327+
sizeBytes: a.sizeBytes,
328+
});
329+
byMessage.set(a.messageId, list);
330+
}
331+
332+
const response: GetMailboxThreadResponse = {
333+
thread: {
334+
id: thread.id,
335+
subject: thread.subject,
336+
fromEmail: thread.fromEmail,
337+
lastMessageAt: thread.lastMessageAt,
338+
createdAt: thread.createdAt,
339+
},
340+
messages: messages.map((m) => ({
341+
id: m.id,
342+
direction: m.direction,
343+
fromEmail: m.fromEmail,
344+
toEmail: m.toEmail,
345+
subject: m.subject,
346+
snippet: m.snippet,
347+
hasHtml: m.hasHtml,
348+
hasText: m.hasText,
349+
attachmentCount: m.attachmentCount,
350+
createdAt: m.createdAt,
351+
attachments: byMessage.get(m.id) ?? [],
352+
})),
353+
};
354+
return c.json(response);
355+
});
356+
357+
/** Stream a message body part (text or html) from R2. */
358+
emailRoutes.get(
359+
"/:id/messages/:messageId/body",
360+
zValidator(
361+
"query",
362+
z.object({ part: z.enum(["text", "html"]).default("text") })
363+
),
364+
async (c) => {
365+
const id = c.req.param("id");
366+
const messageId = c.req.param("messageId");
367+
const organizationId = c.get("organizationId")!;
368+
const db = createDatabase(c.env.DB);
369+
370+
const email = await getEmail(db, id, organizationId);
371+
if (!email) {
372+
return c.json({ error: "Email not found" }, 404);
373+
}
374+
375+
const { part } = c.req.valid("query");
376+
const keys = inboxKeys(id, messageId);
377+
const bodyPart =
378+
part === "html"
379+
? {
380+
key: keys.htmlBody,
381+
filename: "body.html",
382+
contentType: "text/html; charset=utf-8",
383+
}
384+
: {
385+
key: keys.textBody,
386+
filename: "body.txt",
387+
contentType: "text/plain; charset=utf-8",
388+
};
389+
390+
// R2 keys are namespaced by emailId, so a body only resolves when the
391+
// message genuinely lives under the address the caller just authorized.
392+
const obj = await c.env.INBOXES.get(bodyPart.key);
393+
if (!obj) {
394+
return c.json({ error: "Body part not stored" }, 404);
395+
}
396+
397+
return new Response(obj.body, {
398+
status: 200,
399+
headers: blobSecurityHeaders(bodyPart.contentType, bodyPart.filename),
400+
});
401+
}
402+
);
403+
404+
/** Download a single attachment blob from R2. */
405+
emailRoutes.get("/:id/attachments/:attachmentId", async (c) => {
406+
const id = c.req.param("id");
407+
const attachmentId = c.req.param("attachmentId");
408+
const organizationId = c.get("organizationId")!;
409+
const db = createDatabase(c.env.DB);
410+
411+
const email = await getEmail(db, id, organizationId);
412+
if (!email) {
413+
return c.json({ error: "Email not found" }, 404);
414+
}
415+
416+
const att = await mailboxStub(c.env, organizationId).getAttachment(
417+
attachmentId
418+
);
419+
// The R2 key is prefixed with the owning emailId; require it to match the
420+
// addressed mailbox so one address can't read another's blobs.
421+
if (!att || !att.r2Key.startsWith(`${id}/`)) {
422+
return c.json({ error: "Attachment not found" }, 404);
423+
}
424+
425+
const obj = await c.env.INBOXES.get(att.r2Key);
426+
if (!obj) {
427+
return c.json({ error: "Attachment blob missing" }, 404);
428+
}
429+
430+
return new Response(obj.body, {
431+
status: 200,
432+
headers: {
433+
...blobSecurityHeaders(att.contentType, att.filename),
434+
"Content-Length": String(att.sizeBytes),
435+
},
436+
});
437+
});
438+
234439
export default emailRoutes;

0 commit comments

Comments
 (0)