Skip to content

Commit 6f11ce0

Browse files
Make projector upserts idempotent under concurrent dispatch (#438, #436)
#438: parallel fan-out (e.g. 100 children completing in one all([...]) group) strands runs because RunTimelineProjector.updateOrCreate races on the projection PK. firstOrNew + save splits SELECT and INSERT, so two workers both observe no row and both INSERT, and the loser hits SQLSTATE 23000 ER_DUP_ENTRY for the timeline row's primary key. The dispatch path propagates the exception, the workflow task fails, and the parent stays in waiting indefinitely while the queue length keeps growing. Wrap the per-row updateOrCreate in IdempotentProjectionUpsert, which retries on unique-key violations with bounded jittered backoff. After the racing writer wins, our retry's firstOrNew finds the row and falls through to UPDATE — no analogous race exists on UPDATE because it matches an existing PK rather than asserting absence. Apply the helper to the four sibling projectors that share the same updateOrCreate shape (timeline / wait / timer / lineage / summary). Sibling DELETE-side races for these projectors were already addressed by StaleProjectionCleanup (#425). #436: a bare PHP Error (e.g. "Non-static method ... cannot be called statically") could not be restored for replay because is_subclass_of returns false when the class itself IS Error — Error and Exception implement Throwable independently, so the base-class fallback picked Exception::class and reflection raised "Cannot access protected property Error::$message" against the rehydrated instance. Switch to is_a(..., Error::class, true) so Error itself, not just its subclasses, picks the Error reflection target. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e8a7300 commit 6f11ce0

9 files changed

Lines changed: 370 additions & 6 deletions

src/V2/Support/FailureFactory.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,12 @@ private static function restoreThrowable(string $class, array $payload): Throwab
602602

603603
/** @var Throwable $throwable */
604604
$throwable = $reflection->newInstanceWithoutConstructor();
605-
$baseClass = is_subclass_of($class, Error::class) ? Error::class : Exception::class;
605+
// Throwable's protected message/code/file/line/trace properties are declared
606+
// independently on Error and Exception (siblings, not parent/child). Use is_a
607+
// with the allow_string flag so Error itself — not just its subclasses — picks
608+
// the Error::class reflection target. Falling back to Exception here would
609+
// raise "Cannot access protected property Error::$message" (#436).
610+
$baseClass = is_a($class, Error::class, true) ? Error::class : Exception::class;
606611

607612
self::setThrowableProperty($throwable, $baseClass, 'message', $payload['message']);
608613
self::setThrowableProperty($throwable, $baseClass, 'code', $payload['code']);
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Illuminate\Database\Eloquent\Model;
8+
use Illuminate\Database\QueryException;
9+
use LogicException;
10+
use Throwable;
11+
12+
/**
13+
* Wraps a per-row updateOrCreate with retry on unique-key violations so the
14+
* timeline / wait / timer / lineage / summary projectors stay correct when
15+
* two workers race the same projection row.
16+
*
17+
* The race shape (#438): updateOrCreate is firstOrNew + save. Two concurrent
18+
* callers both observe no row, both INSERT, and the loser hits SQLSTATE 23000
19+
* (MySQL/SQLite) or 23505 (Postgres) on the projection's primary or unique
20+
* key. On retry, firstOrNew sees the row the winning side just wrote and the
21+
* second call falls through to UPDATE, which has no analogous race because
22+
* UPDATE does not collide on existing primary keys.
23+
*
24+
* Sibling DELETE-side races for these projectors are handled by
25+
* {@see StaleProjectionCleanup} (#425).
26+
*/
27+
final class IdempotentProjectionUpsert
28+
{
29+
private const MAX_ATTEMPTS = 5;
30+
31+
/**
32+
* @template TModel of Model
33+
* @param class-string<TModel> $model
34+
* @param array<string, mixed> $key
35+
* @param array<string, mixed> $values
36+
* @return TModel
37+
*/
38+
public static function upsert(string $model, array $key, array $values): Model
39+
{
40+
for ($attempt = 1; $attempt <= self::MAX_ATTEMPTS; $attempt++) {
41+
try {
42+
/** @var TModel $row */
43+
$row = $model::query()->updateOrCreate($key, $values);
44+
45+
return $row;
46+
} catch (QueryException $e) {
47+
if (! self::isUniqueViolation($e) || $attempt === self::MAX_ATTEMPTS) {
48+
throw $e;
49+
}
50+
51+
usleep(self::backoffMicroseconds($attempt));
52+
}
53+
}
54+
55+
throw new LogicException('IdempotentProjectionUpsert exhausted attempts without resolving.');
56+
}
57+
58+
private static function isUniqueViolation(Throwable $e): bool
59+
{
60+
$errorInfo = $e instanceof QueryException ? ($e->errorInfo ?? null) : null;
61+
$sqlState = is_array($errorInfo) ? (string) ($errorInfo[0] ?? '') : '';
62+
$driverCode = is_array($errorInfo) ? (int) ($errorInfo[1] ?? 0) : 0;
63+
64+
// 23505 is the Postgres-specific unique_violation SQLSTATE; it never
65+
// overlaps with other constraint failures so we can short-circuit.
66+
if ($sqlState === '23505') {
67+
return true;
68+
}
69+
70+
// 23000 is the generic integrity-constraint family (MySQL, SQLite,
71+
// SQL Server). Narrow to driver codes that mean "duplicate key" so we
72+
// don't retry e.g. foreign-key or NOT NULL violations that won't clear
73+
// on the next pass.
74+
if ($sqlState === '23000') {
75+
// MySQL 1062 = ER_DUP_ENTRY
76+
// SQLite 19 = SQLITE_CONSTRAINT (covers UNIQUE; message-disambiguated below)
77+
// SQL Server 2627 = unique-constraint violation; 2601 = duplicate index key
78+
if ($driverCode === 1062 || $driverCode === 2627 || $driverCode === 2601) {
79+
return true;
80+
}
81+
82+
$message = strtolower($e->getMessage());
83+
84+
return str_contains($message, 'duplicate entry')
85+
|| str_contains($message, 'unique constraint failed')
86+
|| str_contains($message, 'duplicate key');
87+
}
88+
89+
return false;
90+
}
91+
92+
private static function backoffMicroseconds(int $attempt): int
93+
{
94+
// 2ms, 4ms, 8ms, 16ms (capped) with small jitter to break ties between
95+
// racing retriers.
96+
$base = min(16_000, 2_000 * (1 << ($attempt - 1)));
97+
98+
return $base + random_int(0, 1_000);
99+
}
100+
}

src/V2/Support/RunLineageProjector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ private static function projectEntry(
131131
$runId = self::stringValue($entry['workflow_run_id'] ?? null);
132132

133133
/** @var WorkflowRunLineageEntry $row */
134-
$row = $lineageModel::query()->updateOrCreate(
134+
$row = IdempotentProjectionUpsert::upsert(
135+
$lineageModel,
135136
[
136137
'id' => $projectionId,
137138
],

src/V2/Support/RunSummaryProjector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ public static function project(WorkflowRun $run): WorkflowRunSummary
302302
$selectedNextTask = $openUpdateWait !== null ? $openUpdateTask : $nextTask;
303303

304304
/** @var WorkflowRunSummary $summary */
305-
$summary = $summaryModel::query()->updateOrCreate(
305+
$summary = IdempotentProjectionUpsert::upsert(
306+
$summaryModel,
306307
[
307308
'id' => $run->id,
308309
],

src/V2/Support/RunTimelineProjector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public static function project(WorkflowRun $run, ?array $entries = null): array
3434
$seen[] = $projectionId;
3535

3636
/** @var WorkflowTimelineEntry $row */
37-
$row = $entryModel::query()->updateOrCreate(
37+
$row = IdempotentProjectionUpsert::upsert(
38+
$entryModel,
3839
[
3940
'id' => $projectionId,
4041
],

src/V2/Support/RunTimerProjector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public static function project(WorkflowRun $run, ?array $timers = null): array
3434
$seen[] = $projectionId;
3535

3636
/** @var WorkflowRunTimerEntry $row */
37-
$row = $entryModel::query()->updateOrCreate(
37+
$row = IdempotentProjectionUpsert::upsert(
38+
$entryModel,
3839
[
3940
'id' => $projectionId,
4041
],

src/V2/Support/RunWaitProjector.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ public static function project(WorkflowRun $run, ?array $waits = null): array
3030
$seen[] = $projectionId;
3131

3232
/** @var WorkflowRunWait $row */
33-
$row = $waitModel::query()->updateOrCreate(
33+
$row = IdempotentProjectionUpsert::upsert(
34+
$waitModel,
3435
[
3536
'id' => $projectionId,
3637
],
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Unit\V2;
6+
7+
use ArgumentCountError;
8+
use Error;
9+
use Exception;
10+
use RuntimeException;
11+
use Tests\TestCase;
12+
use TypeError;
13+
use Workflow\V2\Support\FailureFactory;
14+
15+
final class FailureFactoryRestoreTest extends TestCase
16+
{
17+
/**
18+
* Regression for #436. PHP's Throwable interface is implemented independently
19+
* by Exception and Error (siblings, not parent/child). The restorer used
20+
* is_subclass_of($class, Error::class) to decide which base-class reflection
21+
* surface owns the protected message/code/file/line/trace properties — but
22+
* is_subclass_of returns false when $class IS Error, so an activity that
23+
* threw a bare Error fell through to Exception's reflection target and
24+
* raised "Cannot access protected property Error::$message" during replay,
25+
* stranding the run in waiting.
26+
*
27+
* @dataProvider errorSubclassesProvider
28+
*/
29+
public function testRestoresErrorSubclassesWithoutFallingBackToExceptionBaseClass(string $class, string $message): void
30+
{
31+
$payload = FailureFactory::payload(new $class($message));
32+
33+
$restored = FailureFactory::restoreForReplay($payload);
34+
35+
$this->assertInstanceOf($class, $restored);
36+
$this->assertSame($message, $restored->getMessage());
37+
}
38+
39+
/**
40+
* @return iterable<string, array{0: string, 1: string}>
41+
*/
42+
public static function errorSubclassesProvider(): iterable
43+
{
44+
yield 'bare Error' => [Error::class, 'static call against instance method'];
45+
yield 'TypeError' => [TypeError::class, 'argument 1 must be of type string'];
46+
yield 'ArgumentCountError' => [ArgumentCountError::class, 'too few arguments to function'];
47+
}
48+
49+
public function testRestoresExceptionSubclassesUnchanged(): void
50+
{
51+
$original = new RuntimeException('still works for Exception side', 42);
52+
53+
$restored = FailureFactory::restoreForReplay(FailureFactory::payload($original));
54+
55+
$this->assertInstanceOf(RuntimeException::class, $restored);
56+
$this->assertSame('still works for Exception side', $restored->getMessage());
57+
$this->assertSame(42, $restored->getCode());
58+
}
59+
60+
public function testRestoresBaseException(): void
61+
{
62+
$original = new Exception('base exception sanity check');
63+
64+
$restored = FailureFactory::restoreForReplay(FailureFactory::payload($original));
65+
66+
$this->assertInstanceOf(Exception::class, $restored);
67+
$this->assertSame('base exception sanity check', $restored->getMessage());
68+
}
69+
}

0 commit comments

Comments
 (0)