Skip to content

Commit 3209378

Browse files
committed
test(lease): add async worker E2E tests for LeaseManager
Three tests against a live Conductor server: 1. Async task with heartbeat → COMPLETED (50s sleep, 10s timeout) 2. Async task without heartbeat → TIMED_OUT 3. Performance: heartbeat tracking adds 0ms overhead on fast tasks
1 parent 634aace commit 3209378

1 file changed

Lines changed: 344 additions & 0 deletions

File tree

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
"""
2+
E2E test for lease extension with async workers (AsyncTaskRunner).
3+
4+
Proves that the centralized LeaseManager works correctly with async workers:
5+
6+
1. WITH lease extension: async long-running task COMPLETES even when execution
7+
time exceeds responseTimeoutSeconds — heartbeats keep the lease alive.
8+
9+
2. WITHOUT lease extension: same async task TIMES OUT after responseTimeoutSeconds.
10+
11+
3. PERFORMANCE: async worker with heartbeat enabled but short task (no heartbeat
12+
actually needed) has no meaningful overhead vs. one without heartbeat tracking.
13+
14+
Run:
15+
export CONDUCTOR_SERVER_URL="http://localhost:8000/api"
16+
python3 -m pytest tests/integration/test_async_lease_extension.py -v -s
17+
18+
Prerequisites:
19+
- Conductor server running (e.g. http://localhost:8000/api)
20+
"""
21+
22+
import asyncio
23+
import logging
24+
import os
25+
import sys
26+
import time
27+
import threading
28+
import unittest
29+
30+
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
31+
32+
from conductor.client.automator.task_handler import TaskHandler
33+
from conductor.client.configuration.configuration import Configuration
34+
from conductor.client.worker.worker_task import worker_task
35+
from conductor.client.http.models.workflow_def import WorkflowDef
36+
from conductor.client.http.models.task_def import TaskDef
37+
from conductor.client.http.models.workflow_task import WorkflowTask
38+
from conductor.client.http.models.start_workflow_request import StartWorkflowRequest
39+
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
40+
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
41+
42+
logging.basicConfig(level=logging.INFO)
43+
logger = logging.getLogger(__name__)
44+
45+
# Short response timeout — task must heartbeat to stay alive
46+
RESPONSE_TIMEOUT_SECONDS = 10
47+
48+
# Task sleeps longer than the response timeout to prove heartbeat works.
49+
# Must be long enough that the server's workflow sweeper catches the expired
50+
# task BEFORE the worker completes.
51+
TASK_SLEEP_SECONDS = 50
52+
53+
# Short task duration for performance test — well within timeout
54+
FAST_TASK_SLEEP_SECONDS = 2
55+
56+
# Number of fast tasks for performance comparison
57+
PERF_TASK_COUNT = 5
58+
59+
60+
# -- Async Workers -----------------------------------------------------------
61+
62+
@worker_task(
63+
task_definition_name='async_lease_heartbeat_task',
64+
lease_extend_enabled=True,
65+
register_task_def=True,
66+
task_def=TaskDef(
67+
name='async_lease_heartbeat_task',
68+
response_timeout_seconds=RESPONSE_TIMEOUT_SECONDS,
69+
timeout_seconds=180,
70+
retry_count=0,
71+
),
72+
overwrite_task_def=True,
73+
)
74+
async def async_lease_heartbeat_task(job_id: str) -> dict:
75+
"""Async long-running task with heartbeat — should complete."""
76+
logger.info("[async_heartbeat] Starting job %s, sleeping %ss (timeout=%ss)",
77+
job_id, TASK_SLEEP_SECONDS, RESPONSE_TIMEOUT_SECONDS)
78+
await asyncio.sleep(TASK_SLEEP_SECONDS)
79+
logger.info("[async_heartbeat] Completed job %s", job_id)
80+
return {'job_id': job_id, 'status': 'completed', 'slept': TASK_SLEEP_SECONDS}
81+
82+
83+
@worker_task(
84+
task_definition_name='async_lease_no_heartbeat_task',
85+
lease_extend_enabled=False,
86+
register_task_def=True,
87+
task_def=TaskDef(
88+
name='async_lease_no_heartbeat_task',
89+
response_timeout_seconds=RESPONSE_TIMEOUT_SECONDS,
90+
timeout_seconds=120,
91+
retry_count=0,
92+
),
93+
overwrite_task_def=True,
94+
)
95+
async def async_lease_no_heartbeat_task(job_id: str) -> dict:
96+
"""Async long-running task without heartbeat — should time out."""
97+
logger.info("[async_no_heartbeat] Starting job %s, sleeping %ss (timeout=%ss)",
98+
job_id, TASK_SLEEP_SECONDS, RESPONSE_TIMEOUT_SECONDS)
99+
await asyncio.sleep(TASK_SLEEP_SECONDS)
100+
logger.info("[async_no_heartbeat] Completed job %s", job_id)
101+
return {'job_id': job_id, 'status': 'completed', 'slept': TASK_SLEEP_SECONDS}
102+
103+
104+
@worker_task(
105+
task_definition_name='async_lease_fast_with_hb',
106+
lease_extend_enabled=True,
107+
register_task_def=True,
108+
task_def=TaskDef(
109+
name='async_lease_fast_with_hb',
110+
response_timeout_seconds=60,
111+
timeout_seconds=120,
112+
retry_count=0,
113+
),
114+
overwrite_task_def=True,
115+
)
116+
async def async_lease_fast_with_hb(job_id: str) -> dict:
117+
"""Fast async task with heartbeat enabled — for overhead measurement."""
118+
await asyncio.sleep(FAST_TASK_SLEEP_SECONDS)
119+
return {'job_id': job_id, 'status': 'completed'}
120+
121+
122+
@worker_task(
123+
task_definition_name='async_lease_fast_no_hb',
124+
lease_extend_enabled=False,
125+
register_task_def=True,
126+
task_def=TaskDef(
127+
name='async_lease_fast_no_hb',
128+
response_timeout_seconds=60,
129+
timeout_seconds=120,
130+
retry_count=0,
131+
),
132+
overwrite_task_def=True,
133+
)
134+
async def async_lease_fast_no_hb(job_id: str) -> dict:
135+
"""Fast async task without heartbeat — baseline for comparison."""
136+
await asyncio.sleep(FAST_TASK_SLEEP_SECONDS)
137+
return {'job_id': job_id, 'status': 'completed'}
138+
139+
140+
# -- Test class --------------------------------------------------------------
141+
142+
class TestAsyncLeaseExtension(unittest.TestCase):
143+
144+
@classmethod
145+
def setUpClass(cls):
146+
from tests.integration.conftest import skip_if_server_unavailable
147+
skip_if_server_unavailable()
148+
149+
cls.config = Configuration()
150+
cls.metadata_client = OrkesMetadataClient(cls.config)
151+
cls.workflow_client = OrkesWorkflowClient(cls.config)
152+
153+
def _register_workflow(self, wf_name, task_names):
154+
"""Register a workflow with one or more tasks in sequence."""
155+
workflow = WorkflowDef(name=wf_name, version=1)
156+
tasks = []
157+
for task_name in (task_names if isinstance(task_names, list) else [task_names]):
158+
tasks.append(WorkflowTask(
159+
name=task_name,
160+
task_reference_name=f'{task_name}_ref',
161+
input_parameters={'job_id': '${workflow.input.job_id}'},
162+
))
163+
workflow._tasks = tasks
164+
try:
165+
self.metadata_client.update_workflow_def(workflow, overwrite=True)
166+
except Exception:
167+
self.metadata_client.register_workflow_def(workflow, overwrite=True)
168+
logger.info("Registered workflow: %s", wf_name)
169+
170+
def _start_workflow(self, wf_name, job_id):
171+
"""Start a workflow and return the execution ID."""
172+
req = StartWorkflowRequest()
173+
req.name = wf_name
174+
req.version = 1
175+
req.input = {'job_id': job_id}
176+
wf_id = self.workflow_client.start_workflow(start_workflow_request=req)
177+
logger.info("Started workflow %s: %s", wf_name, wf_id)
178+
return wf_id
179+
180+
def _wait_for_workflow(self, wf_id, timeout_seconds=90):
181+
"""Poll until workflow reaches a terminal state."""
182+
for _ in range(timeout_seconds):
183+
wf = self.workflow_client.get_workflow(wf_id, include_tasks=True)
184+
if wf.status in ('COMPLETED', 'FAILED', 'TIMED_OUT', 'TERMINATED'):
185+
return wf
186+
time.sleep(1)
187+
return self.workflow_client.get_workflow(wf_id, include_tasks=True)
188+
189+
def _run_workers_in_background(self, duration_seconds=90):
190+
"""Start workers in a background thread, return stop function."""
191+
handler = TaskHandler(
192+
configuration=self.config,
193+
scan_for_annotated_workers=True,
194+
)
195+
handler.start_processes()
196+
197+
def stop():
198+
handler.stop_processes()
199+
200+
timer = threading.Timer(duration_seconds, stop)
201+
timer.daemon = True
202+
timer.start()
203+
204+
return stop
205+
206+
# -- Tests ----------------------------------------------------------------
207+
208+
def test_01_async_with_heartbeat_completes(self):
209+
"""Async task WITH lease_extend_enabled=True completes when sleep > responseTimeout."""
210+
print("\n" + "=" * 80)
211+
print("TEST: Async with heartbeat — task should COMPLETE")
212+
print(f" responseTimeoutSeconds={RESPONSE_TIMEOUT_SECONDS}s, task sleeps {TASK_SLEEP_SECONDS}s")
213+
print("=" * 80)
214+
215+
wf_name = 'test_async_lease_heartbeat'
216+
self._register_workflow(wf_name, 'async_lease_heartbeat_task')
217+
218+
stop_workers = self._run_workers_in_background(duration_seconds=90)
219+
time.sleep(3) # let workers start
220+
221+
try:
222+
wf_id = self._start_workflow(wf_name, 'ASYNC-HB-001')
223+
wf = self._wait_for_workflow(wf_id, timeout_seconds=80)
224+
225+
print(f"\n Workflow ID: {wf_id}")
226+
print(f" Final status: {wf.status}")
227+
for task in (wf.tasks or []):
228+
print(f" Task {task.task_def_name}: {task.status}")
229+
230+
self.assertEqual(wf.status, 'COMPLETED',
231+
f"Workflow should COMPLETE with heartbeat, got {wf.status}")
232+
233+
tasks_by_ref = {t.reference_task_name: t for t in wf.tasks}
234+
task = tasks_by_ref.get('async_lease_heartbeat_task_ref')
235+
self.assertIsNotNone(task)
236+
self.assertEqual(task.status, 'COMPLETED')
237+
self.assertEqual(task.output_data.get('job_id'), 'ASYNC-HB-001')
238+
self.assertEqual(task.output_data.get('slept'), TASK_SLEEP_SECONDS)
239+
print("\n PASS: Async task completed with heartbeat keeping lease alive")
240+
finally:
241+
stop_workers()
242+
243+
def test_02_async_without_heartbeat_times_out(self):
244+
"""Async task WITHOUT lease_extend_enabled times out when sleep > responseTimeout."""
245+
print("\n" + "=" * 80)
246+
print("TEST: Async without heartbeat — task should TIME OUT")
247+
print(f" responseTimeoutSeconds={RESPONSE_TIMEOUT_SECONDS}s, task sleeps {TASK_SLEEP_SECONDS}s")
248+
print("=" * 80)
249+
250+
wf_name = 'test_async_lease_no_heartbeat'
251+
self._register_workflow(wf_name, 'async_lease_no_heartbeat_task')
252+
253+
stop_workers = self._run_workers_in_background(duration_seconds=90)
254+
time.sleep(3)
255+
256+
try:
257+
wf_id = self._start_workflow(wf_name, 'ASYNC-NOHB-001')
258+
wf = self._wait_for_workflow(wf_id, timeout_seconds=80)
259+
260+
print(f"\n Workflow ID: {wf_id}")
261+
print(f" Final status: {wf.status}")
262+
for task in (wf.tasks or []):
263+
print(f" Task {task.task_def_name}: {task.status}")
264+
265+
self.assertIn(wf.status, ('FAILED', 'TIMED_OUT'),
266+
f"Workflow should FAIL/TIMEOUT without heartbeat, got {wf.status}")
267+
268+
tasks_by_ref = {t.reference_task_name: t for t in wf.tasks}
269+
task = tasks_by_ref.get('async_lease_no_heartbeat_task_ref')
270+
self.assertIsNotNone(task)
271+
self.assertIn(task.status, ('TIMED_OUT', 'FAILED', 'CANCELED'),
272+
f"Task should be TIMED_OUT/FAILED, got {task.status}")
273+
print("\n PASS: Async task timed out as expected without heartbeat")
274+
finally:
275+
stop_workers()
276+
277+
def test_03_no_performance_overhead(self):
278+
"""Heartbeat tracking adds no meaningful overhead to fast async tasks."""
279+
print("\n" + "=" * 80)
280+
print("TEST: Performance — heartbeat enabled vs disabled on fast tasks")
281+
print(f" Running {PERF_TASK_COUNT} tasks each, sleep={FAST_TASK_SLEEP_SECONDS}s")
282+
print("=" * 80)
283+
284+
wf_with_hb = 'test_async_perf_with_hb'
285+
wf_no_hb = 'test_async_perf_no_hb'
286+
self._register_workflow(wf_with_hb, 'async_lease_fast_with_hb')
287+
self._register_workflow(wf_no_hb, 'async_lease_fast_no_hb')
288+
289+
stop_workers = self._run_workers_in_background(duration_seconds=120)
290+
time.sleep(3)
291+
292+
try:
293+
# Run tasks WITH heartbeat tracking
294+
hb_workflow_ids = []
295+
for i in range(PERF_TASK_COUNT):
296+
wf_id = self._start_workflow(wf_with_hb, f'PERF-HB-{i:03d}')
297+
hb_workflow_ids.append(wf_id)
298+
299+
# Run tasks WITHOUT heartbeat tracking
300+
no_hb_workflow_ids = []
301+
for i in range(PERF_TASK_COUNT):
302+
wf_id = self._start_workflow(wf_no_hb, f'PERF-NOHB-{i:03d}')
303+
no_hb_workflow_ids.append(wf_id)
304+
305+
# Wait for all to complete
306+
hb_times = []
307+
for wf_id in hb_workflow_ids:
308+
wf = self._wait_for_workflow(wf_id, timeout_seconds=30)
309+
self.assertEqual(wf.status, 'COMPLETED',
310+
f"Fast HB task should complete, got {wf.status}")
311+
task = wf.tasks[0]
312+
duration_ms = task.end_time - task.start_time
313+
hb_times.append(duration_ms)
314+
315+
no_hb_times = []
316+
for wf_id in no_hb_workflow_ids:
317+
wf = self._wait_for_workflow(wf_id, timeout_seconds=30)
318+
self.assertEqual(wf.status, 'COMPLETED',
319+
f"Fast no-HB task should complete, got {wf.status}")
320+
task = wf.tasks[0]
321+
duration_ms = task.end_time - task.start_time
322+
no_hb_times.append(duration_ms)
323+
324+
avg_hb = sum(hb_times) / len(hb_times)
325+
avg_no_hb = sum(no_hb_times) / len(no_hb_times)
326+
overhead_ms = avg_hb - avg_no_hb
327+
overhead_pct = (overhead_ms / avg_no_hb * 100) if avg_no_hb > 0 else 0
328+
329+
print(f"\n With heartbeat: avg {avg_hb:.0f}ms {hb_times}")
330+
print(f" Without heartbeat: avg {avg_no_hb:.0f}ms {no_hb_times}")
331+
print(f" Overhead: {overhead_ms:+.0f}ms ({overhead_pct:+.1f}%)")
332+
333+
# Heartbeat tracking should add < 500ms overhead per task
334+
# (LeaseManager.track is just a dict insert + set add)
335+
self.assertLess(overhead_ms, 500,
336+
f"Heartbeat overhead too high: {overhead_ms:.0f}ms")
337+
338+
print("\n PASS: No meaningful performance overhead from heartbeat tracking")
339+
finally:
340+
stop_workers()
341+
342+
343+
if __name__ == '__main__':
344+
unittest.main()

0 commit comments

Comments
 (0)