Skip to content

Commit 6a4202e

Browse files
Conformance blocker: PHP signal/query workflows must advertise declared queries to the server (#607)
1 parent d1e0f35 commit 6a4202e

4 files changed

Lines changed: 245 additions & 6 deletions

File tree

docs/api-stability.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,11 @@ calls can send the required `lease_owner`, `workflow_task_attempt`,
216216
replays the supplied history export in query mode, validates query targets
217217
and arguments against the same contract as `WorkflowStub::queryWithArguments`,
218218
and returns either an encoded result envelope or a typed query failure for
219-
`WorkerProtocolClient` to complete or fail the query task.
219+
`WorkerProtocolClient` to complete or fail the query task. Standalone workers
220+
that register external workflow type keys can construct it with a
221+
`workflow_type => workflow_class` map; the executor uses that map to attach the
222+
PHP class and declared signal/query/update contract to service-mode query-task
223+
history before replay.
220224
Embedded package installs that need the `/webhooks` bridge contract must opt
221225
into embedded bridge mode explicitly; in that mode `poll*` methods return
222226
ready task opportunities as `tasks` lists and workers explicitly claim a task

docs/architecture/query-and-live-debug.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ used by workflow and activity task polling.
190190
`WorkerProtocolClient::registerWorker()` advertises `query_tasks` by
191191
default for standalone PHP workers that register workflow types, and
192192
`pollQueryTasks()` sends a stable `poll_request_id` for every query
193-
poll attempt.
193+
poll attempt. When the standalone server only knows an external workflow type
194+
key, PHP workers should pass the same `workflow_type => workflow_class` registry
195+
to `WorkflowQueryTaskExecutor` so query-task replay can see the class's declared
196+
queries before invoking the handler.
194197

195198
Each leased query task carries `query_task_id`,
196199
`query_task_attempt`, `lease_owner`, `workflow_id`, `run_id`,

src/V2/Worker/WorkflowQueryTaskExecutor.php

Lines changed: 161 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
use Workflow\V2\Support\ExternalPayloads;
1515
use Workflow\V2\Support\HistoryExport;
1616
use Workflow\V2\Support\QueryStateReplayer;
17-
use Workflow\V2\Support\TypeRegistry;
1817
use Workflow\V2\Support\WorkerProtocolVersion;
18+
use Workflow\V2\Support\WorkflowDefinition;
1919
use Workflow\V2\Support\WorkflowQueryContract;
2020
use Workflow\V2\Support\WorkflowReplayer;
21+
use Workflow\V2\Workflow;
2122

2223
/**
2324
* Executes server-routed query tasks inside a standalone PHP worker process.
@@ -32,6 +33,19 @@ final class WorkflowQueryTaskExecutor
3233
{
3334
public const CAPABILITY = WorkerProtocolVersion::CAPABILITY_QUERY_TASKS;
3435

36+
/**
37+
* @var array<string, class-string<Workflow>>
38+
*/
39+
private readonly array $workflowClassesByType;
40+
41+
/**
42+
* @param array<string, class-string<Workflow>> $workflowClassesByType
43+
*/
44+
public function __construct(array $workflowClassesByType = [])
45+
{
46+
$this->workflowClassesByType = $this->normalizeWorkflowClasses($workflowClassesByType);
47+
}
48+
3549
/**
3650
* @param array<string, mixed> $task
3751
* @return array<string, mixed>
@@ -111,10 +125,153 @@ public function execute(array $task): array
111125
private function runFromTask(array $task): WorkflowRun
112126
{
113127
$historyExport = $task['history_export'] ?? null;
128+
$historyExport = is_array($historyExport) ? $historyExport : $this->historyExportFromTask($task);
114129

115-
return (new WorkflowReplayer())->runFromHistoryExport(
116-
is_array($historyExport) ? $historyExport : $this->historyExportFromTask($task),
117-
);
130+
$workflowType = $this->workflowTypeFromHistoryExport($historyExport)
131+
?? $this->stringValue($task['workflow_type'] ?? null);
132+
$workflowClass = $workflowType !== null
133+
? ($this->workflowClassesByType[$workflowType] ?? null)
134+
: null;
135+
136+
if ($workflowClass !== null) {
137+
$historyExport = $this->historyExportWithWorkflowClass($historyExport, $workflowType, $workflowClass);
138+
}
139+
140+
return (new WorkflowReplayer())->runFromHistoryExport($historyExport);
141+
}
142+
143+
/**
144+
* @param array<string, mixed> $historyExport
145+
* @param class-string<Workflow> $workflowClass
146+
* @return array<string, mixed>
147+
*/
148+
private function historyExportWithWorkflowClass(
149+
array $historyExport,
150+
string $workflowType,
151+
string $workflowClass,
152+
): array {
153+
$workflow = is_array($historyExport['workflow'] ?? null)
154+
? $historyExport['workflow']
155+
: [];
156+
$workflow['workflow_type'] = $workflowType;
157+
$workflow['workflow_class'] = $workflowClass;
158+
$historyExport['workflow'] = $workflow;
159+
160+
$contract = WorkflowDefinition::commandContract($workflowClass);
161+
$fingerprint = WorkflowDefinition::fingerprint($workflowClass);
162+
$events = is_array($historyExport['history_events'] ?? null)
163+
? $historyExport['history_events']
164+
: [];
165+
166+
foreach ($events as $index => $event) {
167+
if (! is_array($event) || $this->historyEventType($event) !== 'WorkflowStarted') {
168+
continue;
169+
}
170+
171+
$payload = is_array($event['payload'] ?? null) ? $event['payload'] : [];
172+
$payload['workflow_class'] = $workflowClass;
173+
$payload['workflow_type'] = $workflowType;
174+
175+
if ($fingerprint !== null) {
176+
$payload['workflow_definition_fingerprint'] = $fingerprint;
177+
}
178+
179+
if (! $this->hasStrictDeclaredContract($payload)) {
180+
$payload['declared_queries'] = $contract['queries'];
181+
$payload['declared_query_contracts'] = $contract['query_contracts'];
182+
$payload['declared_signals'] = $contract['signals'];
183+
$payload['declared_signal_contracts'] = $contract['signal_contracts'];
184+
$payload['declared_updates'] = $contract['updates'];
185+
$payload['declared_update_contracts'] = $contract['update_contracts'];
186+
$payload['declared_entry_method'] = $contract['entry_method'];
187+
$payload['declared_entry_mode'] = $contract['entry_mode'];
188+
$payload['declared_entry_declaring_class'] = $contract['entry_declaring_class'];
189+
}
190+
191+
$event['payload'] = $payload;
192+
$events[$index] = $event;
193+
}
194+
195+
$historyExport['history_events'] = $events;
196+
197+
return $historyExport;
198+
}
199+
200+
/**
201+
* @param array<string, class-string<Workflow>> $workflowClassesByType
202+
* @return array<string, class-string<Workflow>>
203+
*/
204+
private function normalizeWorkflowClasses(array $workflowClassesByType): array
205+
{
206+
$normalized = [];
207+
208+
foreach ($workflowClassesByType as $workflowType => $workflowClass) {
209+
if (! is_string($workflowType) || trim($workflowType) === '') {
210+
throw new LogicException(
211+
'Workflow query task executor registry keys must be non-empty workflow types.',
212+
);
213+
}
214+
215+
if (! is_string($workflowClass) || ! is_subclass_of($workflowClass, Workflow::class)) {
216+
throw new LogicException(sprintf(
217+
'Workflow query task executor registry entry [%s] must point to a loadable %s subclass.',
218+
trim($workflowType),
219+
Workflow::class,
220+
));
221+
}
222+
223+
/** @var class-string<Workflow> $workflowClass */
224+
$normalized[trim($workflowType)] = $workflowClass;
225+
}
226+
227+
ksort($normalized);
228+
229+
return $normalized;
230+
}
231+
232+
/**
233+
* @param array<string, mixed> $historyExport
234+
*/
235+
private function workflowTypeFromHistoryExport(array $historyExport): ?string
236+
{
237+
$workflow = is_array($historyExport['workflow'] ?? null)
238+
? $historyExport['workflow']
239+
: [];
240+
241+
return $this->stringValue($workflow['workflow_type'] ?? null);
242+
}
243+
244+
/**
245+
* @param array<string, mixed> $event
246+
*/
247+
private function historyEventType(array $event): ?string
248+
{
249+
return $this->stringValue($event['type'] ?? null)
250+
?? $this->stringValue($event['event_type'] ?? null);
251+
}
252+
253+
/**
254+
* @param array<string, mixed> $payload
255+
*/
256+
private function hasStrictDeclaredContract(array $payload): bool
257+
{
258+
foreach ([
259+
'declared_queries',
260+
'declared_query_contracts',
261+
'declared_signals',
262+
'declared_signal_contracts',
263+
'declared_updates',
264+
'declared_update_contracts',
265+
'declared_entry_method',
266+
'declared_entry_mode',
267+
'declared_entry_declaring_class',
268+
] as $key) {
269+
if (! array_key_exists($key, $payload)) {
270+
return false;
271+
}
272+
}
273+
274+
return true;
118275
}
119276

120277
/**

tests/Unit/V2/WorkflowQueryTaskExecutorTest.php

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,50 @@ public function testExecutorReplaysHistoryExportAndCompletesKnownQuery(): void
2929
);
3030
}
3131

32+
public function testExecutorUsesRegisteredExternalWorkflowClassForQueryDeclarations(): void
33+
{
34+
$result = (new WorkflowQueryTaskExecutor([
35+
'polyglot.php.signal-query' => TestQueryWorkflow::class,
36+
]))->execute($this->externalQueryTask());
37+
38+
$this->assertSame('completed', $result['outcome'] ?? null);
39+
$this->assertSame('waiting-for-name', $result['result'] ?? null);
40+
}
41+
42+
public function testExecutorReplaysSignalsForRegisteredExternalWorkflowClass(): void
43+
{
44+
$result = (new WorkflowQueryTaskExecutor([
45+
'polyglot.php.signal-query' => TestQueryWorkflow::class,
46+
]))->execute($this->externalQueryTask([
47+
'query_name' => 'events-starting-with',
48+
'query_arguments' => [
49+
'codec' => 'avro',
50+
'blob' => Serializer::serializeWithCodec('avro', [
51+
'prefix' => 'name:',
52+
]),
53+
],
54+
'history_export' => [
55+
'history_events' => [
56+
1 => [
57+
'id' => 'event-signal-applied',
58+
'sequence' => 2,
59+
'type' => HistoryEventType::SignalApplied->value,
60+
'payload' => [
61+
'sequence' => 1,
62+
'signal_name' => 'name-provided',
63+
'signal_wait_id' => 'external-name-provided',
64+
'value' => Serializer::serializeWithCodec('avro', 'Ada'),
65+
],
66+
'recorded_at' => '2026-05-17T00:01:00+00:00',
67+
],
68+
],
69+
],
70+
]));
71+
72+
$this->assertSame('completed', $result['outcome'] ?? null);
73+
$this->assertSame(1, $result['result'] ?? null);
74+
}
75+
3276
public function testExecutorFailsUnknownQueryWithoutThrowing(): void
3377
{
3478
$result = (new WorkflowQueryTaskExecutor())->execute($this->queryTask([
@@ -160,4 +204,35 @@ private function queryTask(array $overrides = []): array
160204
],
161205
], $overrides);
162206
}
207+
208+
/**
209+
* @param array<string, mixed> $overrides
210+
* @return array<string, mixed>
211+
*/
212+
private function externalQueryTask(array $overrides = []): array
213+
{
214+
return array_replace_recursive($this->queryTask([
215+
'workflow_type' => 'polyglot.php.signal-query',
216+
'workflow_class' => 'polyglot.php.signal-query',
217+
'history_export' => [
218+
'workflow' => [
219+
'workflow_type' => 'polyglot.php.signal-query',
220+
'workflow_class' => 'polyglot.php.signal-query',
221+
],
222+
'history_events' => [
223+
[
224+
'id' => 'event-started',
225+
'sequence' => 1,
226+
'type' => HistoryEventType::WorkflowStarted->value,
227+
'payload' => [
228+
'workflow_type' => 'polyglot.php.signal-query',
229+
'workflow_class' => 'polyglot.php.signal-query',
230+
'payload_codec' => 'avro',
231+
],
232+
'recorded_at' => '2026-05-17T00:00:00+00:00',
233+
],
234+
],
235+
],
236+
]), $overrides);
237+
}
163238
}

0 commit comments

Comments
 (0)