Skip to content

Commit e76e31e

Browse files
committed
Performance improvements
1 parent fcb1ec1 commit e76e31e

5 files changed

Lines changed: 47 additions & 23 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ build/
1414
.claudeignore
1515
node_modules/
1616
cloud_poc/frontend/dist/
17+
examples/stress_task.py

offwork/worker/backends/base.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ async def send_heartbeat(self, task_id: str) -> None:
4141
heartbeat-based stall detection should override this.
4242
"""
4343

44+
async def heartbeat_and_check_cancel(self, task_id: str) -> bool:
45+
"""Send a heartbeat and return whether the task is cancelled.
46+
47+
Backends with a single round-trip combining both (e.g. an
48+
HTTP POST whose response carries the cancel flag) should
49+
override this to halve worker→broker chatter while a task is
50+
running. The default implementation issues two calls.
51+
"""
52+
await self.send_heartbeat(task_id)
53+
return await self.is_cancelled(task_id)
54+
4455
async def get_heartbeat(self, task_id: str) -> float | None:
4556
"""Return the timestamp of the last heartbeat for *task_id*.
4657

offwork/worker/backends/http.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,12 @@ async def try_get_result(self, task_id: str) -> str | None:
171171
async def send_heartbeat(self, task_id: str) -> None:
172172
await self._request("POST", f"/tasks/{task_id}/heartbeat")
173173

174+
async def heartbeat_and_check_cancel(self, task_id: str) -> bool:
175+
_status, body = await self._request(
176+
"POST", f"/tasks/{task_id}/heartbeat", allow_not_found=True,
177+
)
178+
return bool(body and body.get("cancelled"))
179+
174180
async def get_heartbeat(self, task_id: str) -> float | None:
175181
_status, body = await self._request(
176182
"GET", f"/tasks/{task_id}/heartbeat", allow_not_found=True,

offwork/worker/remote.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ def _build_detail_tags(worker: Worker) -> str:
347347
return ", ".join(parts)
348348

349349

350-
_HEARTBEAT_INTERVAL = 1.0
350+
_HEARTBEAT_INTERVAL = 5.0
351351

352352

353353
async def _heartbeat_loop(
@@ -358,23 +358,23 @@ async def _heartbeat_loop(
358358
) -> None:
359359
"""Send periodic heartbeats and check for cancellation.
360360
361-
When *exec_task* is provided and the backend reports the task as
362-
cancelled, the execution task is cancelled via
361+
Backends override ``heartbeat_and_check_cancel`` to combine both
362+
into a single round-trip (e.g. an HTTP POST whose response carries
363+
the cancel flag). When *exec_task* is provided and the backend
364+
reports the task as cancelled, the execution task is cancelled via
363365
:meth:`asyncio.Task.cancel`, which raises :class:`CancelledError`
364366
at the next ``await`` in async user functions.
365367
"""
366368
while not cancel_event.is_set():
367369
try:
368-
await backend.send_heartbeat(task_id)
369-
except Exception:
370-
logger.debug("Heartbeat send failed for task %s", task_id, exc_info=True)
371-
if exec_task is not None:
372-
try:
373-
if await backend.is_cancelled(task_id):
370+
if exec_task is not None:
371+
if await backend.heartbeat_and_check_cancel(task_id):
374372
exec_task.cancel()
375373
return
376-
except Exception:
377-
logger.debug("Cancellation check failed for task %s", task_id, exc_info=True)
374+
else:
375+
await backend.send_heartbeat(task_id)
376+
except Exception:
377+
logger.debug("Heartbeat send failed for task %s", task_id, exc_info=True)
378378
try:
379379
await asyncio.wait_for(cancel_event.wait(), timeout=_HEARTBEAT_INTERVAL)
380380
except asyncio.TimeoutError:

offwork/worker/result.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,29 @@ async def _wait_with_stall_detection(
171171
timeout: float | None,
172172
stall_timeout: float,
173173
) -> None:
174-
"""Poll for result with heartbeat-based stall detection."""
174+
"""Wait for the result with heartbeat-based stall detection.
175+
176+
Uses the backend's blocking ``get_result`` (long-poll on HTTP
177+
backends) in bounded slices, with a heartbeat check between
178+
slices. The slice length is derived from ``stall_timeout`` so
179+
we still detect a silent worker in time.
180+
"""
175181
deadline = None if timeout is None else time.monotonic() + timeout
182+
slice_seconds = max(1.0, min(stall_timeout / 2, 30.0))
176183
last_hb_value: float | None = None
177184
last_hb_change: float | None = None
178185

179186
while True:
180-
raw = await self._backend.try_get_result(self._task_id)
187+
remaining = None if deadline is None else deadline - time.monotonic()
188+
if remaining is not None and remaining <= 0:
189+
raise TimeoutError(
190+
f"Timed out waiting for result of task {self._task_id}"
191+
)
192+
wait_for = slice_seconds if remaining is None else min(slice_seconds, remaining)
193+
try:
194+
raw = await self._backend.get_result(self._task_id, timeout=wait_for)
195+
except TimeoutError:
196+
raw = None
181197
if raw is not None:
182198
self._envelope = ResultEnvelope.from_json(raw)
183199
logger.debug(
@@ -186,14 +202,6 @@ async def _wait_with_stall_detection(
186202
)
187203
return
188204

189-
if deadline is not None:
190-
remaining = deadline - time.monotonic()
191-
if remaining <= 0:
192-
raise TimeoutError(
193-
f"Timed out waiting for result of task {self._task_id}"
194-
)
195-
196-
logger.debug("Polling heartbeat for task %s", self._task_id[:8])
197205
hb = await self._backend.get_heartbeat(self._task_id)
198206
now = time.monotonic()
199207
if hb is not None and hb != last_hb_value:
@@ -206,8 +214,6 @@ async def _wait_with_stall_detection(
206214
f"{elapsed:.1f}s (threshold: {stall_timeout}s)"
207215
)
208216

209-
await asyncio.sleep(1.0)
210-
211217
def __await__(self) -> Generator[Any, None, Any]:
212218
"""Allow ``await result`` as shorthand for ``await result.result()``."""
213219
return self.result().__await__()

0 commit comments

Comments
 (0)