Skip to content

Commit 9e2f7de

Browse files
[cross-repo from server#313] Conformance blocker: expand signals and queries coverage beyond current smoke (#634)
1 parent 77e6eac commit 9e2f7de

7 files changed

Lines changed: 618 additions & 1 deletion

File tree

docs/api-stability.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,29 @@ host identity. The server uses the identifier to detect runs that have no
171171
PHP workflow code behind them, so terminal activity outcome and timeout
172172
paths close the host run instead of scheduling a workflow-task resume row.
173173

174+
## Control-plane SDK client
175+
176+
The PHP SDK exposes a stable HTTP client for application and operator
177+
processes that need to drive the standalone server control plane from PHP:
178+
179+
- `Workflow\V2\Client\ControlPlaneClient`
180+
- `Workflow\V2\Exceptions\ControlPlaneRequestException`
181+
182+
`ControlPlaneClient` covers workflow start, workflow describe,
183+
run describe, signal delivery, query execution, and cluster-info reads
184+
against `POST /api/workflows`, `GET /api/workflows/{workflowId}`,
185+
`GET /api/workflows/{workflowId}/runs/{runId}`,
186+
`POST /api/workflows/{workflowId}/signal/{signalName}`,
187+
`POST /api/workflows/{workflowId}/query/{queryName}`, and their current-run
188+
targeted `/runs/{runId}` variants. It sends the
189+
`X-Durable-Workflow-Control-Plane-Version` and `X-Namespace` headers on
190+
every request, accepts raw PHP argument arrays that the server resolves
191+
through the normal payload-envelope boundary, and returns the raw server
192+
JSON envelope so conformance harnesses can deep-equal CLI, Python SDK,
193+
and PHP SDK results. Non-success HTTP responses raise
194+
`ControlPlaneRequestException` with the HTTP status, decoded response body,
195+
and stable `reason` helper.
196+
174197
## Worker protocol SDK shims
175198

176199
The PHP SDK exposes a small stable worker-protocol surface for processes

docs/architecture/platform-conformance-suite.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ them from the declared locations.
7272
| --- | --- | --- | --- |
7373
| `control_plane_request_response` | `cli`, `sdk-python` | `tests/fixtures/control-plane/` | Frozen request bodies and response shapes for `workflow.start`, `signal`, `query`, `update`, `cancel`, `task-history`, namespace storage. |
7474
| `worker_task_lifecycle` | `cli`, `sdk-python`, `server` | `tests/fixtures/external-task-input/`, `tests/fixtures/external-task-result/` | Task input envelopes (poll → claim → run) and task result envelopes (complete, fail, cancel, heartbeat) used by every conforming worker. |
75-
| `signal_query_runtime_contract` | `workflow`, `server`, `cli`, `sdk-python`, `waterline` | `docs/architecture/platform-conformance-suite.md`, `docs/architecture/query-and-live-debug.md`, `tests/Feature/SignalReplayTest.php`, `tests/Feature/V2/V2QueryWorkflowTest.php`, `tests/Feature/WorkflowControlPlaneTest.php`, `tests/Feature/WorkflowQueryTaskBrokerTest.php`, `tests/Commands/`, `tests/test_signals.py`, `tests/test_queries.py`, `tests/test_worker.py`, `CONFORMANCE.md` | Live published-artifact scenarios for signal delivery and query consistency across PHP and Python workers, CLI and SDK clients, replay timing, terminal runs, malformed payloads, and operator visibility. |
75+
| `signal_query_runtime_contract` | `workflow`, `server`, `cli`, `sdk-python`, `waterline` | `docs/architecture/platform-conformance-suite.md`, `docs/architecture/query-and-live-debug.md`, `src/V2/Client/ControlPlaneClient.php`, `tests/Unit/V2/ControlPlaneClientTest.php`, `tests/Feature/SignalReplayTest.php`, `tests/Feature/V2/V2QueryWorkflowTest.php`, `tests/Feature/WorkflowControlPlaneTest.php`, `tests/Feature/WorkflowQueryTaskBrokerTest.php`, `tests/Commands/`, `tests/test_signals.py`, `tests/test_queries.py`, `tests/test_worker.py`, `CONFORMANCE.md` | Live published-artifact scenarios for signal delivery and query consistency across PHP and Python workers, CLI and SDK clients, replay timing, terminal runs, malformed payloads, and operator visibility. |
7676
| `history_replay_bundles` | `workflow`, `sdk-python` | `tests/Fixtures/V2/GoldenHistory/`, `tests/fixtures/golden_history/` | Frozen history event bundles. A conforming SDK must replay each bundle and reproduce the documented final command sequence. |
7777
| `failure_repair_actionability` | `server`, `workflow` | `docs/contracts/external-task-result.md`, `docs/contracts/replay-verification.md`, fixture pointers therein | Failure objects and repair / actionability shapes for stuck tasks, deterministic failure, and replay-mismatch surfaces. |
7878
| `cli_json_envelopes` | `cli` | `tests/fixtures/control-plane/`, `schemas/` | The `--output=json` and `--output=jsonl` envelopes that automation depends on. Diagnostic-only fields are listed and excluded from the contract diff. |
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Workflow\V2\Client;
6+
7+
use Illuminate\Http\Client\Factory as HttpFactory;
8+
use Illuminate\Http\Client\Response;
9+
use InvalidArgumentException;
10+
use RuntimeException;
11+
use Workflow\V2\Exceptions\ControlPlaneRequestException;
12+
13+
/**
14+
* HTTP client for Durable Workflow control-plane operations.
15+
*
16+
* The worker protocol client covers PHP worker processes. This client covers
17+
* PHP-facing operator/application calls to the standalone server so published
18+
* PHP artifacts can participate in cross-language signal/query conformance
19+
* without shelling out to the CLI.
20+
*
21+
* @api Stable v2 control-plane client API.
22+
*/
23+
final class ControlPlaneClient
24+
{
25+
public const CONTROL_PLANE_VERSION = '2';
26+
27+
public const CONTROL_PLANE_HEADER = 'X-Durable-Workflow-Control-Plane-Version';
28+
29+
private readonly string $baseUrl;
30+
31+
private readonly string $apiPath;
32+
33+
private readonly string $controlPlaneVersion;
34+
35+
public function __construct(
36+
private readonly HttpFactory $http,
37+
string $baseUrl,
38+
private readonly ?string $token = null,
39+
private readonly string $namespace = 'default',
40+
?string $controlPlaneVersion = null,
41+
private readonly int $defaultRequestTimeoutSeconds = 30,
42+
string $apiPath = '/api',
43+
) {
44+
$this->baseUrl = rtrim($baseUrl, '/');
45+
$this->apiPath = self::normalizePath($apiPath);
46+
$this->controlPlaneVersion = $controlPlaneVersion ?? self::CONTROL_PLANE_VERSION;
47+
48+
if ($this->baseUrl === '') {
49+
throw new InvalidArgumentException('Base URL must not be empty.');
50+
}
51+
52+
if ($this->defaultRequestTimeoutSeconds < 1) {
53+
throw new InvalidArgumentException('Default request timeout must be at least 1 second.');
54+
}
55+
}
56+
57+
/**
58+
* @param array<int|string, mixed> $arguments
59+
* @param array<string, mixed> $options
60+
* @return array<string, mixed>
61+
*/
62+
public function startWorkflow(
63+
string $workflowType,
64+
?string $workflowId = null,
65+
array $arguments = [],
66+
array $options = [],
67+
): array {
68+
$body = $this->withoutNulls([
69+
'workflow_id' => $workflowId,
70+
'workflow_type' => $workflowType,
71+
'task_queue' => $this->stringOption($options, 'task_queue'),
72+
'input' => $arguments === [] ? null : $arguments,
73+
'business_key' => $this->stringOption($options, 'business_key'),
74+
'memo' => $this->arrayOption($options, 'memo'),
75+
'search_attributes' => $this->arrayOption($options, 'search_attributes'),
76+
'duplicate_policy' => $this->stringOption($options, 'duplicate_policy'),
77+
'execution_timeout_seconds' => $this->intOption($options, 'execution_timeout_seconds'),
78+
'run_timeout_seconds' => $this->intOption($options, 'run_timeout_seconds'),
79+
'priority' => $this->intOption($options, 'priority'),
80+
'fairness_key' => $this->stringOption($options, 'fairness_key'),
81+
'fairness_weight' => $this->intOption($options, 'fairness_weight'),
82+
]);
83+
84+
return $this->post('/workflows', $body, [200, 201, 202]);
85+
}
86+
87+
/**
88+
* @param array<int|string, mixed> $arguments
89+
* @param array<string, mixed> $options
90+
* @return array<string, mixed>
91+
*/
92+
public function signalWorkflow(
93+
string $workflowId,
94+
string $signalName,
95+
array $arguments = [],
96+
array $options = [],
97+
): array {
98+
$runId = $this->stringOption($options, 'run_id');
99+
$path = $runId !== null
100+
? sprintf(
101+
'/workflows/%s/runs/%s/signal/%s',
102+
$this->pathSegment($workflowId),
103+
$this->pathSegment($runId),
104+
$this->pathSegment($signalName),
105+
)
106+
: sprintf(
107+
'/workflows/%s/signal/%s',
108+
$this->pathSegment($workflowId),
109+
$this->pathSegment($signalName),
110+
);
111+
112+
$body = $this->withoutNulls([
113+
'input' => $arguments === [] ? null : $arguments,
114+
'request_id' => $this->stringOption($options, 'request_id'),
115+
]);
116+
117+
return $this->post($path, $body, [200, 202]);
118+
}
119+
120+
/**
121+
* @param array<int|string, mixed> $arguments
122+
* @param array<string, mixed> $options
123+
* @return array<string, mixed>
124+
*/
125+
public function queryWorkflow(
126+
string $workflowId,
127+
string $queryName,
128+
array $arguments = [],
129+
array $options = [],
130+
): array {
131+
$runId = $this->stringOption($options, 'run_id');
132+
$path = $runId !== null
133+
? sprintf(
134+
'/workflows/%s/runs/%s/query/%s',
135+
$this->pathSegment($workflowId),
136+
$this->pathSegment($runId),
137+
$this->pathSegment($queryName),
138+
)
139+
: sprintf(
140+
'/workflows/%s/query/%s',
141+
$this->pathSegment($workflowId),
142+
$this->pathSegment($queryName),
143+
);
144+
145+
$body = $arguments === [] ? [] : ['input' => $arguments];
146+
147+
return $this->post($path, $body, [200]);
148+
}
149+
150+
/**
151+
* @return array<string, mixed>
152+
*/
153+
public function describeWorkflow(string $workflowId): array
154+
{
155+
return $this->get(sprintf('/workflows/%s', $this->pathSegment($workflowId)));
156+
}
157+
158+
/**
159+
* @return array<string, mixed>
160+
*/
161+
public function describeWorkflowRun(string $workflowId, string $runId): array
162+
{
163+
return $this->get(sprintf(
164+
'/workflows/%s/runs/%s',
165+
$this->pathSegment($workflowId),
166+
$this->pathSegment($runId),
167+
));
168+
}
169+
170+
/**
171+
* @return array<string, mixed>
172+
*/
173+
public function clusterInfo(): array
174+
{
175+
return $this->get('/cluster/info', enforceControlPlaneHeader: false);
176+
}
177+
178+
/**
179+
* @return array<string, mixed>
180+
*/
181+
private function get(
182+
string $path,
183+
?int $requestTimeoutSeconds = null,
184+
bool $enforceControlPlaneHeader = true,
185+
): array {
186+
$response = $this->http
187+
->withHeaders($this->headers())
188+
->timeout($requestTimeoutSeconds ?? $this->defaultRequestTimeoutSeconds)
189+
->get($this->url($path));
190+
191+
return $this->decode($response, $path, $enforceControlPlaneHeader);
192+
}
193+
194+
/**
195+
* @param array<string, mixed> $body
196+
* @param list<int> $successStatuses
197+
* @return array<string, mixed>
198+
*/
199+
private function post(
200+
string $path,
201+
array $body,
202+
array $successStatuses,
203+
?int $requestTimeoutSeconds = null,
204+
): array {
205+
$response = $this->http
206+
->withHeaders($this->headers())
207+
->timeout($requestTimeoutSeconds ?? $this->defaultRequestTimeoutSeconds)
208+
->post($this->url($path), $body);
209+
210+
return $this->decode($response, $path, true, $successStatuses);
211+
}
212+
213+
/**
214+
* @param list<int> $successStatuses
215+
* @return array<string, mixed>
216+
*/
217+
private function decode(
218+
Response $response,
219+
string $path,
220+
bool $enforceControlPlaneHeader,
221+
array $successStatuses = [200],
222+
): array {
223+
$json = $response->json();
224+
$body = is_array($json) ? $json : [];
225+
$status = $response->status();
226+
227+
if (! in_array($status, $successStatuses, true)) {
228+
$message = $this->errorMessage($path, $status, $body);
229+
230+
throw new ControlPlaneRequestException($message, $status, $body === [] ? null : $body);
231+
}
232+
233+
if ($enforceControlPlaneHeader) {
234+
$version = $response->header(self::CONTROL_PLANE_HEADER);
235+
236+
if (! is_string($version) || trim($version) !== $this->controlPlaneVersion) {
237+
throw new RuntimeException(sprintf(
238+
'Durable Workflow server response for [%s] used control-plane version [%s]; expected [%s].',
239+
$path,
240+
is_string($version) && $version !== '' ? $version : 'missing',
241+
$this->controlPlaneVersion,
242+
));
243+
}
244+
}
245+
246+
return $body;
247+
}
248+
249+
/**
250+
* @param array<string, mixed> $body
251+
*/
252+
private function errorMessage(string $path, int $status, array $body): string
253+
{
254+
foreach (['message', 'error'] as $field) {
255+
$value = $body[$field] ?? null;
256+
if (is_string($value) && $value !== '') {
257+
return $value;
258+
}
259+
}
260+
261+
$reason = $body['reason'] ?? null;
262+
if (is_string($reason) && $reason !== '') {
263+
return sprintf('Durable Workflow request to [%s] failed with [%s] (HTTP %d).', $path, $reason, $status);
264+
}
265+
266+
return sprintf('Durable Workflow request to [%s] failed with HTTP %d.', $path, $status);
267+
}
268+
269+
/**
270+
* @return array<string, string>
271+
*/
272+
private function headers(): array
273+
{
274+
$headers = [
275+
'Accept' => 'application/json',
276+
'Content-Type' => 'application/json',
277+
'X-Namespace' => $this->namespace,
278+
self::CONTROL_PLANE_HEADER => $this->controlPlaneVersion,
279+
];
280+
281+
if ($this->token !== null && $this->token !== '') {
282+
$headers['Authorization'] = 'Bearer ' . $this->token;
283+
}
284+
285+
return $headers;
286+
}
287+
288+
private function url(string $path): string
289+
{
290+
return $this->baseUrl . $this->apiPath . self::normalizePath($path);
291+
}
292+
293+
/**
294+
* @param array<string, mixed> $options
295+
*/
296+
private function stringOption(array $options, string $key): ?string
297+
{
298+
$value = $options[$key] ?? null;
299+
300+
return is_string($value) && $value !== '' ? $value : null;
301+
}
302+
303+
/**
304+
* @param array<string, mixed> $options
305+
*/
306+
private function intOption(array $options, string $key): ?int
307+
{
308+
$value = $options[$key] ?? null;
309+
310+
return is_int($value) ? $value : null;
311+
}
312+
313+
/**
314+
* @param array<string, mixed> $options
315+
* @return array<string, mixed>|null
316+
*/
317+
private function arrayOption(array $options, string $key): ?array
318+
{
319+
$value = $options[$key] ?? null;
320+
321+
return is_array($value) ? $value : null;
322+
}
323+
324+
/**
325+
* @param array<string, mixed> $values
326+
* @return array<string, mixed>
327+
*/
328+
private function withoutNulls(array $values): array
329+
{
330+
return array_filter($values, static fn (mixed $value): bool => $value !== null);
331+
}
332+
333+
private function pathSegment(string $value): string
334+
{
335+
return rawurlencode($value);
336+
}
337+
338+
private static function normalizePath(string $path): string
339+
{
340+
$path = trim($path);
341+
342+
return $path === '' || $path === '/'
343+
? ''
344+
: '/'.trim($path, '/');
345+
}
346+
}

0 commit comments

Comments
 (0)