Skip to content

Commit 1eea2e9

Browse files
committed
feat: rework email queue step retry logic
Now we mark emails to be retried in one iteration, and retry them in the next iteration.
1 parent 5c9577b commit 1eea2e9

3 files changed

Lines changed: 491 additions & 23 deletions

File tree

apps/backend/src/app/api/latest/emails/outbox/crud.tsx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { KnownErrors } from "@stackframe/stack-shared";
77
import { emailOutboxCrud, EmailOutboxCrud } from "@stackframe/stack-shared/dist/interface/crud/email-outbox";
88
import { yupObject, yupString } from "@stackframe/stack-shared/dist/schema-fields";
99
import { StackAssertionError, StatusError, throwErr } from "@stackframe/stack-shared/dist/utils/errors";
10+
import { Json } from "@stackframe/stack-shared/dist/utils/json";
1011
import { createLazyProxy } from "@stackframe/stack-shared/dist/utils/proxies";
1112

1213
/**
@@ -57,6 +58,25 @@ function prismaModelToCrud(prismaModel: EmailOutbox): EmailOutboxCrud["Server"][
5758
to = { type: "custom-emails", emails: recipient?.emails ?? [] };
5859
}
5960

61+
// Convert sendAttemptErrors from DB format (camelCase) to API format (snake_case)
62+
const sendAttemptErrors = prismaModel.sendAttemptErrors
63+
? (prismaModel.sendAttemptErrors as Array<{
64+
attemptNumber: number,
65+
timestamp: string,
66+
externalMessage: string,
67+
externalDetails: Record<string, Json>,
68+
internalMessage: string,
69+
internalDetails: Record<string, Json>,
70+
}>).map(e => ({
71+
attempt_number: e.attemptNumber,
72+
timestamp: e.timestamp,
73+
external_message: e.externalMessage,
74+
external_details: e.externalDetails,
75+
internal_message: e.internalMessage,
76+
internal_details: e.internalDetails,
77+
}))
78+
: null;
79+
6080
// Base fields present on all emails
6181
const base = {
6282
id: prismaModel.id,
@@ -68,6 +88,9 @@ function prismaModelToCrud(prismaModel: EmailOutbox): EmailOutboxCrud["Server"][
6888
variables: (prismaModel.extraRenderVariables ?? {}) as Record<string, any>,
6989
skip_deliverability_check: prismaModel.shouldSkipDeliverabilityCheck,
7090
scheduled_at_millis: prismaModel.scheduledAt.getTime(),
91+
failed_send_attempt_count: prismaModel.failedSendAttemptCount,
92+
next_send_retry_at_millis: prismaModel.nextSendRetryAt?.getTime() ?? null,
93+
send_attempt_errors: sendAttemptErrors,
7194
// Default flags (overridden in specific statuses)
7295
is_paused: false,
7396
has_rendered: false,
@@ -395,13 +418,16 @@ export const emailOutboxCrudHandlers = createLazyProxy(() => createCrudHandlers(
395418
// If content changed, reset rendering and sending state
396419
if (needsRerenderReset) {
397420
set("isQueued", Prisma.sql`false`);
421+
// Reset retry fields (failedSendAttemptCount to 0, others to null)
422+
set("failedSendAttemptCount", Prisma.sql`0`);
398423
setNull(
399424
"renderedByWorkerId", "startedRenderingAt", "finishedRenderingAt",
400425
"renderErrorExternalMessage", "renderErrorExternalDetails",
401426
"renderErrorInternalMessage", "renderErrorInternalDetails",
402427
"renderedHtml", "renderedText", "renderedSubject",
403428
"renderedIsTransactional", "renderedNotificationCategoryId",
404429
"startedSendingAt", "finishedSendingAt",
430+
"nextSendRetryAt", "sendAttemptErrors",
405431
"sendServerErrorExternalMessage", "sendServerErrorExternalDetails",
406432
"sendServerErrorInternalMessage", "sendServerErrorInternalDetails",
407433
"skippedReason", "skippedDetails", "canHaveDeliveryInfo",
@@ -494,6 +520,9 @@ function parseEmailOutboxFromJson(j: Record<string, unknown>): EmailOutbox {
494520
scheduledAtIfNotYetQueued: dateOrNull("scheduledAtIfNotYetQueued"),
495521
startedSendingAt: dateOrNull("startedSendingAt"),
496522
finishedSendingAt: dateOrNull("finishedSendingAt"),
523+
failedSendAttemptCount: j.failedSendAttemptCount as number,
524+
nextSendRetryAt: dateOrNull("nextSendRetryAt"),
525+
sendAttemptErrors: j.sendAttemptErrors as Prisma.JsonValue,
497526
sentAt: dateOrNull("sentAt"),
498527
sendServerErrorExternalMessage: j.sendServerErrorExternalMessage as string | null,
499528
sendServerErrorExternalDetails: j.sendServerErrorExternalDetails as Prisma.JsonValue,

apps/backend/src/lib/email-queue-step.tsx

Lines changed: 127 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,40 @@ import { filterUndefined } from "@stackframe/stack-shared/dist/utils/objects";
1515
import { Result } from "@stackframe/stack-shared/dist/utils/results";
1616
import { traceSpan } from "@stackframe/stack-shared/dist/utils/telemetry";
1717
import { randomUUID } from "node:crypto";
18-
import { lowLevelSendEmailDirectViaProvider } from "./emails-low-level";
18+
import { lowLevelSendEmailDirectWithoutRetries } from "./emails-low-level";
1919

2020
const MAX_RENDER_BATCH = 50;
2121

22+
const MAX_SEND_ATTEMPTS = 3;
23+
24+
const RETRY_BACKOFF_BASE_MS = 2000;
25+
26+
const calculateRetryBackoffMs = (attemptCount: number): number => {
27+
return (Math.random() + 0.5) * RETRY_BACKOFF_BASE_MS * Math.pow(2, attemptCount);
28+
};
29+
30+
/**
31+
* Structure for tracking errors from each send attempt.
32+
* Mirrors the pattern used for sendServerError* fields.
33+
* Uses Prisma.InputJsonValue-compatible types for DB storage.
34+
*/
35+
type SendAttemptError = {
36+
attemptNumber: number,
37+
timestamp: string,
38+
externalMessage: string,
39+
externalDetails: Prisma.InputJsonObject,
40+
internalMessage: string,
41+
internalDetails: Prisma.InputJsonObject,
42+
};
43+
44+
const appendSendAttemptError =(
45+
existingErrors: SendAttemptError[] | null | undefined,
46+
newError: SendAttemptError
47+
): SendAttemptError[] => {
48+
const errors = existingErrors ?? [];
49+
return [...errors, newError];
50+
};
51+
2252
// Track if email queue has run at least once since server start (used to suppress first-run delta warnings in dev)
2353
const emailQueueFirstRunKey = Symbol.for("__stack_email_queue_first_run_completed");
2454

