Skip to content

Commit b200adf

Browse files
#300: thread history-event timestamps through replay loop so Workflow::now() advances
WorkflowFiberContext::setTime() now accepts an optional Fiber, so the executor can seed deterministic time from outside the fiber. WorkflowExecution::start() and ::send()/::throw() take an optional CarbonInterface $eventTime that is applied to the fiber before it resumes. The fiber must be seeded before $fiber->start() because the workflow body runs synchronously inside start(), so the first Workflow::now() call lands before we return to the executor. WorkflowExecutor and QueryStateReplayer pass the relevant recorded_at at each resume point: activity completion, activity execution result, condition wait resolve/timeout, timer fire, signal receipt, signal timeout, side effect, version marker, memo/search-attribute upsert, child workflow completion, parallel-group failure. Inline timer and inline signal timeout also advance now() to their synthesized fired_at. Adds: - Unit test for the Fiber-scoped time API. - Feature test proving now() at start returns run->started_at and now() after an activity returns ActivityCompleted.recorded_at. Also updates the workflow.v2.stub scaffold to demonstrate Workflow::now() and warn against using Laravel's now()/Carbon::now() inside a workflow. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9b12d96 commit b200adf

8 files changed

Lines changed: 373 additions & 63 deletions

File tree

src/Commands/stubs/workflow.v2.stub

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@ class {{ class }} extends Workflow
1414
{
1515
// Example:
1616
//
17+
// $startedAt = Workflow::now();
1718
// $result = Workflow::activity(MyActivity::class, $input);
1819
// Workflow::timer('5 seconds');
1920
// Workflow::upsertMemo(['stage' => 'done']);
2021
//
2122
// return $result;
23+
//
24+
// Determinism: prefer Workflow::now() over Laravel's now() inside a
25+
// workflow body. Workflow::now() advances with history events during
26+
// replay, so the same workflow run always observes the same timeline.
2227
return null;
2328
}
2429
}

src/V2/Support/QueryStateReplayer.php

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public function replayState(WorkflowRun $run): ReplayState
5757
$this->syncWorkflowCursor($workflow, 1);
5858
$entryMethod = EntryMethod::forWorkflow($workflow);
5959
$arguments = $workflow->resolveMethodDependencies($run->workflowArguments(), $entryMethod);
60-
$workflowExecution = WorkflowExecution::start($workflow, $arguments);
60+
$workflowExecution = WorkflowExecution::start($workflow, $arguments, $run->started_at);
6161

