Skip to content

Commit bb0654e

Browse files
Finalize v2 timer projection schema
1 parent 5ab39ad commit bb0654e

13 files changed

Lines changed: 82 additions & 72 deletions

src/V2/Models/WorkflowRunTimerEntry.php

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212

1313
class WorkflowRunTimerEntry extends Model
1414
{
15-
public const LEGACY_SCHEMA_VERSION = 0;
16-
1715
public const CURRENT_SCHEMA_VERSION = 1;
1816

1917
public $incrementing = false;
@@ -52,7 +50,6 @@ public function run(): BelongsTo
5250
public function toTimerPayload(): array
5351
{
5452
$payload = is_array($this->payload) ? $this->payload : [];
55-
$schemaVersion = $this->schemaVersion();
5653
$status = self::stringValue($payload['status'] ?? $this->status);
5754
$sourceStatus = self::stringValue($payload['source_status'] ?? $this->source_status)
5855
?? $status;
@@ -75,23 +72,18 @@ public function toTimerPayload(): array
7572
$payload['condition_definition_fingerprint'] = $this->condition_definition_fingerprint;
7673
$payload['history_authority'] = $historyAuthority;
7774
$payload['history_unsupported_reason'] = $historyUnsupportedReason;
78-
$payload['row_status'] = self::rowStatus(
79-
$payload['row_status'] ?? null,
80-
$historyAuthority,
81-
$sourceStatus,
82-
$schemaVersion,
83-
);
75+
$payload['row_status'] = self::rowStatus($payload['row_status'] ?? null);
8476
$payload['diagnostic_only'] = self::diagnosticOnly($historyAuthority);
8577
$payload['created_at'] = self::timestamp($payload['created_at'] ?? null);
8678

8779
return $payload;
8880
}
8981

90-
public function schemaVersion(): int
82+
public function schemaVersion(): ?int
9183
{
9284
return is_int($this->schema_version)
9385
? $this->schema_version
94-
: self::LEGACY_SCHEMA_VERSION;
86+
: null;
9587
}
9688

9789
public function usesCurrentSchema(): bool
@@ -117,24 +109,9 @@ private static function diagnosticOnly(mixed $historyAuthority): bool
117109
&& $historyAuthority !== 'typed_history';
118110
}
119111

