Skip to content

Commit 3929609

Browse files
[cross-repo from server#244] Conformance: run signals/queries against current published artifacts (#601)
1 parent 8907560 commit 3929609

6 files changed

Lines changed: 95 additions & 33 deletions

File tree

docs/api-stability.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,11 @@ registration, heartbeat, workflow-task polling/history/complete/fail,
202202
activity-task polling/heartbeat/complete/fail, and query-task
203203
polling/complete/fail all use `POST /api/worker/...` with the
204204
worker-protocol headers. Standalone `poll*` methods return leased tasks from
205-
the server's `task` envelope; the client caches the returned lease fields so
206-
follow-up history, heartbeat, complete, and fail calls can send the required
207-
`lease_owner`, `workflow_task_attempt`, `activity_attempt_id`, and
208-
`query_task_attempt` values.
205+
the server's `task` envelope; query polling uses a stable `poll_request_id`
206+
to recover a leased task after a local HTTP timeout. The client caches the
207+
returned lease fields so follow-up history, heartbeat, complete, and fail
208+
calls can send the required `lease_owner`, `workflow_task_attempt`,
209+
`activity_attempt_id`, and `query_task_attempt` values.
209210
`WorkflowQueryTaskExecutor` is the stable PHP worker shim for the
210211
`query_tasks` capability. It accepts the server-routed query task envelope,
211212
replays the supplied history export in query mode, validates query targets

docs/architecture/query-and-live-debug.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,13 @@ lease query work through:
180180
- `POST /api/worker/query-tasks/{query_task_id}/complete`
181181
- `POST /api/worker/query-tasks/{query_task_id}/fail`
182182

183-
The poll request names `worker_id` and `task_queue`. A successful
184-
poll leases at most one query task and returns `poll_status =
185-
'leased'`; an empty long-poll returns `poll_status = 'empty'` with
186-
no task. Query task long-poll timeout semantics are the same
187-
clamped `WorkerProtocolVersion::longPollSemantics()` used by
188-
workflow and activity task polling.
183+
The poll request names `worker_id` and `task_queue`, and may include
184+
`poll_request_id` so a worker can recover the same poll result after
185+
a local HTTP timeout. A successful poll leases at most one query task
186+
and returns `poll_status = 'leased'`; an empty long-poll returns
187+
`poll_status = 'empty'` with no task. Query task long-poll timeout
188+
semantics are the same clamped `WorkerProtocolVersion::longPollSemantics()`
189+
used by workflow and activity task polling.
189190

190191
Each leased query task carries `query_task_id`,
191192
`query_task_attempt`, `lease_owner`, `workflow_id`, `run_id`,

src/V2/Support/WorkerProtocolVersion.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ final class WorkerProtocolVersion
2929
* pagination semantics). Bump the minor for additive changes (new
3030
* optional fields, new non-terminal command types).
3131
*/
32-
public const VERSION = '1.6';
32+
public const VERSION = '1.7';
3333

