Skip to content

Commit 4ae0014

Browse files
authored
Graceful fallback from update-task-v2 to v1 when server doesn't support it (#385)
1 parent 3cef362 commit 4ae0014

3 files changed

Lines changed: 577 additions & 3 deletions

File tree

tests/integration/test_update_task_v2_perf.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,17 @@
7373
# ---------------------------------------------------------------------------
7474

7575
@worker_task(task_definition_name="perf_type_a", thread_count=WORKER_THREADS, register_task_def=True)
76-
def perf_worker_a(task_index: int = 0) -> dict:
76+
async def perf_worker_a(task_index: int = 0) -> dict:
7777
return {"worker": "perf_type_a", "task_index": task_index}
7878

7979

8080
@worker_task(task_definition_name="perf_type_b", thread_count=WORKER_THREADS, register_task_def=True)
81-
def perf_worker_b(task_index: int = 0) -> dict:
81+
async def perf_worker_b(task_index: int = 0) -> dict:
8282
return {"worker": "perf_type_b", "task_index": task_index}
8383

8484

8585
@worker_task(task_definition_name="perf_type_c", thread_count=WORKER_THREADS, register_task_def=True)
86-
def perf_worker_c(task_index: int = 0) -> dict:
86+
async def perf_worker_c(task_index: int = 0) -> dict:
8787
return {"worker": "perf_type_c", "task_index": task_index}
8888