120-
private static function rowStatus(
121-
mixed $value,
122-
?string $historyAuthority,
123-
?string $sourceStatus,
124-
int $schemaVersion,
125-
): ?string {
126-
$rowStatus = self::stringValue($value);
127-
128-
if ($rowStatus !== null || $schemaVersion !== self::LEGACY_SCHEMA_VERSION) {
129-
return $rowStatus;
130-
}
131-
132-
return in_array($historyAuthority, [
133-
'mutable_open_fallback',
134-
'unsupported_terminal_without_history',
135-
], true)
136-
? $sourceStatus
137-
: null;
112+
private static function rowStatus(mixed $value): ?string
113+
{
114+
return self::stringValue($value);
138115
}
139116

140117
private static function stringValue(mixed $value): ?string

src/V2/Support/HealthCheck.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,12 @@ private static function selectedRunProjectionCheck(array $projections): array
134134
'timer_needs_rebuild' => $timerNeedsRebuild,
135135
'timer_missing_runs_with_timers' => self::integer($timers['missing_runs_with_timers'] ?? 0),
136136
'timer_stale_projected_runs' => self::integer($timers['stale_projected_runs'] ?? 0),
137-
'timer_legacy_schema_runs' => self::integer($timers['legacy_schema_runs'] ?? 0),
138-
'timer_legacy_schema_rows' => self::integer($timers['legacy_schema_rows'] ?? 0),
137+
'timer_schema_version_mismatch_runs' => self::integer(
138+
$timers['schema_version_mismatch_runs'] ?? 0
139+
),
140+
'timer_schema_version_mismatch_rows' => self::integer(
141+
$timers['schema_version_mismatch_rows'] ?? 0
142+
),
139143
'timer_orphaned' => self::integer($timers['orphaned'] ?? 0),
140144
'lineage_needs_rebuild' => $lineageNeedsRebuild,
141145
'lineage_missing_runs_with_lineage' => self::integer($lineage['missing_runs_with_lineage'] ?? 0),

src/V2/Support/OperatorMetrics.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,12 @@ private static function runTimerProjectionMetrics(): array
316316
'projected_runs_with_timers' => $drift['projected_runs_with_timers'],
317317
'missing_runs_with_timers' => $drift['missing_runs_with_timers'],
318318
'stale_projected_runs' => $drift['stale_projected_runs'],
319-
'legacy_schema_runs' => $drift['legacy_schema_runs'],
320-
'legacy_schema_rows' => $timerModel::query()
321-
->where('schema_version', WorkflowRunTimerEntry::LEGACY_SCHEMA_VERSION)
319+
'schema_version_mismatch_runs' => $drift['schema_version_mismatch_runs'],
320+
'schema_version_mismatch_rows' => $timerModel::query()
321+
->where(static function ($query): void {
322+
$query->whereNull('schema_version')
323+
->orWhere('schema_version', '!=', WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION);
324+
})
322325
->count(),
323326
'orphaned' => $orphaned,
324327
'needs_rebuild' => $drift['missing_runs_with_timers'] + $drift['stale_projected_runs'] + $orphaned,

src/V2/Support/RunTimerProjector.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public static function snapshotForRun(WorkflowRun $run): array
131131
* has_canonical: bool,
132132
* missing: bool,
133133
* stale: bool,
134-
* legacy_schema: bool,
134+
* schema_version_mismatch: bool,
135135
* reasons: list<string>
136136
* }
137137
*/
@@ -142,14 +142,14 @@ public static function driftStatusForRun(WorkflowRun $run): array
142142
$hasProjection = $projected->isNotEmpty();
143143
$hasCanonical = $canonicalTimers !== [];
144144
$reasons = self::rebuildReasons($projected, $canonicalTimers);
145-
$legacySchema = in_array('legacy_schema', $reasons, true);
145+
$schemaVersionMismatch = in_array('schema_version_mismatch', $reasons, true);
146146

147147
return [
148148
'has_projection' => $hasProjection,
149149
'has_canonical' => $hasCanonical,
150150
'missing' => $hasCanonical && ! $hasProjection,
151151
'stale' => $hasProjection && $reasons !== [],
152-
'legacy_schema' => $legacySchema,
152+
'schema_version_mismatch' => $schemaVersionMismatch,
153153
'reasons' => $reasons,
154154
];
155155
}
@@ -225,7 +225,7 @@ private static function rebuildReasons(EloquentCollection $projected, array $can
225225
}
226226

227227
if ($projected->isNotEmpty() && ! self::projectedRowsUseCurrentSchema($projected)) {
228-
$reasons[] = 'legacy_schema';
228+
$reasons[] = 'schema_version_mismatch';
229229
}
230230

231231
if ($projected->isNotEmpty() && ! self::projectionMatchesSnapshot($projected, $canonical)) {

src/V2/Support/SelectedRunProjectionDrift.php

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public static function timelineRunIdsNeedingRebuild(array $runIds = [], ?string
129129
* projected_runs_with_timers: int,
130130
* missing_runs_with_timers: int,
131131
* stale_projected_runs: int,
132-
* legacy_schema_runs: int
132+
* schema_version_mismatch_runs: int
133133
* }
134134
*/
135135
public static function timerMetrics(array $runIds = [], ?string $instanceId = null): array
@@ -141,7 +141,7 @@ public static function timerMetrics(array $runIds = [], ?string $instanceId = nu
141141
'projected_runs_with_timers' => $analysis['projected_runs_with_canonical'],
142142
'missing_runs_with_timers' => count($analysis['missing_run_ids']),
143143
'stale_projected_runs' => count($analysis['stale_run_ids']),
144-
'legacy_schema_runs' => count($analysis['legacy_schema_run_ids']),
144+
'schema_version_mismatch_runs' => count($analysis['schema_version_mismatch_run_ids']),
145145
];
146146
}
147147

