Skip to content

Commit 85ba5b8

Browse files
#168: prove avro polyglot worker protocol
1 parent 8c5f7ec commit 85ba5b8

1 file changed

Lines changed: 117 additions & 61 deletions

File tree

tests/integration/test_polyglot.py

Lines changed: 117 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import pytest
1515

16-
from durable_workflow import Client
16+
from durable_workflow import Client, serializer
1717
from durable_workflow.serializer import decode_envelope
1818
from durable_workflow.workflow import replay
1919

@@ -36,7 +36,8 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
3636
"""
3737
task_queue = f"polyglot-{uuid.uuid4().hex[:8]}"
3838
wf_id = f"poly-py-wf-{uuid.uuid4().hex[:8]}"
39-
worker_id = f"py-worker-{uuid.uuid4().hex[:8]}"
39+
py_worker_id = f"py-worker-{uuid.uuid4().hex[:8]}"
40+
php_worker_id = f"php-worker-{uuid.uuid4().hex[:8]}"
4041

4142
# Test data with various JSON-serializable types
4243
test_input = {
@@ -52,12 +53,20 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
5253
}
5354

5455
async with Client(server_url, token=server_token, namespace="default") as client:
55-
# 1. Register Python worker supporting Python workflow and PHP activity
56+
# 1. Register a Python workflow worker and PHP-runtime activity worker.
5657
await client.register_worker(
57-
worker_id=worker_id,
58+
worker_id=py_worker_id,
5859
task_queue=task_queue,
5960
supported_workflow_types=["tests.polyglot.python-workflow"],
61+
supported_activity_types=[],
62+
)
63+
await client.register_worker(
64+
worker_id=php_worker_id,
65+
task_queue=task_queue,
66+
supported_workflow_types=[],
6067
supported_activity_types=["tests.polyglot.php-activity"],
68+
runtime="php",
69+
sdk_version="durable-workflow-php/test",
6170
)
6271

6372
# 2. Start Python workflow
@@ -72,7 +81,7 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
7281

7382
# 3. Poll for workflow task
7483
wf_task = await client.poll_workflow_task(
75-
worker_id=worker_id, task_queue=task_queue, timeout=10.0,
84+
worker_id=py_worker_id, task_queue=task_queue, timeout=10.0,
7685
)
7786
assert wf_task is not None, "expected workflow task after start"
7887
task_id = wf_task["task_id"]
@@ -101,7 +110,7 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
101110
# 5. Complete workflow task
102111
await client.complete_workflow_task(
103112
task_id=task_id,
104-
lease_owner=worker_id,
113+
lease_owner=py_worker_id,
105114
workflow_task_attempt=attempt,
106115
commands=[server_cmd],
107116
)
@@ -111,16 +120,18 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
111120
# For this test, we simulate the PHP activity execution by manually
112121
# constructing what the PHP activity would return.
113122
act_task = await client.poll_activity_task(
114-
worker_id=worker_id, task_queue=task_queue, timeout=10.0,
123+
worker_id=php_worker_id, task_queue=task_queue, timeout=10.0,
115124
)
116125
assert act_task is not None, "expected activity task after schedule_activity"
117126
assert act_task["activity_type"] == "tests.polyglot.php-activity"
127+
assert act_task.get("payload_codec") == "avro"
118128

119129
act_task_id = act_task["task_id"]
120130
act_attempt_id = act_task.get("activity_attempt_id") or act_task.get("attempt_id", "")
121131
act_args = decode_envelope(act_task.get("arguments"), codec=act_task.get("payload_codec")) or []
122132
if not isinstance(act_args, list):
123133
act_args = [act_args]
134+
assert act_args == [test_input]
124135

125136
# 7. Simulate PHP activity execution
126137
# The PHP activity would receive the test_input and return structured data
@@ -146,13 +157,14 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
146157
await client.complete_activity_task(
147158
task_id=act_task_id,
148159
activity_attempt_id=act_attempt_id,
149-
lease_owner=worker_id,
160+
lease_owner=php_worker_id,
150161
result=php_activity_result,
162+
codec="avro",
151163
)
152164

153165
# 9. Poll for next workflow task (activity completed)
154166
wf_task2 = await client.poll_workflow_task(
155-
worker_id=worker_id, task_queue=task_queue, timeout=10.0,
167+
worker_id=py_worker_id, task_queue=task_queue, timeout=10.0,
156168
)
157169
assert wf_task2 is not None, "expected workflow task after activity completion"
158170
task_id2 = wf_task2["task_id"]
@@ -169,19 +181,6 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
169181
codec2 = wf_task2.get("payload_codec")
170182
server_cmd2 = cmd2.to_server_command(task_queue, payload_codec=codec2)
171183

172-
# Debug: inspect command before asserting
173-
print("\n=== Replay outcome ===")
174-
print(f"Command type: {server_cmd2.get('type')}")
175-
print(f"Full command: {server_cmd2}")
176-
if server_cmd2.get("type") == "fail_workflow":
177-
print(f"Failure message: {server_cmd2.get('message', 'N/A')}")
178-
print(f"Failure details: {server_cmd2.get('details', 'N/A')}")
179-
import json
180-
print(f"History events count: {len(history2)}")
181-
print("Last few history events:")
182-
for evt in history2[-3:]:
183-
print(f" - {evt.get('event_type')}: {json.dumps(evt, indent=2)[:200]}")
184-
185184
assert server_cmd2["type"] == "complete_workflow", (
186185
f"Expected complete_workflow but got {server_cmd2['type']}: "
187186
f"{server_cmd2.get('message', 'no error message')}"
@@ -204,7 +203,7 @@ async def test_python_workflow_calls_php_activity(server_url: str, server_token:
204203
# 11. Complete workflow task
205204
await client.complete_workflow_task(
206205
task_id=task_id2,
207-
lease_owner=worker_id,
206+
lease_owner=py_worker_id,
208207
workflow_task_attempt=attempt2,
209208
commands=[server_cmd2],
210209
)
@@ -226,12 +225,14 @@ async def test_python_activity_called_from_php_workflow(server_url: str, server_
226225
- Python activity results serialize correctly back to PHP
227226
- Codec envelopes round-trip across runtimes
228227
229-
Note: This test requires the PHP workflow to be started and executed server-side,
230-
then the Python worker picks up and executes the activity.
228+
This drives the PHP side through the same HTTP worker protocol a PHP worker
229+
uses in service mode, keeping the contract test independent from server-side
230+
fixture autoloading.
231231
"""
232232
task_queue = f"polyglot-{uuid.uuid4().hex[:8]}"
233233
wf_id = f"poly-php-wf-{uuid.uuid4().hex[:8]}"
234-
worker_id = f"py-worker-{uuid.uuid4().hex[:8]}"
234+
php_worker_id = f"php-worker-{uuid.uuid4().hex[:8]}"
235+
py_worker_id = f"py-worker-{uuid.uuid4().hex[:8]}"
235236

