Skip to content

Commit fc20dc2

Browse files
committed
fix(run-engine): move snapshot side effects and ttl ack out of the start-attempt transaction
1 parent 92277a5 commit fc20dc2

1 file changed

Lines changed: 33 additions & 23 deletions

File tree

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -459,29 +459,27 @@ export class RunAttemptSystem {
459459
},
460460
});
461461

462-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, {
463-
run,
464-
snapshot: {
465-
executionStatus: "EXECUTING",
466-
description: `Attempt created, starting execution${
467-
isWarmStart ? " (warm start)" : ""
468-
}`,
469-
},
470-
previousSnapshotId: latestSnapshot.id,
471-
environmentId: latestSnapshot.environmentId,
472-
environmentType: latestSnapshot.environmentType,
473-
projectId: latestSnapshot.projectId,
474-
organizationId: latestSnapshot.organizationId,
475-
batchId: latestSnapshot.batchId ?? undefined,
476-
completedWaitpoints: latestSnapshot.completedWaitpoints,
477-
workerId,
478-
runnerId,
479-
});
480-
481-
if (taskRun.ttl) {
482-
//don't expire the run, it's going to execute
483-
await this.$.worker.ack(`expireRun:${taskRun.id}`);
484-
}
462+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshotMutation(
463+
tx,
464+
{
465+
run,
466+
snapshot: {
467+
executionStatus: "EXECUTING",
468+
description: `Attempt created, starting execution${
469+
isWarmStart ? " (warm start)" : ""
470+
}`,
471+
},
472+
previousSnapshotId: latestSnapshot.id,
473+
environmentId: latestSnapshot.environmentId,
474+
environmentType: latestSnapshot.environmentType,
475+
projectId: latestSnapshot.projectId,
476+
organizationId: latestSnapshot.organizationId,
477+
batchId: latestSnapshot.batchId ?? undefined,
478+
completedWaitpoints: latestSnapshot.completedWaitpoints,
479+
workerId,
480+
runnerId,
481+
}
482+
);
485483

486484
return { updatedRun: run, snapshot: newSnapshot };
487485
},
@@ -510,6 +508,18 @@ export class RunAttemptSystem {
510508

511509
const { updatedRun, snapshot } = result;
512510

511+
if (taskRun.ttl) {
512+
//don't expire the run, it's going to execute
513+
await this.$.worker.ack(`expireRun:${taskRun.id}`);
514+
}
515+
516+
// Side effects must only run against a durably committed snapshot row.
517+
await this.executionSnapshotSystem.scheduleSnapshotSideEffects({
518+
snapshot,
519+
runId: taskRun.id,
520+
completedWaitpoints: latestSnapshot.completedWaitpoints,
521+
});
522+
513523
this.$.eventBus.emit("runAttemptStarted", {
514524
time: new Date(),
515525
run: {

0 commit comments

Comments
 (0)