@@ -296,7 +296,7 @@ private static function runIdsNeedingRebuild(array $analysis): array
296296
* projected_runs_with_canonical: int,
297297
* missing_run_ids: list<string>,
298298
* stale_run_ids: list<string>,
299-
* legacy_schema_run_ids: list<string>
299+
* schema_version_mismatch_run_ids: list<string>
300300
* }
301301
*/
302302
private static function timerAnalysis(array $runIds, ?string $instanceId): array
@@ -305,13 +305,13 @@ private static function timerAnalysis(array $runIds, ?string $instanceId): array
305305
$projectedRunsWithCanonical = 0;
306306
$missingRunIds = [];
307307
$staleRunIds = [];
308-
$legacySchemaRunIds = [];
308+
$schemaVersionMismatchRunIds = [];
309309

310310
self::runQuery(['timerEntries', 'timers', 'historyEvents'], $runIds, $instanceId)
311311
->chunkById(100, static function ($runs) use (
312-
&$legacySchemaRunIds,
313312
&$missingRunIds,
314313
&$projectedRunsWithCanonical,
314+
&$schemaVersionMismatchRunIds,
315315
&$runsWithCanonical,
316316
&$staleRunIds,
317317
): void {
@@ -334,8 +334,8 @@ private static function timerAnalysis(array $runIds, ?string $instanceId): array
334334
$staleRunIds[] = $run->id;
335335
}
336336

337-
if ($status['legacy_schema']) {
338-
$legacySchemaRunIds[] = $run->id;
337+
if ($status['schema_version_mismatch']) {
338+
$schemaVersionMismatchRunIds[] = $run->id;
339339
}
340340
}
341341
}, 'id');
@@ -345,7 +345,7 @@ private static function timerAnalysis(array $runIds, ?string $instanceId): array
345345
'projected_runs_with_canonical' => $projectedRunsWithCanonical,
346346
'missing_run_ids' => array_values(array_unique($missingRunIds)),
347347
'stale_run_ids' => array_values(array_unique($staleRunIds)),
348-
'legacy_schema_run_ids' => array_values(array_unique($legacySchemaRunIds)),
348+
'schema_version_mismatch_run_ids' => array_values(array_unique($schemaVersionMismatchRunIds)),
349349
];
350350
}
351351

src/V2/Support/SelectedRunSnapshot.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public static function timelineDriftStatus(WorkflowRun $run): array
116116
* has_canonical: bool,
117117
* missing: bool,
118118
* stale: bool,
119-
* legacy_schema: bool,
119+
* schema_version_mismatch: bool,
120120
* reasons: list<string>
121121
* }
122122
*/

src/migrations/2026_04_11_000140_create_workflow_run_timer_entries_table.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public function up(): void
1919
->index();
2020
$table->string('timer_id', 191);
2121
$table->unsignedSmallInteger('schema_version')
22-
->default(WorkflowRunTimerEntry::LEGACY_SCHEMA_VERSION);
22+
->default(WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION);
2323
$table->unsignedInteger('position');
2424
$table->unsignedInteger('sequence')
2525
->nullable()

