Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions LEASE_EXTENSION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Lease Extension (Automatic Heartbeat)

When a worker picks up a task, the Conductor server starts a `responseTimeoutSeconds` timer. If the worker doesn't send an update before the timer expires, the server marks the task as timed out and re-queues it for retry.

For long-running tasks (agent tool calls, LLM inference, data processing, batch jobs), the worker is actively executing but the server thinks it's dead. **Lease extension** solves this by automatically sending heartbeats that reset the timeout timer.

## How It Works

When `lease_extend_enabled=True`:

1. Worker picks up a task with `responseTimeoutSeconds > 0`
2. SDK starts tracking the task for heartbeats
3. At **80% of `responseTimeoutSeconds`**, SDK sends a heartbeat (`TaskResult.extend_lease=True`)
4. Server resets the task's `updateTime` to now, giving a fresh `responseTimeoutSeconds` window
5. Heartbeats continue until the task completes, fails, or the worker shuts down

```
Timeline (responseTimeoutSeconds=120s):
0s 96s 192s 288s
|-----------|-----------|-----------|--→ task completes
poll heartbeat heartbeat heartbeat
(80%) (80%) (80%)
```

The heartbeat fires at 80% of `responseTimeoutSeconds` (matching the Java SDK). This gives a 20% safety margin — if a heartbeat is slightly delayed, the task still has time before the server times it out.

## Quick Start

```python
from conductor.client.worker.worker_task import worker_task

@worker_task(
task_definition_name='long_running_analysis',
lease_extend_enabled=True, # Enable automatic heartbeat
)
def analyze_dataset(dataset_id: str) -> dict:
"""This task takes 5 minutes but responseTimeoutSeconds is 60s.
Heartbeats keep it alive automatically."""
results = run_expensive_analysis(dataset_id)
return {'results': results}
```

That's it. The SDK handles heartbeats automatically in the background.

## Enabling Lease Extension

Lease extension is **disabled by default** (matching the Java SDK). Enable it per-worker or globally:

### Per-Worker (Decorator)

```python
@worker_task(
task_definition_name='my_task',
lease_extend_enabled=True,
)
def my_task(data: str) -> dict:
...
```

### Per-Worker (Class)

```python
from conductor.client.worker.worker import Worker

worker = Worker(
task_definition_name='my_task',
execute_function=my_function,
lease_extend_enabled=True,
)
```

### Per-Worker (Environment Variable)

```shell
export conductor_worker_my_task_lease_extend_enabled=true
```

### Global (All Workers)

```shell
export conductor_worker_all_lease_extend_enabled=true
```

### Precedence

Environment variables override decorator/constructor arguments:

1. Task-specific env var (`conductor_worker_<task>_lease_extend_enabled`)
2. Global env var (`conductor_worker_all_lease_extend_enabled`)
3. Worker constructor / decorator argument

## When to Use

**Enable lease extension when:**
- Task execution time may exceed `responseTimeoutSeconds`
- Tasks involve external calls with unpredictable latency (LLM APIs, data pipelines)
- You want the worker to hold the task continuously (not yield and re-poll)

**You don't need lease extension when:**
- Tasks always complete within `responseTimeoutSeconds`
- You're using `TaskInProgress` with `callbackAfterSeconds` (the task is yielded back to the queue)
- `responseTimeoutSeconds` is 0 (no timeout configured)

## Lease Extension vs TaskInProgress

These are two different strategies for long-running tasks:

| | Lease Extension | TaskInProgress |
|---|---|---|
| **How it works** | Worker holds the task, heartbeats keep it alive | Worker yields the task, re-polls later |
| **Task state** | IN_PROGRESS the whole time | Returned to queue between polls |
| **When to use** | Continuous execution (LLM calls, streaming) | Incremental processing (batch chunks, polling external status) |
| **Enable with** | `lease_extend_enabled=True` | Return `TaskInProgress(callback_after_seconds=N)` |
| **Worker memory** | Task stays in worker memory | Task is released, re-polled with fresh context |

You can combine both — enable `lease_extend_enabled` for safety while also using `TaskInProgress` for incremental polling.

## Important Constraints

