Skip to content

Commit 1da5a3f

Browse files
authored
refactor(lease): centralized LeaseManager decouples heartbeats from poll loop (#400)
1 parent 2ce1a54 commit 1da5a3f

3 files changed

Lines changed: 548 additions & 65 deletions

File tree

src/conductor/client/automator/lease_tracker.py

Lines changed: 208 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,27 @@
1-
"""Shared lease extension (heartbeat) tracking for TaskRunner and AsyncTaskRunner."""
1+
"""Centralized lease extension (heartbeat) management for Conductor task runners.
22
3+
Architecture:
4+
LeaseManager runs a single background daemon thread that periodically checks
5+
for tasks needing lease extension heartbeats. Due heartbeats are dispatched
6+
to a small fixed ThreadPoolExecutor for parallel, non-blocking API calls.
7+
8+
This decouples heartbeat work entirely from worker poll loops, preventing
9+
heartbeat API calls (and their retries) from blocking task polling.
10+
11+
Thread-safe: track() and untrack() can be called from any thread or event loop.
12+
"""
13+
14+
import logging
15+
import os
16+
import threading
17+
import time
18+
from concurrent.futures import ThreadPoolExecutor
319
from dataclasses import dataclass
20+
from typing import Any, Dict, Optional
21+
22+
from conductor.client.http.models.task_result import TaskResult
23+
24+
logger = logging.getLogger(__name__)
425

526
# Lease extension constants (matches Java SDK)
627
LEASE_EXTEND_RETRY_COUNT = 3
@@ -15,3 +36,189 @@ class LeaseInfo:
1536
response_timeout_seconds: float
1637
last_heartbeat_time: float # time.monotonic() of last heartbeat (or task start)
1738
interval_seconds: float # 80% of responseTimeoutSeconds
39+
task_client: Any = None # Sync TaskResourceApi for sending heartbeats
40+
41+
42+
class LeaseManager:
43+
"""Centralized lease extension manager for all workers in a process.
44+
45+
One background daemon thread checks for due heartbeats at a fixed interval.
46+
A small ThreadPoolExecutor sends heartbeat API calls in parallel.
47+
Poll loops are never blocked by heartbeat work.
48+
49+
Usage:
50+
manager = LeaseManager.get_instance()
51+
manager.track(task_id, workflow_id, timeout, task_client)
52+
# ... task completes ...
53+
manager.untrack(task_id)
54+
"""
55+
56+
_instance: Optional['LeaseManager'] = None
57+
_instance_lock = threading.Lock()
58+
_instance_pid: Optional[int] = None
59+
60+
@classmethod
61+
def get_instance(cls, check_interval: float = 1.0,
62+
max_heartbeat_workers: int = 4) -> 'LeaseManager':
63+
"""Get or create the process-wide LeaseManager singleton.
64+
65+
Fork-safe: a new instance is created after fork (threads don't survive fork).
66+
"""
67+
current_pid = os.getpid()
68+
if cls._instance is None or cls._instance_pid != current_pid:
69+
with cls._instance_lock:
70+
if cls._instance is None or cls._instance_pid != current_pid:
71+
cls._instance = cls(
72+
check_interval=check_interval,
73+
max_heartbeat_workers=max_heartbeat_workers,
74+
)
75+
cls._instance_pid = current_pid
76+
return cls._instance
77+
78+
@classmethod
79+
def _reset_instance(cls):
80+
"""Reset the singleton. For testing only."""
81+
with cls._instance_lock:
82+
if cls._instance is not None:
83+
cls._instance.shutdown()
84+
cls._instance = None
85+
cls._instance_pid = None
86+
87+
def __init__(self, check_interval: float = 1.0, max_heartbeat_workers: int = 4):
88+
self._tracked: Dict[str, LeaseInfo] = {}
89+
self._lock = threading.Lock()
90+
self._executor = ThreadPoolExecutor(
91+
max_workers=max_heartbeat_workers,
92+
thread_name_prefix="lease-heartbeat",
93+
)
94+
self._stop_event = threading.Event()
95+
self._check_interval = check_interval
96+
self._thread: Optional[threading.Thread] = None
97+
self._started = False
98+
self._start_lock = threading.Lock()
99+
100+
def _ensure_started(self) -> None:
101+
"""Lazily start the background thread on first track() call."""
102+
if self._started:
103+
return
104+
with self._start_lock:
105+
if not self._started:
106+
self._thread = threading.Thread(
107+
target=self._run, daemon=True, name="lease-manager",
108+
)
109+
self._thread.start()
110+
self._started = True
111+
logger.debug(
112+
"LeaseManager started (check_interval=%.1fs)", self._check_interval,
113+
)
114+
115+
def track(self, task_id: str, workflow_instance_id: str,
116+
response_timeout_seconds: float, task_client: Any) -> None:
117+
"""Start tracking a task for lease extension heartbeats.
118+
119+
Thread-safe. Can be called from any worker thread or event loop.
120+
121+
Args:
122+
task_id: Conductor task ID.
123+
workflow_instance_id: Workflow instance this task belongs to.
124+
response_timeout_seconds: The task's server-side response timeout.
125+
task_client: A **sync** TaskResourceApi for sending heartbeat API calls.
126+
"""
127+
interval = response_timeout_seconds * LEASE_EXTEND_DURATION_FACTOR
128+
if interval < 1:
129+
logger.debug(
130+
"Skipping lease tracking for task %s (interval %.1fs too short)",
131+
task_id, interval,
132+
)
133+
return
134+
135+
info = LeaseInfo(
136+
task_id=task_id,
137+
workflow_instance_id=workflow_instance_id,
138+
response_timeout_seconds=response_timeout_seconds,
139+
last_heartbeat_time=time.monotonic(),
140+
interval_seconds=interval,
141+
task_client=task_client,
142+
)
143+
with self._lock:
144+
self._tracked[task_id] = info
145+
self._ensure_started()
146+
logger.debug(
147+
"Tracking lease for task %s (timeout=%ss, heartbeat every %ss)",
148+
task_id, response_timeout_seconds, interval,
149+
)
150+
151+
def untrack(self, task_id: str) -> None:
152+
"""Stop tracking a task. Thread-safe."""
153+
with self._lock:
154+
removed = self._tracked.pop(task_id, None)
155+
if removed is not None:
156+
logger.debug("Untracked lease for task %s", task_id)
157+
158+
@property
159+
def tracked_count(self) -> int:
160+
"""Number of currently tracked tasks."""
161+
with self._lock:
162+
return len(self._tracked)
163+
164+
# -- Background thread -----------------------------------------------------
165+
166+
def _run(self) -> None:
167+
"""Background loop — checks for due heartbeats at fixed intervals."""
168+
while not self._stop_event.is_set():
169+
try:
170+
self._check_and_send()
171+
except Exception as e:
172+
logger.error("LeaseManager error: %s", e)
173+
self._stop_event.wait(self._check_interval)
174+
175+
def _check_and_send(self) -> None:
176+
"""Find tasks with due heartbeats and dispatch to the thread pool."""
177+
now = time.monotonic()
178+
with self._lock:
179+
due = [
180+
info for info in self._tracked.values()
181+
if now - info.last_heartbeat_time >= info.interval_seconds
182+
]
183+
for info in due:
184+
# Update timestamp immediately to prevent double-dispatch on next tick
185+
info.last_heartbeat_time = time.monotonic()
186+
self._executor.submit(self._send_heartbeat, info)
187+
188+
@staticmethod
189+
def _send_heartbeat(info: LeaseInfo) -> None:
190+
"""Send a single lease extension heartbeat with retry.
191+
192+
Runs in a pool thread — blocking retries only block the pool thread,
193+
never a poll loop.
194+
"""
195+
result = TaskResult(
196+
task_id=info.task_id,
197+
workflow_instance_id=info.workflow_instance_id,
198+
extend_lease=True,
199+
)
200+
for attempt in range(LEASE_EXTEND_RETRY_COUNT):
201+
try:
202+
info.task_client.update_task(body=result)
203+
logger.debug("Extended lease for task %s", info.task_id)
204+
return
205+
except Exception as e:
206+
if attempt < LEASE_EXTEND_RETRY_COUNT - 1:
207+
time.sleep(0.5 * (attempt + 2))
208+
else:
209+
logger.error(
210+
"Failed to extend lease for task %s after %d attempts: %s",
211+
info.task_id, LEASE_EXTEND_RETRY_COUNT, e,
212+
)
213+
214+
# -- Lifecycle -------------------------------------------------------------
215+
216+
def shutdown(self) -> None:
217+
"""Stop the background thread and thread pool."""
218+
self._stop_event.set()
219+
if self._started and self._thread is not None:
220+
self._thread.join(timeout=5)
221+
self._executor.shutdown(wait=False)
222+
with self._lock:
223+
self._tracked.clear()
224+
logger.debug("LeaseManager shut down")

src/conductor/client/automator/task_runner.py

Lines changed: 20 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
from conductor.client.worker.worker_config import resolve_worker_config, get_worker_config_oneline
3737
from conductor.client.worker.exception import NonRetryableException
3838
from conductor.client.automator.json_schema_generator import generate_json_schema_from_function
39-
from conductor.client.automator.lease_tracker import LeaseInfo, LEASE_EXTEND_RETRY_COUNT, LEASE_EXTEND_DURATION_FACTOR
39+
from conductor.client.automator.lease_tracker import LeaseManager
4040

4141
logger = logging.getLogger(
4242
Configuration.get_logging_formatted_name(
@@ -112,8 +112,9 @@ def __init__(
112112
self._consecutive_empty_polls = 0 # Track empty polls to implement backoff
113113
self._shutdown = False # Flag to indicate graceful shutdown
114114
self._use_update_v2 = True # Will be set to False if server doesn't support v2 endpoint
115-
self._lease_info = {} # task_id -> LeaseInfo for lease extension heartbeats
116-
self._lease_lock = threading.Lock() # Protects _lease_info for free-threaded Python
115+
self._lease_manager = LeaseManager.get_instance()
116+
self._tracked_task_ids = set() # Local set for cleanup on shutdown
117+
self._tracked_task_ids_lock = threading.Lock()
117118

118119
def run(self) -> None:
119120
if self.configuration is not None:
@@ -153,9 +154,12 @@ def _cleanup(self) -> None:
153154
"""Clean up resources - called on exit."""
154155
logger.debug("Cleaning up TaskRunner resources...")
155156

156-
# Stop all lease extension tracking
157-
with self._lease_lock:
158-
self._lease_info.clear()
157+
# Untrack all tasks this runner was tracking from the shared LeaseManager
158+
with self._tracked_task_ids_lock:
159+
task_ids = list(self._tracked_task_ids)
160+
self._tracked_task_ids.clear()
161+
for task_id in task_ids:
162+
self._lease_manager.untrack(task_id)
159163

160164
# Shutdown ThreadPoolExecutor (EAFP style - more Pythonic)
161165
try:
@@ -429,9 +433,6 @@ def __register_task_definition(self) -> None:
429433

430434
def run_once(self) -> None:
431435
try:
432-
# Send lease extension heartbeats for any tasks that are due
433-
self._send_due_heartbeats()
434-
435436
# Check completed async tasks first (non-blocking)
436437
self.__check_completed_async_tasks()
437438

@@ -1077,74 +1078,29 @@ def __update_task(self, task_result: TaskResult):
10771078

10781079
return None
10791080

1080-
# -- Lease extension (heartbeat) methods ----------------------------------
1081+
# -- Lease extension (heartbeat) delegation to LeaseManager ----------------
10811082

10821083
def _track_lease(self, task: Task) -> None:
1083-
"""Start tracking a task for lease extension heartbeat."""
1084-
lease_enabled = getattr(self.worker, 'lease_extend_enabled', False)
1085-
if not lease_enabled:
1084+
"""Start tracking a task for lease extension via the shared LeaseManager."""
1085+
if not getattr(self.worker, 'lease_extend_enabled', False):
10861086
return
10871087
timeout = getattr(task, 'response_timeout_seconds', None) or 0
10881088
if timeout <= 0:
10891089
return
1090-
interval = timeout * LEASE_EXTEND_DURATION_FACTOR
1091-
if interval < 1:
1092-
return
1093-
info = LeaseInfo(
1090+
self._lease_manager.track(
10941091
task_id=task.task_id,
10951092
workflow_instance_id=task.workflow_instance_id,
10961093
response_timeout_seconds=timeout,
1097-
last_heartbeat_time=time.monotonic(),
1098-
interval_seconds=interval,
1099-
)
1100-
with self._lease_lock:
1101-
self._lease_info[task.task_id] = info
1102-
logger.debug(
1103-
"Tracking lease for task %s (timeout=%ss, heartbeat every %ss)",
1104-
task.task_id, timeout, interval,
1094+
task_client=self.task_client,
11051095
)
1096+
with self._tracked_task_ids_lock:
1097+
self._tracked_task_ids.add(task.task_id)
11061098

11071099
def _untrack_lease(self, task_id: str) -> None:
11081100
"""Stop tracking a task for lease extension."""
1109-
with self._lease_lock:
1110-
removed = self._lease_info.pop(task_id, None)
1111-
if removed is not None:
1112-
logger.debug("Untracked lease for task %s", task_id)
1113-
1114-
def _send_due_heartbeats(self) -> None:
1115-
"""Check all tracked tasks and send heartbeats for any that are due."""
1116-
if not self._lease_info:
1117-
return
1118-
now = time.monotonic()
1119-
with self._lease_lock:
1120-
infos = list(self._lease_info.values())
1121-
for info in infos:
1122-
elapsed = now - info.last_heartbeat_time
1123-
if elapsed < info.interval_seconds:
1124-
continue
1125-
self._send_heartbeat(info)
1126-
info.last_heartbeat_time = time.monotonic()
1127-
1128-
def _send_heartbeat(self, info: LeaseInfo) -> None:
1129-
"""Send a single lease extension heartbeat with retry."""
1130-
result = TaskResult(
1131-
task_id=info.task_id,
1132-
workflow_instance_id=info.workflow_instance_id,
1133-
extend_lease=True,
1134-
)
1135-
for attempt in range(LEASE_EXTEND_RETRY_COUNT):
1136-
try:
1137-
self.task_client.update_task(body=result)
1138-
logger.debug("Extended lease for task %s", info.task_id)
1139-
return
1140-
except Exception as e:
1141-
if attempt < LEASE_EXTEND_RETRY_COUNT - 1:
1142-
time.sleep(0.5 * (attempt + 2))
1143-
else:
1144-
logger.error(
1145-
"Failed to extend lease for task %s after %d attempts: %s",
1146-
info.task_id, LEASE_EXTEND_RETRY_COUNT, e,
1147-
)
1101+
self._lease_manager.untrack(task_id)
1102+
with self._tracked_task_ids_lock:
1103+
self._tracked_task_ids.discard(task_id)
11481104

11491105
# --------------------------------------------------------------------------
11501106

0 commit comments

Comments
 (0)