Skip to content

Commit 3cef362

Browse files
authored
refactor(lease): centralized LeaseManager decouples heartbeats (#401)
1 parent 1da5a3f commit 3cef362

2 files changed

Lines changed: 377 additions & 55 deletions

File tree

src/conductor/client/automator/async_task_runner.py

Lines changed: 33 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from conductor.client.worker.worker_config import resolve_worker_config, get_worker_config_oneline
3333
from conductor.client.worker.exception import NonRetryableException
3434
from conductor.client.automator.json_schema_generator import generate_json_schema_from_function
35-
from conductor.client.automator.lease_tracker import LeaseInfo, LEASE_EXTEND_RETRY_COUNT, LEASE_EXTEND_DURATION_FACTOR
35+
from conductor.client.automator.lease_tracker import LeaseManager
3636

3737
logger = logging.getLogger(
3838
Configuration.get_logging_formatted_name(
@@ -113,7 +113,9 @@ def __init__(
113113
self._semaphore = None
114114
self._shutdown = False # Flag to indicate graceful shutdown
115115
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint
116-
self._lease_info = {} # task_id -> LeaseInfo for lease extension heartbeats
116+
self._lease_manager = LeaseManager.get_instance()
117+
self._tracked_task_ids = set() # Local set for cleanup on shutdown
118+
self._sync_task_client = None # Created after fork for LeaseManager heartbeats
117119

118120
async def run(self) -> None:
119121
"""Main async loop - runs continuously in single event loop."""
@@ -133,6 +135,17 @@ async def run(self) -> None:
133135
api_client=self.async_api_client
134136
)
135137

138+
# Create a sync TaskResourceApi for LeaseManager heartbeats
139+
# (LeaseManager sends heartbeats from its own ThreadPoolExecutor)
140+
from conductor.client.http.api.task_resource_api import TaskResourceApi
141+
from conductor.client.http.api_client import ApiClient
142+
self._sync_task_client = TaskResourceApi(
143+
ApiClient(
144+
configuration=self.configuration,
145+
metrics_collector=self.metrics_collector
146+
)
147+
)
148+
136149
# Create semaphore in the event loop (must be created within the loop)
137150
self._semaphore = asyncio.Semaphore(self._max_workers)
138151

@@ -168,8 +181,10 @@ async def _cleanup(self) -> None:
168181
"""Clean up async resources."""
169182
logger.debug("Cleaning up AsyncTaskRunner resources...")
170183

171-
# Stop all lease extension tracking
172-
self._lease_info.clear()
184+
# Untrack all tasks this runner was tracking from the shared LeaseManager
185+
for task_id in list(self._tracked_task_ids):
186+
self._lease_manager.untrack(task_id)
187+
self._tracked_task_ids.clear()
173188

174189
# Cancel any running tasks (EAFP style)
175190
try:
@@ -187,6 +202,13 @@ async def _cleanup(self) -> None:
187202
except (IOError, OSError) as e:
188203
logger.warning(f"Error closing async client: {e}")
189204

205+
# Close sync HTTP client used for lease heartbeats
206+
if self._sync_task_client:
207+
try:
208+
self._sync_task_client.api_client.rest_client.connection.close()
209+
except Exception:
210+
pass
211+
190212
# Clear event listeners
191213
self.event_dispatcher = None
192214

@@ -441,9 +463,6 @@ async def __async_register_task_definition(self) -> None:
441463
async def run_once(self) -> None:
442464
"""Execute one iteration of the polling loop (async version)."""
443465
try:
444-
# Send lease extension heartbeats for any tasks that are due
445-
await self._send_due_heartbeats()
446-
447466
# No need for manual cleanup - tasks remove themselves via add_done_callback
448467
# Just check capacity directly
449468
current_capacity = len(self._running_tasks)
@@ -932,68 +951,27 @@ async def __async_update_task(self, task_result: TaskResult):
932951

933952
return None
934953

935-
# -- Lease extension (heartbeat) methods ----------------------------------
954+
# -- Lease extension (heartbeat) delegation to LeaseManager ----------------
936955

937956
def _track_lease(self, task) -> None:
938-
"""Start tracking a task for lease extension heartbeat."""
957+
"""Start tracking a task for lease extension via the shared LeaseManager."""
939958
if not getattr(self.worker, 'lease_extend_enabled', False):
940959
return
941960
timeout = getattr(task, 'response_timeout_seconds', None) or 0
942961
if timeout <= 0:
943962
return
944-
interval = timeout * LEASE_EXTEND_DURATION_FACTOR
945-
if interval < 1:
946-
return
947-
self._lease_info[task.task_id] = LeaseInfo(
963+
self._lease_manager.track(
948964
task_id=task.task_id,
949965
workflow_instance_id=task.workflow_instance_id,
950966
response_timeout_seconds=timeout,
951-
last_heartbeat_time=time.monotonic(),
952-
interval_seconds=interval,
953-
)
954-
logger.debug(
955-
"Tracking lease for task %s (timeout=%ss, heartbeat every %ss)",
956-
task.task_id, timeout, interval,
967+
task_client=self._sync_task_client,
957968
)
969+
self._tracked_task_ids.add(task.task_id)
958970

959971
def _untrack_lease(self, task_id: str) -> None:
960972
"""Stop tracking a task for lease extension."""
961-
removed = self._lease_info.pop(task_id, None)
962-
if removed is not None:
963-
logger.debug("Untracked lease for task %s", task_id)
964-
965-
async def _send_due_heartbeats(self) -> None:
966-
"""Check all tracked tasks and send heartbeats for any that are due."""
967-
if not self._lease_info:
968-
return
969-
now = time.monotonic()
970-
for info in list(self._lease_info.values()):
971-
elapsed = now - info.last_heartbeat_time
972-
if elapsed < info.interval_seconds:
973-
continue
974-
await self._send_heartbeat(info)
975-
info.last_heartbeat_time = time.monotonic()
976-
977-
async def _send_heartbeat(self, info: LeaseInfo) -> None:
978-
"""Send a single lease extension heartbeat with retry (async)."""
979-
result = TaskResult(
980-
task_id=info.task_id,
981-
workflow_instance_id=info.workflow_instance_id,
982-
extend_lease=True,
983-
)
984-
for attempt in range(LEASE_EXTEND_RETRY_COUNT):
985-
try:
986-
await self.async_task_client.update_task(body=result)
987-
logger.debug("Extended lease for task %s", info.task_id)
988-
return
989-
except Exception as e:
990-
if attempt < LEASE_EXTEND_RETRY_COUNT - 1:
991-
await asyncio.sleep(0.5 * (attempt + 2))
992-
else:
993-
logger.error(
994-
"Failed to extend lease for task %s after %d attempts: %s",
995-
info.task_id, LEASE_EXTEND_RETRY_COUNT, e,
996-
)
973+
self._lease_manager.untrack(task_id)
974+
self._tracked_task_ids.discard(task_id)
997975

998976
# --------------------------------------------------------------------------
999977

0 commit comments

Comments
 (0)