Skip to content

Commit ae46e3f

Browse files
authored
feat(server): New TTL system, enforce max queue length limits, lazy waitpoint creation (#2980)
This PR implements a new run TTL system and queue size limits to prevent unbounded queue growth which should help prevent situations where queues enter a "death spiral" where the queue will never be able to catch up. The main/correct way to battle this situation is to enforce a maximum TTL on all runs (e.g. up to 14 days) where runs that have been queued for that maximum TTL will get auto-expired, making room for newer runs to execute. This required creating a new TTL system that can handle higher workloads and is now deeply integrated into the RunQueue. When runs are enqueued with a TTL, they are added to their normal queue as well as to the TTL queue. When runs are dequeued, they are removed from both their normal queue and the TTL queue. If runs are dequeued by the TTL system, they are removed from their normal queue. Both these dequeues happen automatically so there is no race condition. The TTL expiration system is also made reliable by expiring runs via a Redis worker, which is enqueued to atomically inside the TTL dequeue lua script. ### Optional associated waitpoints Additionally, this PR implements an optimization where runs that aren't triggered with a dependent parent run will no longer create an associated waitpoint. Associated waitpoints are then lazily created if a dependent run wants to wait for the child run post-facto (via debounce or idempotency), which is a rare situation but is possible. This means fewer waitpoint creations but also fewer waitpoint completions for runs with no dependencies. ### Environment Queue Limits Prevents any single queue growing too large by enforcing queue size limits at trigger time. - Queue size checks happen at trigger time - runs are rejected if queue would exceed limit - Dashboard UI shows queue limits on both the Queues page and a new Limits page - In-memory caching for queue size checks to reduce Redis load ### Batch trigger fixes Currently when a batch item cannot be created for whatever reason (e.g. queue limits) the run will never get created, which means a stalled run if using `batchTriggerAndWait`. We've updated the system to handle this differently: now when a batch item cannot be triggered and converted into a run, we will eventually (after retrying 8 times up to 30s) we will create a "pre-failed" run with the error details, correctly resolving the batchTriggerAndWait.
1 parent 69dc7bc commit ae46e3f

File tree

44 files changed

+5490
-627
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+5490
-627
lines changed

apps/webapp/app/env.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,9 @@ const EnvironmentSchema = z
547547

548548
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
549549
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
550+
QUEUE_SIZE_CACHE_TTL_MS: z.coerce.number().int().optional().default(1_000), // 1 second
551+
QUEUE_SIZE_CACHE_MAX_SIZE: z.coerce.number().int().optional().default(5_000),
552+
QUEUE_SIZE_CACHE_ENABLED: z.coerce.number().int().optional().default(1),
550553
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
551554
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
552555

@@ -603,6 +606,19 @@ const EnvironmentSchema = z
603606
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
604607
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),
605608

609+
// TTL System settings for automatic run expiration
610+
RUN_ENGINE_TTL_SYSTEM_DISABLED: BoolEnv.default(false),
611+
RUN_ENGINE_TTL_SYSTEM_SHARD_COUNT: z.coerce.number().int().optional(),
612+
RUN_ENGINE_TTL_SYSTEM_POLL_INTERVAL_MS: z.coerce.number().int().default(1_000),
613+
RUN_ENGINE_TTL_SYSTEM_BATCH_SIZE: z.coerce.number().int().default(100),
614+
RUN_ENGINE_TTL_WORKER_CONCURRENCY: z.coerce.number().int().default(1),
615+
RUN_ENGINE_TTL_WORKER_BATCH_MAX_SIZE: z.coerce.number().int().default(50),
616+
RUN_ENGINE_TTL_WORKER_BATCH_MAX_WAIT_MS: z.coerce.number().int().default(5_000),
617+
618+
/** Optional maximum TTL for all runs (e.g. "14d"). If set, runs without an explicit TTL
619+
* will use this as their TTL, and runs with a TTL larger than this will be clamped. */
620+
RUN_ENGINE_DEFAULT_MAX_TTL: z.string().optional(),
621+
606622
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
607623
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
608624
RUN_ENGINE_RUN_LOCK_MAX_RETRIES: z.coerce.number().int().default(10),