tests/Feature/V2/V2OperatorMetricsTest.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,14 +277,14 @@ public function testSnapshotCountsStaleRunSummaryProjectionRows(): void
277277
public function testSnapshotCountsSelectedRunProjectionDrift(): void
278278
{
279279
$missingWaitRun = $this->createRunWithSummary(
280-
instanceId: 'metrics-wait-missing-instance',
280+
instanceId: 'metrics-wait-missing-i',
281281
runId: '01JMETRICSPROJWAITMISS01',
282282
status: 'waiting',
283283
statusBucket: 'running',
284284
livenessState: 'waiting_for_signal',
285285
);
286286
$projectedWaitRun = $this->createRunWithSummary(
287-
instanceId: 'metrics-wait-projected-instance',
287+
instanceId: 'metrics-wait-projected-i',
288288
runId: '01JMETRICSPROJWAITDONE01',
289289
status: 'waiting',
290290
statusBucket: 'running',
@@ -395,6 +395,7 @@ public function testSnapshotCountsSelectedRunProjectionDrift(): void
395395
'workflow_run_id' => $projectedWaitRun->id,
396396
'workflow_instance_id' => $projectedWaitRun->workflow_instance_id,
397397
'timer_id' => 'projection-timer-projected',
398+
'schema_version' => WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION - 1,
398399
'position' => 0,
399400
'sequence' => 12,
400401
'status' => 'fired',
@@ -424,6 +425,7 @@ public function testSnapshotCountsSelectedRunProjectionDrift(): void
424425
'workflow_run_id' => '01JMETRICSPROJTIMERGONE01',
425426
'workflow_instance_id' => 'metrics-timer-orphan-instance',
426427
'timer_id' => 'projection-timer-orphan',
428+
'schema_version' => WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION - 1,
427429
'position' => 0,
428430
'status' => 'pending',
429431
'source_status' => 'pending',
@@ -513,8 +515,8 @@ public function testSnapshotCountsSelectedRunProjectionDrift(): void
513515
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['projected_runs_with_timers']);
514516
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['missing_runs_with_timers']);
515517
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['stale_projected_runs']);
516-
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['legacy_schema_runs']);
517-
$this->assertSame(2, $snapshot['projections']['run_timer_entries']['legacy_schema_rows']);
518+
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['schema_version_mismatch_runs']);
519+
$this->assertSame(2, $snapshot['projections']['run_timer_entries']['schema_version_mismatch_rows']);
518520
$this->assertSame(1, $snapshot['projections']['run_timer_entries']['orphaned']);
519521
$this->assertSame(3, $snapshot['projections']['run_timer_entries']['needs_rebuild']);
520522

tests/Feature/V2/V2RunDetailViewTest.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -727,10 +727,10 @@ public function testRunDetailViewReadsTimerRowsFromRebuildableProjection(): void
727727
]);
728728
}
729729

