Skip to content

Commit 9483092

Browse files
committed
refactor(webapp): prefix mollifier env vars with TRIGGER_
All MOLLIFIER_* env vars renamed to TRIGGER_MOLLIFIER_*. The mollifier primitive is generic — buffer + drainer + trip evaluator with no trigger-specific assumptions at the redis-worker layer — but this PR's webapp wiring is specifically the trigger-task mollifier, with PII-sensitive payload handling and trigger-flow semantics. If we later mollify another surface (deploys, schedules, etc.) those will want their own env-var namespace; pre-prefixing now avoids a breaking rename later. Renames are mechanical: schema keys in env.server.ts, env.* references across the v3/mollifier* modules, and a handful of doc-comment mentions. The bootstrap fallback that has DRAINER_ENABLED default to the ENABLED value is updated to read TRIGGER_MOLLIFIER_ENABLED from process.env too. Code-side naming (classes, file names, the literal word "mollifier") stays unchanged — the rename is env-var only.
1 parent ae714c6 commit 9483092

6 files changed

Lines changed: 709 additions & 41 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,46 +1052,46 @@ const EnvironmentSchema = z
10521052
.optional()
10531053
.transform((v) => v ?? process.env.REDIS_PASSWORD),
10541054
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1055-
MOLLIFIER_ENABLED: z.string().default("0"),
1055+
TRIGGER_MOLLIFIER_ENABLED: z.string().default("0"),
10561056
// Separate switch for the drainer (consumer side) so it can be split
10571057
// off onto a dedicated worker service. Unset → inherits
1058-
// MOLLIFIER_ENABLED, so single-container self-hosters don't have to
1058+
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
10591059
// flip two switches. In multi-replica deployments, set this to "0"
10601060
// explicitly on every replica except the one dedicated drainer
10611061
// service — otherwise every replica's polling loop races for the
1062-
// same buffer entries. `MOLLIFIER_ENABLED` is still the master kill
1063-
// switch; setting this to "1" while `MOLLIFIER_ENABLED` is "0" is a
1062+
// same buffer entries. `TRIGGER_MOLLIFIER_ENABLED` is still the master kill
1063+
// switch; setting this to "1" while `TRIGGER_MOLLIFIER_ENABLED` is "0" is a
10641064
// no-op because the gate-side singleton refuses to construct a
10651065
// buffer when the system is off.
1066-
MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.MOLLIFIER_ENABLED ?? "0"),
1067-
MOLLIFIER_SHADOW_MODE: z.string().default("0"),
1068-
MOLLIFIER_REDIS_HOST: z
1066+
TRIGGER_MOLLIFIER_DRAINER_ENABLED: z.string().default(process.env.TRIGGER_MOLLIFIER_ENABLED ?? "0"),
1067+
TRIGGER_MOLLIFIER_SHADOW_MODE: z.string().default("0"),
1068+
TRIGGER_MOLLIFIER_REDIS_HOST: z
10691069
.string()
10701070
.optional()
10711071
.transform((v) => v ?? process.env.REDIS_HOST),
1072-
MOLLIFIER_REDIS_PORT: z.coerce
1072+
TRIGGER_MOLLIFIER_REDIS_PORT: z.coerce
10731073
.number()
10741074
.optional()
10751075
.transform(
10761076
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined),
10771077
),
1078-
MOLLIFIER_REDIS_USERNAME: z
1078+
TRIGGER_MOLLIFIER_REDIS_USERNAME: z
10791079
.string()
10801080
.optional()
10811081
.transform((v) => v ?? process.env.REDIS_USERNAME),
1082-
MOLLIFIER_REDIS_PASSWORD: z
1082+
TRIGGER_MOLLIFIER_REDIS_PASSWORD: z
10831083
.string()
10841084
.optional()
10851085
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1086-
MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1087-
MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1088-
MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1089-
MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
1090-
MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1091-
MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
1092-
MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
1093-
MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
1094-
MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
1086+
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
1087+
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1088+
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
1089+
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
1090+
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
1091+
TRIGGER_MOLLIFIER_ENTRY_TTL_S: z.coerce.number().int().positive().default(600),
1092+
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
1093+
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
1094+
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
10951095