apps/webapp/app/presenters/v3/EnvironmentQueuePresenter.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
22
import { marqs } from "~/v3/marqs/index.server";
33
import { engine } from "~/v3/runEngine.server";
4+
import { getQueueSizeLimit } from "~/v3/utils/queueLimits.server";
45
import { BasePresenter } from "./basePresenter.server";
56

67
export type Environment = {
@@ -9,6 +10,7 @@ export type Environment = {
910
concurrencyLimit: number;
1011
burstFactor: number;
1112
runsEnabled: boolean;
13+
queueSizeLimit: number | null;
1214
};
1315

1416
export class EnvironmentQueuePresenter extends BasePresenter {
@@ -30,19 +32,24 @@ export class EnvironmentQueuePresenter extends BasePresenter {
3032
},
3133
select: {
3234
runsEnabled: true,
35+
maximumDevQueueSize: true,
36+
maximumDeployedQueueSize: true,
3337
},
3438
});
3539

3640
if (!organization) {
3741
throw new Error("Organization not found");
3842
}
3943

44+
const queueSizeLimit = getQueueSizeLimit(environment.type, organization);
45+
4046
return {
4147
running,
4248
queued,
4349
concurrencyLimit: environment.maximumConcurrencyLimit,
4450
burstFactor: environment.concurrencyLimitBurstFactor.toNumber(),
4551
runsEnabled: environment.type === "DEVELOPMENT" || organization.runsEnabled,
52+
queueSizeLimit,
4653
};
4754
}
4855
}

apps/webapp/app/presenters/v3/LimitsPresenter.server.ts

Lines changed: 83 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Ratelimit } from "@upstash/ratelimit";
2+
import { RuntimeEnvironmentType } from "@trigger.dev/database";
23
import { createHash } from "node:crypto";
34
import { env } from "~/env.server";
45
import { getCurrentPlan } from "~/services/platform.v3.server";
@@ -12,6 +13,8 @@ import { BasePresenter } from "./basePresenter.server";
1213
import { singleton } from "~/utils/singleton";
1314
import { logger } from "~/services/logger.server";
1415
import { CheckScheduleService } from "~/v3/services/checkSchedule.server";
16+
import { engine } from "~/v3/runEngine.server";
17+
import { getQueueSizeLimit, getQueueSizeLimitSource } from "~/v3/utils/queueLimits.server";
1518