730-
public function testRunDetailViewRebuildsLegacyProjectedTimerRowsWithoutRowStatus(): void
730+
public function testRunDetailViewRebuildsNonCurrentProjectedTimerRowsWithoutRowStatus(): void
731731
{
732732
$instance = WorkflowInstance::query()->create([
733-
'id' => 'detail-projected-timer-compat',
733+
'id' => 'detail-timer-proj-compat',
734734
'workflow_class' => TestTimerWorkflow::class,
735735
'workflow_type' => 'test-timer-workflow',
736736
'run_count' => 1,
@@ -789,7 +789,7 @@ public function testRunDetailViewRebuildsLegacyProjectedTimerRowsWithoutRowStatu
789789
'workflow_run_id' => $run->id,
790790
'workflow_instance_id' => $instance->id,
791791
'timer_id' => $timer->id,
792-
'schema_version' => WorkflowRunTimerEntry::LEGACY_SCHEMA_VERSION,
792+
'schema_version' => WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION - 1,
793793
'position' => 0,
794794
'sequence' => 1,
795795
'status' => 'pending',
@@ -809,7 +809,10 @@ public function testRunDetailViewRebuildsLegacyProjectedTimerRowsWithoutRowStatu
809809
$detail = RunDetailView::forRun($run->fresh(['summary', 'timerEntries']));
810810

811811
$this->assertSame('workflow_run_timer_entries_rebuilt', $detail['timers_projection_source']);
812-
$this->assertSame(['legacy_schema'], $detail['timers_projection_rebuild_reasons']);
812+
$this->assertSame(
813+
['schema_version_mismatch', 'stale_projection'],
814+
$detail['timers_projection_rebuild_reasons'],
815+
);
813816
$this->assertCount(1, $detail['timers']);
814817
$this->assertSame('pending', $detail['timers'][0]['status']);
815818
$this->assertSame('pending', $detail['timers'][0]['source_status']);

tests/Unit/Commands/V2RebuildProjectionsCommandTest.php

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -421,13 +421,13 @@ public function testNeedsRebuildOptionIncludesRunsMissingLineageProjectionRows()
421421

422422
public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjectionPayloads(): void
423423
{
424-
[, $waitRun] = $this->createWaitingRun('projection-command-stale-wait-selected');
425-
[, $timelineRun] = $this->createCompletedRun('projection-command-stale-timeline-selected');
426-
[, $lineageRun] = $this->createCompletedRun('projection-command-stale-lineage-selected');
427-
[, $timerRun] = $this->createWaitingRun('projection-command-stale-timer-selected');
424+
[, $waitRun] = $this->createWaitingRun('proj-stale-wait-sel');
425+
[, $timelineRun] = $this->createCompletedRun('proj-stale-timeline-sel');
426+
[, $lineageRun] = $this->createCompletedRun('proj-stale-lineage-sel');
427+
[, $timerRun] = $this->createWaitingRun('proj-stale-timer-sel');
428428

429429
ActivityExecution::query()->create([
430-
'id' => 'projection-command-stale-wait-activity',
430+
'id' => 'proj-stale-wait-activity',
431431
'workflow_run_id' => $waitRun->id,
432432
'sequence' => 1,
433433
'activity_class' => 'ProjectionSelectedWaitActivity',
@@ -451,7 +451,7 @@ public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjection
451451
],
452452
);
453453
WorkflowTimer::query()->create([
454-
'id' => 'projection-command-stale-timer',
454+
'id' => 'proj-stale-timer',
455455
'workflow_run_id' => $timerRun->id,
456456
'sequence' => 1,
457457
'status' => 'pending',
@@ -483,15 +483,15 @@ public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjection
483483
WorkflowRunLineageEntry::query()
484484
->where('workflow_run_id', $lineageRun->id)
485485
->update([
486-
'related_workflow_run_id' => 'projection-stale-child-run-wrong',
486+
'related_workflow_run_id' => 'projection-stale-child-bad',
487487
]);
488488
$timerEntry = WorkflowRunTimerEntry::query()
489489
->where('workflow_run_id', $timerRun->id)
490490
->firstOrFail();
491491
$timerPayload = $timerEntry->payload;
492492
unset($timerPayload['row_status']);
493493
$timerEntry->forceFill([
494-
'schema_version' => WorkflowRunTimerEntry::LEGACY_SCHEMA_VERSION,
494+
'schema_version' => WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION - 1,
495495
'payload' => $timerPayload,
496496
])->save();
497497

@@ -507,7 +507,7 @@ public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjection
507507
]);
508508
$this->assertDatabaseHas('workflow_run_waits', [
509509
'workflow_run_id' => $waitRun->id,
510-
'wait_id' => 'activity:projection-command-stale-wait-activity',
510+
'wait_id' => 'activity:proj-stale-wait-activity',
511511
'target_type' => 'projection.selected.wait',
512512
]);
513513
$this->assertDatabaseMissing('workflow_run_timeline_entries', [
@@ -521,7 +521,7 @@ public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjection
521521
]);
522522
$this->assertDatabaseMissing('workflow_run_lineage_entries', [
523523
'workflow_run_id' => $lineageRun->id,
524-
'related_workflow_run_id' => 'projection-stale-child-run-wrong',
524+
'related_workflow_run_id' => 'projection-stale-child-bad',
525525
]);
526526
$this->assertDatabaseHas('workflow_run_lineage_entries', [
527527
'workflow_run_id' => $lineageRun->id,
@@ -530,7 +530,7 @@ public function testNeedsRebuildOptionIncludesRunsWithStaleSelectedRunProjection
530530
]);
531531
$this->assertDatabaseHas('workflow_run_timer_entries', [
532532
'workflow_run_id' => $timerRun->id,
533-
'timer_id' => 'projection-command-stale-timer',
533+
'timer_id' => 'proj-stale-timer',
534534
'schema_version' => WorkflowRunTimerEntry::CURRENT_SCHEMA_VERSION,
535535
]);
536536
}

0 commit comments

Comments
 (0)