Skip to content

Commit 3436d2e

Browse files
committed
docs: add LEASE_EXTENSION.md covering heartbeat, config, constants, and pullWorkflowMessages
1 parent ae5d6c2 commit 3436d2e

1 file changed

Lines changed: 224 additions & 0 deletions

File tree

LEASE_EXTENSION.md

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
# Lease Extension (Heartbeat)
2+
3+
Long-running workers need to keep their task **lease** alive on the Conductor server. When a task is polled, the server starts a `responseTimeoutSeconds` timer. If no update arrives before the timer expires, the server re-queues the task — potentially causing **duplicate execution** by a second worker.
4+
5+
Lease extension sends a periodic heartbeat (`extendLease: true`) to the server that resets this timer, allowing the worker to safely run for minutes or hours.
6+
7+
---
8+
9+
## Quick Start
10+
11+
Enable via the `@worker` decorator:
12+
13+
```typescript
14+
import { worker } from "@io-orkes/conductor-javascript";
15+
16+
@worker({
17+
taskDefName: "process_video",
18+
leaseExtendEnabled: true, // ← heartbeat at 80% of responseTimeoutSeconds
19+
})
20+
async function processVideo(task: Task): Promise<TaskResult> {
21+
// Takes 5 minutes — server lease stays alive automatically
22+
await encodeVideo(task.inputData.videoUrl);
23+
return { status: "COMPLETED", outputData: { done: true } };
24+
}
25+
```
26+
27+
Or on a manually constructed `ConductorWorker`:
28+
29+
```typescript
30+
const runner = new TaskRunner({
31+
worker: {
32+
taskDefName: "process_video",
33+
execute: processVideo,
34+
leaseExtendEnabled: true,
35+
},
36+
client,
37+
options: { workerID: "worker-1", domain: undefined },
38+
});
39+
runner.startPolling();
40+
```
41+
42+
---
43+
44+
## How It Works
45+
46+
When `leaseExtendEnabled: true` is set and a task is polled:
47+
48+
1. The `LeaseTracker` records the task and computes a heartbeat interval:
49+
```
50+
intervalMs = responseTimeoutSeconds × 0.8 × 1000
51+
```
52+
2. A `setInterval` (100 ms tick) runs **independently of the polling loop** — it fires even when all concurrency slots are occupied with executing tasks.
53+
3. When `intervalMs` elapses since the last heartbeat (or task start), a `extendLease: true` update is sent to the server via the v1 endpoint, resetting the `responseTimeoutSeconds` timer.
54+
4. The task is untracked as soon as `worker.execute()` resolves (before the final result is submitted).
55+
56+
```
57+
t=0s task polled → lease tracked
58+
t=8s heartbeat #1 → server timer reset to 10s
59+
t=16s heartbeat #2 → server timer reset to 10s
60+
t=20s execute() → COMPLETED, lease untracked
61+
```
62+
*(Example: `responseTimeoutSeconds=10s`, execution takes 20s)*
63+
64+
### Why independent of the poll loop?
65+
66+
The heartbeat timer is a `setInterval` that runs on the Node.js event loop separate from the `Poller`. If all concurrency slots are full (workers are busy), no new tasks are polled — but heartbeats still fire for the tasks currently executing.
67+
68+
---
69+
70+
## Configuration
71+
72+
### `@worker` decorator option
73+
74+
```typescript
75+
@worker({
76+
taskDefName: "my_task",
77+
leaseExtendEnabled: true, // default: false
78+
})
79+
```
80+
81+
### Environment variable override
82+
83+
Per-worker or global overrides follow the same hierarchy as all other worker config:
84+
85+
```bash
86+
# Worker-specific (highest priority)
87+
CONDUCTOR_WORKER_MY_TASK_LEASE_EXTEND_ENABLED=true
88+
89+
# Global (applies to all workers)
90+
CONDUCTOR_WORKER_ALL_LEASE_EXTEND_ENABLED=true
91+
```
92+
93+
### Task definition requirement
94+
95+
The task must have `responseTimeoutSeconds >= 1.25` for lease extension to activate. Tasks with shorter response timeouts produce a computed interval < 1000 ms, which is skipped (matches Python SDK behaviour).
96+
97+
```typescript
98+
await metadataClient.registerTask({
99+
name: "process_video",
100+
responseTimeoutSeconds: 60, // heartbeat fires every 48s
101+
timeoutSeconds: 3600, // hard ceiling (unchanged by heartbeats)
102+
retryCount: 0,
103+
});
104+
```
105+
106+
> **Note:** `leaseExtendEnabled` resets the `responseTimeoutSeconds` window on each heartbeat. It does **not** extend the task's `timeoutSeconds` (total execution ceiling).
107+
108+
---
109+
110+
## Constants
111+
112+
| Constant | Value | Description |
113+
|----------|-------|-------------|
114+
| `LEASE_EXTEND_DURATION_FACTOR` | `0.8` | Heartbeat fires at 80% of `responseTimeoutSeconds` |
115+
| `LEASE_EXTEND_RETRY_COUNT` | `3` | Retry attempts per heartbeat on failure |
116+
| `HEARTBEAT_CHECK_INTERVAL_MS` | `100` | How often to check if a heartbeat is due |
117+
| `HEARTBEAT_RETRY_DELAY_MS` | `500` | Delay between heartbeat retry attempts |
118+
119+
```typescript
120+
import { LEASE_EXTEND_DURATION_FACTOR, LEASE_EXTEND_RETRY_COUNT } from "@io-orkes/conductor-javascript";
121+
```
122+
123+
---
124+
125+
## Retry Behaviour
126+
127+
If a heartbeat fails, it is retried up to `LEASE_EXTEND_RETRY_COUNT` (3) times with a 500 ms delay between attempts. If all retries fail:
128+
129+
- The error is logged.
130+
- The task remains tracked — the next check interval will attempt another heartbeat.
131+
- The task itself is **not failed** due to heartbeat errors.
132+
133+
---
134+
135+
## Direct `LeaseTracker` Usage
136+
137+
For custom worker implementations that bypass `TaskRunner`, `LeaseTracker` is exported from the public API:
138+
139+
```typescript
140+
import { LeaseTracker, TaskResource, orkesConductorClient } from "@io-orkes/conductor-javascript";
141+
import type { LeaseInfo } from "@io-orkes/conductor-javascript";
142+
143+
const client = await orkesConductorClient();
144+
145+
const tracker = new LeaseTracker(
146+
// sendHeartbeatFn — called by LeaseTracker on each heartbeat
147+
async (taskId, workflowInstanceId) => {
148+
await TaskResource.updateTask({
149+
client,
150+
body: { taskId, workflowInstanceId, status: "IN_PROGRESS", extendLease: true },
151+
throwOnError: true,
152+
});
153+
},
154+
logger
155+
);
156+
157+
tracker.start(); // start the 100ms check interval
158+
tracker.track(task); // track a polled task
159+
// ... worker executes task ...
160+
tracker.untrack(task.taskId!); // untrack immediately after execute() resolves
161+
tracker.stop(); // stop the interval (on worker shutdown)
162+
```
163+
164+
`LeaseInfo` describes the tracked state for a single task:
165+
166+
```typescript
167+
interface LeaseInfo {
168+
readonly taskId: string;
169+
readonly workflowInstanceId: string;
170+
readonly responseTimeoutSeconds: number;
171+
readonly lastHeartbeatTime: number; // Date.now() of last successful heartbeat
172+
readonly intervalMs: number; // responseTimeoutSeconds × 0.8 × 1000
173+
readonly isHeartbeating: boolean; // true while a heartbeat chain is in-flight
174+
}
175+
```
176+
177+
---
178+
179+
## Python SDK Parity
180+
181+
| Behaviour | Python SDK | JS SDK |
182+
|-----------|-----------|--------|
183+
| Heartbeat interval | `responseTimeoutSeconds × 0.8` | ✓ same |
184+
| Minimum interval | `< 1s → skip` |`< 1000ms → skip` |
185+
| Retry count | 3 | ✓ same |
186+
| Retry delay | ~500ms | ✓ same |
187+
| Heartbeat endpoint | v1 `updateTask` | ✓ same |
188+
| Independent of poll loop | ✓ (Python `run_once()` pre-poll) | ✓ (JS `setInterval`) |
189+
| `leaseExtendEnabled` flag |||
190+
191+
---
192+
193+
## Related
194+
195+
- [`pullWorkflowMessages`](#pullworkflowmessages) — task builder for consuming workflow message queue messages, which uses the `IN_PROGRESS` / re-queue pattern that lease extension is designed to protect.
196+
- [METRICS.md](./METRICS.md) — monitoring worker health, poll latency, and execution duration.
197+
198+
---
199+
200+
## `pullWorkflowMessages` Task Builder
201+
202+
A task that consumes messages from the workflow's message queue (WMQ). When messages are available the task completes; when the queue is empty it returns `IN_PROGRESS` and is re-evaluated after ~1 second.
203+
204+
```typescript
205+
import { pullWorkflowMessages } from "@io-orkes/conductor-javascript";
206+
207+
const wf = new ConductorWorkflow(executor, "order_processor")
208+
.add(pullWorkflowMessages("read_messages", /* batchSize */ 5))
209+
.add(simpleTask("process_ref", "process_order", { messages: "${read_messages.output.messages}" }));
210+
```
211+
212+
| Parameter | Type | Default | Description |
213+
|-----------|------|---------|-------------|
214+
| `taskReferenceName` | `string` || Unique reference name within the workflow |
215+
| `batchSize` | `number` | `1` | Max messages to dequeue per execution (server cap ~100) |
216+
| `optional` | `boolean` | `undefined` | Whether the task is optional |
217+
218+
**Output shape:**
219+
```json
220+
{
221+
"messages": [ /* WorkflowMessage objects */ ],
222+
"count": 3
223+
}
224+
```

0 commit comments

Comments
 (0)