Skip to content

Commit a30152d

Browse files
Make schedule history sequence allocation concurrency-safe
WorkflowScheduleHistoryEvent::record() previously chose the next sequence with max(sequence)+1 without serialization, so concurrent lifecycle writers could pick the same slot and either hit the unique(workflow_schedule_id, sequence) constraint or leave audit ordering dependent on DB timing. This retries the write on unique- constraint violations so racing writers advance to the next free slot instead of failing or silently duplicating. Adds regression tests covering a single collision and a five-deep pileup to exercise the retry loop end-to-end against the unique index. Closes TD-069. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 01b6eef commit a30152d

2 files changed

Lines changed: 128 additions & 17 deletions

File tree

src/V2/Models/WorkflowScheduleHistoryEvent.php

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,22 @@
88
use Illuminate\Database\Eloquent\Concerns\HasUlids;
99
use Illuminate\Database\Eloquent\Model;
1010
use Illuminate\Database\Eloquent\Relations\BelongsTo;
11+
use Illuminate\Database\UniqueConstraintViolationException;
1112
use Workflow\V2\Enums\HistoryEventType;
1213
use Workflow\V2\Support\ConfiguredV2Models;
1314

1415
class WorkflowScheduleHistoryEvent extends Model
1516
{
1617
use HasUlids;
1718

19+
/**
20+
* Upper bound on retry iterations when racing for the next schedule
21+
* history sequence. Contention is expected to be low per-schedule, so this
22+
* is large enough to clear any realistic pileup without masking a bug
23+
* that would otherwise spin forever.
24+
*/
25+
private const SEQUENCE_RETRY_LIMIT = 32;
26+
1827
public $incrementing = false;
1928

2029
protected $table = 'workflow_schedule_history_events';
@@ -47,24 +56,41 @@ public static function record(
4756
HistoryEventType $eventType,
4857
array $payload = [],
4958
): self {
50-
$sequence = ((int) ConfiguredV2Models::query('schedule_history_event_model', self::class)
51-
->where('workflow_schedule_id', $schedule->id)
52-
->max('sequence')) + 1;
59+
$snapshot = self::snapshotPayload($schedule, $payload);
60+
$workflowInstanceId = self::stringValue($payload['workflow_instance_id'] ?? null);
61+
$workflowRunId = self::stringValue($payload['workflow_run_id'] ?? null);
62+
63+
for ($attempt = 1; $attempt <= self::SEQUENCE_RETRY_LIMIT; $attempt++) {
64+
$sequence = ((int) ConfiguredV2Models::query('schedule_history_event_model', self::class)
65+
->where('workflow_schedule_id', $schedule->id)
66+
->max('sequence')) + 1;
67+
68+
try {
69+
/** @var self $event */
70+
$event = ConfiguredV2Models::query('schedule_history_event_model', self::class)->create([
71+
'workflow_schedule_id' => $schedule->id,
72+
'schedule_id' => $schedule->schedule_id,
73+
'namespace' => $schedule->namespace,
74+
'sequence' => $sequence,
75+
'event_type' => $eventType->value,
76+
'payload' => $snapshot,
77+
'workflow_instance_id' => $workflowInstanceId,
78+
'workflow_run_id' => $workflowRunId,
79+
'recorded_at' => now(),
80+
]);
81+
82+
return $event;
83+
} catch (UniqueConstraintViolationException $e) {
84+
if ($attempt === self::SEQUENCE_RETRY_LIMIT) {
85+
throw $e;
86+
}
87+
// Another writer claimed this sequence first. Re-read the max
88+
// and try the next slot.
89+
}
90+
}
5391

54-
/** @var self $event */
55-
$event = ConfiguredV2Models::query('schedule_history_event_model', self::class)->create([
56-
'workflow_schedule_id' => $schedule->id,
57-
'schedule_id' => $schedule->schedule_id,
58-
'namespace' => $schedule->namespace,
59-
'sequence' => $sequence,
60-
'event_type' => $eventType->value,
61-
'payload' => self::snapshotPayload($schedule, $payload),
62-
'workflow_instance_id' => self::stringValue($payload['workflow_instance_id'] ?? null),
63-
'workflow_run_id' => self::stringValue($payload['workflow_run_id'] ?? null),
64-
'recorded_at' => now(),
65-
]);
66-
67-
return $event;
92+
// Unreachable: the loop either returns or re-throws at the final attempt.
93+
throw new \LogicException('Schedule history sequence allocation exhausted retries.');
6894
}
6995

7096
/**

tests/Feature/V2/V2ScheduleTest.php

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,91 @@ public function testScheduleLifecycleRecordsScheduleLevelHistoryEvents(): void
531531
$this->assertSame(1, $events[6]->payload['skipped_trigger_count']);
532532
}
533533

534+
public function testScheduleHistorySequenceRecoversFromConcurrentCollision(): void
535+
{
536+
$schedule = ScheduleManager::create(
537+
scheduleId: 'sequence-race-test',
538+
workflowClass: TestScheduledWorkflow::class,
539+
cronExpression: '* * * * *',
540+
);
541+
542+
// ScheduleCreated was written with sequence = 1. Simulate another
543+
// concurrent writer that has already claimed sequence = 2 before we
544+
// attempt to record the next lifecycle event — exactly the collision
545+
// the unique(workflow_schedule_id, sequence) constraint would raise.
546+
WorkflowScheduleHistoryEvent::query()->create([
547+
'workflow_schedule_id' => $schedule->id,
548+
'schedule_id' => $schedule->schedule_id,
549+
'namespace' => $schedule->namespace,
550+
'sequence' => 2,
551+
'event_type' => HistoryEventType::SchedulePaused->value,
552+
'payload' => ['reason' => 'pre-claimed by racing writer'],
553+
'recorded_at' => now(),
554+
]);
555+
556+
ScheduleManager::pause($schedule, 'operator hold');
557+
558+
$events = WorkflowScheduleHistoryEvent::query()
559+
->where('workflow_schedule_id', $schedule->id)
560+
->orderBy('sequence')
561+
->get();
562+
563+
// record() should have retried past the collision and landed on
564+
// sequence = 3, preserving ordering rather than blowing up.
565+
$this->assertSame([1, 2, 3], $events->pluck('sequence')->all());
566+
$this->assertSame(
567+
[
568+
HistoryEventType::ScheduleCreated->value,
569+
HistoryEventType::SchedulePaused->value,
570+
HistoryEventType::SchedulePaused->value,
571+
],
572+
$events->map(static fn (WorkflowScheduleHistoryEvent $event): string => $event->event_type->value)->all(),
573+
);
574+
$this->assertSame('pre-claimed by racing writer', $events[1]->payload['reason']);
575+
$this->assertSame('operator hold', $events[2]->payload['reason']);
576+
}
577+
578+
public function testConcurrentScheduleLifecycleWritesAllLandWithUniqueSequences(): void
579+
{
580+
$schedule = ScheduleManager::create(
581+
scheduleId: 'sequence-pileup-test',
582+
workflowClass: TestScheduledWorkflow::class,
583+
cronExpression: '* * * * *',
584+
);
585+
586+
// Pre-claim sequences 2..6 to force record() to retry five times
587+
// before landing on the next free slot. This exercises the retry
588+
// loop under a larger pileup than a single collision.
589+
for ($claimed = 2; $claimed <= 6; $claimed++) {
590+
WorkflowScheduleHistoryEvent::query()->create([
591+
'workflow_schedule_id' => $schedule->id,
592+
'schedule_id' => $schedule->schedule_id,
593+
'namespace' => $schedule->namespace,
594+
'sequence' => $claimed,
595+
'event_type' => HistoryEventType::SchedulePaused->value,
596+
'payload' => ['reason' => 'pileup ' . $claimed],
597+
'recorded_at' => now(),
598+
]);
599+
}
600+
601+
ScheduleManager::pause($schedule, 'post-pileup');
602+
603+
$sequences = WorkflowScheduleHistoryEvent::query()
604+
->where('workflow_schedule_id', $schedule->id)
605+
->orderBy('sequence')
606+
->pluck('sequence')
607+
->all();
608+
609+
$this->assertSame([1, 2, 3, 4, 5, 6, 7], $sequences);
610+
611+
$tail = WorkflowScheduleHistoryEvent::query()
612+
->where('workflow_schedule_id', $schedule->id)
613+
->where('sequence', 7)
614+
->first();
615+
$this->assertNotNull($tail);
616+
$this->assertSame('post-pileup', $tail->payload['reason']);
617+
}
618+
534619
public function testTriggerDeletedScheduleTracksSkipReason(): void
535620
{
536621
$schedule = ScheduleManager::create(

0 commit comments

Comments
 (0)