Skip to content

Commit 974056a

Browse files
v1r3nclaude
andcommitted
fix: resolve merge conflicts with main and update v2 fallback tests
- Take main's version for task_runner.py and async_task_runner.py (includes transient error handling, 404/405 detection) - Update test_v2_fallback.py: _v2_available -> _use_update_v2, 501 -> 405, fix e2e test infinite loop (mock batch_poll to return empty on second call) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2 parents 59a6846 + 3cef362 commit 974056a

56 files changed

Lines changed: 4619 additions & 259 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
name: Harness Worker Image
2+
3+
on:
4+
push:
5+
branches: [main]
6+
paths:
7+
- "Dockerfile"
8+
- "harness/**"
9+
- "src/**"
10+
- "pyproject.toml"
11+
- "poetry.lock"
12+
- ".github/workflows/harness-image.yml"
13+
release:
14+
types: [published]
15+
workflow_dispatch:
16+
inputs:
17+
deploy:
18+
description: "Dispatch downstream deploy after the image is built"
19+
type: boolean
20+
default: true
21+
22+
concurrency:
23+
group: ${{ github.workflow }}-${{ github.ref }}
24+
cancel-in-progress: true
25+
26+
jobs:
27+
build-and-push:
28+
runs-on: ubuntu-latest
29+
permissions:
30+
contents: read
31+
packages: write
32+
steps:
33+
- name: Checkout
34+
uses: actions/checkout@v4
35+
36+
- name: Set up QEMU
37+
uses: docker/setup-qemu-action@v3
38+
39+
- name: Set up Docker Buildx
40+
uses: docker/setup-buildx-action@v3
41+
42+
- name: Log in to GHCR
43+
uses: docker/login-action@v3
44+
with:
45+
registry: ghcr.io
46+
username: ${{ github.actor }}
47+
password: ${{ secrets.GITHUB_TOKEN }}
48+
49+
- name: Calculate branch tag
50+
id: vars
51+
shell: bash
52+
run: |
53+
BRANCH="${{ github.ref_name }}"
54+
CLEANED_BRANCH_NAME=$(echo "$BRANCH" | tr '/' '-' | tr '[:upper:]' '[:lower:]')
55+
echo "cleaned-branch-name=$CLEANED_BRANCH_NAME" >> "$GITHUB_OUTPUT"
56+
57+
- name: Docker metadata
58+
id: meta
59+
uses: docker/metadata-action@v5
60+
with:
61+
images: ghcr.io/conductor-oss/python-sdk/harness-worker
62+
tags: |
63+
type=raw,value=${{ steps.vars.outputs.cleaned-branch-name }}-latest,enable=${{ github.event_name != 'release' }}
64+
type=raw,value=${{ github.event.release.tag_name }},enable=${{ github.event_name == 'release' }}
65+
66+
- name: Build and push
67+
uses: docker/build-push-action@v6
68+
with:
69+
context: .
70+
file: ./Dockerfile
71+
target: harness
72+
platforms: linux/amd64,linux/arm64
73+
push: true
74+
tags: ${{ steps.meta.outputs.tags }}
75+
# Registry-backed BuildKit cache. Unchanged layers are reused across
76+
# runs so rebuilding the same commit (or one with only minor diffs)
77+
# is near-instant. The `:buildcache` tag lives alongside the image
78+
# but only stores layer blobs, not a runnable image.
79+
cache-from: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache
80+
cache-to: type=registry,ref=ghcr.io/conductor-oss/python-sdk/harness-worker:buildcache,mode=max
81+
82+
dispatch-deploy:
83+
if: |
84+
github.event_name == 'release' ||
85+
(github.event_name == 'workflow_dispatch' && inputs.deploy)
86+
needs: build-and-push
87+
runs-on: ubuntu-latest
88+
permissions:
89+
contents: read
90+
steps:
91+
- uses: peter-evans/repository-dispatch@v3
92+
with:
93+
token: ${{ secrets.CI_UTIL_DISPATCH_TOKEN }}
94+
repository: conductor-oss/oss-ci-util
95+
event-type: sdk_release
96+
client-payload: |-
97+
{"tag": "${{ github.event.release.tag_name || 'latest' }}", "repo": "${{ github.repository }}"}

