Skip to content

Commit 973ed97

Browse files
Own child workflow namespace projection
1 parent bb0654e commit 973ed97

8 files changed

Lines changed: 405 additions & 10 deletions

src/Providers/WorkflowServiceProvider.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@
3030
use Workflow\V2\Contracts\WorkflowControlPlane;
3131
use Workflow\V2\Contracts\WorkflowTaskBridge;
3232
use Workflow\V2\Models\WorkflowHistoryEvent;
33+
use Workflow\V2\Models\WorkflowLink;
34+
use Workflow\V2\Models\WorkflowRunLineageEntry;
3335
use Workflow\V2\Models\WorkflowTask;
3436
use Workflow\V2\Observers\WorkflowHistoryEventObserver;
37+
use Workflow\V2\Observers\WorkflowLinkObserver;
38+
use Workflow\V2\Observers\WorkflowRunLineageEntryObserver;
3539
use Workflow\V2\Observers\WorkflowTaskObserver;
3640
use Workflow\V2\Support\CacheLongPollWakeStore;
3741
use Workflow\V2\Support\DefaultActivityTaskBridge;
@@ -120,8 +124,10 @@ public function boot(): void
120124
*/
121125
private function registerLongPollObservers(): void
122126
{
123-
// Register observers for long-poll wake signals
127+
// Register projection and long-poll wake observers.
124128
// Note: Laravel's observe() is idempotent - calling it multiple times is safe
129+
WorkflowLink::observe(WorkflowLinkObserver::class);
130+
WorkflowRunLineageEntry::observe(WorkflowRunLineageEntryObserver::class);
125131
WorkflowTask::observe(WorkflowTaskObserver::class);
126132
WorkflowHistoryEvent::observe(WorkflowHistoryEventObserver::class);
127133
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Observers;
6+
7+
use Workflow\V2\Models\WorkflowLink;
8+
use Workflow\V2\Support\ChildWorkflowNamespaceProjection;
9+
10+
class WorkflowLinkObserver
11+
{
12+
public function __construct(
13+
private readonly ChildWorkflowNamespaceProjection $projection,
14+
) {}
15+
16+
public function created(WorkflowLink $link): void
17+
{
18+
$this->projection->projectLink($link);
19+
}
20+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Observers;
6+
7+
use Workflow\V2\Models\WorkflowRunLineageEntry;
8+
use Workflow\V2\Support\ChildWorkflowNamespaceProjection;
9+
10+
class WorkflowRunLineageEntryObserver
11+
{
12+
public function __construct(
13+
private readonly ChildWorkflowNamespaceProjection $projection,
14+
) {}
15+
16+
public function created(WorkflowRunLineageEntry $entry): void
17+
{
18+
$this->projection->projectLineageEntry($entry);
19+
}
20+
21+
public function updated(WorkflowRunLineageEntry $entry): void
22+
{
23+
$this->projection->projectLineageEntry($entry);
24+
}
25+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Support;
6+
7+
use Illuminate\Database\Eloquent\Builder;
8+
use Workflow\V2\Contracts\LongPollWakeStore;
9+
use Workflow\V2\Enums\TaskType;
10+
use Workflow\V2\Models\WorkflowInstance;
11+
use Workflow\V2\Models\WorkflowLink;
12+
use Workflow\V2\Models\WorkflowRun;
13+
use Workflow\V2\Models\WorkflowRunLineageEntry;
14+
use Workflow\V2\Models\WorkflowTask;
15+
16+
/**
17+
* @api Stable class surface consumed by the standalone workflow-server.
18+
*/
19+
final class ChildWorkflowNamespaceProjection
20+
{
21+
public function __construct(
22+
private readonly LongPollWakeStore $wakeStore,
23+
) {}
24+
25+
public function projectLink(WorkflowLink $link): void
26+
{
27+
if ($link->link_type !== 'child_workflow') {
28+
return;
29+
}
30+
31+
$this->bindChildWorkflow(
32+
self::stringValue($link->parent_workflow_instance_id ?? null),
33+
self::stringValue($link->parent_workflow_run_id ?? null),
34+
self::stringValue($link->child_workflow_instance_id ?? null),
35+
self::stringValue($link->child_workflow_run_id ?? null),
36+
);
37+
}
38+
39+
public function projectLineageEntry(WorkflowRunLineageEntry $entry): void
40+
{
41+
if ($entry->direction !== 'child' || $entry->link_type !== 'child_workflow') {
42+
return;
43+
}
44+
45+
$this->bindChildWorkflow(
46+
self::stringValue($entry->workflow_instance_id ?? null),
47+
self::stringValue($entry->workflow_run_id ?? null),
48+
self::stringValue($entry->related_workflow_instance_id ?? null),
49+
self::stringValue($entry->related_workflow_run_id ?? null),
50+
);
51+
}
52+
53+
private function bindChildWorkflow(
54+
?string $parentInstanceId,
55+
?string $parentRunId,
56+
?string $childInstanceId,
57+
?string $childRunId,
58+
): void {
59+
if ($childInstanceId === null) {
60+
return;
61+
}
62+
63+
$namespace = $this->namespaceForParent($parentInstanceId, $parentRunId);
64+
65+
if ($namespace === null) {
66+
return;
67+
}
68+
69+
$updated = 0;
70+
71+
/** @var class-string<WorkflowInstance> $instanceModel */
72+
$instanceModel = ConfiguredV2Models::resolve('instance_model', WorkflowInstance::class);
73+
$updated += $instanceModel::query()
74+
->whereKey($childInstanceId)
75+
->where(static function (Builder $query): void {
76+
$query->whereNull('namespace')
77+
->orWhere('namespace', '');
78+
})
79+
->update(['namespace' => $namespace]);
80+
81+
if ($childRunId !== null) {
82+
/** @var class-string<WorkflowRun> $runModel */
83+
$runModel = ConfiguredV2Models::resolve('run_model', WorkflowRun::class);
84+
$updated += $runModel::query()
85+
->whereKey($childRunId)
86+
->where(static function (Builder $query): void {
87+
$query->whereNull('namespace')
88+
->orWhere('namespace', '');
89+
})
90+
->update(['namespace' => $namespace]);
91+
92+
/** @var class-string<WorkflowTask> $taskModel */
93+
$taskModel = ConfiguredV2Models::resolve('task_model', WorkflowTask::class);
94+
$updated += $taskModel::query()
95+
->where('workflow_run_id', $childRunId)
96+
->where(static function (Builder $query): void {
97+
$query->whereNull('namespace')
98+
->orWhere('namespace', '');
99+
})
100+
->update(['namespace' => $namespace]);
101+
}
102+
103+
if ($updated > 0) {
104+
$this->signalChildWorkflowTasks($childRunId, $namespace);
105+
}
106+
}
107+
108+
private function namespaceForParent(?string $parentInstanceId, ?string $parentRunId): ?string
109+
{
110+
/** @var class-string<WorkflowRun> $runModel */
111+
$runModel = ConfiguredV2Models::resolve('run_model', WorkflowRun::class);
112+
113+
if ($parentRunId !== null) {
114+
$namespace = $runModel::query()
115+
->whereKey($parentRunId)
116+
->value('namespace');
117+
118+
if (is_string($namespace) && $namespace !== '') {
119+
return $namespace;
120+
}
121+
}
122+
123+
if ($parentInstanceId === null) {
124+
return null;
125+
}
126+
127+
/** @var class-string<WorkflowInstance> $instanceModel */
128+
$instanceModel = ConfiguredV2Models::resolve('instance_model', WorkflowInstance::class);
129+
130+
$namespace = $instanceModel::query()
131+
->whereKey($parentInstanceId)
132+
->value('namespace');
133+
134+
return is_string($namespace) && $namespace !== ''
135+
? $namespace
136+
: null;
137+
}
138+
139+
private function signalChildWorkflowTasks(?string $childRunId, string $namespace): void
140+
{
141+
if ($childRunId === null) {
142+
return;
143+
}
144+
145+
/** @var class-string<WorkflowTask> $taskModel */
146+
$taskModel = ConfiguredV2Models::resolve('task_model', WorkflowTask::class);
147+
148+
/** @var iterable<int, WorkflowTask> $tasks */
149+
$tasks = $taskModel::query()
150+
->where('workflow_run_id', $childRunId)
151+
->get();
152+
153+
foreach ($tasks as $task) {
154+
$taskType = $task->task_type instanceof TaskType
155+
? $task->task_type->value
156+
: (string) $task->task_type;
157+
158+
$channels = $taskType === TaskType::Activity->value
159+
? $this->wakeStore->activityTaskPollChannels($namespace, $task->connection, $task->queue)
160+
: $this->wakeStore->workflowTaskPollChannels($namespace, $task->connection, $task->queue);
161+
162+
$this->wakeStore->signal(...$channels);
163+
}
164+
}
165+
166+
private static function stringValue(mixed $value): ?string
167+
{
168+
return is_string($value) && $value !== ''
169+
? $value
170+
: null;
171+
}
172+
}

src/V2/Support/DefaultWorkflowTaskBridge.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,6 +1007,7 @@ private function applyStartChildWorkflow(
10071007
$childInstance = WorkflowInstance::query()->create([
10081008
'workflow_class' => $workflowType,
10091009
'workflow_type' => $workflowType,
1010+
'namespace' => $run->namespace,
10101011
'reserved_at' => $now,
10111012
'started_at' => $now,
10121013
'run_count' => 1,
@@ -1018,6 +1019,7 @@ private function applyStartChildWorkflow(
10181019
'run_number' => 1,
10191020
'workflow_class' => $workflowType,
10201021
'workflow_type' => $workflowType,
1022+
'namespace' => $run->namespace,
10211023
'status' => RunStatus::Pending->value,
10221024
'compatibility' => $run->compatibility ?? WorkerCompatibility::current(),
10231025
'payload_codec' => $run->payload_codec ?? CodecRegistry::defaultCodec(),
@@ -1207,6 +1209,7 @@ private function applyContinueAsNew(
12071209
'run_number' => $run->run_number + 1,
12081210
'workflow_class' => $workflowType,
12091211
'workflow_type' => $workflowType,
1212+
'namespace' => $run->namespace,
12101213
'business_key' => $run->business_key,
12111214
'visibility_labels' => $run->visibility_labels,
12121215
'memo' => $run->memo,

src/V2/Support/WorkflowExecutor.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,6 +1506,7 @@ private function scheduleChildWorkflow(
15061506
$childInstance = WorkflowInstance::query()->create([
15071507
'workflow_class' => $childWorkflowCall->workflow,
15081508
'workflow_type' => $workflowType,
1509+
'namespace' => $run->namespace,
15091510
'reserved_at' => $now,
15101511
'started_at' => $now,
15111512
'run_count' => 1,
@@ -1517,6 +1518,7 @@ private function scheduleChildWorkflow(
15171518
'run_number' => 1,
15181519
'workflow_class' => $childWorkflowCall->workflow,
15191520
'workflow_type' => $workflowType,
1521+
'namespace' => $run->namespace,
15201522
'business_key' => null,
15211523
'visibility_labels' => null,
15221524
'status' => RunStatus::Pending->value,
@@ -2185,6 +2187,7 @@ private function continueAsNew(
21852187
'run_number' => $run->run_number + 1,
21862188
'workflow_class' => $workflowClass,
21872189
'workflow_type' => $run->workflow_type,
2190+
'namespace' => $run->namespace,
21882191
'business_key' => $run->business_key,
21892192
'visibility_labels' => $run->visibility_labels,
21902193
'memo' => $run->memo,

0 commit comments

Comments
 (0)