Skip to content

Commit 947f33d

Browse files
authored
fix: downgrade queue size limit errors to warnings (#3243)
Queue limit ServiceValidationErrors were being logged at error level. These are expected validation rejections, not bugs. - Add logLevel property to ServiceValidationError (webapp + run-engine) - Set logLevel: warn on all queue limit throws - Schedule engine: detect queue limit failures and log as warn - Redis-worker: respect logLevel on thrown errors
1 parent 2037254 commit 947f33d

File tree

11 files changed

+113
-53
lines changed

11 files changed

+113
-53
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,9 @@ export class RunEngineTriggerTaskService {
264264

265265
if (!queueSizeGuard.ok) {
266266
throw new ServiceValidationError(
267-
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
267+
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
268+
undefined,
269+
"warn"
268270
);
269271
}
270272
}

apps/webapp/app/v3/scheduleEngine.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ScheduleEngine } from "@internal/schedule-engine";
2+
import type { TriggerScheduledTaskErrorType } from "@internal/schedule-engine";
23
import { stringifyIO } from "@trigger.dev/core/v3";
34
import { prisma } from "~/db.server";
45
import { env } from "~/env.server";
@@ -8,6 +9,7 @@ import { singleton } from "~/utils/singleton";
89
import { TriggerTaskService } from "./services/triggerTask.server";
910
import { meter, tracer } from "./tracer.server";
1011
import { workerQueue } from "~/services/worker.server";
12+
import { ServiceValidationError } from "./services/common.server";
1113

1214
export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);
1315

@@ -113,9 +115,20 @@ function createScheduleEngine() {
113115

114116
return { success: !!result };
115117
} catch (error) {
118+
const errorMessage = error instanceof Error ? error.message : String(error);
119+
let errorType: TriggerScheduledTaskErrorType = "SYSTEM_ERROR";
120+
121+
if (
122+
error instanceof ServiceValidationError &&
123+
errorMessage.includes("queue size limit for this environment has been reached")
124+
) {
125+
errorType = "QUEUE_LIMIT";
126+
}
127+
116128
return {
117129
success: false,
118-
error: error instanceof Error ? error.message : String(error),
130+
error: errorMessage,
131+
errorType,
119132
};
120133
}
121134
},

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,9 @@ export class BatchTriggerV3Service extends BaseService {
251251

252252
if (!queueSizeGuard.isWithinLimits) {
253253
throw new ServiceValidationError(
254-
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
254+
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
255+
undefined,
256+
"warn"
255257
);
256258
}
257259

apps/webapp/app/v3/services/common.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
export type ServiceValidationErrorLevel = "error" | "warn" | "info";
2+
13
export class ServiceValidationError extends Error {
2-
constructor(message: string, public status?: number) {
4+
constructor(
5+
message: string,
6+
public status?: number,
7+
public logLevel?: ServiceValidationErrorLevel
8+
) {
39
super(message);
410
this.name = "ServiceValidationError";
511
}

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ export class TriggerTaskServiceV1 extends BaseService {
134134

135135
if (!queueSizeGuard.isWithinLimits) {
136136
throw new ServiceValidationError(
137-
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
137+
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
138+
undefined,
139+
"warn"
138140
);
139141
}
140142
}

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ export function runStatusFromError(
6969
}
7070
}
7171

72+
export type ServiceValidationErrorLevel = "error" | "warn" | "info";
73+
7274
export class ServiceValidationError extends Error {
7375
constructor(
7476
message: string,
7577
public status?: number,
76-
public metadata?: Record<string, unknown>
78+
public metadata?: Record<string, unknown>,
79+
public logLevel?: ServiceValidationErrorLevel
7780
) {
7881
super(message);
7982
this.name = "ServiceValidationError";

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2545,7 +2545,7 @@ export class RunQueue {
25452545
return;
25462546
}
25472547

2548-
this.logger.info("Processing concurrency keys from stream", {
2548+
this.logger.debug("Processing concurrency keys from stream", {
25492549
keys: uniqueKeys,
25502550
});
25512551

@@ -2615,22 +2615,22 @@ export class RunQueue {
26152615
}
26162616

26172617
private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
2618-
this.logger.info("Processing concurrency set with runs", {
2618+
this.logger.debug("Processing concurrency set with runs", {
26192619
concurrencyKey,
2620-
runIds: runIds.slice(0, 5), // Log first 5 for debugging,
2620+
runIds: runIds.slice(0, 5),
26212621
runIdsLength: runIds.length,
26222622
});
26232623

26242624
// Call the callback to determine which runs are completed
26252625
const completedRuns = await this.options.concurrencySweeper?.callback(runIds);
26262626

26272627
if (!completedRuns) {
2628-
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
2628+
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
26292629
return;
26302630
}
26312631

26322632
if (completedRuns.length === 0) {
2633-
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
2633+
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
26342634
return;
26352635
}
26362636

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -497,17 +497,28 @@ export class ScheduleEngine {
497497

498498
span.setAttribute("trigger_success", true);
499499
} else {
500-
this.logger.error("Failed to trigger scheduled task", {
501-
instanceId: params.instanceId,
502-
taskIdentifier: instance.taskSchedule.taskIdentifier,
503-
durationMs: triggerDuration,
504-
error: result.error,
505-
});
500+
const isQueueLimit = result.errorType === "QUEUE_LIMIT";
501+
502+
if (isQueueLimit) {
503+
this.logger.warn("Scheduled task trigger skipped due to queue limit", {
504+
instanceId: params.instanceId,
505+
taskIdentifier: instance.taskSchedule.taskIdentifier,
506+
durationMs: triggerDuration,
507+
error: result.error,
508+
});
509+
} else {
510+
this.logger.error("Failed to trigger scheduled task", {
511+
instanceId: params.instanceId,
512+
taskIdentifier: instance.taskSchedule.taskIdentifier,
513+
durationMs: triggerDuration,
514+
error: result.error,
515+
});
516+
}
506517

507518
this.scheduleExecutionFailureCounter.add(1, {
508519
environment_type: environmentType,
509520
schedule_type: scheduleType,
510-
error_type: "task_failure",
521+
error_type: isQueueLimit ? "queue_limit" : "task_failure",
511522
});
512523

513524
span.setAttribute("trigger_success", false);

internal-packages/schedule-engine/src/engine/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,14 @@ export type TriggerScheduledTaskParams = {
2424
exactScheduleTime?: Date;
2525
};
2626

27+
export type TriggerScheduledTaskErrorType = "QUEUE_LIMIT" | "SYSTEM_ERROR";
28+
2729
export interface TriggerScheduledTaskCallback {
28-
(params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>;
30+
(params: TriggerScheduledTaskParams): Promise<{
31+
success: boolean;
32+
error?: string;
33+
errorType?: TriggerScheduledTaskErrorType;
34+
}>;
2935
}
3036

3137
export interface ScheduleEngineOptions {

internal-packages/schedule-engine/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ export type {
33
ScheduleEngineOptions,
44
TriggerScheduleParams,
55
TriggerScheduledTaskCallback,
6+
TriggerScheduledTaskErrorType,
67
} from "./engine/types.js";

0 commit comments

Comments
 (0)