.github/workflows/pull_request.yml

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ jobs:
1010
unit-test:
1111
runs-on: ubuntu-latest
1212
env:
13-
COVERAGE_FILE: coverage.xml
1413
COVERAGE_DIR: .coverage-reports
1514
steps:
1615
- name: Checkout code
@@ -67,31 +66,29 @@ jobs:
6766
6867
- name: Generate coverage report
6968
id: coverage_report
70-
continue-on-error: true
7169
run: |
7270
coverage combine ${{ env.COVERAGE_DIR }}/.coverage.*
7371
coverage report
74-
coverage xml
72+
coverage xml -o coverage.xml
7573
7674
- name: Verify coverage file
7775
id: verify_coverage
7876
if: always()
79-
continue-on-error: true
8077
run: |
81-
if [ ! -s "${{ env.COVERAGE_FILE }}" ]; then
82-
echo "Coverage file is empty or does not exist"
83-
ls -la ${{ env.COVERAGE_FILE }} ${{ env.COVERAGE_DIR }}
78+
if [ ! -s coverage.xml ]; then
79+
echo "coverage.xml is empty or does not exist"
80+
ls -la coverage.xml ${{ env.COVERAGE_DIR }} || true
8481
exit 1
8582
fi
86-
echo "Coverage file exists and is not empty"
83+
echo "coverage.xml exists and is not empty"
8784
8885
- name: Upload coverage to Codecov
8986
if: always() && steps.verify_coverage.outcome == 'success'
9087
continue-on-error: true
9188
uses: codecov/codecov-action@v3
9289
with:
9390
token: ${{ secrets.CODECOV_TOKEN }}
94-
file: ${{ env.COVERAGE_FILE }}
91+
file: coverage.xml
9592

9693
- name: Check test results
9794
if: steps.unit_tests.outcome == 'failure' || steps.bc_tests.outcome == 'failure' || steps.serdeser_tests.outcome == 'failure'

Dockerfile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,19 @@ ENV CONDUCTOR_AUTH_SECRET ${CONDUCTOR_AUTH_SECRET}
4141
ENV CONDUCTOR_SERVER_URL ${CONDUCTOR_SERVER_URL}
4242
RUN python3 ./tests/integration/main.py
4343

44+
FROM python_test_base AS harness-build
45+
COPY /harness /package/harness
46+
47+
FROM python:3.12-alpine AS harness
48+
RUN adduser -D -u 65532 nonroot
49+
WORKDIR /app
50+
COPY --from=harness-build /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
51+
COPY --from=harness-build /package/src /app/src
52+
COPY --from=harness-build /package/harness /app/harness
53+
ENV PYTHONPATH=/app/src
54+
USER nonroot
55+
ENTRYPOINT ["python", "-u", "harness/main.py"]
56+
4457
FROM python:3.12-alpine AS publish
4558
RUN apk add --no-cache tk curl
4659
WORKDIR /package

