Skip to content

Commit 22d19b7

Browse files
Honor start command context in control plane
1 parent d6831f3 commit 22d19b7

4 files changed

Lines changed: 60 additions & 7 deletions

File tree

src/V2/Contracts/WorkflowControlPlane.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ interface WorkflowControlPlane
3636
* - business_key: string|null — caller-supplied business key
3737
* - labels: array<string, string>|null — visibility labels
3838
* - memo: array<string, mixed>|null — non-indexed metadata
39+
* - command_context: \Workflow\V2\CommandContext|null — recorded command attribution/context
3940
* - duplicate_start_policy: 'reject_duplicate'|'return_existing_active'
4041
*
4142
* @return array{

src/V2/Support/CacheLongPollWakeStore.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public function signalTask(WorkflowTask $task): void
143143
default => [],
144144
};
145145

146-
$this->signal(...$channels);
146+
$this->signal(...$this->normalizeChannels($channels));
147147
}
148148

149149
/**
@@ -206,7 +206,7 @@ private function signalTaskWithNamespace(WorkflowTask $task, ?string $namespace)
206206
default => [],
207207
};
208208

209-
$this->signal(...$channels);
209+
$this->signal(...$this->normalizeChannels($channels));
210210
}
211211

212212
private function namespaceForTask(WorkflowTask $task): ?string

src/V2/Support/DefaultWorkflowControlPlane.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
4646
$memo = $options['memo'] ?? null;
4747
$searchAttributes = $options['search_attributes'] ?? null;
4848
$namespace = $this->resolveNamespace($options);
49+
$commandContext = $this->commandContext($options);
4950
$executionTimeoutSeconds = isset($options['execution_timeout_seconds']) ? (int) $options['execution_timeout_seconds'] : null;
5051
$runTimeoutSeconds = isset($options['run_timeout_seconds']) ? (int) $options['run_timeout_seconds'] : null;
5152
$duplicatePolicy = ($options['duplicate_start_policy'] ?? null) === 'return_existing_active'
@@ -72,6 +73,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
7273
$memo,
7374
$searchAttributes,
7475
$namespace,
76+
$commandContext,
7577
$executionTimeoutSeconds,
7678
$runTimeoutSeconds,
7779
$duplicatePolicy,
@@ -93,7 +95,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
9395
true
9496
);
9597

96-
$command = WorkflowCommand::record($instance, $currentRun, $this->commandAttributes([
98+
$command = WorkflowCommand::record($instance, $currentRun, $this->commandAttributes($commandContext, [
9799
'command_type' => CommandType::Start->value,
98100
'target_scope' => 'instance',
99101
'status' => $canReturnExisting
@@ -193,7 +195,7 @@ public function start(string $workflowType, ?string $instanceId = null, array $o
193195
'last_history_sequence' => 0,
194196
]);
195197

196-
$command = WorkflowCommand::record($instance, $run, $this->commandAttributes([
198+
$command = WorkflowCommand::record($instance, $run, $this->commandAttributes($commandContext, [
197199
'command_type' => CommandType::Start->value,
198200
'target_scope' => 'instance',
199201
'status' => CommandStatus::Accepted->value,
@@ -797,9 +799,9 @@ private function classPropertyDefault(string $workflowClass, string $property):
797799
* @param array<string, mixed> $attributes
798800
* @return array<string, mixed>
799801
*/
800-
private function commandAttributes(array $attributes): array
802+
private function commandAttributes(CommandContext $commandContext, array $attributes): array
801803
{
802-
return array_merge(CommandContext::controlPlane()->attributes(), $attributes);
804+
return array_merge($commandContext->attributes(), $attributes);
803805
}
804806

805807
/**

tests/Feature/V2/V2WorkflowControlPlaneTest.php

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,19 +133,69 @@ public function testStartWithAutoGeneratedInstanceId(): void
133133
$this->assertNotNull($result['workflow_run_id']);
134134
}
135135

136+
public function testStartSupportsCommandContext(): void
137+
{
138+
$result = $this->controlPlane->start('remote-workflow-type', 'ctrl-plane-start-context-1', [
139+
'connection' => 'redis',
140+
'queue' => 'default',
141+
'command_context' => CommandContext::controlPlane()->with([
142+
'caller' => [
143+
'type' => 'server',
144+
'label' => 'Standalone Server',
145+
],
146+
'server' => [
147+
'namespace' => 'default',
148+
'command' => 'start',
149+
],
150+
]),
151+
]);
152+
153+
$this->assertTrue($result['started']);
154+
155+
$command = WorkflowCommand::query()
156+
->where('workflow_instance_id', 'ctrl-plane-start-context-1')
157+
->where('outcome', 'started_new')
158+
->firstOrFail();
159+
160+
$this->assertSame('control_plane', $command->source);
161+
$this->assertSame('server', $command->commandContext()['caller']['type'] ?? null);
162+
$this->assertSame('Standalone Server', $command->commandContext()['caller']['label'] ?? null);
163+
$this->assertSame('default', $command->commandContext()['server']['namespace'] ?? null);
164+
$this->assertSame('start', $command->commandContext()['server']['command'] ?? null);
165+
}
166+
136167
public function testStartRejectsDuplicate(): void
137168
{
138169
$this->controlPlane->start('remote-workflow-type', 'ctrl-plane-dup-1', [
139170
'connection' => 'redis',
140171
'queue' => 'default',
141172
]);
142173

143-
$result = $this->controlPlane->start('remote-workflow-type', 'ctrl-plane-dup-1');
174+
$result = $this->controlPlane->start('remote-workflow-type', 'ctrl-plane-dup-1', [
175+
'command_context' => CommandContext::controlPlane()->with([
176+
'caller' => [
177+
'type' => 'server',
178+
'label' => 'Standalone Server',
179+
],
180+
'server' => [
181+
'namespace' => 'default',
182+
'command' => 'start',
183+
],
184+
]),
185+
]);
144186

145187
$this->assertFalse($result['started']);
146188
$this->assertSame('ctrl-plane-dup-1', $result['workflow_instance_id']);
147189
$this->assertSame('rejected_duplicate', $result['outcome']);
148190
$this->assertSame('instance_already_started', $result['reason']);
191+
192+
$command = WorkflowCommand::query()
193+
->where('workflow_instance_id', 'ctrl-plane-dup-1')
194+
->where('outcome', 'rejected_duplicate')
195+
->firstOrFail();
196+
197+
$this->assertSame('server', $command->commandContext()['caller']['type'] ?? null);
198+
$this->assertSame('default', $command->commandContext()['server']['namespace'] ?? null);
149199
}
150200

151201
public function testStartReturnExistingActive(): void

0 commit comments

Comments
 (0)