Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ export class DefaultQueueManager implements QueueManager {
async getWorkerQueue(
environment: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined> {
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined> {
if (environment.type === "DEVELOPMENT") {
return environment.id;
return { masterQueue: environment.id, enableFastPath: true };
}

const workerGroupService = new WorkerGroupService({
Expand All @@ -279,7 +279,10 @@ export class DefaultQueueManager implements QueueManager {
throw new ServiceValidationError("No worker group found");
}

return workerGroup.masterQueue;
return {
masterQueue: workerGroup.masterQueue,
enableFastPath: workerGroup.enableFastPath,
};
}
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ export class RunEngineTriggerTaskService {

const depth = parentRun ? parentRun.depth + 1 : 0;

const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
const workerQueueResult = await this.queueConcern.getWorkerQueue(
environment,
body.options?.region
);
const workerQueue = workerQueueResult?.masterQueue;
const enableFastPath = workerQueueResult?.enableFastPath ?? false;

// Build annotations for this run
const triggerSource = options.triggerSource ?? "api";
Expand Down Expand Up @@ -344,6 +349,7 @@ export class RunEngineTriggerTaskService {
queue: queueName,
lockedQueueId,
workerQueue,
enableFastPath,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export interface QueueManager {
getWorkerQueue(
env: AuthenticatedEnvironment,
regionOverride?: string
): Promise<string | undefined>;
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined>;
}

export interface PayloadProcessor {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "public"."WorkerInstanceGroup" ADD COLUMN "enableFastPath" BOOLEAN NOT NULL DEFAULT false;
4 changes: 4 additions & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,10 @@ model WorkerInstanceGroup {

workloadType WorkloadType @default(CONTAINER)

/// When true, runs enqueued to this worker queue may skip the intermediate queue
/// and be pushed directly to the worker queue when concurrency is available.
enableFastPath Boolean @default(false)

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
Expand Down
2 changes: 2 additions & 0 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ export class RunEngine {
cliVersion,
concurrencyKey,
workerQueue,
enableFastPath,
queue,
lockedQueueId,
isTest,
Expand Down Expand Up @@ -799,6 +800,7 @@ export class RunEngine {
tx: prisma,
skipRunLock: true,
includeTtl: true,
enableFastPath,
});
} catch (enqueueError) {
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class EnqueueSystem {
runnerId,
skipRunLock,
includeTtl = false,
enableFastPath = false,
}: {
run: TaskRun;
env: MinimalAuthenticatedEnvironment;
Expand All @@ -57,6 +58,8 @@ export class EnqueueSystem {
skipRunLock?: boolean;
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
includeTtl?: boolean;
/** When true, allow the queue to push directly to worker queue if concurrency is available. */
enableFastPath?: boolean;
}) {
const prisma = tx ?? this.$.prisma;

Expand Down Expand Up @@ -98,6 +101,7 @@ export class EnqueueSystem {
await this.$.runQueue.enqueueMessage({
env,
workerQueue,
enableFastPath,
message: {
runId: run.id,
taskIdentifier: run.taskIdentifier,
Expand Down
3 changes: 3 additions & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ export type TriggerParams = {
cliVersion?: string;
concurrencyKey?: string;
workerQueue?: string;
/** When true, the run queue may push directly to the worker queue if concurrency is available.
* Gated per WorkerInstanceGroup (production) or always true (development). */
enableFastPath?: boolean;
queue: string;
lockedQueueId?: string;
isTest: boolean;
Expand Down
Loading
Loading