LEASE_EXTENSION.md

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Lease Extension (Automatic Heartbeat)
2+
3+
When a worker picks up a task, the Conductor server starts a `responseTimeoutSeconds` timer. If the worker doesn't send an update before the timer expires, the server marks the task as timed out and re-queues it for retry.
4+
5+
For long-running tasks (agent tool calls, LLM inference, data processing, batch jobs), the worker is actively executing but the server thinks it's dead. **Lease extension** solves this by automatically sending heartbeats that reset the timeout timer.
6+
7+
## How It Works
8+
9+
When `lease_extend_enabled=True`:
10+
11+
1. Worker picks up a task with `responseTimeoutSeconds > 0`
12+
2. SDK starts tracking the task for heartbeats
13+
3. At **80% of `responseTimeoutSeconds`**, SDK sends a heartbeat (`TaskResult.extend_lease=True`)
14+
4. Server resets the task's `updateTime` to now, giving a fresh `responseTimeoutSeconds` window
15+
5. Heartbeats continue until the task completes, fails, or the worker shuts down
16+
17+
```
18+
Timeline (responseTimeoutSeconds=120s):
19+
0s 96s 192s 288s
20+
|-----------|-----------|-----------|--→ task completes
21+
poll heartbeat heartbeat heartbeat
22+
(80%) (80%) (80%)
23+
```
24+
25+
The heartbeat fires at 80% of `responseTimeoutSeconds` (matching the Java SDK). This gives a 20% safety margin — if a heartbeat is slightly delayed, the task still has time before the server times it out.
26+
27+
## Quick Start
28+
29+
```python
30+
from conductor.client.worker.worker_task import worker_task
31+
32+
@worker_task(
33+
task_definition_name='long_running_analysis',
34+
lease_extend_enabled=True, # Enable automatic heartbeat
35+
)
36+
def analyze_dataset(dataset_id: str) -> dict:
37+
"""This task takes 5 minutes but responseTimeoutSeconds is 60s.
38+
Heartbeats keep it alive automatically."""
39+
results = run_expensive_analysis(dataset_id)
40+
return {'results': results}
41+
```
42+
43+
That's it. The SDK handles heartbeats automatically in the background.
44+
45+
## Enabling Lease Extension
46+
47+
Lease extension is **disabled by default** (matching the Java SDK). Enable it per-worker or globally:
48+
49+
### Per-Worker (Decorator)
50+
51+
```python
52+
@worker_task(
53+
task_definition_name='my_task',
54+
lease_extend_enabled=True,
55+
)
56+
def my_task(data: str) -> dict:
57+
...
58+
```
59+
60+
### Per-Worker (Class)
61+
62+
```python
63+
from conductor.client.worker.worker import Worker
64+
65+
worker = Worker(
66+
task_definition_name='my_task',
67+
execute_function=my_function,
68+
lease_extend_enabled=True,
69+
)
70+
```
71+
72+
### Per-Worker (Environment Variable)
73+
74+
```shell
75+
export conductor_worker_my_task_lease_extend_enabled=true
76+
```
77+
78+
### Global (All Workers)
79+
80+
```shell
81+
export conductor_worker_all_lease_extend_enabled=true
82+
```
83+
84+
### Precedence
85+
86+
Environment variables override decorator/constructor arguments:
87+
88+
1. Task-specific env var (`conductor_worker_<task>_lease_extend_enabled`)
89+
2. Global env var (`conductor_worker_all_lease_extend_enabled`)
90+
3. Worker constructor / decorator argument
91+
92+
## When to Use
93+
94+
**Enable lease extension when:**
95+
- Task execution time may exceed `responseTimeoutSeconds`
96+
- Tasks involve external calls with unpredictable latency (LLM APIs, data pipelines)
97+
- You want the worker to hold the task continuously (not yield and re-poll)
98+
99+
**You don't need lease extension when:**
100+
- Tasks always complete within `responseTimeoutSeconds`
101+
- You're using `TaskInProgress` with `callbackAfterSeconds` (the task is yielded back to the queue)
102+
- `responseTimeoutSeconds` is 0 (no timeout configured)
103+
104+
## Lease Extension vs TaskInProgress
105+
106+
These are two different strategies for long-running tasks:
107+
108+
| | Lease Extension | TaskInProgress |
109+
|---|---|---|
110+
| **How it works** | Worker holds the task, heartbeats keep it alive | Worker yields the task, re-polls later |
111+
| **Task state** | IN_PROGRESS the whole time | Returned to queue between polls |
112+
| **When to use** | Continuous execution (LLM calls, streaming) | Incremental processing (batch chunks, polling external status) |
113+
| **Enable with** | `lease_extend_enabled=True` | Return `TaskInProgress(callback_after_seconds=N)` |
114+
| **Worker memory** | Task stays in worker memory | Task is released, re-polled with fresh context |
115+
116+
You can combine both — enable `lease_extend_enabled` for safety while also using `TaskInProgress` for incremental polling.
117+
118+
## Important Constraints
119+
120+
- **`responseTimeoutSeconds`** is the time between updates. This is what heartbeats reset.
121+
- **`timeoutSeconds`** is the overall SLA wall-clock ceiling. **Cannot be extended by heartbeat.** Once exceeded, the task is TIMED_OUT regardless of heartbeats.
122+
- Heartbeats only fire when `responseTimeoutSeconds > 0` and `lease_extend_enabled = True`.
123+
- If the heartbeat interval would be less than 1 second (i.e., `responseTimeoutSeconds < 1.25`), heartbeats are skipped.
124+
125+
## Retry on Failure
126+
127+
If a heartbeat API call fails, the SDK retries up to 3 times with backoff (`1s`, `1.5s`, `2s`). If all retries fail, the error is logged and the SDK tries again on the next poll loop iteration. If the network is truly partitioned, the server will eventually time out the task — this is correct behavior.
128+
129+
## Example
130+
131+
See [examples/lease_extension_example.py](examples/lease_extension_example.py) for a complete runnable example that:
132+
- Defines a long-running worker with `lease_extend_enabled=True`
133+
- Creates a workflow with a short `responseTimeoutSeconds`
134+
- Runs the workflow and proves the task completes despite sleeping longer than the timeout

