Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/scheduled-run-region-display.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Scheduled runs now show under their correct region in the dashboard, run details, and the API, and match region filters, instead of appearing under a separate region.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/databa
import assertNever from "assert-never";
import { API_VERSIONS, CURRENT_API_VERSION, RunStatusUnspecifiedApiVersion } from "~/api/versions";
import { $replica, prisma } from "~/db.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import {
findRunByIdWithMollifierFallback,
Expand Down Expand Up @@ -519,7 +520,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
metadata,
region: run.workerQueue || undefined,
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
};
}

Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
import { findDisplayableEnvironment } from "~/models/runtimeEnvironment.server";
import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";
Expand Down Expand Up @@ -259,7 +260,7 @@ export class NextRunListPresenter {
name: run.queue.replace("task/", ""),
type: run.queue.startsWith("task/") ? "task" : "custom",
},
region: run.workerQueue ? run.workerQueue : undefined,
region: run.workerQueue ? baseWorkerQueue(run.workerQueue) : undefined,
taskKind: RunAnnotations.safeParse(run.annotations).data?.taskKind ?? "STANDARD",
};
}),
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
getUserProvidedIdempotencyKey,
} from "@trigger.dev/core/v3/serverOnly";
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { logger } from "~/services/logger.server";
import { rehydrateAttribute } from "~/v3/eventRepository/eventRepository.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
Expand Down Expand Up @@ -302,7 +303,7 @@ export class SpanPresenter extends BasePresenter {
location: true,
},
where: {
masterQueue: run.workerQueue,
masterQueue: baseWorkerQueue(run.workerQueue),
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
type SyntheticReplayTaskRun,
} from "~/v3/mollifier/syntheticReplayTaskRun.server";
import parseDuration from "parse-duration";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server";
import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server";
import { ReplayRunData } from "~/v3/replayTask";
Expand Down Expand Up @@ -209,7 +210,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
maxAttempts: run.maxAttempts,
maxDurationSeconds: run.maxDurationInSeconds,
machinePreset: run.machinePreset,
region: environment.type === "DEVELOPMENT" ? undefined : run.workerQueue,
region: environment.type === "DEVELOPMENT" ? undefined : baseWorkerQueue(run.workerQueue),
Comment thread
ericallam marked this conversation as resolved.
regions: regionsResult.regions,
ttlSeconds: run.ttl ? parseDuration(run.ttl, "s") ?? undefined : undefined,
idempotencyKey: run.idempotencyKey,
Expand Down
14 changes: 14 additions & 0 deletions apps/webapp/app/runEngine/concerns/workerQueueSplit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ import { FEATURE_FLAG, FeatureFlagCatalog } from "~/v3/featureFlags";
*/
export const SCHEDULED_WORKER_QUEUE_SUFFIX = ":scheduled";

/**
* Recover the base region a worker queue belongs to by stripping any split
* suffix (e.g. `us-nyc-3:scheduled` -> `us-nyc-3`). Region/masterQueue names are
* either `<name>` or `<projectId>-<name>` and never contain a colon, so the
* region is everything before the first `:`. Use this wherever a worker queue is
* read as a region — for display, filtering, or as a region override — so
* scheduled-split runs group under their real region instead of a phantom one.
* Idempotent; returns the input unchanged when there's no suffix.
*/
export function baseWorkerQueue(workerQueue: string): string {
const colon = workerQueue.indexOf(":");
return colon === -1 ? workerQueue : workerQueue.slice(0, colon);
}

/** `TriggerSource` value used for runs originating from a schedule. */
const SCHEDULE_TRIGGER_SOURCE = "schedule";

Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import EventEmitter from "node:events";
import pLimit from "p-limit";
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import {
isClickHouseJsonParseError,
parseRowNumberFromError,
Expand Down Expand Up @@ -1121,7 +1122,7 @@ export class RunsReplicationService {
event === "delete" ? 1 : 0, // _is_deleted
run.concurrencyKey ?? "", // concurrency_key
run.bulkActionGroupIds ?? [], // bulk_action_group_ids
run.masterQueue ?? "", // worker_queue
baseWorkerQueue(run.masterQueue ?? ""), // worker_queue (region; strip any split suffix like `:scheduled`)
Comment thread
ericallam marked this conversation as resolved.
run.maxDurationInSeconds ?? null, // max_duration_in_seconds
annotations?.triggerSource ?? "", // trigger_source
annotations?.rootTriggerSource ?? "", // root_trigger_source
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/services/replayTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from "@trigger.dev/core/v3";
import { type TaskRun } from "@trigger.dev/database";
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
import { baseWorkerQueue } from "~/runEngine/concerns/workerQueueSplit.server";
import { logger } from "~/services/logger.server";
import { BaseService } from "./baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server";
Expand Down Expand Up @@ -65,7 +66,9 @@ export class ReplayTaskRunService extends BaseService {
existingTaskRun.engine === "V1" ||
existingEnvironment.type === "DEVELOPMENT" ||
authenticatedEnvironment.type === "DEVELOPMENT";
const region = ignoreRegion ? undefined : overrideOptions.region ?? existingTaskRun.workerQueue;
const region = ignoreRegion
? undefined
: overrideOptions.region ?? baseWorkerQueue(existingTaskRun.workerQueue);
Comment thread
ericallam marked this conversation as resolved.

try {
const taskQueue = await this._prisma.taskQueue.findFirst({
Expand Down
37 changes: 37 additions & 0 deletions apps/webapp/test/workerQueueSplit.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, expect, it } from "vitest";
import {
baseWorkerQueue,
resolveScheduledQueueSplitEnabled,
workerQueueForRun,
workerQueueForClass,
Expand Down Expand Up @@ -98,6 +99,42 @@ describe("workerQueueForRun", () => {
});
});

describe("baseWorkerQueue", () => {
const region = "us-nyc-3";
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;

it("strips the scheduled split suffix back to the base region", () => {
expect(baseWorkerQueue(scheduled)).toBe(region);
});

it("leaves a base region untouched (idempotent)", () => {
expect(baseWorkerQueue(region)).toBe(region);
expect(baseWorkerQueue(baseWorkerQueue(scheduled))).toBe(region);
});

it("strips any future `:<class>` suffix, not just `:scheduled`", () => {
expect(baseWorkerQueue("us-nyc-3:priority")).toBe(region);
expect(baseWorkerQueue("us-nyc-3:a:b")).toBe(region);
});

it("handles the project-scoped masterQueue shape (`<projectId>-<name>`)", () => {
expect(baseWorkerQueue("proj_abc123-main:scheduled")).toBe("proj_abc123-main");
});

it("returns an empty string unchanged", () => {
expect(baseWorkerQueue("")).toBe("");
});

it("round-trips with workerQueueForRun: the split queue strips back to the region it came from", () => {
const enqueued = workerQueueForRun({
workerQueue: region,
rootTriggerSource: "schedule",
splitEnabled: true,
});
expect(baseWorkerQueue(enqueued)).toBe(region);
});
});

describe("workerQueueForClass", () => {
const region = "us-nyc-3";
const scheduled = `${region}${SCHEDULED_WORKER_QUEUE_SUFFIX}`;
Expand Down