Skip to content

Commit 2d7f664

Browse files
[cross-repo from server#288] Conformance finding: replay workflow remains waiting on current artifacts (#102)
1 parent 4019683 commit 2d7f664

4 files changed

Lines changed: 51 additions & 2 deletions

File tree

src/durable_workflow/client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3009,6 +3009,9 @@ async def register_worker(
30093009
sdk_version: str | None = None,
30103010
build_id: str | None = None,
30113011
capabilities: list[str] | None = None,
3012+
task_slots: dict[str, int] | None = None,
3013+
process_metrics: dict[str, Any] | None = None,
3014+
heartbeat_interval_seconds: int | None = None,
30123015
) -> Any:
30133016
"""Register this process with the server as a worker for ``task_queue``.
30143017
@@ -3041,6 +3044,12 @@ async def register_worker(
30413044
body["max_concurrent_workflow_tasks"] = max_concurrent_workflow_tasks
30423045
if max_concurrent_activity_tasks is not None:
30433046
body["max_concurrent_activity_tasks"] = max_concurrent_activity_tasks
3047+
if task_slots is not None:
3048+
body["task_slots"] = task_slots
3049+
if process_metrics is not None:
3050+
body["process_metrics"] = process_metrics
3051+
if heartbeat_interval_seconds is not None:
3052+
body["heartbeat_interval_seconds"] = heartbeat_interval_seconds
30443053
return await self._request("POST", "/worker/register", worker=True, json=body)
30453054

30463055
async def heartbeat_worker(
@@ -3067,8 +3076,9 @@ async def heartbeat_worker(
30673076
30683077
``process_metrics`` is an optional dict with any subset of
30693078
``cpu_percent``, ``memory_bytes``, ``process_uptime_seconds``,
3070-
``process_id``, and ``host`` — the SDK reports only what it has
3071-
cheap access to, and the server records exactly what was reported.
3079+
``process_id``, ``process_started_at``, and ``host`` — the SDK
3080+
reports only what it has cheap access to, and the server records
3081+
exactly what was reported.
30723082
30733083
Returns the server acknowledgement, which includes the advertised
30743084
``heartbeat_interval_seconds`` and ``stale_after_seconds`` so the

src/durable_workflow/worker.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import traceback
2727
import uuid
2828
from collections.abc import Awaitable, Callable, Iterable, Mapping
29+
from datetime import datetime, timezone
2930
from types import FunctionType
3031
from typing import Any
3132

@@ -526,6 +527,11 @@ def __init__(
526527
self._activity_inflight = 0
527528
self._heartbeat_interval = float(heartbeat_interval)
528529
self._process_started_at = time.time()
530+
self._process_started_at_iso = (
531+
datetime.fromtimestamp(self._process_started_at, timezone.utc)
532+
.isoformat(timespec="milliseconds")
533+
.replace("+00:00", "Z")
534+
)
529535
# CPU sampling baseline. The heartbeat reports an *instantaneous*
530536
# cpu_percent — CPU time burned in the interval since the previous
531537
# heartbeat, divided by that interval — rather than the lifetime
@@ -643,6 +649,8 @@ async def _register(self) -> None:
643649
max_concurrent_activity_tasks=self.max_concurrent_activity_tasks,
644650
build_id=self.build_id,
645651
capabilities=[QUERY_TASKS_CAPABILITY] if self._query_tasks_supported else None,
652+
task_slots=self._current_task_slots(),
653+
process_metrics=self._current_process_metrics(),
646654
)
647655
# Adapt to the server-advertised cadence when present so a cluster
648656
# can pin the worker fleet's heartbeat beat without each worker
@@ -1635,6 +1643,7 @@ def _current_process_metrics(self) -> dict[str, Any]:
16351643
metrics: dict[str, Any] = {
16361644
"process_uptime_seconds": int(now - self._process_started_at),
16371645
"process_id": os.getpid(),
1646+
"process_started_at": self._process_started_at_iso,
16381647
}
16391648

16401649
# ``memory_bytes`` is the *current* resident set size, not the

tests/test_client.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,6 +2539,28 @@ async def test_register_sends_worker_capacity_when_configured(self, client: Clie
25392539
assert body["max_concurrent_workflow_tasks"] == 3
25402540
assert body["max_concurrent_activity_tasks"] == 7
25412541

2542+
@pytest.mark.asyncio
2543+
async def test_register_sends_worker_process_state_when_configured(self, client: Client) -> None:
2544+
resp = _mock_response(201, {"worker_id": "w1", "registered": True})
2545+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
2546+
await client.register_worker(
2547+
worker_id="w1",
2548+
task_queue="q1",
2549+
task_slots={"workflow_available": 2, "activity_available": 5},
2550+
process_metrics={
2551+
"host": "worker-host",
2552+
"process_id": 1234,
2553+
"process_started_at": "2026-05-18T21:00:00Z",
2554+
},
2555+
)
2556+
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
2557+
assert body["task_slots"] == {"workflow_available": 2, "activity_available": 5}
2558+
assert body["process_metrics"] == {
2559+
"host": "worker-host",
2560+
"process_id": 1234,
2561+
"process_started_at": "2026-05-18T21:00:00Z",
2562+
}
2563+
25422564
@pytest.mark.asyncio
25432565
async def test_register_rejects_non_positive_worker_capacity(self, client: Client) -> None:
25442566
with pytest.raises(ValueError, match="max_concurrent_workflow_tasks"):

tests/test_worker.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,13 @@ async def test_register(self, mock_client: AsyncMock) -> None:
227227
assert call_kwargs["max_concurrent_workflow_tasks"] == 10
228228
assert call_kwargs["max_concurrent_activity_tasks"] == 10
229229
assert call_kwargs["capabilities"] == ["query_tasks"]
230+
assert call_kwargs["task_slots"] == {
231+
"workflow_available": 10,
232+
"activity_available": 10,
233+
}
234+
process_metrics = call_kwargs["process_metrics"]
235+
assert process_metrics["process_id"] > 0
236+
assert "process_started_at" in process_metrics
230237

231238
@pytest.mark.asyncio
232239
async def test_register_omits_query_task_capability_when_server_does_not_support_it(
@@ -1959,6 +1966,7 @@ async def test_run_drives_periodic_heartbeats_with_slot_state(
19591966
assert "process_id" in process_metrics
19601967
assert process_metrics["process_id"] > 0
19611968
assert "process_uptime_seconds" in process_metrics
1969+
assert "process_started_at" in process_metrics
19621970

19631971
@pytest.mark.asyncio
19641972
async def test_register_adopts_server_advertised_heartbeat_cadence(

0 commit comments

Comments
 (0)