Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

`MollifierDrainer` accepts a `drainBatchSize` option (default 1) that controls how many entries are popped per env per tick — in-flight handlers remain capped by the global `concurrency`.
6 changes: 6 additions & 0 deletions .server-changes/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Wire `TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE` (default 50) so single-env bursts drain at the full `DRAIN_CONCURRENCY` budget per tick instead of one entry per tick.
10 changes: 10 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,16 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Per-env per-tick pop cap. The drainer rotates one env per org per
// tick; this bounds how many entries it pops from that env before
// dispatching them through the shared `DRAIN_CONCURRENCY`-bounded
// limiter. Default matches `DRAIN_CONCURRENCY` so a single-env burst
// uses the full handler-parallelism budget — for 20k buffered on one
// env this is the difference between ~17m (one-pop-per-tick × ~50ms)
// and ~20s (400 ticks × concurrent engine.trigger). Org/env fairness
// is preserved because the per-tick env selection is unchanged; only
// the in-env pop count grows.
TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE: z.coerce.number().int().positive().default(50),
// Periodic sweep that scans buffer queue LISTs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
logger.debug("Initializing mollifier drainer", {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
});

const drainer = new MollifierDrainer<MollifierSnapshot>({
Expand All @@ -81,6 +82,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
isRetryable: isRetryablePgError,
});

Expand Down
Loading