Skip to content

Commit a340728

Browse files
authored
feat(engine): enqueue fast path; skip the queue under certain conditions (#3299)
## Summary Currently, every triggered run follows a two-step path through Redis: 1. **Enqueue** — A Lua script atomically adds the message to a queue sorted set (ordered by priority-adjusted timestamp) 2. **Dequeue** — A debounced `processQueueForWorkerQueue` job fires ~500ms later, checks concurrency limits, removes the message from the sorted set, and pushes it to a worker queue (Redis list) where workers pick it up via `BLPOP` This means every run pays at least ~500ms of latency between being triggered and being available for a worker to execute, even when the queue is empty and concurrency is wide open. ### What changed The enqueue Lua scripts now atomically decide whether to **skip the queue sorted set entirely** and push directly to the worker queue. This happens inside the same Lua script that handles normal enqueue, so the decision is atomic with respect to concurrency bookkeeping. A run takes the **fast path** when all of these are true: - **Fast path is enabled** for this worker queue (gated per `WorkerInstanceGroup`) - **No available messages** in the queue (`ZRANGEBYSCORE` finds nothing with score ≤ now) — this respects priority ordering and allows fast path even when the queue has future-scored messages (e.g. nacked retries with delay) - **Environment concurrency** has capacity - **Queue concurrency** has capacity (including per-concurrency-key limits for CK queues) When the fast path is taken: - The message is stored and pushed directly to the worker queue (`RPUSH`) - Concurrency slots are claimed (`SADD` to the same sets used by the normal dequeue path) - The `processQueueForWorkerQueue` job is **not scheduled** (no work to do) - TTL sorted set is skipped (the `expireRun` worker job handles TTL independently) When any condition fails, the existing slow path runs unchanged. ### Rollout gating - **Development environments**: Fast path is always enabled - **Production environments**: Gated by a new `enableFastPath` boolean on `WorkerInstanceGroup` (defaults to `false`), allowing region-by-region rollout ### Rolling deploy safety Each process registers its own Lua scripts via `defineCommand` (identified by SHA hash). Old and new processes never share scripts. The Redis data structures are fully compatible in both directions — ack, nack, and release operations work identically regardless of which path a message took. ## Test plan - [x] Fast path taken when queue is empty and concurrency available - [x] Slow path when `enableFastPath` is false - [x] Slow path when queue has available messages (respects priority ordering) - [x] Fast path when queue only has future-scored messages - [x] Slow path when env concurrency is full - [x] Fast-path message can be acknowledged correctly - [x] Fast-path message can be nacked and re-enqueued to the queue sorted set - [x] Run all existing run-queue tests (ack, nack, CK, concurrency sweeper, dequeue) to verify no regressions - [x] Typecheck passes for run-engine and webapp
1 parent 0e63f83 commit a340728

File tree

11 files changed

+762
-60
lines changed

11 files changed

+762
-60
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Reduce run start latency by skipping the intermediate queue when concurrency is available. This optimization is rolled out per-region and enabled automatically for development environments.

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,9 @@ export class DefaultQueueManager implements QueueManager {
295295
async getWorkerQueue(
296296
environment: AuthenticatedEnvironment,
297297
regionOverride?: string
298-
): Promise<string | undefined> {
298+
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined> {
299299
if (environment.type === "DEVELOPMENT") {
300-
return environment.id;
300+
return { masterQueue: environment.id, enableFastPath: true };
301301
}
302302

303303
const workerGroupService = new WorkerGroupService({
@@ -320,7 +320,10 @@ export class DefaultQueueManager implements QueueManager {
320320
throw new ServiceValidationError("No worker group found");
321321
}
322322

323-
return workerGroup.masterQueue;
323+
return {
324+
masterQueue: workerGroup.masterQueue,
325+
enableFastPath: workerGroup.enableFastPath,
326+
};
324327
}
325328
}
326329

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,12 @@ export class RunEngineTriggerTaskService {
298298

299299
const depth = parentRun ? parentRun.depth + 1 : 0;
300300

301-
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
301+
const workerQueueResult = await this.queueConcern.getWorkerQueue(
302+
environment,
303+
body.options?.region
304+
);
305+
const workerQueue = workerQueueResult?.masterQueue;
306+
const enableFastPath = workerQueueResult?.enableFastPath ?? false;
302307

303308
// Build annotations for this run
304309
const triggerSource = options.triggerSource ?? "api";
@@ -352,6 +357,7 @@ export class RunEngineTriggerTaskService {
352357
queue: queueName,
353358
lockedQueueId,
354359
workerQueue,
360+
enableFastPath,
355361
isTest: body.options?.test ?? false,
356362
delayUntil,
357363
queuedAt: delayUntil ? undefined : new Date(),

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export interface QueueManager {
7070
getWorkerQueue(
7171
env: AuthenticatedEnvironment,
7272
regionOverride?: string
73-
): Promise<string | undefined>;
73+
): Promise<{ masterQueue: string; enableFastPath: boolean } | undefined>;
7474
}
7575

7676
export interface PayloadProcessor {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "public"."WorkerInstanceGroup" ADD COLUMN "enableFastPath" BOOLEAN NOT NULL DEFAULT false;

internal-packages/database/prisma/schema.prisma

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,6 +1333,10 @@ model WorkerInstanceGroup {
13331333
13341334
workloadType WorkloadType @default(CONTAINER)
13351335
1336+
/// When true, runs enqueued to this worker queue may skip the intermediate queue
1337+
/// and be pushed directly to the worker queue when concurrency is available.
1338+
enableFastPath Boolean @default(false)
1339+
13361340
createdAt DateTime @default(now())
13371341
updatedAt DateTime @updatedAt
13381342
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ export class RunEngine {
462462
cliVersion,
463463
concurrencyKey,
464464
workerQueue,
465+
enableFastPath,
465466
queue,
466467
lockedQueueId,
467468
isTest,
@@ -799,6 +800,7 @@ export class RunEngine {
799800
tx: prisma,
800801
skipRunLock: true,
801802
includeTtl: true,
803+
enableFastPath,
802804
});
803805
} catch (enqueueError) {
804806
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export class EnqueueSystem {
3636
runnerId,
3737
skipRunLock,
3838
includeTtl = false,
39+
enableFastPath = false,
3940
}: {
4041
run: TaskRun;
4142
env: MinimalAuthenticatedEnvironment;
@@ -57,6 +58,8 @@ export class EnqueueSystem {
5758
skipRunLock?: boolean;
5859
/** When true, include TTL in the queued message (only for first enqueue from trigger). Default false. */
5960
includeTtl?: boolean;
61+
/** When true, allow the queue to push directly to worker queue if concurrency is available. */
62+
enableFastPath?: boolean;
6063
}) {
6164
const prisma = tx ?? this.$.prisma;
6265

@@ -98,6 +101,7 @@ export class EnqueueSystem {
98101
await this.$.runQueue.enqueueMessage({
99102
env,
100103
workerQueue,
104+
enableFastPath,
101105
message: {
102106
runId: run.id,
103107
taskIdentifier: run.taskIdentifier,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ export type TriggerParams = {
181181
cliVersion?: string;
182182
concurrencyKey?: string;
183183
workerQueue?: string;
184+
/** When true, the run queue may push directly to the worker queue if concurrency is available.
185+
* Gated per WorkerInstanceGroup (production) or always true (development). */
186+
enableFastPath?: boolean;
184187
queue: string;
185188
lockedQueueId?: string;
186189
isTest: boolean;

0 commit comments

Comments
 (0)