Skip to content

Commit 9b12d96

Browse files
#300: add Workflow::now() — deterministic workflow time
WorkflowFiberContext: stores per-fiber deterministic time, set by the executor from history event timestamps before resuming. WorkflowExecutor: syncWorkflowCursor accepts optional eventTime and updates the fiber context. Initial time set from run started_at. Workflow::now() and Workflow\V2\now() expose the deterministic time. Outside a workflow fiber, falls back to wall-clock now().
1 parent e20ba9a commit 9b12d96

4 files changed

Lines changed: 71 additions & 4 deletions

File tree

src/V2/Support/WorkflowExecutor.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
9797
$current = $workflowExecution->current();
9898

9999
$sequence = 1;
100-
$this->syncWorkflowCursor($workflow, $sequence);
100+
$this->syncWorkflowCursor($workflow, $sequence, $run->started_at);
101101
$historySequenceAtTaskStart = $run->last_history_sequence ?? 0;
102102

103103
while (true) {
@@ -116,7 +116,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
116116

117117
if (! $workflowExecution->valid()) {
118118
try {
119-
$this->syncWorkflowCursor($workflow, $sequence);
119+
$this->syncWorkflowCursor($workflow, $sequence, $run->started_at);
120120
$this->completeRun($run, $task, $workflowExecution->getReturn());
121121
} catch (Throwable $throwable) {
122122
$this->failRun($run, $task, $throwable, 'workflow_run', $run->id);
@@ -862,7 +862,7 @@ public function run(WorkflowRun $run, WorkflowTask $task): ?WorkflowTask
862862

863863
if ($groupSize === 0) {
864864
try {
865-
$this->syncWorkflowCursor($workflow, $sequence);
865+
$this->syncWorkflowCursor($workflow, $sequence, $run->started_at);
866866
$current = $workflowExecution->send($current->nestedResults([]));
867867
} catch (Throwable $throwable) {
868868
$this->failRun($run, $task, $throwable, 'workflow_run', $run->id);
@@ -3942,7 +3942,7 @@ private function cancelSignalTimeout(WorkflowRun $run, WorkflowTask $task, Workf
39423942
$this->cancelConditionTimeout($run, $task, $timer);
39433943
}
39443944

3945-
private function syncWorkflowCursor(Workflow $workflow, int $visibleSequence): void
3945+
private function syncWorkflowCursor(Workflow $workflow, int $visibleSequence, ?\Carbon\CarbonInterface $eventTime = null): void
39463946
{
39473947
$workflow->syncExecutionCursor($visibleSequence);
39483948
$workflow->setCommandDispatchEnabled(true);

src/V2/Support/WorkflowFiberContext.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Workflow\V2\Support;
66

7+
use Carbon\CarbonInterface;
78
use Fiber;
89
use LogicException;
910

@@ -14,6 +15,14 @@ final class WorkflowFiberContext
1415
*/
1516
private static array $activeFibers = [];
1617

18+
/**
19+
* Deterministic workflow time per fiber, set by the executor from the
20+
* latest history event recorded_at before resuming the workflow.
21+
*
22+
* @var array<int, CarbonInterface>
23+
*/
24+
private static array $workflowTime = [];
25+
1726
public static function enter(): void
1827
{
1928
$fiber = Fiber::getCurrent();
@@ -34,6 +43,7 @@ public static function leave(): void
3443
}
3544

3645
unset(self::$activeFibers[spl_object_id($fiber)]);
46+
unset(self::$workflowTime[spl_object_id($fiber)]);
3747
}
3848

3949
public static function active(): bool
@@ -47,6 +57,36 @@ public static function active(): bool
4757
return isset(self::$activeFibers[spl_object_id($fiber)]);
4858
}
4959

60+
/**
61+
* Set the deterministic workflow time for the current fiber.
62+
*/
63+
public static function setTime(CarbonInterface $time): void
64+
{
65+
$fiber = Fiber::getCurrent();
66+
67+
if ($fiber instanceof Fiber) {
68+
self::$workflowTime[spl_object_id($fiber)] = $time;
69+
}
70+
}
71+
72+
/**
73+
* Read the deterministic workflow time for the current fiber.
74+
*
75+
* Returns the timestamp of the last history event the executor replayed
76+
* before resuming this fiber. Outside a workflow context, falls back to
77+
* wall-clock time via now().
78+
*/
79+
public static function getTime(): CarbonInterface
80+
{
81+
$fiber = Fiber::getCurrent();
82+
83+
if ($fiber instanceof Fiber && isset(self::$workflowTime[spl_object_id($fiber)])) {
84+
return self::$workflowTime[spl_object_id($fiber)];
85+
}
86+
87+
return now();
88+
}
89+
5090
public static function suspend(mixed $call): mixed
5191
{
5292
if (! self::active()) {

src/V2/Workflow.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,19 @@ public function setCommandDispatchEnabled(bool $enabled): void
190190
// These are pure delegates. Behaviour, determinism guarantees, and
191191
// operand/return types come from the underlying helper functions.
192192

193+
/**
194+
* Read the deterministic workflow time.
195+
*
196+
* Inside a workflow fiber, returns the timestamp of the last history
197+
* event the executor replayed. Outside a workflow, returns wall-clock.
198+
*
199+
* @see \Workflow\V2\now()
200+
*/
201+
public static function now(): \Carbon\CarbonInterface
202+
{
203+
return \Workflow\V2\Support\WorkflowFiberContext::getTime();
204+
}
205+
193206
/**
194207
* Invoke an activity and wait for its result.
195208
*

src/V2/functions.php

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,3 +268,17 @@ function years(int $years): mixed
268268
return timer("{$years} years");
269269
}
270270
}
271+
272+
if (! function_exists(__NAMESPACE__ . '\\now')) {
273+
/**
274+
* Read the deterministic workflow time.
275+
*
276+
* Inside a workflow fiber, returns the timestamp of the last history
277+
* event the executor replayed before resuming.
278+
* Outside a workflow fiber, falls back to wall-clock time.
279+
*/
280+
function now(): \Carbon\CarbonInterface
281+
{
282+
return WorkflowFiberContext::getTime();
283+
}
284+
}

0 commit comments

Comments
 (0)