Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ export class RunEngineTriggerTaskService {

if (!queueSizeGuard.ok) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
15 changes: 14 additions & 1 deletion apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ScheduleEngine } from "@internal/schedule-engine";
import type { TriggerScheduledTaskErrorType } from "@internal/schedule-engine";
import { stringifyIO } from "@trigger.dev/core/v3";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
Expand All @@ -8,6 +9,7 @@ import { singleton } from "~/utils/singleton";
import { TriggerTaskService } from "./services/triggerTask.server";
import { meter, tracer } from "./tracer.server";
import { workerQueue } from "~/services/worker.server";
import { ServiceValidationError } from "./services/common.server";

export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);

Expand Down Expand Up @@ -111,9 +113,20 @@ function createScheduleEngine() {

return { success: !!result };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
let errorType: TriggerScheduledTaskErrorType = "SYSTEM_ERROR";

if (
error instanceof ServiceValidationError &&
errorMessage.includes("queue size limit for this environment has been reached")
) {
errorType = "QUEUE_LIMIT";
}

return {
success: false,
error: error instanceof Error ? error.message : String(error),
error: errorMessage,
errorType,
};
}
},
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ export class BatchTriggerV3Service extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/v3/services/common.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(message: string, public status?: number) {
constructor(
message: string,
public status?: number,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
}
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/triggerTaskV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ export class TriggerTaskServiceV1 extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ export function runStatusFromError(
}
}

export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(
message: string,
public status?: number,
public metadata?: Record<string, unknown>
public metadata?: Record<string, unknown>,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
Expand Down
10 changes: 5 additions & 5 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2545,7 +2545,7 @@ export class RunQueue {
return;
}

this.logger.info("Processing concurrency keys from stream", {
this.logger.debug("Processing concurrency keys from stream", {
keys: uniqueKeys,
});

Expand Down Expand Up @@ -2615,22 +2615,22 @@ export class RunQueue {
}

private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
this.logger.info("Processing concurrency set with runs", {
this.logger.debug("Processing concurrency set with runs", {
concurrencyKey,
runIds: runIds.slice(0, 5), // Log first 5 for debugging,
runIds: runIds.slice(0, 5),
runIdsLength: runIds.length,
});

// Call the callback to determine which runs are completed
const completedRuns = await this.options.concurrencySweeper?.callback(runIds);

if (!completedRuns) {
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
return;
}

if (completedRuns.length === 0) {
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
return;
}

Expand Down
25 changes: 18 additions & 7 deletions internal-packages/schedule-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,28 @@ export class ScheduleEngine {

span.setAttribute("trigger_success", true);
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
const isQueueLimit = result.errorType === "QUEUE_LIMIT";

if (isQueueLimit) {
this.logger.warn("Scheduled task trigger skipped due to queue limit", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
}

this.scheduleExecutionFailureCounter.add(1, {
environment_type: environmentType,
schedule_type: scheduleType,
error_type: "task_failure",
error_type: isQueueLimit ? "queue_limit" : "task_failure",
});

span.setAttribute("trigger_success", false);
Expand Down
8 changes: 7 additions & 1 deletion internal-packages/schedule-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ export type TriggerScheduledTaskParams = {
exactScheduleTime?: Date;
};

export type TriggerScheduledTaskErrorType = "QUEUE_LIMIT" | "SYSTEM_ERROR";

export interface TriggerScheduledTaskCallback {
(params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>;
(params: TriggerScheduledTaskParams): Promise<{
success: boolean;
error?: string;
errorType?: TriggerScheduledTaskErrorType;
}>;
}

export interface ScheduleEngineOptions {
Expand Down
1 change: 1 addition & 0 deletions internal-packages/schedule-engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export type {
ScheduleEngineOptions,
TriggerScheduleParams,
TriggerScheduledTaskCallback,
TriggerScheduledTaskErrorType,
} from "./engine/types.js";
23 changes: 17 additions & 6 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,8 @@ class Worker<TCatalog extends WorkerCatalog> {
const errorMessage = error instanceof Error ? error.message : String(error);

const shouldLogError = catalogItem.logErrors ?? true;
const errorLogLevel =
error && typeof error === "object" && "logLevel" in error ? error.logLevel : undefined;

const logAttributes = {
name: this.options.name,
Expand All @@ -906,10 +908,14 @@ class Worker<TCatalog extends WorkerCatalog> {
errorMessage,
};

if (shouldLogError) {
this.logger.error(`Worker error processing item`, logAttributes);
} else {
if (!shouldLogError) {
this.logger.info(`Worker failed to process item`, logAttributes);
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker error processing item`, logAttributes);
} else if (errorLogLevel === "info") {
this.logger.info(`Worker error processing item`, logAttributes);
} else {
this.logger.error(`Worker error processing item`, logAttributes);
}

// Attempt requeue logic.
Expand All @@ -922,13 +928,18 @@ class Worker<TCatalog extends WorkerCatalog> {
const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt);

if (!retryDelay) {
if (shouldLogError) {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
if (!shouldLogError || errorLogLevel === "info") {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
Expand Down
Loading