Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mollifier-configurable-constants.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

Make mollifier buffer and drainer internals configurable. `MollifierBuffer` now accepts `ackGraceTtlSeconds`, `maxRetriesPerRequest`, `reconnectStepMs`, and `reconnectMaxMs` options, and `MollifierDrainer` accepts `maxBackoffMs` and `backoffFloorMs`. All default to their previous hardcoded values, so existing behaviour is unchanged.
74 changes: 73 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ const EnvironmentSchema = z
// off onto a dedicated worker service. Unset → inherits
// TRIGGER_MOLLIFIER_ENABLED, so single-container self-hosters don't have to
// flip two switches. Multi-replica drainers are correct — `popAndMarkDraining`
// is an atomic ZPOPMIN + status flip in one Lua call, so only one replica
// is an atomic RPOP + status flip in one Lua call, so only one replica
// can win any given entry — but inefficient: polling load (SMEMBERS +
// per-env scans) multiplies by N, and `TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY`
// is per-process so engine load also multiplies. Splitting the drainer
Expand Down Expand Up @@ -1137,6 +1137,78 @@ const EnvironmentSchema = z
.int()
.positive()
.default(5 * 60_000),
// Bounds for one stale-sweep pass (see mollifierStaleSweep.server.ts).
// Max entries scanned per env and max orgs visited per tick — together
// they cap the Redis traffic / wall-time of a single sweep pass.
TRIGGER_MOLLIFIER_STALE_SWEEP_MAX_ENTRIES_PER_ENV: z.coerce
.number()
.int()
.positive()
.default(1000),
TRIGGER_MOLLIFIER_STALE_SWEEP_MAX_ORGS_PER_PASS: z.coerce
.number()
.int()
.positive()
.default(100),

// --- Mollifier buffer internals (wired into MollifierBuffer in
// mollifierBuffer.server.ts). ---
// Grace TTL applied to the entry hash on drainer ack so direct reads
// (retrieve, trace) have a safety net while PG replica lag settles.
TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS: z.coerce.number().int().positive().default(30),
// ioredis per-request retry limit on the buffer's Redis client.
TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST: z.coerce
.number()
.int()
.positive()
.default(20),
// ioredis reconnect backoff envelope for the buffer client: the base
// grows by `STEP_MS` per attempt, capped at `MAX_MS`, then equal-jittered.
TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS: z.coerce.number().int().positive().default(50),
TRIGGER_MOLLIFIER_REDIS_RECONNECT_MAX_MS: z.coerce.number().int().positive().default(1000),

// --- Mollifier drainer loop internals (wired into MollifierDrainer in
// mollifierDrainer.server.ts). ---
// Tick gap when a tick drained nothing; under backlog ticks run back-to-back.
TRIGGER_MOLLIFIER_DRAIN_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(100),
// Cap on the drainer's exponential backoff after consecutive runOnce errors.
TRIGGER_MOLLIFIER_DRAIN_MAX_BACKOFF_MS: z.coerce.number().int().positive().default(5_000),
// Floor for the drainer's backoff base (so a tiny poll interval doesn't
// collapse the backoff to near-zero during a sustained outage).
TRIGGER_MOLLIFIER_DRAIN_BACKOFF_FLOOR_MS: z.coerce.number().int().positive().default(100),
// Required margin between the drainer's shutdown deadline and
// GRACEFUL_SHUTDOWN_TIMEOUT; boot fails loud if the timeout leaves less.
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_MARGIN_MS: z.coerce.number().int().positive().default(1_000),

// How often the draining-tracker ZSET cardinality is polled into the gauge.
TRIGGER_MOLLIFIER_DRAINING_GAUGE_INTERVAL_MS: z.coerce
.number()
.int()
.positive()
.default(15_000),