3434
/**
3535
* Worker registration capability for server-routed workflow query
@@ -335,7 +335,7 @@ public static function queryTaskSemantics(): array
335335
'poll' => [
336336
'method' => 'POST',
337337
'path' => '/api/worker/query-tasks/poll',
338-
'request_fields' => ['worker_id', 'task_queue'],
338+
'request_fields' => ['worker_id', 'task_queue', 'poll_request_id'],
339339
'response_fields' => ['task', 'poll_status'],
340340
],
341341
'complete' => [
@@ -357,6 +357,7 @@ public static function queryTaskSemantics(): array
357357
'poll' => [
358358
'leases_on_return' => true,
359359
'long_poll' => self::longPollSemantics(),
360+
'poll_request_idempotency' => true,
360361
'empty_response_poll_status' => 'empty',
361362
'requires_registered_worker' => true,
362363
'requires_worker_capability' => self::CAPABILITY_QUERY_TASKS,

src/V2/Worker/WorkerProtocolClient.php

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -363,38 +363,45 @@ public function pollQueryTasks(
363363
?string $queue = null,
364364
int $timeoutSeconds = WorkerProtocolVersion::DEFAULT_LONG_POLL_TIMEOUT,
365365
?string $workerId = null,
366+
?string $pollRequestId = null,
366367
): array {
367368
if ($this->embeddedBridgeMode) {
368369
return [];
369370
}
370371

372+
$pollRequestId = $this->stringValue($pollRequestId) ?? 'query-poll-'.bin2hex(random_bytes(16));
371373
$body = [
372374
'worker_id' => $this->resolveStandaloneWorkerId($workerId),
373375
'task_queue' => $this->resolveStandaloneTaskQueue($queue),
376+
'poll_request_id' => $pollRequestId,
374377
];
375378

376-
try {
377-
$response = $this->workerPost(
378-
$this->workerApiPath.'/query-tasks/poll',
379-
$body,
380-
$this->longPollRequestTimeoutSeconds($timeoutSeconds),
381-
);
382-
} catch (ConnectionException $exception) {
383-
if ($this->isHttpTimeout($exception)) {
379+
for ($attempt = 0; $attempt < 2; $attempt++) {
380+
try {
381+
$response = $this->workerPost(
382+
$this->workerApiPath.'/query-tasks/poll',
383+
$body,
384+
$this->longPollRequestTimeoutSeconds($timeoutSeconds),
385+
);
386+
} catch (ConnectionException $exception) {
387+
if ($this->isHttpTimeout($exception)) {
388+
continue;
389+
}
390+
391+
throw $exception;
392+
}
393+
394+
$task = $response['task'] ?? null;
395+
if (! is_array($task)) {
384396
return [];
385397
}
386398

387-
throw $exception;
388-
}
399+
$this->rememberQueryTaskLease($task);
389400

390-
$task = $response['task'] ?? null;
391-
if (! is_array($task)) {
392-
return [];
401+
return [$task];
393402
}
394403

395-
$this->rememberQueryTaskLease($task);
396-
397-
return [$task];
404+
return [];
398405
}
399406

400407
/**

tests/Unit/V2/WorkerProtocolClientTest.php

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,11 @@ public function testStandaloneQueryPollAndCompleteUseCachedLease(): void
9797
});
9898

9999
$client = new WorkerProtocolClient($http, 'http://server:8080', 'test-token', 'default');
100-
$tasks = $client->pollQueryTasks(queue: 'polyglot', workerId: 'php-worker');
100+
$tasks = $client->pollQueryTasks(
101+
queue: 'polyglot',
102+
workerId: 'php-worker',
103+
pollRequestId: 'query-poll-request-1',
104+
);
101105
$complete = $client->completeQueryTask('query-task-1', 'waiting', [
102106
'codec' => 'avro',
103107
'blob' => 'encoded-result',
@@ -110,7 +114,11 @@ public function testStandaloneQueryPollAndCompleteUseCachedLease(): void
110114
[
111115
'method' => 'POST',
112116
'url' => 'http://server:8080/api/worker/query-tasks/poll',
113-
'body' => ['worker_id' => 'php-worker', 'task_queue' => 'polyglot'],
117+
'body' => [
118+
'worker_id' => 'php-worker',
119+
'task_queue' => 'polyglot',
120+
'poll_request_id' => 'query-poll-request-1',
121+
],
114122
],
115123
[
116124
'method' => 'POST',
@@ -154,7 +162,11 @@ public function testStandaloneQueryFailureUsesCachedLease(): void
154162
});
155163

156164
$client = new WorkerProtocolClient($http, 'http://server:8080', 'test-token', 'default');
157-
$client->pollQueryTasks(queue: 'polyglot', workerId: 'php-worker');
165+
$client->pollQueryTasks(
166+
queue: 'polyglot',
167+
workerId: 'php-worker',
168+
pollRequestId: 'query-poll-request-2',
169+
);
158170
$failure = $client->failQueryTask(
159171
'query-task-1',
160172
'No query handler.',
@@ -170,7 +182,11 @@ public function testStandaloneQueryFailureUsesCachedLease(): void
170182
[
171183
'method' => 'POST',
172184
'url' => 'http://server:8080/api/worker/query-tasks/poll',
173-
'body' => ['worker_id' => 'php-worker', 'task_queue' => 'polyglot'],
185+
'body' => [
186+
'worker_id' => 'php-worker',
187+
'task_queue' => 'polyglot',
188+
'poll_request_id' => 'query-poll-request-2',
189+
],
174190
],
175191
[
176192
'method' => 'POST',
@@ -191,6 +207,40 @@ public function testStandaloneQueryFailureUsesCachedLease(): void
191207
], $requests);
192208
}
193209

210+
public function testStandaloneQueryPollRetriesTimeoutWithSamePollRequestId(): void
211+
{
212+
$http = new HttpFactory();
213+
$requests = [];
214+
215+
$http->fake(function (Request $request) use ($http, &$requests) {
216+
$requests[] = $request->data();
217+
218+
if (count($requests) === 1) {
219+
throw new ConnectionException('cURL error 28: Operation timed out');
220+
}
221+
222+
return $http->response([
223+
'task' => [
224+
'query_task_id' => 'query-task-retry',
225+
'query_task_attempt' => 1,
226+
'lease_owner' => 'php-worker',
227+
],
228+
]);
229+
});
230+
231+
$client = new WorkerProtocolClient($http, 'http://server:8080', 'test-token', 'default');
232+
$tasks = $client->pollQueryTasks(queue: 'polyglot', workerId: 'php-worker');
233+
234+
$this->assertCount(1, $tasks);
235+
$this->assertSame('query-task-retry', $tasks[0]['query_task_id']);
236+
$this->assertCount(2, $requests);
237+
$this->assertSame('php-worker', $requests[0]['worker_id']);
238+
$this->assertSame('polyglot', $requests[0]['task_queue']);
239+
$this->assertIsString($requests[0]['poll_request_id']);
240+
$this->assertNotSame('', $requests[0]['poll_request_id']);
241+
$this->assertSame($requests[0]['poll_request_id'], $requests[1]['poll_request_id']);
242+
}
243+
194244
public function testPollWorkflowTasksUsesStandaloneWorkerApiByDefault(): void
195245
{
196246
$http = new HttpFactory();

tests/Unit/V2/WorkerProtocolVersionTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public function testVersionIsNonEmptyString(): void
1717

1818
public function testVersionTracksQueryTaskWorkerProtocolShape(): void
1919
{
20-
$this->assertSame('1.6', WorkerProtocolVersion::VERSION);
20+
$this->assertSame('1.7', WorkerProtocolVersion::VERSION);
2121
}
2222

2323
public function testWorkflowTaskVerbsIncludesAllBridgeMethods(): void
@@ -129,6 +129,7 @@ public function testDescribeIncludesQueryTaskSemantics(): void
129129
$this->assertSame(WorkerProtocolVersion::queryTaskVerbs(), $queryTasks['verbs']);
130130
$this->assertSame('/api/worker/query-tasks', $queryTasks['path_prefix']);
131131
$this->assertSame('/api/worker/query-tasks/poll', $queryTasks['endpoints']['poll']['path']);
132+
$this->assertContains('poll_request_id', $queryTasks['endpoints']['poll']['request_fields']);
132133
$this->assertSame(
133134
'/api/worker/query-tasks/{query_task_id}/complete',
134135
$queryTasks['endpoints']['complete']['path'],
@@ -139,6 +140,7 @@ public function testDescribeIncludesQueryTaskSemantics(): void
139140
);
140141
$this->assertTrue($queryTasks['poll']['leases_on_return']);
141142
$this->assertSame(WorkerProtocolVersion::longPollSemantics(), $queryTasks['poll']['long_poll']);
143+
$this->assertTrue($queryTasks['poll']['poll_request_idempotency']);
142144
$this->assertSame('empty', $queryTasks['poll']['empty_response_poll_status']);
143145
$this->assertTrue($queryTasks['poll']['requires_registered_worker']);
144146
$this->assertSame(

0 commit comments

Comments
 (0)