Skip to content

Commit 0144e55

Browse files
committed
feat: rework email queue step retry logic
Now we mark emails to be retried in one iteration, and retry them in the next iteration.
1 parent 2181e08 commit 0144e55

3 files changed

Lines changed: 490 additions & 23 deletions

File tree

apps/backend/src/app/api/latest/emails/outbox/crud.tsx

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,25 @@ function prismaModelToCrud(prismaModel: EmailOutbox): EmailOutboxCrud["Server"][
5757
to = { type: "custom-emails", emails: recipient?.emails ?? [] };
5858
}
5959

60+
// Convert sendAttemptErrors from DB format (camelCase) to API format (snake_case)
61+
const sendAttemptErrors = prismaModel.sendAttemptErrors
62+
? (prismaModel.sendAttemptErrors as Array<{
63+
attemptNumber: number,
64+
timestamp: string,
65+
externalMessage: string,
66+
externalDetails: Record<string, unknown>,
67+
internalMessage: string,
68+
internalDetails: Record<string, unknown>,
69+
}>).map(e => ({
70+
attempt_number: e.attemptNumber,
71+
timestamp: e.timestamp,
72+
external_message: e.externalMessage,
73+
external_details: e.externalDetails,
74+
internal_message: e.internalMessage,
75+
internal_details: e.internalDetails,
76+
}))
77+
: null;
78+
6079
// Base fields present on all emails
6180
const base = {
6281
id: prismaModel.id,
@@ -68,6 +87,9 @@ function prismaModelToCrud(prismaModel: EmailOutbox): EmailOutboxCrud["Server"][
6887
variables: (prismaModel.extraRenderVariables ?? {}) as Record<string, any>,
6988
skip_deliverability_check: prismaModel.shouldSkipDeliverabilityCheck,
7089
scheduled_at_millis: prismaModel.scheduledAt.getTime(),
90+
failed_send_attempt_count: prismaModel.failedSendAttemptCount,
91+
next_send_retry_at_millis: prismaModel.nextSendRetryAt?.getTime() ?? null,
92+
send_attempt_errors: sendAttemptErrors,
7193
// Default flags (overridden in specific statuses)
7294
is_paused: false,
7395
has_rendered: false,
@@ -395,13 +417,16 @@ export const emailOutboxCrudHandlers = createLazyProxy(() => createCrudHandlers(
395417
// If content changed, reset rendering and sending state
396418
if (needsRerenderReset) {
397419
set("isQueued", Prisma.sql`false`);
420+
// Reset retry fields (failedSendAttemptCount to 0, others to null)
421+
set("failedSendAttemptCount", Prisma.sql`0`);
398422
setNull(
399423
"renderedByWorkerId", "startedRenderingAt", "finishedRenderingAt",
400424
"renderErrorExternalMessage", "renderErrorExternalDetails",
401425
"renderErrorInternalMessage", "renderErrorInternalDetails",
402426
"renderedHtml", "renderedText", "renderedSubject",
403427
"renderedIsTransactional", "renderedNotificationCategoryId",
404428
"startedSendingAt", "finishedSendingAt",
429+
"nextSendRetryAt", "sendAttemptErrors",
405430
"sendServerErrorExternalMessage", "sendServerErrorExternalDetails",
406431
"sendServerErrorInternalMessage", "sendServerErrorInternalDetails",
407432
"skippedReason", "skippedDetails", "canHaveDeliveryInfo",
@@ -494,6 +519,9 @@ function parseEmailOutboxFromJson(j: Record<string, unknown>): EmailOutbox {
494519
scheduledAtIfNotYetQueued: dateOrNull("scheduledAtIfNotYetQueued"),
495520
startedSendingAt: dateOrNull("startedSendingAt"),
496521
finishedSendingAt: dateOrNull("finishedSendingAt"),
522+
failedSendAttemptCount: j.failedSendAttemptCount as number,
523+
nextSendRetryAt: dateOrNull("nextSendRetryAt"),
524+
sendAttemptErrors: j.sendAttemptErrors as Prisma.JsonValue,
497525
sentAt: dateOrNull("sentAt"),
498526
sendServerErrorExternalMessage: j.sendServerErrorExternalMessage as string | null,
499527
sendServerErrorExternalDetails: j.sendServerErrorExternalDetails as Prisma.JsonValue,

apps/backend/src/lib/email-queue-step.tsx

Lines changed: 127 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,40 @@ import { filterUndefined } from "@stackframe/stack-shared/dist/utils/objects";
1515
import { Result } from "@stackframe/stack-shared/dist/utils/results";
1616
import { traceSpan } from "@stackframe/stack-shared/dist/utils/telemetry";
1717
import { randomUUID } from "node:crypto";
18-
import { lowLevelSendEmailDirectViaProvider } from "./emails-low-level";
18+
import { lowLevelSendEmailDirectWithoutRetries } from "./emails-low-level";
1919

2020
const MAX_RENDER_BATCH = 50;
2121

22+
const MAX_SEND_ATTEMPTS = 3;
23+
24+
const RETRY_BACKOFF_BASE_MS = 2000;
25+
26+
const calculateRetryBackoffMs = (attemptCount: number): number => {
27+
return (Math.random() + 0.5) * RETRY_BACKOFF_BASE_MS * Math.pow(2, attemptCount);
28+
};
29+
30+
/**
31+
* Structure for tracking errors from each send attempt.
32+
* Mirrors the pattern used for sendServerError* fields.
33+
* Uses Prisma.InputJsonValue-compatible types for DB storage.
34+
*/
35+
type SendAttemptError = {
36+
attemptNumber: number,
37+
timestamp: string,
38+
externalMessage: string,
39+
externalDetails: Prisma.InputJsonObject,
40+
internalMessage: string,
41+
internalDetails: Prisma.InputJsonObject,
42+
};
43+
44+
const appendSendAttemptError =(
45+
existingErrors: SendAttemptError[] | null | undefined,
46+
newError: SendAttemptError
47+
): SendAttemptError[] => {
48+
const errors = existingErrors ?? [];
49+
return [...errors, newError];
50+
};
51+
2252
// Track if email queue has run at least once since server start (used to suppress first-run delta warnings in dev)
2353
const emailQueueFirstRunKey = Symbol.for("__stack_email_queue_first_run_completed");
2454

@@ -481,6 +511,7 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
481511
AND "finishedRenderingAt" IS NOT NULL
482512
AND "renderedHtml" IS NOT NULL
483513
AND "scheduledAt" <= NOW()
514+
AND "nextSendRetryAt" IS NULL
484515
RETURNING "id";
485516
`;
486517
return {
@@ -489,11 +520,16 @@ async function queueReadyEmails(): Promise<{ queuedCount: number }> {
489520
}
490521

491522
async function prepareSendPlan(deltaSeconds: number): Promise<TenancySendBatch[]> {
523+
// Find tenancies with emails ready to send (either new emails or emails ready for retry)
492524
const tenancyIds = await globalPrismaClient.emailOutbox.findMany({
493525
where: {
494-
isQueued: true,
495526
isPaused: false,
496-
startedSendingAt: null,
527+
OR: [
528+
// Normal case: queued, not started, and no pending retry
529+
{ startedSendingAt: null, isQueued: true, nextSendRetryAt: null },
530+
// Retry case: past retry time
531+
{ nextSendRetryAt: { lte: new Date() } },
532+
],
497533
},
498534
distinct: ["tenancyId"],
499535
select: { tenancyId: true },
@@ -524,16 +560,23 @@ async function claimEmailsForSending(tx: PrismaClientTransaction, tenancyId: str
524560
SELECT "tenancyId", "id"
525561
FROM "EmailOutbox"
526562
WHERE "tenancyId" = ${tenancyId}::uuid
527-
AND "isQueued" = TRUE
528563
AND "isPaused" = FALSE
529564
AND "finishedRenderingAt" IS NOT NULL
530-
AND "startedSendingAt" IS NULL
565+
AND (
566+
-- Normal case: queued, not started, and no pending retry
567+
("startedSendingAt" IS NULL AND "isQueued" = TRUE AND "nextSendRetryAt" IS NULL)
568+
OR
569+
-- Retry case: past retry time
570+
("nextSendRetryAt" IS NOT NULL AND "nextSendRetryAt" <= NOW())
571+
)
531572
ORDER BY "priority" DESC, "scheduledAt" ASC, "createdAt" ASC
532573
LIMIT ${limit}
533574
FOR UPDATE SKIP LOCKED
534575
)
535576
UPDATE "EmailOutbox" AS e
536-
SET "startedSendingAt" = NOW()
577+
SET
578+
"startedSendingAt" = NOW(),
579+
"nextSendRetryAt" = NULL
537580
FROM selected
538581
WHERE e."tenancyId" = selected."tenancyId" AND e."id" = selected."id"
539582
RETURNING e.*;
@@ -641,7 +684,7 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
641684

642685
const result = getEnvBoolean("STACK_EMAIL_BRANCHING_DISABLE_QUEUE_SENDING")
643686
? Result.error({ errorType: "email-sending-disabled", canRetry: false, message: "Email sending is disabled", rawError: new Error("Email sending is disabled") })
644-
: await lowLevelSendEmailDirectViaProvider({
687+
: await lowLevelSendEmailDirectWithoutRetries({
645688
tenancyId: context.tenancy.id,
646689
emailConfig: context.emailConfig,
647690
to: resolution.emails,
@@ -651,24 +694,86 @@ async function processSingleEmail(context: TenancyProcessingContext, row: EmailO
651694
});
652695

653696
if (result.status === "error") {
654-
await globalPrismaClient.emailOutbox.update({
655-
where: {
656-
tenancyId_id: {
697+
const newAttemptCount = row.failedSendAttemptCount + 1;
698+
const canRetry = result.error.canRetry && newAttemptCount < MAX_SEND_ATTEMPTS;
699+
700+
// Build error entry for this attempt
701+
const errorEntry: SendAttemptError = {
702+
attemptNumber: newAttemptCount,
703+
timestamp: new Date().toISOString(),
704+
externalMessage: result.error.message ?? result.error.errorType,
705+
externalDetails: { errorType: result.error.errorType },
706+
internalMessage: result.error.message ?? result.error.errorType,
707+
internalDetails: { rawError: errorToNiceString(result.error.rawError), errorType: result.error.errorType },
708+
};
709+
const updatedErrors = appendSendAttemptError(row.sendAttemptErrors as SendAttemptError[] | null, errorEntry);
710+
711+
if (canRetry) {
712+
// Schedule retry: unclaim the email and set nextSendRetryAt
713+
const backoffMs = calculateRetryBackoffMs(newAttemptCount);
714+
await globalPrismaClient.emailOutbox.update({
715+
where: {
716+
tenancyId_id: {
717+
tenancyId: row.tenancyId,
718+
id: row.id,
719+
},
720+
finishedSendingAt: null,
721+
},
722+
data: {
723+
startedSendingAt: null, // Unclaim the email
724+
isQueued: false, // Prevent normal queue path from picking it up
725+
failedSendAttemptCount: newAttemptCount,
726+
nextSendRetryAt: new Date(Date.now() + backoffMs),
727+
sendAttemptErrors: updatedErrors as Prisma.InputJsonArray,
728+
},
729+
});
730+
} else {
731+
// Mark as permanent failure - distinguish between "attempts exhausted" and "permanent error from provider"
732+
const isAttemptsExhausted = result.error.canRetry && newAttemptCount >= MAX_SEND_ATTEMPTS;
733+
const failureReason = isAttemptsExhausted ? "attempts_exhausted" : "permanent_error";
734+
735+
if (isAttemptsExhausted) {
736+
captureError("email-queue-step-retries-exhausted", new StackAssertionError(`Email failed after ${newAttemptCount} attempts`, {
737+
emailId: row.id,
657738
tenancyId: row.tenancyId,
658-
id: row.id,
739+
errorType: result.error.errorType,
740+
errorMessage: result.error.message,
741+
allAttemptErrors: updatedErrors,
742+
}));
743+
}
744+
745+
const externalMessage = isAttemptsExhausted
746+
? "Email could not be delivered after multiple attempts. Please verify your email configuration and try again."
747+
: result.error.message;
748+
749+
await globalPrismaClient.emailOutbox.update({
750+
where: {
751+
tenancyId_id: {
752+
tenancyId: row.tenancyId,
753+
id: row.id,
754+
},
755+
finishedSendingAt: null,
659756
},
660-
finishedSendingAt: null,
661-
},
662-
data: {
663-
finishedSendingAt: new Date(),
664-
canHaveDeliveryInfo: false,
665-
sendServerErrorExternalMessage: result.error.message,
666-
sendServerErrorExternalDetails: { errorType: result.error.errorType },
667-
sendServerErrorInternalMessage: result.error.message,
668-
sendServerErrorInternalDetails: { rawError: errorToNiceString(result.error.rawError), errorType: result.error.errorType },
669-
},
670-
});
757+
data: {
758+
finishedSendingAt: new Date(),
759+
canHaveDeliveryInfo: false,
760+
failedSendAttemptCount: newAttemptCount,
761+
sendAttemptErrors: updatedErrors as Prisma.InputJsonArray,
762+
sendServerErrorExternalMessage: externalMessage,
763+
sendServerErrorExternalDetails: { errorType: result.error.errorType },
764+
sendServerErrorInternalMessage: result.error.message,
765+
sendServerErrorInternalDetails: {
766+
rawError: errorToNiceString(result.error.rawError),
767+
errorType: result.error.errorType,
768+
attemptCount: newAttemptCount,
769+
failureReason,
770+
allAttemptErrors: updatedErrors as Json[],
771+
},
772+
},
773+
});
774+
}
671775
} else {
776+
// Success - mark as sent (don't increment failedSendAttemptCount since this wasn't a failure)
672777
await globalPrismaClient.emailOutbox.update({
673778
where: {
674779
tenancyId_id: {

0 commit comments

Comments
 (0)