1619
// Create a singleton Redis client for rate limit queries
1720
const rateLimitRedisClient = singleton("rateLimitQueryRedisClient", () =>
@@ -66,8 +69,7 @@ export type LimitsResult = {
6669
logRetentionDays: QuotaInfo | null;
6770
realtimeConnections: QuotaInfo | null;
6871
batchProcessingConcurrency: QuotaInfo;
69-
devQueueSize: QuotaInfo;
70-
deployedQueueSize: QuotaInfo;
72+
queueSize: QuotaInfo;
7173
metricDashboards: QuotaInfo | null;
7274
metricWidgetsPerDashboard: QuotaInfo | null;
7375
queryPeriodDays: QuotaInfo | null;
@@ -87,11 +89,13 @@ export class LimitsPresenter extends BasePresenter {
8789
organizationId,
8890
projectId,
8991
environmentId,
92+
environmentType,
9093
environmentApiKey,
9194
}: {
9295
organizationId: string;
9396
projectId: string;
9497
environmentId: string;
98+
environmentType: RuntimeEnvironmentType;
9599
environmentApiKey: string;
96100
}): Promise<LimitsResult> {
97101
// Get organization with all limit-related fields
@@ -175,6 +179,30 @@ export class LimitsPresenter extends BasePresenter {
175179
batchRateLimitConfig
176180
);
177181

182+
// Get current queue size for this environment
183+
// We need the runtime environment fields for the engine query
184+
const runtimeEnv = await this._replica.runtimeEnvironment.findFirst({
185+
where: { id: environmentId },
186+
select: {
187+
id: true,
188+
maximumConcurrencyLimit: true,
189+
concurrencyLimitBurstFactor: true,
190+
},
191+
});
192+
193+
let currentQueueSize = 0;
194+
if (runtimeEnv) {
195+
const engineEnv = {
196+
id: runtimeEnv.id,
197+
type: environmentType,
198+
maximumConcurrencyLimit: runtimeEnv.maximumConcurrencyLimit,
199+
concurrencyLimitBurstFactor: runtimeEnv.concurrencyLimitBurstFactor,
200+
organization: { id: organizationId },
201+
project: { id: projectId },
202+
};
203+
currentQueueSize = (await engine.lengthOfEnvQueue(engineEnv)) ?? 0;
204+
}
205+
178206
// Get plan-level limits
179207
const schedulesLimit = limits?.schedules?.number ?? null;
180208
const teamMembersLimit = limits?.teamMembers?.number ?? null;
@@ -217,72 +245,72 @@ export class LimitsPresenter extends BasePresenter {
217245
schedules:
218246
schedulesLimit !== null
219247
? {
220-
name: "Schedules",
221-
description: "Maximum number of schedules per project",
222-
limit: schedulesLimit,
223-
currentUsage: scheduleCount,
224-
source: "plan",
225-
canExceed: limits?.schedules?.canExceed,
226-
isUpgradable: true,
227-
}
248+
name: "Schedules",
249+
description: "Maximum number of schedules per project",
250+
limit: schedulesLimit,
251+
currentUsage: scheduleCount,
252+
source: "plan",
253+
canExceed: limits?.schedules?.canExceed,
254+
isUpgradable: true,
255+
}
228256
: null,
229257
teamMembers:
230258
teamMembersLimit !== null
231259
? {
232-
name: "Team members",
233-
description: "Maximum number of team members in this organization",
234-
limit: teamMembersLimit,
235-
currentUsage: organization._count.members,
236-
source: "plan",
237-
canExceed: limits?.teamMembers?.canExceed,
238-
isUpgradable: true,
239-
}
260+
name: "Team members",
261+
description: "Maximum number of team members in this organization",
262+
limit: teamMembersLimit,
263+
currentUsage: organization._count.members,
264+
source: "plan",
265+
canExceed: limits?.teamMembers?.canExceed,
266+
isUpgradable: true,
267+
}
240268
: null,
241269
alerts:
242270
alertsLimit !== null
243271
? {
244-
name: "Alert channels",
245-
description: "Maximum number of alert channels per project",
246-
limit: alertsLimit,
247-
currentUsage: alertChannelCount,
248-
source: "plan",
249-
canExceed: limits?.alerts?.canExceed,
250-
isUpgradable: true,
251-
}
272+
name: "Alert channels",
273+
description: "Maximum number of alert channels per project",
274+
limit: alertsLimit,
275+
currentUsage: alertChannelCount,
276+
source: "plan",
277+
canExceed: limits?.alerts?.canExceed,
278+
isUpgradable: true,
279+
}
252280
: null,
253281
branches:
254282
branchesLimit !== null
255283
? {
256-
name: "Preview branches",
257-
description: "Maximum number of active preview branches per project",
258-
limit: branchesLimit,
259-
currentUsage: activeBranchCount,
260-
source: "plan",
261-
canExceed: limits?.branches?.canExceed,
262-
isUpgradable: true,
263-
}
284+
name: "Preview branches",
285+
description: "Maximum number of active preview branches per project",
286+
limit: branchesLimit,
287+
currentUsage: activeBranchCount,
288+
source: "plan",
289+
canExceed: limits?.branches?.canExceed,
290+
isUpgradable: true,
291+
}
264292
: null,
265293
logRetentionDays:
266294
logRetentionDaysLimit !== null
267295
? {
268-
name: "Log retention",
269-
description: "Number of days logs are retained",
270-
limit: logRetentionDaysLimit,
271-
currentUsage: 0, // Not applicable - this is a duration, not a count
272-
source: "plan",
273-
}
296+
name: "Log retention",
297+
description: "Number of days logs are retained",
298+
limit: logRetentionDaysLimit,
299+
currentUsage: 0, // Not applicable - this is a duration, not a count
300+
source: "plan",
301+
}
274302
: null,
275303
realtimeConnections:
276304
realtimeConnectionsLimit !== null
277305
? {
278-
name: "Realtime connections",
279-
description: "Maximum concurrent Realtime connections",
280-
limit: realtimeConnectionsLimit,
281-
currentUsage: 0, // Would need to query realtime service for this
282-
source: "plan",
283-
canExceed: limits?.realtimeConcurrentConnections?.canExceed,
284-
isUpgradable: true,
285-
}
306+
name: "Realtime connections",
307+
description: "Maximum concurrent Realtime connections",
308+
limit: realtimeConnectionsLimit,
309+
currentUsage: 0, // Would need to query realtime service for this
310+
source: "plan",
311+
canExceed: limits?.realtimeConcurrentConnections?.canExceed,
312+
isUpgradable: true,
313+
}
286314
: null,
287315
batchProcessingConcurrency: {
288316
name: "Batch processing concurrency",
@@ -293,19 +321,13 @@ export class LimitsPresenter extends BasePresenter {
293321
canExceed: true,
294322
isUpgradable: true,
295323
},
296-
devQueueSize: {
297-
name: "Dev queue size",
298-
description: "Maximum pending runs in development environments",
299-
limit: organization.maximumDevQueueSize ?? null,
300-
currentUsage: 0, // Would need to query Redis for this
301-
source: organization.maximumDevQueueSize ? "override" : "default",
302-
},
303-
deployedQueueSize: {
304-
name: "Deployed queue size",
305-
description: "Maximum pending runs in deployed environments",
306-
limit: organization.maximumDeployedQueueSize ?? null,
307-
currentUsage: 0, // Would need to query Redis for this
308-
source: organization.maximumDeployedQueueSize ? "override" : "default",
324+
queueSize: {
325+
name: "Max queued runs",
326+
description: "Maximum pending runs per individual queue in this environment",
327+
limit: getQueueSizeLimit(environmentType, organization),
328+
currentUsage: currentQueueSize,
329+
source: getQueueSizeLimitSource(environmentType, organization),
330+
isUpgradable: true,
309331
},
310332
metricDashboards:
311333
metricDashboardsLimit !== null

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.limits/route.tsx

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
8282
organizationId: project.organizationId,
8383
projectId: project.id,
8484
environmentId: environment.id,
85+
environmentType: environment.type,
8586
environmentApiKey: environment.apiKey,
8687
})
8788
);
@@ -507,9 +508,8 @@ function QuotasSection({
507508
// Include batch processing concurrency
508509
quotaRows.push(quotas.batchProcessingConcurrency);
509510

510-
// Add queue size quotas if set
511-
if (quotas.devQueueSize.limit !== null) quotaRows.push(quotas.devQueueSize);
512-
if (quotas.deployedQueueSize.limit !== null) quotaRows.push(quotas.deployedQueueSize);
511+
// Add queue size quota if set
512+
if (quotas.queueSize.limit !== null) quotaRows.push(quotas.queueSize);
513513

514514
// Metric & query quotas
515515
if (quotas.metricDashboards) quotaRows.push(quotas.metricDashboards);
@@ -565,8 +565,11 @@ function QuotaRow({
565565
const isDurationQuota = quota.name === "Log retention" || quota.name === "Query period";
566566
const isPerItemQuota = quota.name === "Charts per dashboard";
567567
const isRetentionQuota = isDurationQuota || isPerItemQuota;
568+
const isQueueSizeQuota = quota.name === "Max queued runs";
569+
const hideCurrentUsage = isRetentionQuota || isQueueSizeQuota;
570+
568571
const percentage =
569-
!isRetentionQuota && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
572+
!hideCurrentUsage && quota.limit && quota.limit > 0 ? quota.currentUsage / quota.limit : null;
570573

571574
// Special handling for duration-based quotas (Log retention, Query period)
572575
if (isDurationQuota) {
@@ -667,10 +670,10 @@ function QuotaRow({
667670
alignment="right"
668671
className={cn(
669672
"tabular-nums",
670-
isRetentionQuota ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
673+
hideCurrentUsage ? "text-text-dimmed" : getUsageColorClass(percentage, "usage")
671674
)}
672675
>
673-
{isRetentionQuota ? "–" : formatNumber(quota.currentUsage)}
676+
{hideCurrentUsage ? "–" : formatNumber(quota.currentUsage)}
674677
</TableCell>
675678
<TableCell alignment="right">
676679
<SourceBadge source={quota.source} />

0 commit comments

Comments
 (0)