// --- Pre-gate idempotency claim (idempotencyClaim.server.ts). ---
// TTL on the claim key (and the upper clamp on the customer-derived
// claim TTL), how long a waiter blocks before timing out, and the
// waiter poll interval.
TRIGGER_MOLLIFIER_CLAIM_TTL_SECONDS: z.coerce.number().int().positive().default(30),
TRIGGER_MOLLIFIER_CLAIM_WAIT_MS: z.coerce.number().int().positive().default(5_000),
TRIGGER_MOLLIFIER_CLAIM_POLL_MS: z.coerce.number().int().positive().default(25),

// --- Buffered-run mutate-with-fallback wait loop (mutateWithFallback.server.ts). ---
// Ceiling on the wait-for-materialisation loop, initial poll gap, the
// backoff ceiling, and the exponential growth factor (a float).
TRIGGER_MOLLIFIER_MUTATE_SAFETY_NET_MS: z.coerce.number().int().positive().default(2_000),
TRIGGER_MOLLIFIER_MUTATE_POLL_STEP_MS: z.coerce.number().int().positive().default(20),
TRIGGER_MOLLIFIER_MUTATE_MAX_POLL_STEP_MS: z.coerce.number().int().positive().default(250),
TRIGGER_MOLLIFIER_MUTATE_BACKOFF_FACTOR: z.coerce.number().gt(1).default(1.7),

// --- Buffered-run metadata CAS retry loop (applyMetadataMutation.server.ts). ---
// Retry budget for concurrent metadata writers, and the jittered
// conflict-backoff envelope: random in [0, base + attempt * step) ms.
TRIGGER_MOLLIFIER_METADATA_MAX_RETRIES: z.coerce.number().int().positive().default(12),
TRIGGER_MOLLIFIER_METADATA_BACKOFF_BASE_MS: z.coerce.number().int().positive().default(5),
TRIGGER_MOLLIFIER_METADATA_BACKOFF_STEP_MS: z.coerce.number().int().positive().default(5),

BATCH_TRIGGER_PROCESS_JOB_VISIBILITY_TIMEOUT_MS: z.coerce
.number()
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ export async function routeOperationsToRun(
environmentId: env.id,
organizationId: env.organizationId,
maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE,
maxRetries: appEnv.TRIGGER_MOLLIFIER_METADATA_MAX_RETRIES,
backoffBaseMs: appEnv.TRIGGER_MOLLIFIER_METADATA_BACKOFF_BASE_MS,
backoffStepMs: appEnv.TRIGGER_MOLLIFIER_METADATA_BACKOFF_STEP_MS,
body: { operations },
})
);
Expand Down Expand Up @@ -190,6 +193,9 @@ const { action } = createActionApiRoute(
environmentId: env.id,
organizationId: env.organizationId,
maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE,
maxRetries: appEnv.TRIGGER_MOLLIFIER_METADATA_MAX_RETRIES,
backoffBaseMs: appEnv.TRIGGER_MOLLIFIER_METADATA_BACKOFF_BASE_MS,
backoffStepMs: appEnv.TRIGGER_MOLLIFIER_METADATA_BACKOFF_STEP_MS,
body: { metadata: body.metadata, operations: body.operations },
});

Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { env as appEnv } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
Expand Down Expand Up @@ -62,6 +63,10 @@ const { action } = createActionApiRoute(
},
synthesisedResponse: () => json({ id: runId }, { status: 200 }),
abortSignal: getRequestAbortSignal(),
safetyNetMs: appEnv.TRIGGER_MOLLIFIER_MUTATE_SAFETY_NET_MS,
pollStepMs: appEnv.TRIGGER_MOLLIFIER_MUTATE_POLL_STEP_MS,
maxPollStepMs: appEnv.TRIGGER_MOLLIFIER_MUTATE_MAX_POLL_STEP_MS,
backoffFactor: appEnv.TRIGGER_MOLLIFIER_MUTATE_BACKOFF_FACTOR,
});

