Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/fix-batch-waitpoint-lock-contention.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Reduce lock contention when processing large `batchTriggerAndWait` batches. Previously, each batch item acquired a Redis lock on the parent run to insert a `TaskRunWaitpoint` row, causing `LockAcquisitionTimeoutError` with high concurrency (880 errors/24h in prod). Since `blockRunWithCreatedBatch` already transitions the parent to `EXECUTING_WITH_WAITPOINTS` before items are processed, the per-item lock is unnecessary. The new `blockRunWithWaitpointLockless` method performs only the idempotent CTE insert without acquiring the lock.
37 changes: 26 additions & 11 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,17 +728,32 @@ export class RunEngine {

//triggerAndWait or batchTriggerAndWait
if (resumeParentOnCompletion && parentTaskRunId && taskRun.associatedWaitpoint) {
//this will block the parent run from continuing until this waitpoint is completed (and removed)
await this.waitpointSystem.blockRunWithWaitpoint({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
organizationId: environment.organization.id,
batch,
workerId,
runnerId,
tx: prisma,
});
if (batch) {
// Batch path: lockless insert. The parent is already EXECUTING_WITH_WAITPOINTS
// from blockRunWithCreatedBatch, so we only need to insert the TaskRunWaitpoint
// row without acquiring the parent run lock. This avoids lock contention when
// processing large batches with high concurrency.
await this.waitpointSystem.blockRunWithWaitpointLockless({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
batch,
tx: prisma,
});
} else {
// Single triggerAndWait: acquire the parent run lock to safely transition
// the snapshot and insert the waitpoint
await this.waitpointSystem.blockRunWithWaitpoint({
runId: parentTaskRunId,
waitpoints: taskRun.associatedWaitpoint.id,
projectId: taskRun.associatedWaitpoint.projectId,
organizationId: environment.organization.id,
batch,
workerId,
runnerId,
tx: prisma,
});
}
}

if (taskRun.delayUntil) {
Expand Down
79 changes: 79 additions & 0 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,85 @@ export class WaitpointSystem {
});
}

/**
* Lockless version of blockRunWithWaitpoint for batch item processing.
*
* When processing batchTriggerAndWait items, blockRunWithCreatedBatch has already
* transitioned the parent run to EXECUTING_WITH_WAITPOINTS before any items are
* processed. Per-item calls to blockRunWithWaitpoint would all compete for the same
* parent run lock just to insert a TaskRunWaitpoint row — causing lock contention
* and LockAcquisitionTimeoutError with large batches.
*
* This method performs only the CTE insert (which is idempotent via ON CONFLICT DO
* NOTHING) and timeout scheduling, without acquiring the parent run lock.
*/
async blockRunWithWaitpointLockless({
runId,
waitpoints,
projectId,
timeout,
spanIdToComplete,
batch,
tx,
}: {
runId: string;
waitpoints: string | string[];
projectId: string;
timeout?: Date;
spanIdToComplete?: string;
batch: { id: string; index?: number };
tx?: PrismaClientOrTransaction;
}): Promise<void> {
const prisma = tx ?? this.$.prisma;
const $waitpoints = typeof waitpoints === "string" ? [waitpoints] : waitpoints;

// Insert the blocking connections and the historical run connections.
// No lock needed: ON CONFLICT DO NOTHING makes concurrent inserts safe,
// and the parent snapshot is already EXECUTING_WITH_WAITPOINTS from
// blockRunWithCreatedBatch.
await prisma.$queryRaw`
WITH inserted AS (
INSERT INTO "TaskRunWaitpoint" ("id", "taskRunId", "waitpointId", "projectId", "createdAt", "updatedAt", "spanIdToComplete", "batchId", "batchIndex")
SELECT
gen_random_uuid(),
${runId},
w.id,
${projectId},
NOW(),
NOW(),
${spanIdToComplete ?? null},
${batch.id},
${batch.index ?? null}
FROM "Waitpoint" w
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
RETURNING "waitpointId"
),
connected_runs AS (
INSERT INTO "_WaitpointRunConnections" ("A", "B")
SELECT ${runId}, w.id
FROM "Waitpoint" w
WHERE w.id IN (${Prisma.join($waitpoints)})
ON CONFLICT DO NOTHING
)
SELECT COUNT(*) FROM inserted`;

// Schedule timeout jobs if needed
if (timeout) {
for (const waitpoint of $waitpoints) {
await this.$.worker.enqueue({
id: `finishWaitpoint.${waitpoint}`,
job: "finishWaitpoint",
payload: {
waitpointId: waitpoint,
error: JSON.stringify(timeoutError(timeout)),
},
availableAt: timeout,
});
}
}
}

/**
* Blocks a run with a waitpoint and immediately completes the waitpoint.
*
Expand Down
Loading