Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/mollifier-redis-worker-primitives.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@trigger.dev/redis-worker": patch
---

Add MollifierBuffer and MollifierDrainer primitives for trigger burst smoothing.

MollifierBuffer (`accept`, `pop`, `ack`, `requeue`, `fail`, `evaluateTrip`) is a per-env FIFO over Redis with atomic Lua transitions for status tracking. `evaluateTrip` is a sliding-window trip evaluator the webapp gate uses to detect per-env trigger bursts.

MollifierDrainer pops entries through a polling loop with a user-supplied handler. The loop survives transient Redis errors via capped exponential backoff (up to 5s), and per-env pop failures don't poison the rest of the batch — one env's blip is logged and counted as failed for that tick. Rotation is two-level: orgs at the top, envs within each org. The buffer maintains `mollifier:orgs` and `mollifier:org-envs:${orgId}` atomically with per-env queues, so the drainer walks orgs → envs directly without an in-memory cache. The `maxOrgsPerTick` option (default 500) caps how many orgs are scheduled per tick; for each picked org, one env is popped (rotating round-robin within the org). An org with N envs gets the same per-tick scheduling slot as an org with 1 env, so tenant-level drainage throughput is determined by org count rather than env count.
6 changes: 6 additions & 0 deletions .server-changes/magic-link-email-validation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Validate email format on the magic link login form.
6 changes: 6 additions & 0 deletions .server-changes/mollifier-burst-protection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Lay the groundwork for an opt-in burst-protection layer on the trigger hot path. This release ships **monitoring only** — operators can observe per-env trigger storms via two opt-in modes, but no trigger calls are diverted or rate-limited yet (active burst smoothing follows in a later release). All new env vars are prefixed `TRIGGER_MOLLIFIER_*` and default off, so existing deployments see no behaviour change. With `TRIGGER_MOLLIFIER_SHADOW_MODE=1`, each trigger evaluates a per-env rate counter and logs `mollifier.would_mollify` when the threshold is crossed. With `TRIGGER_MOLLIFIER_ENABLED=1` plus a per-org `mollifierEnabled` flag, over-threshold triggers are also recorded in a Redis audit buffer alongside the normal `engine.trigger` call, drained by a background no-op consumer. The drainer has its own switch (`TRIGGER_MOLLIFIER_DRAINER_ENABLED`) so multi-replica deployments can pin the polling loop to a single worker service while every replica still produces into the buffer; unset, it inherits `TRIGGER_MOLLIFIER_ENABLED` so single-container self-hosters need only one flag. Drainer misconfiguration (shutdown-timeout reconciliation against `GRACEFUL_SHUTDOWN_TIMEOUT`, or `TRIGGER_MOLLIFIER_ENABLED=1` with no buffer Redis) now throws `MollifierConfigurationError` at boot and crashes the process, so the misconfig surfaces to the orchestrator instead of disappearing into a log line; transient init failures (Redis blip) are still logged-and-swallowed. Emits the `mollifier.decisions` OTel counter for per-env rate visibility.
23 changes: 23 additions & 0 deletions .server-changes/otel-attribute-utf16-sanitization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
area: webapp
type: fix
---

Recover from ClickHouse `JSONEachRow` parse failures caused by lone
UTF-16 surrogates in OTel attribute strings (`Cannot parse JSON object
here ... ParallelParsingBlockInputFormat`).

`ClickhouseEventRepository.#flushBatch` and `#flushLlmMetricsBatch` now
retry once after sanitizing every row in the batch: any string value
containing a lone surrogate is replaced with `"[invalid-utf16]"`. If
the sanitizer touched no fields (the parse error isn't a surrogate
issue) or the retry still fails, the batch is dropped without further
ClickHouse round-trips, `permanentlyDroppedBatches` increments, and an
error log with a 1KB sample row is emitted. Non-parse errors propagate
unchanged.

