Skip to content

Commit dcd5427

Browse files
Fix bounded timeline projection rebuild
Bounded Waterline timeline reads now repair missing run timeline projection rows before returning a limited window, preserving the projected fast path when rows already exist. Issue: zorporation/durable-workflow#539 Loop-ID: build-03
1 parent 3e6d471 commit dcd5427

2 files changed

Lines changed: 66 additions & 0 deletions

File tree

src/V2/Support/RunTimelineProjector.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,21 @@ private static function projectedWindow(WorkflowRun $run, int $limit): array
151151
$total = (int) (clone $baseQuery)->count();
152152

153153
if ($total === 0) {
154+
$canonicalTimeline = HistoryTimeline::fromHistory($run);
155+
156+
if ($canonicalTimeline !== []) {
157+
$reprojected = self::project($run, $canonicalTimeline);
158+
159+
return self::boundedTimelinePayload(
160+
collect($reprojected)
161+
->map(static fn (WorkflowTimelineEntry $entry): array => $entry->toTimelinePayload())
162+
->values()
163+
->all(),
164+
$limit,
165+
'workflow_run_timeline_entries_rebuilt_window',
166+
);
167+
}
168+
154169
return [
155170
'source' => 'workflow_run_timeline_entries_window',
156171
'timeline' => [],
@@ -176,6 +191,19 @@ private static function projectedWindow(WorkflowRun $run, int $limit): array
176191
];
177192
}
178193

194+
/**
195+
* @param list<array<string, mixed>> $timeline
196+
* @return array{source: string, timeline: list<array<string, mixed>>, total_count: int}
197+
*/
198+
private static function boundedTimelinePayload(array $timeline, int $limit, string $source): array
199+
{
200+
return [
201+
'source' => $source,
202+
'timeline' => array_values(array_slice($timeline, -$limit)),
203+
'total_count' => count($timeline),
204+
];
205+
}
206+
179207
/**
180208
* @return class-string<WorkflowTimelineEntry>
181209
*/

tests/Feature/V2/V2HistoryTimelineTest.php

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,44 @@ public function testTimelineProjectorCanReadBoundedProjectedWindowWithoutHistory
721721
);
722722
}
723723

724+
public function testBoundedTimelineRebuildsMissingProjectionRowsBeforeReturningWindow(): void
725+
{
726+
Queue::fake();
727+
728+
$workflow = WorkflowStub::make(TestGreetingWorkflow::class, 'timeline-projection-window-rebuild');
729+
$workflow->start('Taylor');
730+
$runId = $workflow->runId();
731+
732+
$this->assertNotNull($runId);
733+
734+
$this->drainReadyTasks();
735+
$this->assertTrue($workflow->refresh()->completed());
736+
737+
/** @var WorkflowRun $run */
738+
$run = WorkflowRun::query()->findOrFail($runId);
739+
$historyCount = $run->historyEvents()
740+
->count();
741+
742+
$this->assertGreaterThan(2, $historyCount);
743+
744+
WorkflowTimelineEntry::query()
745+
->where('workflow_run_id', $runId)
746+
->delete();
747+
748+
$snapshot = RunTimelineProjector::snapshotForRun($run->fresh(), 2);
749+
750+
$this->assertSame('workflow_run_timeline_entries_rebuilt_window', $snapshot['source']);
751+
$this->assertSame($historyCount, $snapshot['total_count']);
752+
$this->assertCount(2, $snapshot['timeline']);
753+
$this->assertSame(['ActivityCompleted', 'WorkflowCompleted'], array_column($snapshot['timeline'], 'type'));
754+
$this->assertSame(
755+
$historyCount,
756+
WorkflowTimelineEntry::query()
757+
->where('workflow_run_id', $runId)
758+
->count(),
759+
);
760+
}
761+
724762
public function testTimelineKeepsTimerSnapshotsWhenTimerRowDrifts(): void
725763
{
726764
Queue::fake();

0 commit comments

Comments
 (0)