8989

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""
2+
Integration test for update-task-v2 graceful degradation.
3+
4+
Verifies that when update-task-v2 is unavailable (or available), the SDK
5+
correctly auto-detects and falls back to v1 while still completing workflows.
6+
7+
Run:
8+
python -m pytest tests/integration/test_v2_fallback_intg.py -v -s
9+
"""
10+
11+
import logging
12+
import os
13+
import sys
14+
import time
15+
import threading
16+
import unittest
17+
18+
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
19+
20+
from conductor.client.automator.task_handler import TaskHandler
21+
from conductor.client.configuration.configuration import Configuration
22+
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
23+
from conductor.client.http.models.workflow_def import WorkflowDef
24+
from conductor.client.http.models.workflow_task import WorkflowTask
25+
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
26+
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
27+
from conductor.client.worker.worker_task import worker_task
28+
29+
logger = logging.getLogger(__name__)
30+
31+
WORKFLOW_NAME = "test_v2_fallback_workflow"
32+
WORKFLOW_VERSION = 1
33+
34+
35+
# ---------------------------------------------------------------------------
36+
# Workers
37+
# ---------------------------------------------------------------------------
38+
39+
@worker_task(task_definition_name="v2_fallback_task_a", thread_count=3, register_task_def=True)
40+
async def fallback_worker_a(task_index: int = 0) -> dict:
41+
return {"worker": "v2_fallback_task_a", "task_index": task_index}
42+
43+
44+
@worker_task(task_definition_name="v2_fallback_task_b", thread_count=3, register_task_def=True)
45+
async def fallback_worker_b(task_index: int = 0) -> dict:
46+
return {"worker": "v2_fallback_task_b", "task_index": task_index}
47+
48+
49+
# ---------------------------------------------------------------------------
50+
# Test
51+
# ---------------------------------------------------------------------------
52+
53+
class TestV2FallbackIntegration(unittest.TestCase):
54+
55+
@classmethod
56+
def setUpClass(cls):
57+
from tests.integration.conftest import skip_if_server_unavailable
58+
skip_if_server_unavailable()
59+
60+
logging.basicConfig(
61+
level=logging.INFO,
62+
format="%(asctime)s [%(process)d] %(name)-45s %(levelname)-10s %(message)s",
63+
)
64+
logging.getLogger("conductor.client").setLevel(logging.WARNING)
65+
66+
cls.config = Configuration()
67+
cls.workflow_client = OrkesWorkflowClient(cls.config)
68+
cls.metadata_client = OrkesMetadataClient(cls.config)
69+
70+
def test_0_register_workflow(self):
71+
"""Register workflow with 2 task types (3 tasks each)."""
72+
tasks = []
73+
idx = 0
74+
for task_type, count in [("v2_fallback_task_a", 3), ("v2_fallback_task_b", 3)]:
75+
for i in range(count):
76+
idx += 1
77+
tasks.append(
78+
WorkflowTask(
79+
name=task_type,
80+
task_reference_name=f"{task_type}_{i + 1}",
81+
input_parameters={"task_index": idx},
82+
)
83+
)
84+
85+
workflow = WorkflowDef(name=WORKFLOW_NAME, version=WORKFLOW_VERSION)
86+
workflow._tasks = tasks
87+
try:
88+
self.metadata_client.update_workflow_def(workflow, overwrite=True)
89+
except Exception:
90+
self.metadata_client.register_workflow_def(workflow, overwrite=True)
91+
print(f"\n Registered workflow '{WORKFLOW_NAME}' with {len(tasks)} tasks")
92+
93+
def test_1_workflows_complete_with_v2_or_fallback(self):
94+
"""Start workers and verify workflows complete regardless of v2 support.
95+
96+
This test doesn't force a 404 — it runs against the real server.
97+
If v2 is available, it uses v2. If not, it auto-detects and falls back.
98+
Either way, all workflows should complete successfully.
99+
"""
100+
workflow_count = 5
101+
102+
handler_ready = threading.Event()
103+
handler_ref = {}
104+
105+
def _run_workers():
106+
handler = TaskHandler(
107+
configuration=self.config,
108+
scan_for_annotated_workers=True,
109+
)
110+
handler_ref["h"] = handler
111+
handler.start_processes()
112+
handler_ready.set()
113+
handler_ref["stop"] = threading.Event()
114+
handler_ref["stop"].wait()
115+
handler.stop_processes()
116+
117+
worker_thread = threading.Thread(target=_run_workers, daemon=True)
118+
worker_thread.start()
119+
handler_ready.wait(timeout=30)
120+
self.assertTrue(handler_ready.is_set(), "Workers failed to start within 30s")
121+
time.sleep(3) # Warm up
122+
123+
try:
124+
# Submit workflows
125+
workflow_ids = []
126+
for i in range(workflow_count):
127+
req = StartWorkflowRequest()
128+
req.name = WORKFLOW_NAME
129+
req.version = WORKFLOW_VERSION
130+
req.input = {"run_index": i}
131+
wf_id = self.workflow_client.start_workflow(start_workflow_request=req)
132+
workflow_ids.append(wf_id)
133+
134+
print(f"\n Submitted {len(workflow_ids)} workflows")
135+
136+
# Wait for completion
137+
deadline = time.time() + 60 # 60s timeout
138+
pending = set(workflow_ids)
139+
completed = 0
140+
failed = 0
141+
142+
while pending and time.time() < deadline:
143+
still_pending = set()
144+
for wf_id in pending:
145+
try:
146+
wf = self.workflow_client.get_workflow(wf_id, include_tasks=False)
147+
except Exception:
148+
still_pending.add(wf_id)
149+
continue
150+
151+
if wf.status == "COMPLETED":
152+
completed += 1
153+
elif wf.status in ("FAILED", "TERMINATED", "TIMED_OUT"):
154+
failed += 1
155+
logger.warning("Workflow %s ended with status %s", wf_id, wf.status)
156+
else:
157+
still_pending.add(wf_id)
158+
159+
pending = still_pending
160+
if pending:
161+
time.sleep(1)
162+
163+
print(f" Results: {completed} completed, {failed} failed, {len(pending)} pending")
164+
165+
self.assertEqual(len(pending), 0, f"{len(pending)} workflows did not complete in time")
166+
self.assertEqual(completed, workflow_count, f"Expected {workflow_count} completed, got {completed}")
167+
168+
finally:
169+
handler_ref.get("stop", threading.Event()).set()
170+
worker_thread.join(timeout=15)
171+
172+
173+
if __name__ == "__main__":
174+
unittest.main(verbosity=2)

0 commit comments

Comments
 (0)