Skip to content

Commit 31bfb20

Browse files
Eliminate gap-lock deadlocks in run-scoped projector cleanup
Replace the destructive `DELETE WHERE workflow_run_id = ? AND id NOT IN (...)` shape across the four run-scoped projectors with a snapshot-read followed by a point-lock DELETE keyed on the clustered index. The legacy shape drives an InnoDB scan through the workflow_run_id secondary index and takes next-key (record + gap) locks across the whole run, so two concurrent projector runs for the same workflow form a lock cycle (#399 / #425). The new helper sorts the stale id list before delete to give two siblings a consistent acquisition order and retries up to five times with exponential backoff on deadlock / lock-wait-timeout to cover the residual case where a sibling held a lock the snapshot did not anticipate. Refs #399 #425 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4bb484b commit 31bfb20

6 files changed

Lines changed: 233 additions & 36 deletions

File tree

src/V2/Support/RunLineageProjector.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,7 @@ public static function project(
3838
$projected[] = self::projectEntry($lineageModel, $run, $entry, 'child', $position, $seen);
3939
}
4040

41-
$staleQuery = $lineageModel::query()
42-
->where('workflow_run_id', $run->id);
43-
44-
if ($seen === []) {
45-
$staleQuery->delete();
46-
} else {
47-
$staleQuery->whereNotIn('id', $seen)
48-
->delete();
49-
}
41+
StaleProjectionCleanup::forRun($lineageModel, $run->id, $seen);
5042

5143
$run->unsetRelation('lineageEntries');
5244

src/V2/Support/RunTimelineProjector.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,7 @@ public static function project(WorkflowRun $run, ?array $entries = null): array
6363
$projected[] = $row;
6464
}
6565

66-
$staleQuery = $entryModel::query()
67-
->where('workflow_run_id', $run->id);
68-
69-
if ($seen === []) {
70-
$staleQuery->delete();
71-
} else {
72-
$staleQuery->whereNotIn('id', $seen)
73-
->delete();
74-
}
66+
StaleProjectionCleanup::forRun($entryModel, $run->id, $seen);
7567

7668
$run->unsetRelation('timelineEntries');
7769

src/V2/Support/RunTimerProjector.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,7 @@ public static function project(WorkflowRun $run, ?array $timers = null): array
6666
$projected[] = $row;
6767
}
6868

69-
$staleQuery = $entryModel::query()
70-
->where('workflow_run_id', $run->id);
71-
72-
if ($seen === []) {
73-
$staleQuery->delete();
74-
} else {
75-
$staleQuery->whereNotIn('id', $seen)
76-
->delete();
77-
}
69+
StaleProjectionCleanup::forRun($entryModel, $run->id, $seen);
7870

7971
$run->unsetRelation('timerEntries');
8072

src/V2/Support/RunWaitProjector.php

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,7 @@ public static function project(WorkflowRun $run, ?array $waits = null): array
6969
$projected[] = $row;
7070
}
7171

72-
$staleQuery = $waitModel::query()
73-
->where('workflow_run_id', $run->id);
74-
75-
if ($seen === []) {
76-
$staleQuery->delete();
77-
} else {
78-
$staleQuery->whereNotIn('id', $seen)
79-
->delete();
80-
}
72+
StaleProjectionCleanup::forRun($waitModel, $run->id, $seen);
8173