Detection reuses `detectBadJsonStrings` via `JSON.stringify(value)`,
with a latent regex bug fixed: the low-surrogate hex nibble matched
`[cd]` instead of `[c-f]`, missing the U+DE00–U+DFFF half of the range
and false-flagging common emoji pairs. Healthy batches pay zero scan
cost — the check only runs when ClickHouse has already rejected.
3 changes: 3 additions & 0 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import isbot from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { PassThrough } from "stream";
import * as Worker from "~/services/worker.server";
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
import { bootstrap } from "./bootstrap";
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
import {
Expand Down Expand Up @@ -247,6 +248,8 @@ Worker.init().catch((error) => {
logError(error);
});

initMollifierDrainerWorker();

bootstrap().catch((error) => {
logError(error);
});
Expand Down
41 changes: 41 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,47 @@ const EnvironmentSchema = z
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

TRIGGER_MOLLIFIER_ENABLED: z.string().default("0"),
// Separate switch for the drainer (consumer side) so it can be split
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. In multi-replica deployments, set this to "0"
// explicitly on every replica except the one dedicated drainer
// service — otherwise every replica's polling loop races for the
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
// no-op because the gate-side singleton refuses to construct a
// buffer when the system is off.
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
TRIGGER_MOLLIFIER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
TRIGGER_MOLLIFIER_REDIS_PORT: z.coerce
.number()
.optional()
.transform(
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
),
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
TRIGGER_MOLLIFIER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
.int()
Expand Down
21 changes: 18 additions & 3 deletions apps/webapp/app/routes/login.magic/route.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,32 @@ export async function action({ request }: ActionFunctionArgs) {

const payload = Object.fromEntries(await clonedRequest.formData());

const data = z
const result = z
.discriminatedUnion("action", [
z.object({
action: z.literal("send"),
email: z.string().trim().toLowerCase(),
email: z.string().trim().toLowerCase().email(),
}),
z.object({
action: z.literal("reset"),
}),
])
.parse(payload);
.safeParse(payload);

if (!result.success) {
const session = await getUserSession(request);
session.set("auth:error", {
message: "Please enter a valid email address.",
});

return redirect("/login/magic", {
headers: {
"Set-Cookie": await commitSession(session),
},
});
}

const data = result.data;

switch (data.action) {
case "send": {
Expand Down
114 changes: 114 additions & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ import type {
TriggerTaskRequest,
TriggerTaskValidator,
} from "../types";
import { env } from "~/env.server";
import {
evaluateGate as defaultEvaluateGate,
type GateOutcome,
type MollifierEvaluateGate,
} from "~/v3/mollifier/mollifierGate.server";
import {
getMollifierBuffer as defaultGetMollifierBuffer,
type MollifierGetBuffer,
} from "~/v3/mollifier/mollifierBuffer.server";
import { buildBufferedTriggerPayload } from "~/v3/mollifier/bufferedTriggerPayload.server";
import { serialiseSnapshot } from "@trigger.dev/redis-worker";
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";

class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
Expand All @@ -59,6 +71,14 @@ export class RunEngineTriggerTaskService {
private readonly traceEventConcern: TraceEventConcern;
private readonly triggerRacepointSystem: TriggerRacepointSystem;
private readonly metadataMaximumSize: number;
// Mollifier hooks are DI'd so tests can drive the call-site's mollify branch
// deterministically (stub the gate to return mollify, inject a real or fake
// buffer, force the global-enabled predicate to true so the call site
// doesn't short-circuit on an unset env). In production all three default
// to the live module-level singletons + env read.
private readonly evaluateGate: MollifierEvaluateGate;
private readonly getMollifierBuffer: MollifierGetBuffer;
private readonly isMollifierGloballyEnabled: () => boolean;

constructor(opts: {
prisma: PrismaClientOrTransaction;
Expand All @@ -71,6 +91,9 @@ export class RunEngineTriggerTaskService {
tracer: Tracer;
metadataMaximumSize: number;
triggerRacepointSystem?: TriggerRacepointSystem;
evaluateGate?: MollifierEvaluateGate;
getMollifierBuffer?: MollifierGetBuffer;
isMollifierGloballyEnabled?: () => boolean;
}) {
this.prisma = opts.prisma;
this.engine = opts.engine;
Expand All @@ -82,6 +105,10 @@ export class RunEngineTriggerTaskService {
this.traceEventConcern = opts.traceEventConcern;
this.metadataMaximumSize = opts.metadataMaximumSize;
this.triggerRacepointSystem = opts.triggerRacepointSystem ?? new NoopTriggerRacepointSystem();
this.evaluateGate = opts.evaluateGate ?? defaultEvaluateGate;
this.getMollifierBuffer = opts.getMollifierBuffer ?? defaultGetMollifierBuffer;
this.isMollifierGloballyEnabled =
opts.isMollifierGloballyEnabled ?? (() => env.TRIGGER_MOLLIFIER_ENABLED === "1");
}

public async call({
Expand Down Expand Up @@ -316,6 +343,25 @@ export class RunEngineTriggerTaskService {
taskKind: taskKind ?? "STANDARD",
};

// Short-circuit before the gate when mollifier is globally off (the
// default for every deployment that hasn't opted in). Avoids the
// GateInputs allocation, the deps spread inside `evaluateGate`, and
// the `mollifier.decisions{outcome=pass_through}` OTel increment on
// every trigger — `triggerTask` is the highest-throughput code path
// in the system. The check goes through a DI'd predicate so unit
// tests that inject a custom `evaluateGate` can also override the
// gate-on check (the default reads `env.TRIGGER_MOLLIFIER_ENABLED`,
// which is "0" in CI where no .env file is present).
const mollifierOutcome: GateOutcome | null = this.isMollifierGloballyEnabled()
? await this.evaluateGate({
envId: environment.id,
orgId: environment.organizationId,
taskId,
orgFeatureFlags:
(environment.organization.featureFlags as Record<string, unknown> | null) ?? null,
})
: null;

try {
return await this.traceEventConcern.traceRun(
triggerRequest,
Expand All @@ -328,6 +374,74 @@ export class RunEngineTriggerTaskService {

const payloadPacket = await this.payloadProcessor.process(triggerRequest);

// Phase 1 dual-write: if the org has the mollifier feature flag
// enabled and the per-env trip evaluator says divert, write the
// canonical replay payload to the buffer AND continue through
// engine.trigger as normal. The buffer entry is an audit/preview
// copy; the drainer's no-op handler consumes it to prove the
// dequeue mechanism works. Phase 2 will replace engine.trigger
// (below) with a synthesised 200 response and rely on the
// drainer to perform the Postgres write via replay.
if (mollifierOutcome?.action === "mollify") {
const buffer = this.getMollifierBuffer();
if (buffer) {
const canonicalPayload = buildBufferedTriggerPayload({
runFriendlyId,
taskId,
envId: environment.id,
envType: environment.type,
envSlug: environment.slug,
orgId: environment.organizationId,
orgSlug: environment.organization.slug,
projectId: environment.projectId,
projectRef: environment.project.externalRef,
body,
idempotencyKey: idempotencyKey ?? null,
idempotencyKeyExpiresAt: idempotencyKey
? idempotencyKeyExpiresAt ?? null
: null,
tags,
parentRunFriendlyId: parentRun?.friendlyId ?? null,
traceContext: event.traceContext,
triggerSource,
triggerAction,
serviceOptions: options,
createdAt: new Date(),
});

try {
const serialisedPayload = serialiseSnapshot(canonicalPayload);
await buffer.accept({
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
payload: serialisedPayload,
});
// Light log on the hot path — keep this synchronous work
// O(1) per trigger. The drainer computes the payload hash
// off-path; operators correlate `mollifier.buffered` →
// `mollifier.drained` by runId.
logger.debug("mollifier.buffered", {
runId: runFriendlyId,
envId: environment.id,
orgId: environment.organizationId,
taskId,
payloadBytes: serialisedPayload.length,
});
} catch (err) {
// Fail-open: buffer write must never block the customer's
// trigger. engine.trigger below is the primary write path
// in Phase 1 — the customer still gets a valid run.
logger.error("mollifier.buffer_accept_failed", {
runId: runFriendlyId,
envId: environment.id,
taskId,
err: err instanceof Error ? err.message : String(err),
});
}
}
}

const taskRun = await this.engine.trigger(
{
friendlyId: runFriendlyId,
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
/**
* ⚠️ LEGACY — Graphile-worker / ZodWorker setup. Do not touch.
*
* This file wires the original background-job system the webapp was
* built on (`@internal/zod-worker` → graphile-worker → Postgres). It is
* now in deprecation mode: every task in `workerCatalog` below is
* annotated with `@deprecated, moved to <new home>` and the live jobs
* for new features all run on `@trigger.dev/redis-worker` instead.
*
* Where to put new things:
* - Background jobs / queues → use redis-worker, alongside
* `~/v3/commonWorker.server.ts`, `~/v3/alertsWorker.server.ts`, or
* `~/v3/batchTriggerWorker.server.ts`.
* - Run lifecycle → `@internal/run-engine` via `~/v3/runEngine.server`.
* - Custom polling loops with their own Redis connection → keep them
* in their own lifecycle module (e.g. `~/v3/mollifierDrainerWorker.server.ts`)
* and wire the bootstrap from `entry.server.tsx`. Don't reach into
* `init()` below.
*
* Edit only when removing legacy paths.
*/
import { ZodWorker } from "@internal/zod-worker";
import { DeliverEmailSchema } from "emails";
import { z } from "zod";
Expand Down
22 changes: 18 additions & 4 deletions apps/webapp/app/utils/detectBadJsonStrings.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/**
* Detects unpaired UTF-16 surrogate escape sequences in JSON-encoded text.
*
* Returns true if the input contains a `\uD8XX`/`\uD9XX`/`\uDAXX`/`\uDBXX`
* high-surrogate escape not immediately followed by a `\uDC..`–`\uDF..` low
* surrogate, or a `\uDC..`–`\uDF..` low surrogate not immediately preceded by
* a high surrogate. Strict JSON parsers (e.g. ClickHouse `JSONEachRow`)
* reject input containing such sequences.
*
* Surrogate hex ranges (case-insensitive — inputs from `JSON.stringify` are
* lowercase):
* - High surrogate (U+D800–U+DBFF): `\uD[8-B][0-9A-F][0-9A-F]`
* - Low surrogate (U+DC00–U+DFFF): `\uD[C-F][0-9A-F][0-9A-F]`
*/
export function detectBadJsonStrings(jsonString: string): boolean {
// Fast path: skip everything if no \u
let idx = jsonString.indexOf("\\u");
Expand All @@ -13,7 +27,7 @@ export function detectBadJsonStrings(jsonString: string): boolean {
if (jsonString[idx + 1] === "u" && jsonString[idx + 2] === "d") {
const third = jsonString[idx + 3];

// High surrogate check
// High surrogate check — third nibble is 8, 9, a, or b (U+D800–U+DBFF)
if (
/[89ab]/.test(third) &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
Expand All @@ -28,17 +42,17 @@ export function detectBadJsonStrings(jsonString: string): boolean {
jsonString[idx + 6] !== "\\" ||
jsonString[idx + 7] !== "u" ||
jsonString[idx + 8] !== "d" ||
!/[cd]/.test(jsonString[idx + 9]) ||
!/[c-f]/.test(jsonString[idx + 9]) ||
!/[0-9a-f]/.test(jsonString[idx + 10]) ||
!/[0-9a-f]/.test(jsonString[idx + 11])
) {
return true; // Incomplete high surrogate
}
}

// Low surrogate check
// Low surrogate check — third nibble is c, d, e, or f (U+DC00–U+DFFF)
if (
(third === "c" || third === "d") &&
/[c-f]/.test(third) &&
/[0-9a-f]/.test(jsonString[idx + 4]) &&
/[0-9a-f]/.test(jsonString[idx + 5])
) {
Expand Down
Loading
Loading