- **`responseTimeoutSeconds`** is the time between updates. This is what heartbeats reset.
- **`timeoutSeconds`** is the overall SLA wall-clock ceiling. **Cannot be extended by heartbeat.** Once exceeded, the task is TIMED_OUT regardless of heartbeats.
- Heartbeats only fire when `responseTimeoutSeconds > 0` and `lease_extend_enabled = True`.
- If the heartbeat interval would be less than 1 second (i.e., `responseTimeoutSeconds < 1.25`), heartbeats are skipped.

## Retry on Failure

If a heartbeat API call fails, the SDK retries up to 3 times with backoff (`1s`, `1.5s`, `2s`). If all retries fail, the error is logged and the SDK tries again on the next poll loop iteration. If the network is truly partitioned, the server will eventually time out the task — this is correct behavior.

## Example

See [examples/lease_extension_example.py](examples/lease_extension_example.py) for a complete runnable example that:
- Defines a long-running worker with `lease_extend_enabled=True`
- Creates a workflow with a short `responseTimeoutSeconds`
- Runs the workflow and proves the task completes despite sleeping longer than the timeout
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ If you find [Conductor](https://github.com/conductor-oss/conductor) useful, plea
* [Workers: Sync and Async](#workers-sync-and-async)
* [Workflows with HTTP Calls and Waits](#workflows-with-http-calls-and-waits)
* [Long-Running Tasks with TaskContext](#long-running-tasks-with-taskcontext)
* [Lease Extension for Long-Running Tasks](#lease-extension-for-long-running-tasks)
* [Monitoring with Metrics](#monitoring-with-metrics)
* [Managing Workflow Executions](#managing-workflow-executions)
* [AI & LLM Workflows](#ai--llm-workflows)
Expand Down Expand Up @@ -275,6 +276,26 @@ def batch_job(batch_id: str) -> Union[dict, TaskInProgress]:

See [examples/task_context_example.py](examples/task_context_example.py) for all patterns (polling, retry-aware logic, async context, input access).

### Lease Extension for Long-Running Tasks

For tasks that run longer than `responseTimeoutSeconds` (e.g., LLM inference, data pipelines, batch jobs), enable automatic lease extension. The SDK sends heartbeats at 80% of `responseTimeoutSeconds`, resetting the server's timeout timer so the task stays alive:

```python
from conductor.client.worker.worker_task import worker_task

@worker_task(
task_definition_name='train_model',
lease_extend_enabled=True, # Automatic heartbeat — keeps task alive
)
def train_model(dataset_id: str) -> dict:
"""Runs for 10 minutes, but responseTimeoutSeconds is only 60s.
Heartbeats at 48s intervals keep the lease alive."""
model = train(dataset_id)
return {'model_id': model.id, 'accuracy': model.accuracy}
```

Disabled by default. Enable per-worker via decorator, constructor, or environment variable (`conductor_worker_<task>_lease_extend_enabled=true`). See [LEASE_EXTENSION.md](LEASE_EXTENSION.md) for the full guide.

### Monitoring with Metrics

Enable Prometheus metrics with a single setting — the SDK exposes poll counts, execution times, error rates, and HTTP latency:
Expand Down Expand Up @@ -407,6 +428,7 @@ See the [Examples Guide](examples/README.md) for the full catalog. Key examples:
| [kitchensink.py](examples/kitchensink.py) | All task types (HTTP, JS, JQ, Switch) | `python examples/kitchensink.py` |
| [workflow_ops.py](examples/workflow_ops.py) | Pause, resume, terminate, retry, restart, rerun, signal | `python examples/workflow_ops.py` |
| [task_context_example.py](examples/task_context_example.py) | Long-running tasks with TaskInProgress | `python examples/task_context_example.py` |
| [lease_extension_example.py](examples/lease_extension_example.py) | Automatic heartbeat for long-running tasks | `python examples/lease_extension_example.py` |
| [metrics_example.py](examples/metrics_example.py) | Prometheus metrics collection | `python examples/metrics_example.py` |
| [fastapi_worker_service.py](examples/fastapi_worker_service.py) | FastAPI: expose a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
| [helloworld.py](examples/helloworld/helloworld.py) | Minimal hello world | `python examples/helloworld/helloworld.py` |
Expand All @@ -431,6 +453,7 @@ End-to-end examples covering all APIs for each domain:
| [Worker Design](docs/design/WORKER_DESIGN.md) | Architecture: AsyncTaskRunner vs TaskRunner, discovery, lifecycle |
| [Worker Guide](docs/WORKER.md) | All worker patterns (function, class, annotation, async) |
| [Worker Configuration](WORKER_CONFIGURATION.md) | Hierarchical environment variable configuration |
| [Lease Extension](LEASE_EXTENSION.md) | Automatic heartbeat for long-running tasks |
| [Workflow Management](docs/WORKFLOW.md) | Start, pause, resume, terminate, retry, search |
| [Workflow Testing](docs/WORKFLOW_TESTING.md) | Unit testing with mock outputs |
| [Task Management](docs/TASK_MANAGEMENT.md) | Task operations |
Expand Down
6 changes: 3 additions & 3 deletions WORKER_CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ The following properties can be configured via environment variables:
| `overwrite_task_def` | bool | Overwrite existing task definitions when registering (default: true) | `false` | ✅ Yes |
| `strict_schema` | bool | Enforce strict schema validation - additionalProperties=false (default: false) | `true` | ✅ Yes |
| `poll_timeout` | int | Poll request timeout in milliseconds | `100` | ✅ Yes |
| `lease_extend_enabled` | bool | ⚠️ **Not implemented** - reserved for future use | `false` | ✅ Yes |
| `lease_extend_enabled` | bool | Auto-extend task lease via heartbeat (see below) | `false` | ✅ Yes |
| `paused` | bool | Pause worker from polling/executing tasks | `true` | ❌ **Environment-only** |

**Notes**:
- The `paused` property is intentionally **not available** in the `@worker_task` decorator. It can only be controlled via environment variables, allowing operators to pause/resume workers at runtime without code changes or redeployment.
- The `lease_extend_enabled` parameter is accepted but **not currently implemented**. For lease extension, use manual `TaskInProgress` returns (see below).
- When `lease_extend_enabled=True`, the SDK automatically sends heartbeats at 80% of `responseTimeoutSeconds` to keep long-running tasks alive. Without it, tasks that exceed `responseTimeoutSeconds` are timed out and retried by the server.
- The `register_task_def` parameter automatically registers task definitions with JSON Schema (draft-07) generated from Python type hints.
- The `overwrite_task_def` parameter controls whether to overwrite existing task definitions (default: true).
- The `strict_schema` parameter controls JSON schema validation strictness (default: false for lenient validation).
Expand Down Expand Up @@ -97,7 +97,7 @@ def long_task(job_id: str) -> Union[dict, TaskInProgress]:
return {'status': 'completed', 'result': processed}
```

**⚠️ Note**: The `lease_extend_enabled=True` configuration parameter does **not** provide automatic lease extension. You must explicitly return `TaskInProgress` to extend the lease.
**Automatic lease extension**: When `lease_extend_enabled=True`, the SDK sends a heartbeat to the server at 80% of `responseTimeoutSeconds`, resetting the timeout clock. This keeps the task alive without requiring manual `TaskInProgress` returns. The `TaskInProgress` pattern is still useful for chunked/checkpoint-based execution where the worker yields the task back to the queue.

**For detailed patterns**, see [Long-Running Tasks & Lease Extension](docs/design/WORKER_DESIGN.md#long-running-tasks--lease-extension).

Expand Down
2 changes: 1 addition & 1 deletion docs/WORKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ class SimpleCppWorker(WorkerInterface):

## Long-Running Tasks and Lease Extension

For tasks that take longer than the configured `responseTimeoutSeconds`, the SDK provides automatic lease extension to prevent timeouts. See the comprehensive [Lease Extension Guide](../../LEASE_EXTENSION.md) for:
For tasks that take longer than the configured `responseTimeoutSeconds`, the SDK provides automatic lease extension to prevent timeouts. See the comprehensive [Lease Extension Guide](../LEASE_EXTENSION.md) for:

- How lease extension works
- Automatic vs manual control
Expand Down
Loading
Loading