8274
$run->unsetRelation('waits');
8375

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Illuminate\Database\Eloquent\Model;
8+
use Illuminate\Database\QueryException;
9+
use Throwable;
10+
11+
/**
12+
* Removes stale projection rows for a workflow run without taking secondary-index
13+
* gap locks.
14+
*
15+
* The legacy `DELETE FROM tbl WHERE workflow_run_id = ? AND id NOT IN (...)` shape
16+
* drives the scan through the `workflow_run_id` secondary index. Under InnoDB
17+
* REPEATABLE-READ that takes next-key (record + gap) locks over the scanned
18+
* range, so two concurrent projector runs for the same run form a lock cycle
19+
* (#399 / #425). This helper resolves the reconcile step via a snapshot read of
20+
* primary keys followed by a point-lock DELETE against the clustered index,
21+
* which takes record locks only and cannot gap-deadlock with a sibling projector.
22+
*
23+
* Stale ids are sorted before the DELETE so two concurrent projectors that
24+
* compute overlapping (but non-identical) stale sets acquire row locks in the
25+
* same order, removing the remaining cycle on the clustered index. A short
26+
* deadlock-retry loop covers the residual case where a sibling projector held
27+
* a lock the snapshot did not anticipate.
28+
*/
29+
final class StaleProjectionCleanup
30+
{
31+
private const MAX_ATTEMPTS = 5;
32+
33+
/**
34+
* @param class-string<Model> $model
35+
* @param list<string> $seen
36+
*/
37+
public static function forRun(string $model, string $runId, array $seen): void
38+
{
39+
for ($attempt = 1; $attempt <= self::MAX_ATTEMPTS; $attempt++) {
40+
try {
41+
self::pruneOnce($model, $runId, $seen);
42+
43+
return;
44+
} catch (QueryException $e) {
45+
if (! self::isConcurrencyError($e) || $attempt === self::MAX_ATTEMPTS) {
46+
throw $e;
47+
}
48+
49+
usleep(self::backoffMicroseconds($attempt));
50+
}
51+
}
52+
}
53+
54+
/**
55+
* @param class-string<Model> $model
56+
* @param list<string> $seen
57+
*/
58+
private static function pruneOnce(string $model, string $runId, array $seen): void
59+
{
60+
/** @var list<string> $existingIds */
61+
$existingIds = $model::query()
62+
->where('workflow_run_id', $runId)
63+
->pluck('id')
64+
->all();
65+
66+
if ($existingIds === []) {
67+
return;
68+
}
69+
70+
$staleIds = $seen === []
71+
? $existingIds
72+
: array_values(array_diff($existingIds, $seen));
73+
74+
if ($staleIds === []) {
75+
return;
76+
}
77+
78+
sort($staleIds);
79+
80+
$model::query()
81+
->whereIn('id', $staleIds)
82+
->delete();
83+
}
84+
85+
private static function isConcurrencyError(Throwable $e): bool
86+
{
87+
$sqlState = (string) ($e->errorInfo[0] ?? '');
88+
$driverCode = (int) ($e->errorInfo[1] ?? 0);
89+
90+
if ($sqlState === '40001' || $sqlState === '40P01') {
91+
return true;
92+
}
93+
94+
// MySQL: 1213 deadlock, 1205 lock wait timeout
95+
if ($driverCode === 1213 || $driverCode === 1205) {
96+
return true;
97+
}
98+
99+
$message = strtolower($e->getMessage());
100+
101+
return str_contains($message, 'deadlock')
102+
|| str_contains($message, 'lock wait timeout')
103+
|| str_contains($message, 'try restarting transaction');
104+
}
105+
106+
private static function backoffMicroseconds(int $attempt): int
107+
{
108+
// 5ms, 10ms, 20ms, 40ms (capped) with small jitter to break ties.
109+
$base = min(40_000, 5_000 * (1 << ($attempt - 1)));
110+
111+
return $base + random_int(0, 2_000);
112+
}
113+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Unit\V2;
6+
7+
use Illuminate\Support\Str;
8+
use Tests\TestCase;
9+
use Workflow\V2\Models\WorkflowInstance;
10+
use Workflow\V2\Models\WorkflowRun;
11+
use Workflow\V2\Models\WorkflowTimelineEntry;
12+
use Workflow\V2\Support\StaleProjectionCleanup;
13+
14+
final class StaleProjectionCleanupTest extends TestCase
15+
{
16+
public function testRemovesOnlyPrunedRowsAndLeavesSeenRowsIntact(): void
17+
{
18+
$run = $this->seedRun();
19+
20+
$keepA = $this->seedTimelineEntry($run, 'keep-a');
21+
$keepB = $this->seedTimelineEntry($run, 'keep-b');
22+
$stale = $this->seedTimelineEntry($run, 'stale');
23+
24+
StaleProjectionCleanup::forRun(WorkflowTimelineEntry::class, $run->id, [$keepA->id, $keepB->id]);
25+
26+
$this->assertDatabaseHas('workflow_run_timeline_entries', [
27+
'id' => $keepA->id,
28+
]);
29+
$this->assertDatabaseHas('workflow_run_timeline_entries', [
30+
'id' => $keepB->id,
31+
]);
32+
$this->assertDatabaseMissing('workflow_run_timeline_entries', [
33+
'id' => $stale->id,
34+
]);
35+
}
36+
37+
public function testRemovesAllRowsWhenSeenIsEmpty(): void
38+
{
39+
$run = $this->seedRun();
40+
41+
$this->seedTimelineEntry($run, 'doomed-a');
42+
$this->seedTimelineEntry($run, 'doomed-b');
43+
44+
StaleProjectionCleanup::forRun(WorkflowTimelineEntry::class, $run->id, []);
45+
46+
$this->assertSame(0, WorkflowTimelineEntry::query()->where('workflow_run_id', $run->id)->count());
47+
}
48+
49+
public function testNoopWhenNoRowsExistForRun(): void
50+
{
51+
$run = $this->seedRun();
52+
53+
StaleProjectionCleanup::forRun(WorkflowTimelineEntry::class, $run->id, ['unused-id']);
54+
55+
$this->assertSame(0, WorkflowTimelineEntry::query()->where('workflow_run_id', $run->id)->count());
56+
}
57+
58+
public function testDoesNotAffectRowsForOtherRuns(): void
59+
{
60+
$runA = $this->seedRun();
61+
$runB = $this->seedRun();
62+
63+
$runAEntry = $this->seedTimelineEntry($runA, 'run-a-entry');
64+
$runBEntry = $this->seedTimelineEntry($runB, 'run-b-entry');
65+
66+
StaleProjectionCleanup::forRun(WorkflowTimelineEntry::class, $runA->id, []);
67+
68+
$this->assertDatabaseMissing('workflow_run_timeline_entries', [
69+
'id' => $runAEntry->id,
70+
]);
71+
$this->assertDatabaseHas('workflow_run_timeline_entries', [
72+
'id' => $runBEntry->id,
73+
]);
74+
}
75+
76+
private function seedRun(): WorkflowRun
77+
{
78+
$instance = WorkflowInstance::query()->create([
79+
'id' => (string) Str::ulid(),
80+
'workflow_class' => 'App\\Fake\\Workflow',
81+
'workflow_type' => 'App\\Fake\\Workflow',
82+
'business_key' => null,
83+
'namespace' => 'default',
84+
]);
85+
86+
return WorkflowRun::query()->create([
87+
'id' => (string) Str::ulid(),
88+
'workflow_instance_id' => $instance->id,
89+
'run_number' => 1,
90+
'workflow_class' => $instance->workflow_class,
91+
'workflow_type' => $instance->workflow_type,
92+
'status' => 'pending',
93+
'connection' => null,
94+
'queue' => null,
95+
]);
96+
}
97+
98+
private function seedTimelineEntry(WorkflowRun $run, string $historyEventId): WorkflowTimelineEntry
99+
{
100+
return WorkflowTimelineEntry::query()->create([
101+
'id' => hash('sha256', $run->id . '|' . $historyEventId),
102+
'workflow_run_id' => $run->id,
103+
'workflow_instance_id' => $run->workflow_instance_id,
104+
'history_event_id' => $historyEventId,
105+
'sequence' => 0,
106+
'type' => 'TestEvent',
107+
'kind' => 'workflow',
108+
'entry_kind' => 'point',
109+
'source_kind' => null,
110+
'source_id' => null,
111+
'summary' => null,
112+
'recorded_at' => now(),
113+
'payload' => [],
114+
]);
115+
}
116+
}

0 commit comments

Comments
 (0)