Skip to content

Commit 6f96309

Browse files
committed
Efficiency 2
1 parent 1114184 commit 6f96309

4 files changed

Lines changed: 53 additions & 16 deletions

File tree

offwork/worker/backends/ws.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ async def _request(
186186
backoff = _RECONNECT_BACKOFF_MIN
187187
last_exc: Exception | None = None
188188
for attempt in range(3):
189+
req_id: str | None = None
189190
try:
190191
ws = await self._ensure_connected()
191192
req_id = uuid.uuid4().hex
@@ -203,6 +204,10 @@ async def _request(
203204
return await future
204205
return await asyncio.wait_for(future, timeout=timeout)
205206
except (ConnectionError, OSError) as exc:
207+
# Drop any orphaned pending entry before reconnecting so a
208+
# late response can't resolve a future no one awaits.
209+
if req_id is not None:
210+
self._pending.pop(req_id, None)
206211
last_exc = exc
207212
if self._closed:
208213
raise
@@ -212,7 +217,8 @@ async def _request(
212217
except asyncio.TimeoutError:
213218
# Drop the pending entry — the response, if it ever
214219
# arrives, has no waiter.
215-
self._pending.pop(req_id, None)
220+
if req_id is not None:
221+
self._pending.pop(req_id, None)
216222
raise
217223
assert last_exc is not None
218224
raise last_exc

offwork/worker/remote.py

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ async def _handle_task(
540540
known_clients: KnownClients | None = None,
541541
nonce_lru: NonceLRU | None = None,
542542
clock_skew: float = DEFAULT_CLOCK_SKEW,
543+
sem: asyncio.Semaphore | None = None,
543544
) -> None:
544545
"""Process a single task: deserialize, execute with policy, send result.
545546
@@ -573,13 +574,32 @@ async def _handle_task(
573574

574575
logger.debug("Received task %s: %s", task.task_id, task.function_name)
575576

576-
# Wait for scheduled time
577+
# Wait for scheduled time *before* acquiring a concurrency slot, so a
578+
# task scheduled far in the future does not occupy one of the worker's
579+
# limited execution slots while it sleeps.
577580
if task.scheduled_at is not None:
578581
delay = task.scheduled_at - time.time()
579582
if delay > 0:
580583
logger.debug("Task %s scheduled in %.1fs", task.task_id, delay)
581584
await asyncio.sleep(delay)
582585

586+
# Acquire the concurrency slot only for the actual execution phase.
587+
slot = sem if sem is not None else contextlib.nullcontext()
588+
async with slot:
589+
await _execute_task(
590+
worker, backend, task,
591+
root_token=root_token,
592+
)
593+
594+
595+
async def _execute_task(
596+
worker: Worker,
597+
backend: Backend,
598+
task: Task,
599+
*,
600+
root_token: bytes | None = None,
601+
) -> None:
602+
"""Run a ready task (scheduled wait already elapsed) and send its result."""
583603
# Any failure in the backend checks below must still surface to the
584604
# client, otherwise it would hang forever polling for a result.
585605
try:
@@ -793,14 +813,17 @@ def _on_shutdown_signal() -> None:
793813
pass
794814

795815
async def bounded_handle(task_json: str) -> None:
796-
async with sem:
797-
await _handle_task(
798-
worker, backend, task_json,
799-
root_token=root_token,
800-
known_clients=known_clients,
801-
nonce_lru=nonce_lru,
802-
clock_skew=clock_skew,
803-
)
816+
# The semaphore is acquired inside _handle_task, *after* any
817+
# scheduled-time wait, so far-future tasks don't hold a slot while
818+
# sleeping.
819+
await _handle_task(
820+
worker, backend, task_json,
821+
root_token=root_token,
822+
known_clients=known_clients,
823+
nonce_lru=nonce_lru,
824+
clock_skew=clock_skew,
825+
sem=sem,
826+
)
804827

805828
async def _listen() -> None:
806829
async for task_json in backend.listen():

offwork/worker/result.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,18 +324,24 @@ async def _do_cancel(self, resolved: float | None) -> bool:
324324

325325
deadline = None if resolved is None else time.monotonic() + resolved
326326
while True:
327-
raw = await self._backend.try_get_result(self._task_id)
328-
if raw is not None:
329-
self._envelope = ResultEnvelope.from_json(raw)
330-
return True
331-
if deadline is not None and time.monotonic() >= deadline:
327+
remaining = None if deadline is None else deadline - time.monotonic()
328+
if remaining is not None and remaining <= 0:
332329
# Timed out waiting for worker confirmation — seed a cancelled
333330
# envelope so future reads don't hang forever.
334331
env = ResultEnvelope.cancelled(self._task_id)
335332
await self._backend.send_result(self._task_id, env.to_json())
336333
self._envelope = env
337334
return False
338-
await asyncio.sleep(0.5)
335+
# Long-poll for the worker's confirmation envelope instead of
336+
# busy-polling — wakes immediately on push-capable backends.
337+
wait_for = 5.0 if remaining is None else min(5.0, remaining)
338+
try:
339+
raw = await self._backend.get_result(self._task_id, timeout=wait_for)
340+
except TimeoutError:
341+
raw = None
342+
if raw is not None:
343+
self._envelope = ResultEnvelope.from_json(raw)
344+
return True
339345

340346
# -- awaiting the result ---------------------------------------------------
341347

tests/fixtures/_ws_broker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ async def _dispatch(
149149
return {"cancelled": True}
150150
if op == "is_cancelled":
151151
return {"cancelled": payload["task_id"] in state.cancelled}
152+
if op == "send_log_line":
153+
return None
152154
if op == "send_progress":
153155
state.progress[payload["task_id"]] = payload["progress_json"]
154156
return {"ok": True}

0 commit comments

Comments
 (0)