Skip to content

Commit 0fb2869

Browse files
committed
feat(engine): enqueue fast path; skip the queue under certain conditions
1 parent b075678 commit 0fb2869

File tree

10 files changed

+754
-58
lines changed

10 files changed

+754
-58
lines changed

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,9 @@ export class DefaultQueueManager implements QueueManager {
254254
async getWorkerQueue(
255255
environment: AuthenticatedEnvironment,
256256
regionOverride?: string
257-
): Promise<string | undefined> {
257+
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined> {
258258
if (environment.type === "DEVELOPMENT") {
259-
return environment.id;
259+
return { masterQueue: environment.id, enableFastPath: true };
260260
}
261261

262262
const workerGroupService = new WorkerGroupService({
@@ -279,7 +279,10 @@ export class DefaultQueueManager implements QueueManager {
279279
throw new ServiceValidationError("No worker group found");
280280
}
281281

282-
return workerGroup.masterQueue;
282+
return {
283+
masterQueue: workerGroup.masterQueue,
284+
enableFastPath: workerGroup.enableFastPath,
285+
};
283286
}
284287
}
285288

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,12 @@ export class RunEngineTriggerTaskService {
290290

291291
const depth = parentRun ? parentRun.depth + 1 : 0;
292292

293-
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
293+
const workerQueueResult = await this.queueConcern.getWorkerQueue(
294+
environment,
295+
body.options?.region
296+
);
297+
const workerQueue = workerQueueResult?.masterQueue;
298+
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
294299

295300
// Build annotations for this run
296301
const triggerSource = options.triggerSource ?? "api";
@@ -344,6 +349,7 @@ export class RunEngineTriggerTaskService {
344349
queue: queueName,
345350
lockedQueueId,
346351
workerQueue,
352+
enableFastPath,
347353
isTest: body.options?.test ?? false,
348354
delayUntil,
349355
queuedAt: delayUntil ? undefined : new Date(),

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export interface QueueManager {
7070
getWorkerQueue(
7171
env: AuthenticatedEnvironment,
7272
regionOverride?: string
73-
): Promise<string | undefined>;
73+
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined>;
7474
}
7575

7676
export interface PayloadProcessor {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."WorkerInstanceGroup" ADD COLUMN "enableFastPath" BOOLEAN NOT NULL DEFAULT false;

internal-packages/database/prisma/schema.prisma

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,6 +1331,10 @@ model WorkerInstanceGroup {
13311331
13321332
workloadType WorkloadType @default(CONTAINER)
13331333
1334+
/// When true, runs enqueued to this worker queue may skip the intermediate queue
1335+
/// and be pushed directly to the worker queue when concurrency is available.
1336+
enableFastPath Boolean @default(false)
1337+
13341338
createdAt DateTime @default(now())
13351339
updatedAt DateTime @updatedAt
13361340
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ export class RunEngine {
462462
cliVersion,
463463
concurrencyKey,
464464
workerQueue,
465+
enableFastPath,
465466
queue,
466467
lockedQueueId,
467468
isTest,
@@ -799,6 +800,7 @@ export class RunEngine {
799800
tx: prisma,
800801
skipRunLock: true,
801802
includeTtl: true,
803+
enableFastPath,
802804
});
803805
} catch (enqueueError) {
804806
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class EnqueueSystem {
3636
runnerId,
3737
skipRunLock,
3838
includeTtl = false,
39+
enableFastPath = false,
3940
}: {
4041
run: TaskRun;
4142
env: MinimalAuthenticatedEnvironment;
@@ -57,6 +58,8 @@ export class EnqueueSystem {
5758
skipRunLock?: boolean;
5859
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
5960
includeTtl?: boolean;
61+
/** When true, allow the queue to push directly to worker queue if concurrency is available. */
62+
enableFastPath?: boolean;
6063
}) {
6164
const prisma = tx ?? this.$.prisma;
6265

@@ -98,6 +101,7 @@ export class EnqueueSystem {
98101
await this.$.runQueue.enqueueMessage({
99102
env,
100103
workerQueue,
104+
enableFastPath,
101105
message: {
102106
runId: run.id,
103107
taskIdentifier: run.taskIdentifier,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ export type TriggerParams = {
181181
cliVersion?: string;
182182
concurrencyKey?: string;
183183
workerQueue?: string;
184+
/** When true, the run queue may push directly to the worker queue if concurrency is available.
185+
* Gated per WorkerInstanceGroup (production) or always true (development). */
186+
enableFastPath?: boolean;
184187
queue: string;
185188
lockedQueueId?: string;
186189
isTest: boolean;

0 commit comments

Comments
 (0)