Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/run-engine-single-ttl-path.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Route TTL expiration through the batch TTL path only. Removes the redundant per-run `expireRun` worker job, leaving the batch consumer as the single mechanism that flips runs to `EXPIRED` when their TTL elapses while still queued.
6 changes: 1 addition & 5 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,6 @@ export class RunEngine {
}
} else {
try {
if (taskRun.ttl) {
await this.ttlSystem.scheduleExpireRun({ runId: taskRun.id, ttl: taskRun.ttl });
}

await this.enqueueSystem.enqueueRun({
run: taskRun,
env: environment,
Expand All @@ -812,7 +808,7 @@ export class RunEngine {
enableFastPath,
});
} catch (enqueueError) {
this.logger.error("engine.trigger(): failed to schedule TTL or enqueue run", {
this.logger.error("engine.trigger(): failed to enqueue run", {
runId: taskRun.id,
friendlyId: taskRun.friendlyId,
taskIdentifier: taskRun.taskIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { startSpan } from "@internal/tracing";
import { SystemResources } from "./systems.js";
import { PrismaClientOrTransaction, TaskRun } from "@trigger.dev/database";
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic";
import { EnqueueSystem } from "./enqueueSystem.js";
import { ServiceValidationError } from "../errors.js";

Expand Down Expand Up @@ -145,12 +144,17 @@ export class DelayedRunSystem {
}

// Now we need to enqueue the run into the RunQueue
// Skip the lock in enqueueRun since we already hold it
// Skip the lock in enqueueRun since we already hold it.
// includeTtl: true so the run's TTL is armed from the moment it enters
// the queue (not from taskRun.createdAt). The TTL system tracks runs
// that are queued and have never started — delayed runs are first
// enqueued here, so this is the correct point to arm TTL.
await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
batchId: run.batchId ?? undefined,
skipRunLock: true,
includeTtl: true,
});

const queuedAt = new Date();
Expand Down Expand Up @@ -183,18 +187,6 @@ export class DelayedRunSystem {
},
});

if (run.ttl) {
const expireAt = parseNaturalLanguageDuration(run.ttl);

if (expireAt) {
await this.$.worker.enqueue({
id: `expireRun:${runId}`,
job: "expireRun",
payload: { runId },
availableAt: expireAt,
});
}
}
});
}

Expand Down
45 changes: 37 additions & 8 deletions internal-packages/run-engine/src/engine/tests/delays.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ describe("RunEngine delays", () => {
},
queue: {
redis: redisOptions,
ttlSystem: {
pollIntervalMs: 100,
batchSize: 10,
batchMaxWaitMs: 100,
},
},
runLock: {
redis: redisOptions,
Expand Down Expand Up @@ -230,7 +235,21 @@ describe("RunEngine delays", () => {
taskIdentifier
);

// TTL only expires runs still queued waiting on a concurrency slot.
// Once the delay elapses, the run gets enqueued; saturate env concurrency
// so it stays queued so the new TTL path can expire it.
await engine.runQueue.updateEnvConcurrencyLimits({
...authenticatedEnvironment,
maximumConcurrencyLimit: 0,
});

const enqueuedAfterDelayTimes: number[] = [];
engine.eventBus.on("runEnqueuedAfterDelay", () => {
enqueuedAfterDelayTimes.push(Date.now());
});

//trigger the run
const triggerTime = Date.now();
const run = await engine.trigger(
{
number: 1,
Expand All @@ -247,7 +266,7 @@ describe("RunEngine delays", () => {
queue: "task/test-task",
isTest: false,
tags: [],
delayUntil: new Date(Date.now() + 1000),
delayUntil: new Date(triggerTime + 1000),
ttl: "2s",
},
prisma
Expand All @@ -259,7 +278,7 @@ describe("RunEngine delays", () => {
expect(executionData.snapshot.executionStatus).toBe("DELAYED");
expect(run.status).toBe("DELAYED");

//wait for 1 seconds
//wait so the delay elapses and the run is enqueued
await setTimeout(2_500);

//should now be queued
Expand All @@ -273,19 +292,29 @@ describe("RunEngine delays", () => {

expect(run2.status).toBe("PENDING");

//wait for 3 seconds
await setTimeout(3_000);
// TTL is armed at queue-enter time (not from triggerTime). With a 2s TTL
// and a 1s delay, the run becomes eligible to expire ~3s after trigger.
// Confirm the TTL was not armed against triggerTime (i.e. didn't already
// fire while still DELAYED), and that the run only expires after the
// queue-enter timestamp + ttl has elapsed.
expect(enqueuedAfterDelayTimes.length).toBe(1);
const enqueuedAt = enqueuedAfterDelayTimes[0]!;
expect(enqueuedAt - triggerTime).toBeGreaterThanOrEqual(1000);

//should now be expired
const executionData3 = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData3);
expect(executionData3.snapshot.executionStatus).toBe("FINISHED");
//wait so the TTL fires (counted from when the run was enqueued)
await setTimeout(3_000);

// Status comes from the DB; the batch TTL path does not create
// execution snapshots, so getRunExecutionData may still show QUEUED.
const run3 = await prisma.taskRun.findFirstOrThrow({
where: { id: run.id },
});

expect(run3.status).toBe("EXPIRED");
assertNonNullable(run3.expiredAt);
// The expiry must happen after enqueue + ttl, not after trigger + ttl.
// Allow a small tolerance for poll interval + batch wait.
expect(run3.expiredAt.getTime()).toBeGreaterThanOrEqual(enqueuedAt + 2_000);
} finally {
await engine.quit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ describe("RunEngine lazy waitpoint creation", () => {
ttlSystem: {
pollIntervalMs: 100,
batchSize: 10,
batchMaxWaitMs: 100,
},
},
runLock: {
Expand All @@ -434,6 +435,12 @@ describe("RunEngine lazy waitpoint creation", () => {

await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

// TTL only expires runs still queued waiting on a concurrency slot.
await engine.runQueue.updateEnvConcurrencyLimits({
...authenticatedEnvironment,
maximumConcurrencyLimit: 0,
});

// Trigger a standalone run with TTL (no waitpoint)
const run = await engine.trigger(
{
Expand Down Expand Up @@ -467,11 +474,15 @@ describe("RunEngine lazy waitpoint creation", () => {
// Wait for TTL to expire
await setTimeout(1_500);

// Verify run expired successfully (no throw)
const executionData = await engine.getRunExecutionData({ runId: run.id });
assertNonNullable(executionData);
expect(executionData.run.status).toBe("EXPIRED");
expect(executionData.snapshot.executionStatus).toBe("FINISHED");
// Verify run expired successfully (no throw).
// The batch TTL path does not create execution snapshots, so check
// the status directly from the database rather than via
// getRunExecutionData.
const expiredRun = await prisma.taskRun.findUnique({
where: { id: run.id },
select: { status: true },
});
expect(expiredRun?.status).toBe("EXPIRED");
} finally {
await engine.quit();
}
Expand Down
Loading
Loading