Skip to content

Commit 7c143dd

Browse files
committed
Ensure lazy waitpoints are always created and completed even if the existing run is already complete
1 parent d424754 commit 7c143dd

File tree

4 files changed

+336
-30
lines changed

4 files changed

+336
-30
lines changed

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ export class IdempotencyKeyConcern {
9494
});
9595
}
9696

97-
// If run already completed, return without blocking
98-
if (!associatedWaitpoint) {
99-
return { isCached: true, run: existingRun };
100-
}
101-
10297
await this.traceEventConcern.traceIdempotentRun(
10398
request,
10499
parentStore,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -540,11 +540,6 @@ export class RunEngine {
540540
});
541541
}
542542

543-
// If run already completed, return without blocking
544-
if (!waitpoint) {
545-
return debounceResult.run;
546-
}
547-
548543
// Call the onDebounced callback to create a span and get spanIdToComplete
549544
let spanIdToComplete: string | undefined;
550545
if (onDebounced) {
@@ -1460,8 +1455,8 @@ export class RunEngine {
14601455
* Gets an existing run waitpoint or creates one lazily.
14611456
* Used for debounce/idempotency when a late-arriving triggerAndWait caller
14621457
* needs to block on an existing run that was created without a waitpoint.
1463-
*
1464-
* Returns null if the run has already completed (caller should return result directly).
1458+
* When the run has already completed, creates the waitpoint and immediately
1459+
* completes it with the run's output/error so the parent can resume.
14651460
*/
14661461
async getOrCreateRunWaitpoint({
14671462
runId,
@@ -1471,7 +1466,7 @@ export class RunEngine {
14711466
runId: string;
14721467
projectId: string;
14731468
environmentId: string;
1474-
}): Promise<Waitpoint | null> {
1469+
}): Promise<Waitpoint> {
14751470
return this.waitpointSystem.getOrCreateRunWaitpoint({
14761471
runId,
14771472
projectId,

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
Prisma,
55
PrismaClientOrTransaction,
66
TaskQueue,
7+
TaskRun,
78
TaskRunExecutionSnapshot,
89
TaskRunExecutionStatus,
910
Waitpoint,
@@ -809,12 +810,37 @@ export class WaitpointSystem {
809810
};
810811
}
811812

813+
/**
814+
* Builds the waitpoint output payload from a completed run's stored output/error.
815+
*/
816+
#buildWaitpointOutputFromRun(
817+
run: Pick<TaskRun, "status" | "output" | "outputType" | "error">
818+
): { value: string; type?: string; isError: boolean } | undefined {
819+
if (run.status === "COMPLETED_SUCCESSFULLY") {
820+
if (run.output == null) {
821+
return undefined;
822+
}
823+
return {
824+
value: run.output,
825+
type: run.outputType ?? undefined,
826+
isError: false,
827+
};
828+
}
829+
if (isFinalRunStatus(run.status)) {
830+
return {
831+
value: JSON.stringify(run.error ?? {}),
832+
isError: true,
833+
};
834+
}
835+
return undefined;
836+
}
837+
812838
/**
813839
* Gets an existing run waitpoint or creates one lazily.
814840
* Used for debounce/idempotency when a late-arriving triggerAndWait caller
815841
* needs to block on an existing run that was created without a waitpoint.
816-
*
817-
* Returns null if the run has already completed (caller should return result directly).
842+
* When the run has already completed, creates the waitpoint and immediately
843+
* completes it with the run's output/error so the parent can resume.
818844
*/
819845
public async getOrCreateRunWaitpoint({
820846
runId,
@@ -824,7 +850,7 @@ export class WaitpointSystem {
824850
runId: string;
825851
projectId: string;
826852
environmentId: string;
827-
}): Promise<Waitpoint | null> {
853+
}): Promise<Waitpoint> {
828854
// Fast path: check if waitpoint already exists
829855
const run = await this.$.prisma.taskRun.findFirst({
830856
where: { id: runId },
@@ -839,15 +865,12 @@ export class WaitpointSystem {
839865
return run.associatedWaitpoint;
840866
}
841867

842-
// Run already completed - no waitpoint needed
843-
if (isFinalRunStatus(run.status)) {
844-
return null;
845-
}
846-
847-
// Need to create - use run lock to prevent races
868+
// Need to create - use run lock to prevent races (operational decisions use latest snapshot inside lock)
848869
return this.$.runLock.lock("getOrCreateRunWaitpoint", [runId], async () => {
870+
const prisma = this.$.prisma;
871+
849872
// Double-check after acquiring lock
850-
const runAfterLock = await this.$.prisma.taskRun.findFirst({
873+
const runAfterLock = await prisma.taskRun.findFirst({
851874
where: { id: runId },
852875
include: { associatedWaitpoint: true },
853876
});
@@ -860,19 +883,30 @@ export class WaitpointSystem {
860883
return runAfterLock.associatedWaitpoint;
861884
}
862885

863-
if (isFinalRunStatus(runAfterLock.status)) {
864-
return null;
865-
}
886+
// Operational decision: use latest execution snapshot, not TaskRun status
887+
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
866888

867889
// Create waitpoint and link to run atomically
868890
const waitpointData = this.buildRunAssociatedWaitpoint({ projectId, environmentId });
869891

870-
return this.$.prisma.waitpoint.create({
892+
const waitpoint = await prisma.waitpoint.create({
871893
data: {
872894
...waitpointData,
873895
completedByTaskRunId: runId,
874896
},
875897
});
898+
899+
// If run has already finished (per snapshot), complete the waitpoint immediately so the parent can resume
900+
if (snapshot.executionStatus === "FINISHED") {
901+
const output = this.#buildWaitpointOutputFromRun(runAfterLock);
902+
const completed = await this.completeWaitpoint({
903+
id: waitpoint.id,
904+
output,
905+
});
906+
return completed;
907+
}
908+
909+
return waitpoint;
876910
});
877911
}
878912
}

0 commit comments

Comments
 (0)