6262
if (! $workflowExecution->valid()) {
6363
$this->syncWorkflowCursor($workflow, 0);
@@ -81,9 +81,15 @@ public function replayState(WorkflowRun $run): ReplayState
8181
if ($activityCompletion !== null) {
8282
$this->syncWorkflowCursor($workflow, $sequence + 1);
8383
if ($activityCompletion->event_type === HistoryEventType::ActivityCompleted) {
84-
$current = $workflowExecution->send($this->activityResult($activityCompletion, $run));
84+
$current = $workflowExecution->send(
85+
$this->activityResult($activityCompletion, $run),
86+
$activityCompletion->recorded_at,
87+
);
8588
} else {
86-
$current = $workflowExecution->throw($this->activityException($activityCompletion, null, $run));
89+
$current = $workflowExecution->throw(
90+
$this->activityException($activityCompletion, null, $run),
91+
$activityCompletion->recorded_at,
92+
);
8793
}
8894

8995
++$sequence;
@@ -120,9 +126,15 @@ public function replayState(WorkflowRun $run): ReplayState
120126

121127
$this->syncWorkflowCursor($workflow, $sequence + 1);
122128
if ($execution->status === ActivityStatus::Completed) {
123-
$current = $workflowExecution->send($execution->activityResult());
129+
$current = $workflowExecution->send(
130+
$execution->activityResult(),
131+
$execution->closed_at,
132+
);
124133
} else {
125-
$current = $workflowExecution->throw($this->activityException(null, $execution, $run));
134+
$current = $workflowExecution->throw(
135+
$this->activityException(null, $execution, $run),
136+
$execution->closed_at,
137+
);
126138
}
127139

128140
++$sequence;
@@ -143,7 +155,8 @@ public function replayState(WorkflowRun $run): ReplayState
143155

144156
$this->syncWorkflowCursor($workflow, $sequence + 1);
145157
$current = $workflowExecution->send(
146-
$resolutionEvent->event_type === HistoryEventType::ConditionWaitSatisfied
158+
$resolutionEvent->event_type === HistoryEventType::ConditionWaitSatisfied,
159+
$resolutionEvent->recorded_at,
147160
);
148161

149162
++$sequence;
@@ -154,9 +167,9 @@ public function replayState(WorkflowRun $run): ReplayState
154167
if ($current instanceof TimerCall) {
155168
WorkflowStepHistory::assertCompatible($run, $sequence, WorkflowStepHistory::TIMER);
156169

157-
if ($this->timerFiredEvent($run, $sequence) !== null) {
170+
if (($timerFired = $this->timerFiredEvent($run, $sequence)) !== null) {
158171
$this->syncWorkflowCursor($workflow, $sequence + 1);
159-
$current = $workflowExecution->send(true);
172+
$current = $workflowExecution->send(true, $timerFired->recorded_at);
160173

161174
++$sequence;
162175

@@ -186,7 +199,7 @@ public function replayState(WorkflowRun $run): ReplayState
186199
WorkflowStepHistory::assertTypedHistoryRecorded($run, $sequence, WorkflowStepHistory::TIMER);
187200

188201
$this->syncWorkflowCursor($workflow, $sequence + 1);
189-
$current = $workflowExecution->send(true);
202+
$current = $workflowExecution->send(true, $timer->fired_at);
190203

191204
++$sequence;
192205

@@ -205,7 +218,10 @@ public function replayState(WorkflowRun $run): ReplayState
205218
}
206219

207220
$this->syncWorkflowCursor($workflow, $sequence + 1);
208-
$current = $workflowExecution->send($this->sideEffectResult($sideEffectEvent, $run));
221+
$current = $workflowExecution->send(
222+
$this->sideEffectResult($sideEffectEvent, $run),
223+
$sideEffectEvent->recorded_at,
224+
);
209225

210226
++$sequence;
211227

@@ -216,15 +232,16 @@ public function replayState(WorkflowRun $run): ReplayState
216232
$this->applyRecordedUpdates($run, $workflow, $sequence);
217233
WorkflowStepHistory::assertCompatible($run, $sequence, WorkflowStepHistory::VERSION_MARKER);
218234

235+
$versionEvent = $this->versionMarkerEvent($run, $sequence);
219236
$resolution = VersionResolver::resolve(
220237
$run,
221-
$this->versionMarkerEvent($run, $sequence),
238+
$versionEvent,
222239
$current,
223240
$sequence,
224241
);
225242

226243
$this->syncWorkflowCursor($workflow, $sequence + ($resolution->advancesSequence ? 1 : 0));
227-
$current = $workflowExecution->send($resolution->version);
244+
$current = $workflowExecution->send($resolution->version, $versionEvent?->recorded_at);
228245

229246
if ($resolution->advancesSequence) {
230247
++$sequence;
@@ -245,7 +262,7 @@ public function replayState(WorkflowRun $run): ReplayState
245262
}
246263

247264
$this->syncWorkflowCursor($workflow, $sequence + 1);
248-
$current = $workflowExecution->send(null);
265+
$current = $workflowExecution->send(null, $upsertEvent->recorded_at);
249266

250267
++$sequence;
251268

@@ -260,7 +277,10 @@ public function replayState(WorkflowRun $run): ReplayState
260277

261278
if ($signalEvent !== null) {
262279
$this->syncWorkflowCursor($workflow, $sequence + 1);
263-
$current = $workflowExecution->send($this->signalValue($signalEvent, $run));
280+
$current = $workflowExecution->send(
281+
$this->signalValue($signalEvent, $run),
282+
$signalEvent->recorded_at,
283+
);
264284

265285
++$sequence;
266286

@@ -269,10 +289,10 @@ public function replayState(WorkflowRun $run): ReplayState
269289

270290
if (
271291
$current->timeoutSeconds !== null
272-
&& $this->signalTimeoutFiredEvent($run, $sequence, $current->name) !== null
292+
&& ($signalTimeoutFired = $this->signalTimeoutFiredEvent($run, $sequence, $current->name)) !== null
273293
) {
274294
$this->syncWorkflowCursor($workflow, $sequence + 1);
275-
$current = $workflowExecution->send(null);
295+
$current = $workflowExecution->send(null, $signalTimeoutFired->recorded_at);
276296

277297
++$sequence;
278298

@@ -294,11 +314,13 @@ public function replayState(WorkflowRun $run): ReplayState
294314
$this->syncWorkflowCursor($workflow, $sequence + 1);
295315
if ($resolutionEvent->event_type === HistoryEventType::ChildRunCompleted) {
296316
$current = $workflowExecution->send(
297-
ChildRunHistory::outputForResolution($resolutionEvent, $childRun)
317+
ChildRunHistory::outputForResolution($resolutionEvent, $childRun),
318+
$resolutionEvent->recorded_at,
298319
);
299320
} else {
300321
$current = $workflowExecution->throw(
301-
ChildRunHistory::exceptionForResolution($resolutionEvent, $childRun)
322+
ChildRunHistory::exceptionForResolution($resolutionEvent, $childRun),
323+
$resolutionEvent->recorded_at,
302324
);
303325
}
304326

@@ -324,9 +346,15 @@ public function replayState(WorkflowRun $run): ReplayState
324346

325347
$this->syncWorkflowCursor($workflow, $sequence + 1);
326348
if ($childStatus === RunStatus::Completed) {
327-
$current = $workflowExecution->send(ChildRunHistory::outputForChildRun($childRun));
349+
$current = $workflowExecution->send(
350+
ChildRunHistory::outputForChildRun($childRun),
351+
$childRun->closed_at,
352+
);
328353
} else {
329-
$current = $workflowExecution->throw(ChildRunHistory::exceptionForChildRun($childRun));
354+
$current = $workflowExecution->throw(
355+
ChildRunHistory::exceptionForChildRun($childRun),
356+
$childRun->closed_at,
357+
);
330358
}
331359

332360
++$sequence;
@@ -528,7 +556,10 @@ public function replayState(WorkflowRun $run): ReplayState
528556

529557
if ($failure !== null) {
530558
$this->syncWorkflowCursor($workflow, $sequence + $groupSize);
531-
$current = $workflowExecution->throw($failure['exception']);
559+
$failureTime = isset($failure['recorded_at']) && is_int($failure['recorded_at']) && $failure['recorded_at'] !== PHP_INT_MAX
560+
? \Carbon\Carbon::createFromTimestampMs($failure['recorded_at'])
561+
: null;
562+
$current = $workflowExecution->throw($failure['exception'], $failureTime);
532563
$sequence += $groupSize;
533564

534565
continue;

src/V2/Support/WorkflowExecution.php

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Workflow\V2\Support;
66

7+
use Carbon\CarbonInterface;
78
use Fiber;
89
use Throwable;
910
use Workflow\V2\Exceptions\StraightLineWorkflowRequiredException;
@@ -21,13 +22,14 @@ private function __construct(
2122
/**
2223
* @param array<int|string, mixed> $arguments
2324
*/
24-
public static function start(Workflow $workflow, array $arguments): self
25+
public static function start(Workflow $workflow, array $arguments, ?CarbonInterface $eventTime = null): self
2526
{
2627
$entryMethod = EntryMethod::forWorkflow($workflow);
2728

2829
return self::startCallback(
2930
static fn (): mixed => $workflow->{$entryMethod->getName()}(...$arguments),
3031
straightLineError: StraightLineWorkflowRequiredException::forWorkflow($workflow::class),
32+
eventTime: $eventTime,
3133
);
3234
}
3335

@@ -38,6 +40,7 @@ public static function startCallback(
3840
callable $callback,
3941
array $arguments = [],
4042
?StraightLineWorkflowRequiredException $straightLineError = null,
43+
?CarbonInterface $eventTime = null,
4144
): self {
4245
$fiber = new Fiber(static function () use ($callback, $arguments): mixed {
4346
WorkflowFiberContext::enter();
@@ -49,6 +52,10 @@ public static function startCallback(
4952
}
5053
});
5154

55+
if ($eventTime !== null) {
56+
WorkflowFiberContext::setTime($eventTime, $fiber);
57+
}
58+
5259
$current = $fiber->start();
5360

5461
if ($fiber->isSuspended()) {
@@ -69,6 +76,11 @@ public function current(): mixed
6976
return $this->current;
7077
}
7178

79+
public function fiber(): ?Fiber
80+
{
81+
return $this->fiber;
82+
}
83+
7284
public function valid(): bool
7385
{
7486
if ($this->fiber instanceof Fiber) {
@@ -78,12 +90,16 @@ public function valid(): bool
7890
return false;
7991
}
8092

81-
public function send(mixed $value): mixed
93+
public function send(mixed $value, ?CarbonInterface $eventTime = null): mixed
8294
{
8395
if (! $this->fiber instanceof Fiber) {
8496
return null;
8597
}
8698

99+
if ($eventTime !== null) {
100+
WorkflowFiberContext::setTime($eventTime, $this->fiber);
101+
}
102+
87103
$result = $this->fiber->resume($value);
88104

89105
if ($this->fiber->isSuspended()) {
@@ -98,12 +114,16 @@ public function send(mixed $value): mixed
98114
return null;
99115
}
100116

101-
public function throw(Throwable $throwable): mixed
117+
public function throw(Throwable $throwable, ?CarbonInterface $eventTime = null): mixed
102118
{
103119
if (! $this->fiber instanceof Fiber) {
104120
throw $throwable;
105121
}
106122

123+
if ($eventTime !== null) {
124+
WorkflowFiberContext::setTime($eventTime, $this->fiber);
125+
}
126+
107127
$result = $this->fiber->throw($throwable);
108128

109129
if ($this->fiber->isSuspended()) {

0 commit comments

Comments
 (0)