10961096
BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
10971097
.number()
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { MollifierBuffer } from "@trigger.dev/redis-worker";
2+
import { env } from "~/env.server";
3+
import { logger } from "~/services/logger.server";
4+
import { singleton } from "~/utils/singleton";
5+
6+
// DI seam type for consumers (e.g. triggerTask.server.ts) that need a
7+
// nullable buffer accessor at construction time.
8+
export type MollifierGetBuffer = () => MollifierBuffer | null;
9+
10+
function initializeMollifierBuffer(): MollifierBuffer {
11+
logger.debug("Initializing mollifier buffer", {
12+
host: env.TRIGGER_MOLLIFIER_REDIS_HOST,
13+
});
14+
15+
return new MollifierBuffer({
16+
redisOptions: {
17+
keyPrefix: "",
18+
host: env.TRIGGER_MOLLIFIER_REDIS_HOST,
19+
port: env.TRIGGER_MOLLIFIER_REDIS_PORT,
20+
username: env.TRIGGER_MOLLIFIER_REDIS_USERNAME,
21+
password: env.TRIGGER_MOLLIFIER_REDIS_PASSWORD,
22+
enableAutoPipelining: true,
23+
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
24+
},
25+
entryTtlSeconds: env.TRIGGER_MOLLIFIER_ENTRY_TTL_S,
26+
});
27+
}
28+
29+
export function getMollifierBuffer(): MollifierBuffer | null {
30+
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
31+
return singleton("mollifierBuffer", initializeMollifierBuffer);
32+
}

apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
1111
if (!buffer) {
1212
// Unreachable in normal config: getMollifierDrainer() gates on the
1313
// same env flag as getMollifierBuffer(). If we hit this, fail loud
14-
// — the operator has set MOLLIFIER_ENABLED=1 on a worker pod but
15-
// the buffer can't initialise (e.g. MOLLIFIER_REDIS_HOST resolves
14+
// — the operator has set TRIGGER_MOLLIFIER_ENABLED=1 on a worker pod but
15+
// the buffer can't initialise (e.g. TRIGGER_MOLLIFIER_REDIS_HOST resolves
1616
// to nothing). Crashing surfaces the misconfig immediately rather
1717
// than silently leaving entries un-drained.
1818
throw new Error("MollifierDrainer initialised without a buffer — env vars inconsistent");
@@ -24,7 +24,7 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
2424
// polling with no SIGTERM handler registered by the caller — exactly
2525
// the failure mode the validation is supposed to prevent.
2626
//
27-
// The SIGTERM handler in worker.server.ts is sync fire-and-forget:
27+
// The SIGTERM handler in mollifierDrainerWorker.server.ts is sync fire-and-forget:
2828
// `drainer.stop({ timeoutMs })` returns a promise that keeps the event
2929
// loop alive, but in cluster mode the primary runs its own
3030
// GRACEFUL_SHUTDOWN_TIMEOUT and will call `process.exit(0)`
@@ -34,17 +34,17 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
3434
// its own teardown after the drainer settles.
3535
const shutdownMarginMs = 1_000;
3636
if (
37-
env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS >=
37+
env.TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS >=
3838
env.GRACEFUL_SHUTDOWN_TIMEOUT - shutdownMarginMs
3939
) {
4040
throw new Error(
41-
`MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS (${env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS}) must be at least ${shutdownMarginMs}ms below GRACEFUL_SHUTDOWN_TIMEOUT (${env.GRACEFUL_SHUTDOWN_TIMEOUT}); otherwise the primary's hard exit shadows the drainer's deadline.`,
41+
`TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS (${env.TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS}) must be at least ${shutdownMarginMs}ms below GRACEFUL_SHUTDOWN_TIMEOUT (${env.GRACEFUL_SHUTDOWN_TIMEOUT}); otherwise the primary's hard exit shadows the drainer's deadline.`,
4242
);
4343
}
4444

4545
logger.debug("Initializing mollifier drainer", {
46-
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
47-
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
46+
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
47+
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
4848
});
4949

5050
// Phase 1 handler: no-op ack. The trigger has ALREADY been written to
@@ -74,9 +74,9 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
7474
payloadHash,
7575
});
7676
},
77-
concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY,
78-
maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS,
79-
maxOrgsPerTick: env.MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
77+
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
78+
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
79+
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
8080
// A no-op handler shouldn't throw, but if something does (e.g. an
8181
// unexpected deserialise failure), don't loop — let it FAIL terminally
8282
// so the entry is observable in metrics.
@@ -88,12 +88,12 @@ function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload>
8888

