Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-blocking-waitpoint-race-condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix a race condition in the waitpoint system where a run could be blocked by a completed waitpoint but never be resumed because of an PostgreSQL MVCC issue. This was most likely to occur when creating a waitpoint via `wait.forToken()` at the exact same moment as completing the token with `wait.completeToken()`. Other types of waitpoints (timed, child runs) were not affected.
39 changes: 33 additions & 6 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,22 @@ export class WaitpointSystem {

/**
* Prevents a run from continuing until the waitpoint is completed.
*
* This method uses two separate SQL statements intentionally:
*
* 1. A CTE that INSERTs TaskRunWaitpoint rows (blocking connections) and
* _WaitpointRunConnections rows (historical connections).
*
* 2. A separate SELECT that checks if any of the requested waitpoints are still PENDING.
*
* These MUST be separate statements because of PostgreSQL MVCC in READ COMMITTED isolation:
* each statement gets its own snapshot. If a concurrent `completeWaitpoint` commits between
* the CTE starting and finishing, the CTE's snapshot won't see the COMPLETED status. By using
* a separate SELECT, we get a fresh snapshot that reflects the latest committed state.
*
* The pending check queries ALL requested waitpoint IDs (not just the ones actually inserted
* by the CTE). This is intentional: if a TaskRunWaitpoint row already existed (ON CONFLICT
* DO NOTHING skipped the insert), a still-PENDING waitpoint should still count as blocking.
*/
async blockRunWithWaitpoint({
runId,
Expand Down Expand Up @@ -399,8 +415,10 @@ export class WaitpointSystem {
return await this.$.runLock.lock("blockRunWithWaitpoint", [runId], async () => {
let snapshot: TaskRunExecutionSnapshot = await getLatestExecutionSnapshot(prisma, runId);

//block the run with the waitpoints, returning how many waitpoints are pending
const insert = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
// Insert the blocking connections and the historical run connections.
// We use a CTE to do both inserts atomically. Data-modifying CTEs are
// always executed regardless of whether they're referenced in the outer query.
await prisma.$queryRaw`
WITH inserted AS (
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
SELECT
Expand All @@ -425,12 +443,21 @@ export class WaitpointSystem {
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
)
SELECT COUNT(*) FROM inserted`;

// Check if the run is actually blocked using a separate query.
// This MUST be a separate statement from the CTE above because in READ COMMITTED
// isolation, each statement gets its own snapshot. The CTE's snapshot is taken when
// it starts, so if a concurrent completeWaitpoint commits during the CTE, the CTE
// won't see it. This fresh query gets a new snapshot that reflects the latest commits.
const pendingCheck = await prisma.$queryRaw<{ pending_count: BigInt }[]>`
SELECT COUNT(*) as pending_count
FROM inserted i
JOIN "Waitpoint" w ON w.id = i."waitpointId"
WHERE w.status = 'PENDING';`;
FROM "Waitpoint"
WHERE id IN (${Prisma.join($waitpoints)})
AND status = 'PENDING'
`;

const isRunBlocked = Number(insert.at(0)?.pending_count ?? 0) > 0;
const isRunBlocked = Number(pendingCheck.at(0)?.pending_count ?? 0) > 0;

let newStatus: TaskRunExecutionStatus = "SUSPENDED";
if (
Expand Down
Loading