@@ -481,6 +511,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
481511
AND "finishedRenderingAt" IS NOT NULL
482512
AND "renderedHtml" IS NOT NULL
483513
AND "scheduledAt" <= NOW()
514+
AND "nextSendRetryAt" IS NULL
484515
RETURNING "id";
485516
`;
486517
return {
@@ -489,11 +520,16 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
489520
}
490521

491522
async function prepareSendPlan(deltaSeconds: number): Promise<TenancySendBatch[]> {
523+
// Find tenancies with emails ready to send (either new emails or emails ready for retry)
492524
const tenancyIds = await globalPrismaClient.emailOutbox.findMany({
493525
where: {
494-
isQueued: true,
495526
isPaused: false,
496-
startedSendingAt: null,
527+
OR: [
528+
// Normal case: queued, not started, and no pending retry
529+
{ startedSendingAt: null, isQueued: true, nextSendRetryAt: null },
530+
// Retry case: past retry time
531+
{ nextSendRetryAt: { lte: new Date() } },
532+
],
497533
},
498534
distinct: ["tenancyId"],
499535
select: { tenancyId: true },
@@ -524,16 +560,23 @@ async function claimEmailsForSending(tx: PrismaClientTransaction, tenancyId: str
524560
SELECT "tenancyId", "id"
525561
FROM "EmailOutbox"
526562
WHERE "tenancyId" = ${tenancyId}::uuid
527-
AND "isQueued" = TRUE
528563
AND "isPaused" = FALSE
529564
AND "finishedRenderingAt" IS NOT NULL
530-
AND "startedSendingAt" IS NULL
565+
AND (
566+
-- Normal case: queued, not started, and no pending retry
567+
("startedSendingAt" IS NULL AND "isQueued" = TRUE AND "nextSendRetryAt" IS NULL)
568+
OR
569+
-- Retry case: past retry time
570+
("nextSendRetryAt" IS NOT NULL AND "nextSendRetryAt" <= NOW())
571+
)
531572
ORDER BY "priority" DESC, "scheduledAt" ASC, "createdAt" ASC
532573
LIMIT ${limit}
533574
FOR UPDATE SKIP LOCKED
534575
)
535576
UPDATE "EmailOutbox" AS e
536-
SET "startedSendingAt" = NOW()
577+
SET
578+
"startedSendingAt" = NOW(),
579+
"nextSendRetryAt" = NULL
537580
FROM selected
538581
WHERE e."tenancyId" = selected."tenancyId" AND e."id" = selected."id"
539582
RETURNING e.*;
@@ -641,7 +684,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
641684

642685
const result = getEnvBoolean("STACK_EMAIL_BRANCHING_DISABLE_QUEUE_SENDING")
643686
? Result.error({ errorType: "email-sending-disabled", canRetry: false, message: "Email sending is disabled", rawError: new Error("Email sending is disabled") })
644-
: await lowLevelSendEmailDirectViaProvider({
687+
: await lowLevelSendEmailDirectWithoutRetries({
645688
tenancyId: context.tenancy.id,
646689
emailConfig: context.emailConfig,
647690
to: resolution.emails,
@@ -651,24 +694,86 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
651694
});
652695

653696
if (result.status === "error") {
654-
await globalPrismaClient.emailOutbox.update({
655-
where: {
656-
tenancyId_id: {
697+
const newAttemptCount = row.failedSendAttemptCount + 1;
698+
const canRetry = result.error.canRetry && newAttemptCount < MAX_SEND_ATTEMPTS;
699+
700+
// Build error entry for this attempt
701+
const errorEntry: SendAttemptError = {
702+
attemptNumber: newAttemptCount,
703+
timestamp: new Date().toISOString(),
704+
externalMessage: result.error.message ?? result.error.errorType,
705+
externalDetails: { errorType: result.error.errorType },
706+
internalMessage: result.error.message ?? result.error.errorType,
707+
internalDetails: { rawError: errorToNiceString(result.error.rawError), errorType: result.error.errorType },
708+
};
709+
const updatedErrors = appendSendAttemptError(row.sendAttemptErrors as SendAttemptError[] | null, errorEntry);
710+
711+
if (canRetry) {
712+
// Schedule retry: unclaim the email and set nextSendRetryAt
713+
const backoffMs = calculateRetryBackoffMs(newAttemptCount);
714+
await globalPrismaClient.emailOutbox.update({
715+
where: {
716+
tenancyId_id: {
717+
tenancyId: row.tenancyId,
718+
id: row.id,
719+
},
720+
finishedSendingAt: null,
721+
},
722+
data: {
723+
startedSendingAt: null, // Unclaim the email
724+
isQueued: false, // Prevent normal queue path from picking it up
725+
failedSendAttemptCount: newAttemptCount,
726+
nextSendRetryAt: new Date(Date.now() + backoffMs),
727+
sendAttemptErrors: updatedErrors as Prisma.InputJsonArray,
728+
},
729+
});
730+
} else {
731+
// Mark as permanent failure - distinguish between "attempts exhausted" and "permanent error from provider"
732+
const isAttemptsExhausted = result.error.canRetry && newAttemptCount >= MAX_SEND_ATTEMPTS;
733+
const failureReason = isAttemptsExhausted ? "attempts_exhausted" : "permanent_error";
734+
735+
if (isAttemptsExhausted) {
736+
captureError("email-queue-step-retries-exhausted", new StackAssertionError(`Email failed after ${newAttemptCount} attempts`, {
737+
emailId: row.id,
657738
tenancyId: row.tenancyId,
658-
id: row.id,
739+
errorType: result.error.errorType,
740+
errorMessage: result.error.message,
741+
allAttemptErrors: updatedErrors,
742+
}));
743+
}
744+
745+
const externalMessage = isAttemptsExhausted
746+
? "Email could not be delivered after multiple attempts. Please verify your email configuration and try again."
747+
: result.error.message;
748+
749+
await globalPrismaClient.emailOutbox.update({
750+
where: {
751+
tenancyId_id: {
752+
tenancyId: row.tenancyId,
753+
id: row.id,
754+
},
755+
finishedSendingAt: null,
659756
},
660-
finishedSendingAt: null,
661-
},
662-
data: {
663-
finishedSendingAt: new Date(),
664-
canHaveDeliveryInfo: false,
665-
sendServerErrorExternalMessage: result.error.message,
666-
sendServerErrorExternalDetails: { errorType: result.error.errorType },
667-
sendServerErrorInternalMessage: result.error.message,
668-
sendServerErrorInternalDetails: { rawError: errorToNiceString(result.error.rawError), errorType: result.error.errorType },
669-
},
670-
});
757+
data: {
758+
finishedSendingAt: new Date(),
759+
canHaveDeliveryInfo: false,
760+
failedSendAttemptCount: newAttemptCount,
761+
sendAttemptErrors: updatedErrors as Prisma.InputJsonArray,
762+
sendServerErrorExternalMessage: externalMessage,
763+
sendServerErrorExternalDetails: { errorType: result.error.errorType },
764+
sendServerErrorInternalMessage: result.error.message,
765+
sendServerErrorInternalDetails: {
766+
rawError: errorToNiceString(result.error.rawError),
767+
errorType: result.error.errorType,
768+
attemptCount: newAttemptCount,
769+
failureReason,
770+
allAttemptErrors: updatedErrors as Json[],
771+
},
772+
},
773+
});
774+
}
671775
} else {
776+
// Success - mark as sent (don't increment failedSendAttemptCount since this wasn't a failure)
672777
await globalPrismaClient.emailOutbox.update({
673778
where: {
674779
tenancyId_id: {

0 commit comments

Comments
 (0)