|
| 1 | +import { createHash } from "node:crypto"; |
| 2 | +import { MollifierDrainer, serialiseSnapshot } from "@trigger.dev/redis-worker"; |
| 3 | +import { env } from "~/env.server"; |
| 4 | +import { logger } from "~/services/logger.server"; |
| 5 | +import { singleton } from "~/utils/singleton"; |
| 6 | +import { getMollifierBuffer } from "./mollifierBuffer.server"; |
| 7 | +import type { BufferedTriggerPayload } from "./bufferedTriggerPayload.server"; |
| 8 | + |
| 9 | +function initializeMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> { |
| 10 | + const buffer = getMollifierBuffer(); |
| 11 | + if (!buffer) { |
| 12 | + // Unreachable in normal config: getMollifierDrainer() gates on the |
| 13 | + // same env flag as getMollifierBuffer(). If we hit this, fail loud |
| 14 | + // — the operator has set MOLLIFIER_ENABLED=1 on a worker pod but |
| 15 | + // the buffer can't initialise (e.g. MOLLIFIER_REDIS_HOST resolves |
| 16 | + // to nothing). Crashing surfaces the misconfig immediately rather |
| 17 | + // than silently leaving entries un-drained. |
| 18 | + throw new Error("MollifierDrainer initialised without a buffer — env vars inconsistent"); |
| 19 | + } |
| 20 | + |
| 21 | + // Validate BEFORE start() so a misconfigured shutdown timeout fails |
| 22 | + // loud at module-load time and the singleton is never cached. If start() |
| 23 | + // ran first and the throw propagated out, the loop would already be |
| 24 | + // polling with no SIGTERM handler registered by the caller — exactly |
| 25 | + // the failure mode the validation is supposed to prevent. |
| 26 | + // |
| 27 | + // The SIGTERM handler in worker.server.ts is sync fire-and-forget: |
| 28 | + // `drainer.stop({ timeoutMs })` returns a promise that keeps the event |
| 29 | + // loop alive, but in cluster mode the primary runs its own |
| 30 | + // GRACEFUL_SHUTDOWN_TIMEOUT and will call `process.exit(0)` |
| 31 | + // independently. If the drainer's deadline exceeds the primary's, the |
| 32 | + // drainer is cut off mid-wait — "log a warning on timeout" turns into |
| 33 | + // "hard exit with no log". 1s margin gives the primary room to finish |
| 34 | + // its own teardown after the drainer settles. |
| 35 | + const shutdownMarginMs = 1_000; |
| 36 | + if ( |
| 37 | + env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS >= |
| 38 | + env.GRACEFUL_SHUTDOWN_TIMEOUT - shutdownMarginMs |
| 39 | + ) { |
| 40 | + throw new Error( |
| 41 | + `MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS (${env.MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS}) must be at least ${shutdownMarginMs}ms below GRACEFUL_SHUTDOWN_TIMEOUT (${env.GRACEFUL_SHUTDOWN_TIMEOUT}); otherwise the primary's hard exit shadows the drainer's deadline.`, |
| 42 | + ); |
| 43 | + } |
| 44 | + |
| 45 | + logger.debug("Initializing mollifier drainer", { |
| 46 | + concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY, |
| 47 | + maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS, |
| 48 | + }); |
| 49 | + |
| 50 | + // Phase 1 handler: no-op ack. The trigger has ALREADY been written to |
| 51 | + // Postgres via engine.trigger (dual-write at the call site). Popping + |
| 52 | + // acking here proves the dequeue mechanism works end-to-end without |
| 53 | + // duplicating the work. Phase 2 will replace this with an engine.trigger |
| 54 | + // replay that performs the actual Postgres write. |
| 55 | + const drainer = new MollifierDrainer<BufferedTriggerPayload>({ |
| 56 | + buffer, |
| 57 | + handler: async (input) => { |
| 58 | + // Hash the (re-serialised, canonical) payload on the drain side rather |
| 59 | + // than on the trigger hot path. Burst-time CPU stays with engine.trigger; |
| 60 | + // the drainer is the natural place for the audit-equivalence checksum. |
| 61 | + // Re-serialisation is identity for the BufferedTriggerPayload shape |
| 62 | + // (only strings/numbers/plain objects), so this hash matches what the |
| 63 | + // call site wrote into Redis. |
| 64 | + const reserialised = serialiseSnapshot(input.payload); |
| 65 | + const payloadHash = createHash("sha256").update(reserialised).digest("hex"); |
| 66 | + logger.info("mollifier.drained", { |
| 67 | + runId: input.runId, |
| 68 | + envId: input.envId, |
| 69 | + orgId: input.orgId, |
| 70 | + taskId: input.payload.taskId, |
| 71 | + attempts: input.attempts, |
| 72 | + ageMs: Date.now() - input.createdAt.getTime(), |
| 73 | + payloadBytes: reserialised.length, |
| 74 | + payloadHash, |
| 75 | + }); |
| 76 | + }, |
| 77 | + concurrency: env.MOLLIFIER_DRAIN_CONCURRENCY, |
| 78 | + maxAttempts: env.MOLLIFIER_DRAIN_MAX_ATTEMPTS, |
| 79 | + maxOrgsPerTick: env.MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK, |
| 80 | + // A no-op handler shouldn't throw, but if something does (e.g. an |
| 81 | + // unexpected deserialise failure), don't loop — let it FAIL terminally |
| 82 | + // so the entry is observable in metrics. |
| 83 | + isRetryable: () => false, |
| 84 | + }); |
| 85 | + |
| 86 | + drainer.start(); |
| 87 | + return drainer; |
| 88 | +} |
| 89 | + |
| 90 | +export function getMollifierDrainer(): MollifierDrainer<BufferedTriggerPayload> | null { |
| 91 | + if (env.MOLLIFIER_ENABLED !== "1") return null; |
| 92 | + return singleton("mollifierDrainer", initializeMollifierDrainer); |
| 93 | +} |
0 commit comments