8989
// Returns a configured-but-stopped drainer. Callers MUST register their
9090
// SIGTERM / SIGINT shutdown handlers before invoking `drainer.start()` —
91-
// see `apps/webapp/app/services/worker.server.ts`. Starting inside the
92-
// singleton factory would put the polling loop ahead of handler
93-
// registration, leaving a narrow window where a SIGTERM landing between
94-
// `start()` and `process.once("SIGTERM", ...)` would skip the graceful
95-
// stop. The split is intentional.
91+
// see `apps/webapp/app/v3/mollifierDrainerWorker.server.ts`. Starting
92+
// inside the singleton factory would put the polling loop ahead of
93+
// handler registration, leaving a narrow window where a SIGTERM landing
94+
// between `start()` and `process.once("SIGTERM", ...)` would skip the
95+
// graceful stop. The split is intentional.
9696
export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null {
97-
if (env.MOLLIFIER_ENABLED !== "1") return null;
97+
if (env.TRIGGER_MOLLIFIER_ENABLED !== "1") return null;
9898
return singleton("mollifierDrainer", initializeMollifierDrainer);
9999
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import { env } from "~/env.server";
2+
import { logger } from "~/services/logger.server";
3+
import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";
4+
import { getMollifierBuffer } from "./mollifierBuffer.server";
5+
import { createRealTripEvaluator } from "./mollifierTripEvaluator.server";
6+
import {
7+
recordDecision,
8+
type DecisionOutcome,
9+
type DecisionReason,
10+
} from "./mollifierTelemetry.server";
11+
12+
// `count` is the fleet-wide fixed-window counter for the env (INCR with a
13+
// PEXPIRE armed on the first tick of each window — see
14+
// `mollifierEvaluateTrip` in `packages/redis-worker/src/mollifier/buffer.ts`).
15+
// All webapp replicas pointing at the same Redis share the key
16+
// `mollifier:rate:${envId}`, so the threshold is the fleet-wide ceiling
17+
// rather than a per-instance one. At a window boundary an env can briefly
18+
// admit up to ~2x threshold across the fleet before tripping (fixed-window
19+
// not sliding-window). The tripped marker is refreshed on every overage
20+
// call, so a sustained burst holds the divert state until the rate falls
21+
// below threshold within a window.
22+
export type TripDecision =
23+
| { divert: false }
24+
| {
25+
divert: true;
26+
reason: "per_env_rate";
27+
count: number;
28+
threshold: number;
29+
windowMs: number;
30+
holdMs: number;
31+
};
32+
33+
export type GateOutcome =
34+
| { action: "pass_through" }
35+
| { action: "mollify"; decision: Extract<TripDecision, { divert: true }> }
36+
| { action: "shadow_log"; decision: Extract<TripDecision, { divert: true }> };
37+
38+
export type GateInputs = {
39+
envId: string;
40+
orgId: string;
41+
taskId: string;
42+
// Org-scoped flag overrides — taken from `Organization.featureFlags` on the
43+
// AuthenticatedEnvironment at the call site. The repo-wide `flag()` helper
44+
// queries the global `FeatureFlag` table; passing per-org overrides lets the
45+
// mollifier opt in a single org without touching the global row, matching
46+
// the pattern used by `canAccessAi`, `canAccessPrivateConnections`, and the
47+
// compute-template beta gate.
48+
orgFeatureFlags: Record<string, unknown> | null;
49+
};
50+
51+
export type TripEvaluator = (inputs: GateInputs) => Promise<TripDecision>;
52+
53+
// DI seam type for consumers (e.g. triggerTask.server.ts) that inject the
54+
// gate at construction time. Deliberately narrower than `evaluateGate`'s
55+
// real signature — no `deps` param — because consumers only call it with
56+
// inputs and rely on the module-level defaults.
57+
export type MollifierEvaluateGate = (inputs: GateInputs) => Promise<GateOutcome>;
58+
59+
export type GateDependencies = {
60+
isMollifierEnabled: () => boolean;
61+
isShadowModeOn: () => boolean;
62+
resolveOrgFlag: (inputs: GateInputs) => Promise<boolean>;
63+
evaluator: TripEvaluator;
64+
logShadow: (
65+
inputs: GateInputs,
66+
decision: Extract<TripDecision, { divert: true }>,
67+
) => void;
68+
logMollified: (
69+
inputs: GateInputs,
70+
decision: Extract<TripDecision, { divert: true }>,
71+
) => void;
72+
recordDecision: (outcome: DecisionOutcome, reason?: DecisionReason) => void;
73+
};
74+
75+
// `options` is a thunk so env reads happen per-evaluation, not at module load.
76+
// Don't "simplify" to a plain object — Phase 2 dynamic config relies on the
77+
// gate observing whichever env values are live at trigger time.
78+
const defaultEvaluator = createRealTripEvaluator({
79+
getBuffer: () => getMollifierBuffer(),
80+
options: () => ({
81+
windowMs: env.TRIGGER_MOLLIFIER_TRIP_WINDOW_MS,
82+
threshold: env.TRIGGER_MOLLIFIER_TRIP_THRESHOLD,
83+
holdMs: env.TRIGGER_MOLLIFIER_HOLD_MS,
84+
}),
85+
});
86+
87+
function logDivertDecision(
88+
message: "mollifier.would_mollify" | "mollifier.mollified",
89+
inputs: GateInputs,
90+
decision: Extract<TripDecision, { divert: true }>,
91+
): void {
92+
logger.info(message, {
93+
envId: inputs.envId,
94+
orgId: inputs.orgId,
95+
taskId: inputs.taskId,
96+
reason: decision.reason,
97+
count: decision.count,
98+
threshold: decision.threshold,
99+
windowMs: decision.windowMs,
100+
holdMs: decision.holdMs,
101+
});
102+
}
103+
104+
// Resolve the per-org mollifier flag purely from the in-memory
105+
// `Organization.featureFlags` JSON. No DB query — `triggerTask` is the
106+
// trigger hot path and the webapp CLAUDE.md forbids adding Prisma calls
107+
// there. The fleet-wide kill switch lives in `TRIGGER_MOLLIFIER_ENABLED`; rollout
108+
// is per-org via the JSON, matching the pattern used by `canAccessAi`,
109+
// `hasComputeAccess`, etc. There is no global `FeatureFlag` table read
110+
// in this path by design.
111+
export function makeResolveMollifierFlag(): (inputs: GateInputs) => Promise<boolean> {
112+
return (inputs) => {
113+
const override = inputs.orgFeatureFlags?.[FEATURE_FLAG.mollifierEnabled];
114+
if (override !== undefined) {
115+
const parsed = FeatureFlagCatalog[FEATURE_FLAG.mollifierEnabled].safeParse(override);
116+
if (parsed.success) {
117+
return Promise.resolve(parsed.data);
118+
}
119+
}
120+
return Promise.resolve(false);
121+
};
122+
}
123+
124+
const resolveMollifierFlag = makeResolveMollifierFlag();
125+
126+
export const defaultGateDependencies: GateDependencies = {
127+
isMollifierEnabled: () => env.TRIGGER_MOLLIFIER_ENABLED === "1",
128+
isShadowModeOn: () => env.TRIGGER_MOLLIFIER_SHADOW_MODE === "1",
129+
resolveOrgFlag: resolveMollifierFlag,
130+
evaluator: defaultEvaluator,
131+
logShadow: (inputs, decision) =>
132+
logDivertDecision("mollifier.would_mollify", inputs, decision),
133+
logMollified: (inputs, decision) =>
134+
logDivertDecision("mollifier.mollified", inputs, decision),
135+
recordDecision,
136+
};
137+
138+
export async function evaluateGate(
139+
inputs: GateInputs,
140+
deps: Partial<GateDependencies> = {},
141+
): Promise<GateOutcome> {
142+
const d = { ...defaultGateDependencies, ...deps };
143+
144+
if (!d.isMollifierEnabled()) {
145+
d.recordDecision("pass_through");
146+
return { action: "pass_through" };
147+
}
148+
149+
// Fail open: a transient DB error resolving the per-org flag must not
150+
// block triggers. Mirror the evaluator's fail-open posture in
151+
// `mollifierTripEvaluator.server.ts`.
152+
let orgFlagEnabled: boolean;
153+
try {
154+
orgFlagEnabled = await d.resolveOrgFlag(inputs);
155+
} catch (error) {
156+
logger.warn("mollifier.resolve_org_flag_failed", {
157+
envId: inputs.envId,
158+
orgId: inputs.orgId,
159+
taskId: inputs.taskId,
160+
error: error instanceof Error ? error.message : String(error),
161+
});
162+
orgFlagEnabled = false;
163+
}
164+
const shadowOn = d.isShadowModeOn();
165+
166+
if (!orgFlagEnabled && !shadowOn) {
167+
d.recordDecision("pass_through");
168+
return { action: "pass_through" };
169+
}
170+
171+
// Fail open on evaluator errors too. The default `createRealTripEvaluator`
172+
// catches its own errors and returns `{ divert: false }`, but injected or
173+
// future evaluators may not — keep the contract symmetric with the org
174+
// flag resolution above so the trigger hot path can never be broken by a
175+
// gate-internal failure.
176+
let decision: TripDecision;
177+
try {
178+
decision = await d.evaluator(inputs);
179+
} catch (error) {
180+
logger.warn("mollifier.evaluator_failed", {
181+
envId: inputs.envId,
182+
orgId: inputs.orgId,
183+
taskId: inputs.taskId,
184+
error: error instanceof Error ? error.message : String(error),
185+
});
186+
decision = { divert: false };
187+
}
188+
if (!decision.divert) {
189+
d.recordDecision("pass_through");
190+
return { action: "pass_through" };
191+
}
192+
193+
if (orgFlagEnabled) {
194+
d.logMollified(inputs, decision);
195+
d.recordDecision("mollify", decision.reason);
196+
return { action: "mollify", decision };
197+
}
198+
199+
d.logShadow(inputs, decision);
200+
d.recordDecision("shadow_log", decision.reason);
201+
return { action: "shadow_log", decision };
202+
}

0 commit comments

Comments
 (0)