236237
# Test data with various JSON-serializable types
237238
test_input = {
@@ -246,37 +247,64 @@ async def test_python_activity_called_from_php_workflow(server_url: str, server_
246247
}
247248

248249
async with Client(server_url, token=server_token, namespace="default") as client:
249-
# 1. Register Python worker supporting Python activity
250-
# (PHP workflow will execute server-side)
250+
# 1. Register a PHP-runtime workflow worker and Python activity worker.
251+
await client.register_worker(
252+
worker_id=php_worker_id,
253+
task_queue=task_queue,
254+
supported_workflow_types=["tests.polyglot.php-workflow"],
255+
supported_activity_types=[],
256+
runtime="php",
257+
sdk_version="durable-workflow-php/test",
258+
)
251259
await client.register_worker(
252-
worker_id=worker_id,
260+
worker_id=py_worker_id,
253261
task_queue=task_queue,
254-
supported_workflow_types=[], # Not handling PHP workflows
262+
supported_workflow_types=[],
255263
supported_activity_types=["tests.polyglot.python-activity"],
256264
)
257265

258-
# 2. Start PHP workflow through control plane
259-
# The PHP workflow will execute and schedule a Python activity
266+
# 2. Start PHP workflow through control plane.
260267
handle = await client.start_workflow(
261268
workflow_type="tests.polyglot.php-workflow",
262269
task_queue=task_queue,
263270
workflow_id=wf_id,
264271
input=[test_input],
265272
)
266273
assert handle.workflow_id == wf_id
274+
assert handle.run_id is not None
267275

268-
# 3. Poll for Python activity task
269-
# The PHP workflow executes server-side and schedules the Python activity
270-
act_task = await client.poll_activity_task(
271-
worker_id=worker_id, task_queue=task_queue, timeout=15.0,
276+
# 3. Poll and complete the PHP workflow task with a schedule_activity
277+
# command that targets the Python activity.
278+
wf_task = await client.poll_workflow_task(
279+
worker_id=php_worker_id, task_queue=task_queue, timeout=10.0,
280+
)
281+
assert wf_task is not None, "expected PHP workflow task after start"
282+
assert wf_task["workflow_type"] == "tests.polyglot.php-workflow"
283+
assert wf_task.get("payload_codec") == "avro"
284+
285+
decoded_start = decode_envelope(wf_task.get("arguments"), codec=wf_task.get("payload_codec"))
286+
assert decoded_start == [test_input]
287+
288+
schedule_python_activity = {
289+
"type": "schedule_activity",
290+
"activity_type": "tests.polyglot.python-activity",
291+
"queue": task_queue,
292+
"arguments": serializer.envelope([test_input], codec="avro"),
293+
}
294+
await client.complete_workflow_task(
295+
task_id=wf_task["task_id"],
296+
lease_owner=php_worker_id,
297+
workflow_task_attempt=wf_task.get("workflow_task_attempt", 1),
298+
commands=[schedule_python_activity],
272299
)
273300

274-
# If no task is available, the PHP workflow may not be registered server-side
275-
# Skip this test in that case (it requires PHP fixtures to be loadable by server)
276-
if act_task is None:
277-
pytest.skip("PHP workflow not available server-side — requires server with PHP fixtures")
278-
301+
# 4. Poll for and execute the Python activity task.
302+
act_task = await client.poll_activity_task(
303+
worker_id=py_worker_id, task_queue=task_queue, timeout=15.0,
304+
)
305+
assert act_task is not None, "expected Python activity task after PHP workflow command"
279306
assert act_task["activity_type"] == "tests.polyglot.python-activity"
307+
assert act_task.get("payload_codec") == "avro"
280308

281309
act_task_id = act_task["task_id"]
282310
act_attempt_id = act_task.get("activity_attempt_id") or act_task.get("attempt_id", "")
@@ -291,7 +319,7 @@ async def test_python_activity_called_from_php_workflow(server_url: str, server_
291319
assert activity_input.get("name") == "php-to-python"
292320
assert activity_input.get("count") == 100
293321

294-
# 4. Execute Python activity
322+
# 5. Execute Python activity.
295323
result = await polyglot_python_activity(activity_input)
296324

297325
# Verify Python activity produced expected output
@@ -300,30 +328,58 @@ async def test_python_activity_called_from_php_workflow(server_url: str, server_
300328
assert result["type_checks"]["has_int"] is True
301329
assert result["computed"]["count_doubled"] == 200
302330

303-
# 5. Complete activity task
331+
# 6. Complete activity task with the Python result encoded as Avro.
304332
await client.complete_activity_task(
305333
task_id=act_task_id,
306334
activity_attempt_id=act_attempt_id,
307-
lease_owner=worker_id,
335+
lease_owner=py_worker_id,
308336
result=result,
337+
codec="avro",
309338
)
310339

311-
# 6. Wait for PHP workflow to complete
312-
# The server-side PHP workflow should receive the activity result and complete
313-
import asyncio
314-
await asyncio.sleep(2) # Give server time to process
340+
# 7. Poll the follow-up PHP workflow task and verify the activity result
341+
# reached the workflow history with an Avro codec tag.
342+
wf_task2 = await client.poll_workflow_task(
343+
worker_id=php_worker_id, task_queue=task_queue, timeout=10.0,
344+
)
345+
assert wf_task2 is not None, "expected PHP workflow task after Python activity completion"
346+
assert wf_task2.get("payload_codec") == "avro"
347+
348+
activity_completed = [
349+
event for event in wf_task2.get("history_events", [])
350+
if event.get("event_type") in ("ActivityCompleted", "activity_completed")
351+
]
352+
assert activity_completed, "expected ActivityCompleted in PHP workflow history"
353+
completed_payload = activity_completed[-1].get("payload", {})
354+
assert completed_payload.get("payload_codec") == "avro"
355+
assert decode_envelope(completed_payload.get("result"), codec="avro") == result
356+
357+
workflow_result = {
358+
"workflow_runtime": "php",
359+
"python_activity_result": result,
360+
"validation": {
361+
"called_python_activity": True,
362+
"result_is_array": isinstance(result, dict),
363+
"result_has_runtime": result.get("runtime") == "python",
364+
},
365+
}
366+
await client.complete_workflow_task(
367+
task_id=wf_task2["task_id"],
368+
lease_owner=php_worker_id,
369+
workflow_task_attempt=wf_task2.get("workflow_task_attempt", 1),
370+
commands=[
371+
{
372+
"type": "complete_workflow",
373+
"result": serializer.envelope(workflow_result, codec="avro"),
374+
},
375+
],
376+
)
315377

316-
# 7. Verify final workflow state
378+
# 8. Verify final workflow state.
317379
desc = await handle.describe()
318-
319-
# The workflow should be completed with PHP activity result embedded
320-
assert desc.status in ("completed", "Completed", "waiting", "pending", "running")
321-
322-
# If completed, verify the output structure
323-
if desc.status in ("completed", "Completed") and desc.output:
324-
assert isinstance(desc.output, dict)
325-
assert desc.output.get("workflow_runtime") == "php"
326-
if "python_activity_result" in desc.output:
327-
py_result = desc.output["python_activity_result"]
328-
assert py_result.get("runtime") == "python"
329-
assert py_result.get("computed", {}).get("count_doubled") == 200
380+
assert desc.status in ("completed", "Completed")
381+
assert isinstance(desc.output, dict)
382+
assert desc.output.get("workflow_runtime") == "php"
383+
py_result = desc.output["python_activity_result"]
384+
assert py_result.get("runtime") == "python"
385+
assert py_result.get("computed", {}).get("count_doubled") == 200

0 commit comments

Comments
 (0)