README.md

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ If you find [Conductor](https://github.com/conductor-oss/conductor) useful, plea
1919
* [Workers: Sync and Async](#workers-sync-and-async)
2020
* [Workflows with HTTP Calls and Waits](#workflows-with-http-calls-and-waits)
2121
* [Long-Running Tasks with TaskContext](#long-running-tasks-with-taskcontext)
22+
* [Lease Extension for Long-Running Tasks](#lease-extension-for-long-running-tasks)
2223
* [Monitoring with Metrics](#monitoring-with-metrics)
2324
* [Managing Workflow Executions](#managing-workflow-executions)
2425
* [AI & LLM Workflows](#ai--llm-workflows)
@@ -59,9 +60,13 @@ conductor server start
5960
## Install the SDK
6061

6162
```shell
63+
python3 -m venv conductor-env
64+
source conductor-env/bin/activate # Windows: conductor-env\Scripts\activate
6265
pip install conductor-python
6366
```
6467

68+
> **Already in a virtual environment?** Skip the `venv` step and run `pip install conductor-python` directly. On macOS, Windows, or in containers where system Python is not locked down, you can also install globally.
69+
6570
## 60-Second Quickstart
6671

6772
**Step 1: Create a workflow**
@@ -101,7 +106,7 @@ Create a `quickstart.py` with the following:
101106
```python
102107
from conductor.client.automator.task_handler import TaskHandler
103108
from conductor.client.configuration.configuration import Configuration
104-
from conductor.client.orkes_clients import OrkesClients
109+
from conductor.client.orkes_clients import OrkesClients # works with OSS Conductor and Orkes Conductor
105110
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
106111
from conductor.client.worker.worker_task import worker_task
107112

@@ -127,6 +132,8 @@ def main():
127132
workflow.register(overwrite=True)
128133

129134
# Start polling for tasks (one worker subprocess per worker function).
135+
# Note: scan_for_annotated_workers=True only discovers @worker_task functions that have
136+
# already been imported. If workers are in a separate module, import it first.
130137
with TaskHandler(configuration=config, scan_for_annotated_workers=True) as task_handler:
131138
task_handler.start_processes()
132139

@@ -162,7 +169,7 @@ python quickstart.py
162169
> See [Configuration](#configuration) for details.
163170
164171
That's it — you just defined a worker, built a workflow, and executed it. Open the Conductor UI (default:
165-
[http://localhost:8127](http://localhost:8127)) to see the execution.
172+
[http://localhost:8080](http://localhost:8080)) to see the execution.
166173
167174
---
168175
@@ -271,6 +278,26 @@ def batch_job(batch_id: str) -> Union[dict, TaskInProgress]:
271278
272279
See [examples/task_context_example.py](examples/task_context_example.py) for all patterns (polling, retry-aware logic, async context, input access).
273280
281+
### Lease Extension for Long-Running Tasks
282+
283+
For tasks that run longer than `responseTimeoutSeconds` (e.g., LLM inference, data pipelines, batch jobs), enable automatic lease extension. The SDK sends heartbeats at 80% of `responseTimeoutSeconds`, resetting the server's timeout timer so the task stays alive:
284+
285+
```python
286+
from conductor.client.worker.worker_task import worker_task
287+
288+
@worker_task(
289+
task_definition_name='train_model',
290+
lease_extend_enabled=True, # Automatic heartbeat — keeps task alive
291+
)
292+
def train_model(dataset_id: str) -> dict:
293+
"""Runs for 10 minutes, but responseTimeoutSeconds is only 60s.
294+
Heartbeats at 48s intervals keep the lease alive."""
295+
model = train(dataset_id)
296+
return {'model_id': model.id, 'accuracy': model.accuracy}
297+
```
298+
299+
Disabled by default. Enable per-worker via decorator, constructor, or environment variable (`conductor_worker_<task>_lease_extend_enabled=true`). See [LEASE_EXTENSION.md](LEASE_EXTENSION.md) for the full guide.
300+
274301
### Monitoring with Metrics
275302
276303
Enable Prometheus metrics with a single setting — the SDK exposes poll counts, execution times, error rates, and HTTP latency:
@@ -403,6 +430,7 @@ See the [Examples Guide](examples/README.md) for the full catalog. Key examples:
403430
| [kitchensink.py](examples/kitchensink.py) | All task types (HTTP, JS, JQ, Switch) | `python examples/kitchensink.py` |
404431
| [workflow_ops.py](examples/workflow_ops.py) | Pause, resume, terminate, retry, restart, rerun, signal | `python examples/workflow_ops.py` |
405432
| [task_context_example.py](examples/task_context_example.py) | Long-running tasks with TaskInProgress | `python examples/task_context_example.py` |
433+
| [lease_extension_example.py](examples/lease_extension_example.py) | Automatic heartbeat for long-running tasks | `python examples/lease_extension_example.py` |
406434
| [metrics_example.py](examples/metrics_example.py) | Prometheus metrics collection | `python examples/metrics_example.py` |
407435
| [fastapi_worker_service.py](examples/fastapi_worker_service.py) | FastAPI: expose a workflow as an API (+ workers) | `uvicorn examples.fastapi_worker_service:app --port 8081 --workers 1` |
408436
| [helloworld.py](examples/helloworld/helloworld.py) | Minimal hello world | `python examples/helloworld/helloworld.py` |
@@ -427,6 +455,7 @@ End-to-end examples covering all APIs for each domain:
427455
| [Worker Design](docs/design/WORKER_DESIGN.md) | Architecture: AsyncTaskRunner vs TaskRunner, discovery, lifecycle |
428456
| [Worker Guide](docs/WORKER.md) | All worker patterns (function, class, annotation, async) |
429457
| [Worker Configuration](WORKER_CONFIGURATION.md) | Hierarchical environment variable configuration |
458+
| [Lease Extension](LEASE_EXTENSION.md) | Automatic heartbeat for long-running tasks |
430459
| [Workflow Management](docs/WORKFLOW.md) | Start, pause, resume, terminate, retry, search |
431460
| [Workflow Testing](docs/WORKFLOW_TESTING.md) | Unit testing with mock outputs |
432461
| [Task Management](docs/TASK_MANAGEMENT.md) | Task operations |

0 commit comments

Comments
 (0)