if (outcome.kind === "not_found") {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { ServiceValidationError } from "~/v3/services/common.server";
Expand Down Expand Up @@ -311,7 +312,7 @@ export class IdempotencyKeyConcern {
const ttlSeconds = Math.max(
1,
Math.min(
30,
env.TRIGGER_MOLLIFIER_CLAIM_TTL_SECONDS,
Math.ceil((idempotencyKeyExpiresAt.getTime() - Date.now()) / 1000),
),
);
Expand All @@ -320,6 +321,8 @@ export class IdempotencyKeyConcern {
taskIdentifier: request.taskId,
idempotencyKey,
ttlSeconds,
safetyNetMs: env.TRIGGER_MOLLIFIER_CLAIM_WAIT_MS,
pollStepMs: env.TRIGGER_MOLLIFIER_CLAIM_POLL_MS,
});
if (outcome.kind === "resolved") {
// Another concurrent trigger committed first. Re-resolve via the
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ export class RunEngineTriggerTaskService {
idempotencyKey: idempotencyClaim.idempotencyKey,
token: idempotencyClaim.token,
runId: result.run.friendlyId,
ttlSeconds: env.TRIGGER_MOLLIFIER_CLAIM_TTL_SECONDS,
});
}
return result;
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export async function applyMetadataMutationToBufferedRun(input: {
body: Pick<FlushedRunMetadata, "metadata" | "operations">;
buffer?: MollifierBuffer | null;
maxRetries?: number;
// Jittered conflict-backoff envelope: random in [0, base + attempt * step) ms.
backoffBaseMs?: number;
backoffStepMs?: number;
}): Promise<ApplyMetadataMutationOutcome> {
const buffer = input.buffer ?? getMollifierBuffer();
if (!buffer) return { kind: "not_found" };
Expand All @@ -71,6 +74,8 @@ export async function applyMetadataMutationToBufferedRun(input: {
// with sub-percent failure probability; the cost is bounded (each
// retry is one Redis Lua call ~1ms).
const maxRetries = input.maxRetries ?? 12;
const backoffBaseMs = input.backoffBaseMs ?? 5;
const backoffStepMs = input.backoffStepMs ?? 5;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
const entry = await buffer.getEntry(input.runId);
if (!entry) return { kind: "not_found" };
Expand Down Expand Up @@ -192,7 +197,7 @@ export async function applyMetadataMutationToBufferedRun(input: {
observedVersion: entry.metadataVersion,
currentVersion: cas.currentVersion,
});
const backoffMs = Math.floor(Math.random() * (5 + attempt * 5));
const backoffMs = Math.floor(Math.random() * (backoffBaseMs + attempt * backoffStepMs));
await new Promise((resolve) => setTimeout(resolve, backoffMs));
}

Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierBuffer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ function initializeMollifierBuffer(): MollifierBuffer {
enableAutoPipelining: true,
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
ackGraceTtlSeconds: env.TRIGGER_MOLLIFIER_ACK_GRACE_TTL_SECONDS,
maxRetriesPerRequest: env.TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST,
reconnectStepMs: env.TRIGGER_MOLLIFIER_REDIS_RECONNECT_STEP_MS,
reconnectMaxMs: env.TRIGGER_MOLLIFIER_REDIS_RECONNECT_MAX_MS,
});
}

Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
// drainer is cut off mid-wait — "log a warning on timeout" turns into
// "hard exit with no log". 1s margin gives the primary room to finish
// its own teardown after the drainer settles.
const shutdownMarginMs = 1_000;
const shutdownMarginMs = env.TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_MARGIN_MS;
if (
env.TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS >=
env.GRACEFUL_SHUTDOWN_TIMEOUT - shutdownMarginMs
Expand All @@ -83,6 +83,9 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
pollIntervalMs: env.TRIGGER_MOLLIFIER_DRAIN_POLL_INTERVAL_MS,
maxBackoffMs: env.TRIGGER_MOLLIFIER_DRAIN_MAX_BACKOFF_MS,
backoffFloorMs: env.TRIGGER_MOLLIFIER_DRAIN_BACKOFF_FLOOR_MS,
isRetryable: isRetryablePgError,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,14 @@ export class MollifierStaleSweepState implements StaleSweepStateStore {
private readonly redis: Redis;
private readonly logger: Logger;

constructor(options: { redisOptions: RedisOptions; logger?: Logger }) {
constructor(options: {
redisOptions: RedisOptions;
logger?: Logger;
maxRetriesPerRequest?: number;
}) {
this.logger = options.logger ?? new Logger("MollifierStaleSweepState", "debug");
this.redis = createRedisClient(
{ ...options.redisOptions, maxRetriesPerRequest: 20 },
{ ...options.redisOptions, maxRetriesPerRequest: options.maxRetriesPerRequest ?? 20 },
{
onError: (error) => {
this.logger.error("MollifierStaleSweepState redis client error:", { error });
Expand Down
6 changes: 4 additions & 2 deletions apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const DEFAULT_SAFETY_NET_MS = 2_000;
// cadence for the whole safety-net budget.
export const DEFAULT_POLL_STEP_MS = 20;
export const DEFAULT_MAX_POLL_STEP_MS = 250;
const BACKOFF_FACTOR = 1.7;
export const DEFAULT_BACKOFF_FACTOR = 1.7;

export type MutateWithFallbackInput<TResponse> = {
runId: string;
Expand Down Expand Up @@ -52,6 +52,7 @@ export type MutateWithFallbackInput<TResponse> = {
safetyNetMs?: number;
pollStepMs?: number;
maxPollStepMs?: number;
backoffFactor?: number;
// Test injection.
getBuffer?: () => MollifierBuffer | null;
prismaWriter?: TaskRunReader;
Expand Down Expand Up @@ -179,6 +180,7 @@ export async function mutateWithFallback<TResponse>(
// for the real mutation.
const safetyNetMs = input.safetyNetMs ?? DEFAULT_SAFETY_NET_MS;
const maxPollStepMs = input.maxPollStepMs ?? DEFAULT_MAX_POLL_STEP_MS;
const backoffFactor = input.backoffFactor ?? DEFAULT_BACKOFF_FACTOR;
const random = input.random ?? Math.random;
const deadline = now() + safetyNetMs;
let step = input.pollStepMs ?? DEFAULT_POLL_STEP_MS;
Expand Down Expand Up @@ -210,7 +212,7 @@ export async function mutateWithFallback<TResponse>(
if (now() >= deadline) break;
const jittered = step + Math.floor(random() * step);
await sleep(jittered);
step = Math.min(Math.ceil(step * BACKOFF_FACTOR), maxPollStepMs);
step = Math.min(Math.ceil(step * backoffFactor), maxPollStepMs);
}

logger.warn("mollifier mutate-with-fallback: drainer resolution timed out", {
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/mollifierDrainerWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ export function initMollifierDrainerWorker(
// drainer because that's the loop creating the DRAINING entries
// — same pod, same Redis client lifecycle. Idempotent + unref'd
// so it's safe under dev hot-reload and doesn't block shutdown.
startMollifierDrainingGauge();
startMollifierDrainingGauge({
intervalMs: env.TRIGGER_MOLLIFIER_DRAINING_GAUGE_INTERVAL_MS,
});
}
} catch (error) {
// Deterministic misconfig (shutdown-timeout vs GRACEFUL_SHUTDOWN_TIMEOUT,
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/mollifierStaleSweepWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ export function initMollifierStaleSweepWorker(): void {
enableAutoPipelining: true,
...(env.TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
maxRetriesPerRequest: env.TRIGGER_MOLLIFIER_REDIS_MAX_RETRIES_PER_REQUEST,
});

const handle = startStaleSweepInterval(
{
intervalMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_INTERVAL_MS,
staleThresholdMs: env.TRIGGER_MOLLIFIER_STALE_SWEEP_THRESHOLD_MS,
maxEntriesPerEnv: env.TRIGGER_MOLLIFIER_STALE_SWEEP_MAX_ENTRIES_PER_ENV,
maxOrgsPerPass: env.TRIGGER_MOLLIFIER_STALE_SWEEP_MAX_ORGS_PER_PASS,
},
{ state },
);
Expand Down
35 changes: 27 additions & 8 deletions packages/redis-worker/src/mollifier/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,25 @@ import { BufferEntry, BufferEntrySchema } from "./schemas.js";
export type MollifierBufferOptions = {
redisOptions: RedisOptions;
logger?: Logger;
// Grace TTL applied to the entry hash on drainer ack. The entry survives
// this long after materialisation so direct reads (retrieve, trace, etc.)
// have a safety net while PG replica lag settles. Defaults to 30s.
ackGraceTtlSeconds?: number;
// ioredis per-request retry limit on the buffer's Redis client. Defaults to 20.
maxRetriesPerRequest?: number;
// Reconnect backoff envelope (see `mollifierReconnectDelayMs`). Defaults
// to 50ms per attempt capped at 1000ms.
reconnectStepMs?: number;
reconnectMaxMs?: number;
};

// Grace TTL applied to the entry hash on drainer ack. The entry survives
// this long after materialisation so direct reads (retrieve, trace, etc.)
// have a safety net while PG replica lag settles.
const ACK_GRACE_TTL_SECONDS = 30;
// Default grace TTL applied to the entry hash on drainer ack.
const DEFAULT_ACK_GRACE_TTL_SECONDS = 30;
// Default ioredis reconnect backoff envelope for the buffer client.
const DEFAULT_RECONNECT_STEP_MS = 50;
const DEFAULT_RECONNECT_MAX_MS = 1000;
// Default ioredis per-request retry limit.
const DEFAULT_MAX_RETRIES_PER_REQUEST = 20;

// Observability-only sorted set of entries currently in DRAINING state
// (popped by the drainer, not yet acked/failed/requeued). Score is the
Expand All @@ -41,8 +54,10 @@ export const DRAINING_SET_KEY = "mollifier:draining";
export function mollifierReconnectDelayMs(
times: number,
random: () => number = Math.random,
stepMs: number = DEFAULT_RECONNECT_STEP_MS,
maxMs: number = DEFAULT_RECONNECT_MAX_MS,
): number {
const base = Math.min(times * 50, 1000);
const base = Math.min(times * stepMs, maxMs);
const half = Math.floor(base / 2);
return half + Math.round(random() * (base - half));
}
Expand Down Expand Up @@ -125,17 +140,21 @@ export type IdempotencyClaimResult =
export class MollifierBuffer {
private readonly redis: Redis;
private readonly logger: Logger;
private readonly ackGraceTtlSeconds: number;

constructor(options: MollifierBufferOptions) {
this.logger = options.logger ?? new Logger("MollifierBuffer", "debug");
this.ackGraceTtlSeconds = options.ackGraceTtlSeconds ?? DEFAULT_ACK_GRACE_TTL_SECONDS;
const reconnectStepMs = options.reconnectStepMs ?? DEFAULT_RECONNECT_STEP_MS;
const reconnectMaxMs = options.reconnectMaxMs ?? DEFAULT_RECONNECT_MAX_MS;

this.redis = createRedisClient(
{
...options.redisOptions,
retryStrategy(times) {
return mollifierReconnectDelayMs(times);
return mollifierReconnectDelayMs(times, Math.random, reconnectStepMs, reconnectMaxMs);
},
maxRetriesPerRequest: 20,
maxRetriesPerRequest: options.maxRetriesPerRequest ?? DEFAULT_MAX_RETRIES_PER_REQUEST,
},
{
onError: (error) => {
Expand Down Expand Up @@ -506,7 +525,7 @@ export class MollifierBuffer {
await this.redis.ackMollifierEntry(
`mollifier:entries:${runId}`,
DRAINING_SET_KEY,
String(ACK_GRACE_TTL_SECONDS),
String(this.ackGraceTtlSeconds),
runId,
);
}
Expand Down
Loading
Loading