Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/redis-worker": patch
---

`MollifierDrainer` accepts a `drainBatchSize` option (default 1) that lets a single env drain at full `concurrency`-parallelism per tick.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
6 changes: 6 additions & 0 deletions .server-changes/mollifier-drain-batch-size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Wire `TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE` (default 50) so single-env bursts drain at the full `DRAIN_CONCURRENCY` budget per tick instead of one entry per tick.
10 changes: 10 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,16 @@ const EnvironmentSchema = z
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),
TRIGGER_MOLLIFIER_DRAIN_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().positive().default(30_000),
TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK: z.coerce.number().int().positive().default(500),
// Per-env per-tick pop cap. The drainer rotates one env per org per
// tick; this bounds how many entries it pops from that env before
// dispatching them through the shared `DRAIN_CONCURRENCY`-bounded
// limiter. Default matches `DRAIN_CONCURRENCY` so a single-env burst
// uses the full handler-parallelism budget — for 20k buffered on one
// env this is the difference between ~17m (one-pop-per-tick × ~50ms)
// and ~20s (400 ticks × concurrent engine.trigger). Org/env fairness
// is preserved because the per-tick env selection is unchanged; only
// the in-env pop count grows.
TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE: z.coerce.number().int().positive().default(50),
// Periodic sweep that scans buffer queue LISTs for entries whose
// dwell exceeds the stale threshold. Independent of the drainer —
// its job is exactly to make a stuck/offline drainer visible to
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/mollifier/mollifierDrainer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
logger.debug("Initializing mollifier drainer", {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
});

const drainer = new MollifierDrainer<MollifierSnapshot>({
Expand All @@ -81,6 +82,7 @@ function initializeMollifierDrainer(): MollifierDrainer<MollifierSnapshot> {
concurrency: env.TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY,
maxAttempts: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS,
maxOrgsPerTick: env.TRIGGER_MOLLIFIER_DRAIN_MAX_ORGS_PER_TICK,
drainBatchSize: env.TRIGGER_MOLLIFIER_DRAIN_BATCH_SIZE,
isRetryable: isRetryablePgError,
});

Expand Down
Loading
Loading