Skip to content

Commit 387d6db

Browse files
authored
Lease extend (#398) and Fix memory leak
1 parent e555f78 commit 387d6db

18 files changed

Lines changed: 1309 additions & 101 deletions

LEASE_EXTENSION.md

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Lease Extension (Automatic Heartbeat)
2+
3+
When a worker picks up a task, the Conductor server starts a `responseTimeoutSeconds` timer. If the worker doesn't send an update before the timer expires, the server marks the task as timed out and re-queues it for retry.
4+
5+
For long-running tasks (agent tool calls, LLM inference, data processing, batch jobs), the worker is actively executing but the server thinks it's dead. **Lease extension** solves this by automatically sending heartbeats that reset the timeout timer.
6+
7+
## How It Works
8+
9+
When `lease_extend_enabled=True`:
10+
11+
1. Worker picks up a task with `responseTimeoutSeconds > 0`
12+
2. SDK starts tracking the task for heartbeats
13+
3. At **80% of `responseTimeoutSeconds`**, SDK sends a heartbeat (`TaskResult.extend_lease=True`)
14+
4. Server resets the task's `updateTime` to now, giving a fresh `responseTimeoutSeconds` window
15+
5. Heartbeats continue until the task completes, fails, or the worker shuts down
16+
17+
```
18+
Timeline (responseTimeoutSeconds=120s):
19+
0s 96s 192s 288s
20+
|-----------|-----------|-----------|--→ task completes
21+
poll heartbeat heartbeat heartbeat
22+
(80%) (80%) (80%)
23+
```
24+
25+
The heartbeat fires at 80% of `responseTimeoutSeconds` (matching the Java SDK). This gives a 20% safety margin — if a heartbeat is slightly delayed, the task still has time before the server times it out.
26+
27+
## Quick Start
28+
29+
```python
30+
from conductor.client.worker.worker_task import worker_task
31+
32+
@worker_task(
33+
task_definition_name='long_running_analysis',
34+
lease_extend_enabled=True, # Enable automatic heartbeat
35+
)
36+
def analyze_dataset(dataset_id: str) -> dict:
37+
"""This task takes 5 minutes but responseTimeoutSeconds is 60s.
38+
Heartbeats keep it alive automatically."""
39+
results = run_expensive_analysis(dataset_id)
40+
return {'results': results}
41+
```
42+
43+
That's it. The SDK handles heartbeats automatically in the background.
44+
45+
## Enabling Lease Extension
46+
47+
Lease extension is **disabled by default** (matching the Java SDK). Enable it per-worker or globally:
48+
49+
### Per-Worker (Decorator)
50+
51+
```python
52+
@worker_task(
53+
task_definition_name='my_task',
54+
lease_extend_enabled=True,
55+
)
56+
def my_task(data: str) -> dict:
57+
...
58+
```
59+
60+
### Per-Worker (Class)
61+
62+
```python
63+
from conductor.client.worker.worker import Worker
64+
65+
worker = Worker(
66+
task_definition_name='my_task',
67+
execute_function=my_function,
68+
lease_extend_enabled=True,
69+
)
70+
```
71+
72+
### Per-Worker (Environment Variable)
73+
74+
```shell
75+
export conductor_worker_my_task_lease_extend_enabled=true
76+
```
77+
78+
### Global (All Workers)
79+
80+
```shell
81+
export conductor_worker_all_lease_extend_enabled=true
82+
```
83+
84+
### Precedence
85+
86+
Environment variables override decorator/constructor arguments:
87+
88+
1. Task-specific env var (`conductor_worker_<task>_lease_extend_enabled`)
89+
2. Global env var (`conductor_worker_all_lease_extend_enabled`)
90+
3. Worker constructor / decorator argument
91+
92+
## When to Use
93+
94+
**Enable lease extension when:**
95+
- Task execution time may exceed `responseTimeoutSeconds`
96+
- Tasks involve external calls with unpredictable latency (LLM APIs, data pipelines)
97+
- You want the worker to hold the task continuously (not yield and re-poll)
98+
99+
**You don't need lease extension when:**
100+
- Tasks always complete within `responseTimeoutSeconds`
101+
- You're using `TaskInProgress` with `callbackAfterSeconds` (the task is yielded back to the queue)
102+
- `responseTimeoutSeconds` is 0 (no timeout configured)
103+
104+
## Lease Extension vs TaskInProgress
105+
106+
These are two different strategies for long-running tasks:
107+
108+
| | Lease Extension | TaskInProgress |
109+
|---|---|---|
110+
| **How it works** | Worker holds the task, heartbeats keep it alive | Worker yields the task, re-polls later |
111+
| **Task state** | IN_PROGRESS the whole time | Returned to queue between polls |
112+
| **When to use** | Continuous execution (LLM calls, streaming) | Incremental processing (batch chunks, polling external status) |
113+
| **Enable with** | `lease_extend_enabled=True` | Return `TaskInProgress(callback_after_seconds=N)` |
114+
| **Worker memory** | Task stays in worker memory | Task is released, re-polled with fresh context |
115+
116+
You can combine both — enable `lease_extend_enabled` for safety while also using `TaskInProgress` for incremental polling.
117+
118+
## Important Constraints
119+
120+
- **`responseTimeoutSeconds`** is the time between updates. This is what heartbeats reset.
121+
- **`timeoutSeconds`** is the overall SLA wall-clock ceiling. **Cannot be extended by heartbeat.** Once exceeded, the task is TIMED_OUT regardless of heartbeats.
122+
- Heartbeats only fire when `responseTimeoutSeconds > 0` and `lease_extend_enabled = True`.
123+
- If the heartbeat interval would be less than 1 second (i.e., `responseTimeoutSeconds < 1.25`), heartbeats are skipped.
124+
125+
## Retry on Failure
126+
127+
If a heartbeat API call fails, the SDK retries up to 3 times with backoff (`1s`, `1.5s`, `2s`). If all retries fail, the error is logged and the SDK tries again on the next poll loop iteration. If the network is truly partitioned, the server will eventually time out the task — this is correct behavior.
128+
129+
## Example
130+
131+
See [examples/lease_extension_example.py](examples/lease_extension_example.py) for a complete runnable example that:
132+
- Defines a long-running worker with `lease_extend_enabled=True`
133+
- Creates a workflow with a short `responseTimeoutSeconds`
134+
- Runs the workflow and proves the task completes despite sleeping longer than the timeout

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ If you find [Conductor](https://github.com/conductor-oss/conductor) useful, plea
1919
* [Workers: Sync and Async](#workers-sync-and-async)
2020
* [Workflows with HTTP Calls and Waits](#workflows-with-http-calls-and-waits)
2121
* [Long-Running Tasks with TaskContext](#long-running-tasks-with-taskcontext)
22+
* [Lease Extension for Long-Running Tasks](#lease-extension-for-long-running-tasks)
2223
* [Monitoring with Metrics](#monitoring-with-metrics)
2324
* [Managing Workflow Executions](#managing-workflow-executions)
2425
* [AI & LLM Workflows](#ai--llm-workflows)
@@ -277,6 +278,26 @@ def batch_job(batch_id: str) -> Union[dict, TaskInProgress]:
277278
278279
See [examples/task_context_example.py](examples/task_context_example.py) for all patterns (polling, retry-aware logic, async context, input access).
279280
281+
### Lease Extension for Long-Running Tasks
282+
283+
For tasks that run longer than `responseTimeoutSeconds` (e.g., LLM inference, data pipelines, batch jobs), enable automatic lease extension. The SDK sends heartbeats at 80% of `responseTimeoutSeconds`, resetting the server's timeout timer so the task stays alive:
284+
285+
```python
286+
from conductor.client.worker.worker_task import worker_task
287+
288+
@worker_task(
289+
task_definition_name='train_model',
290+
lease_extend_enabled=True, # Automatic heartbeat — keeps task alive
291+
)
292+
def train_model(dataset_id: str) -> dict:
293+
"""Runs for 10 minutes, but responseTimeoutSeconds is only 60s.
294+
Heartbeats at 48s intervals keep the lease alive."""
295+
model = train(dataset_id)
296+
return {'model_id': model.id, 'accuracy': model.accuracy}
297+
```
298+
299+
Disabled by default. Enable per-worker via decorator, constructor, or environment variable (`conductor_worker_<task>_lease_extend_enabled=true`). See [LEASE_EXTENSION.md](LEASE_EXTENSION.md) for the full guide.
300+
280301
### Monitoring with Metrics
281302
282303
Enable Prometheus metrics with a single setting — the SDK exposes poll counts, execution times, error rates, and HTTP latency:
@@ -409,6 +430,7 @@ See the [Examples Guide](examples/README.md) for the full catalog. Key examples:
409430
| [kitchensink.py](examples/kitchensink.py) | All task types (HTTP, JS, JQ, Switch) | `python examples/kitchensink.py` |
410431
| [workflow_ops.py](examples/workflow_ops.py) | Pause, resume, terminate, retry, restart, rerun, signal | `python examples/workflow_ops.py` |
411432
| [task_context_example.py](examples/task_context_example.py) | Long-running tasks with TaskInProgress | `python examples/task_context_example.py` |
433+
| [lease_extension_example.py](examples/lease_extension_example.py) | Automatic heartbeat for long-running tasks | `python examples/lease_extension_example.py` |
412434
| [metrics_example.py](examples/metrics_example.py) | Prometheus metrics collection | `python examples/metrics_example.py` |
413435
| [fastapi_worker_service.py](examples/fastapi_worker_service.py) | FastAPI: expose a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
414436
| [helloworld.py](examples/helloworld/helloworld.py) | Minimal hello world | `python examples/helloworld/helloworld.py` |
@@ -433,6 +455,7 @@ End-to-end examples covering all APIs for each domain:
433455
| [Worker Design](docs/design/WORKER_DESIGN.md) | Architecture: AsyncTaskRunner vs TaskRunner, discovery, lifecycle |
434456
| [Worker Guide](docs/WORKER.md) | All worker patterns (function, class, annotation, async) |
435457
| [Worker Configuration](WORKER_CONFIGURATION.md) | Hierarchical environment variable configuration |
458+
| [Lease Extension](LEASE_EXTENSION.md) | Automatic heartbeat for long-running tasks |
436459
| [Workflow Management](docs/WORKFLOW.md) | Start, pause, resume, terminate, retry, search |
437460
| [Workflow Testing](docs/WORKFLOW_TESTING.md) | Unit testing with mock outputs |
438461
| [Task Management](docs/TASK_MANAGEMENT.md) | Task operations |

WORKER_CONFIGURATION.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ The following properties can be configured via environment variables:
2929
| `overwrite_task_def` | bool | Overwrite existing task definitions when registering (default: true) | `false` | ✅ Yes |
3030
| `strict_schema` | bool | Enforce strict schema validation - additionalProperties=false (default: false) | `true` | ✅ Yes |
3131
| `poll_timeout` | int | Poll request timeout in milliseconds | `100` | ✅ Yes |
32-
| `lease_extend_enabled` | bool | ⚠️ **Not implemented** - reserved for future use | `false` | ✅ Yes |
32+
| `lease_extend_enabled` | bool | Auto-extend task lease via heartbeat (see below) | `false` | ✅ Yes |
3333
| `paused` | bool | Pause worker from polling/executing tasks | `true` |**Environment-only** |
3434

3535
**Notes**:
3636
- The `paused` property is intentionally **not available** in the `@worker_task` decorator. It can only be controlled via environment variables, allowing operators to pause/resume workers at runtime without code changes or redeployment.
37-
- The `lease_extend_enabled` parameter is accepted but **not currently implemented**. For lease extension, use manual `TaskInProgress` returns (see below).
37+
- When `lease_extend_enabled=True`, the SDK automatically sends heartbeats at 80% of `responseTimeoutSeconds` to keep long-running tasks alive. Without it, tasks that exceed `responseTimeoutSeconds` are timed out and retried by the server.
3838
- The `register_task_def` parameter automatically registers task definitions with JSON Schema (draft-07) generated from Python type hints.
3939
- The `overwrite_task_def` parameter controls whether to overwrite existing task definitions (default: true).
4040
- The `strict_schema` parameter controls JSON schema validation strictness (default: false for lenient validation).
@@ -97,7 +97,7 @@ def long_task(job_id: str) -> Union[dict, TaskInProgress]:
9797
return {'status': 'completed', 'result': processed}
9898
```
9999

100-
**⚠️ Note**: The `lease_extend_enabled=True` configuration parameter does **not** provide automatic lease extension. You must explicitly return `TaskInProgress` to extend the lease.
100+
**Automatic lease extension**: When `lease_extend_enabled=True`, the SDK sends a heartbeat to the server at 80% of `responseTimeoutSeconds`, resetting the timeout clock. This keeps the task alive without requiring manual `TaskInProgress` returns. The `TaskInProgress` pattern is still useful for chunked/checkpoint-based execution where the worker yields the task back to the queue.
101101

102102
**For detailed patterns**, see [Long-Running Tasks & Lease Extension](docs/design/WORKER_DESIGN.md#long-running-tasks--lease-extension).
103103

docs/WORKER.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ class SimpleCppWorker(WorkerInterface):
593593

594594
## Long-Running Tasks and Lease Extension
595595

596-
For tasks that take longer than the configured `responseTimeoutSeconds`, the SDK provides automatic lease extension to prevent timeouts. See the comprehensive [Lease Extension Guide](../../LEASE_EXTENSION.md) for:
596+
For tasks that take longer than the configured `responseTimeoutSeconds`, the SDK provides automatic lease extension to prevent timeouts. See the comprehensive [Lease Extension Guide](../LEASE_EXTENSION.md) for:
597597

598598
- How lease extension works
599599
- Automatic vs manual control

0 commit comments

Comments
 (0)