Skip to content

Commit 86c4eb3

Browse files
Fail closed on SDK protocol compatibility checks
1 parent c48f467 commit 86c4eb3

4 files changed

Lines changed: 150 additions & 48 deletions

File tree

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,16 @@ Full documentation is available at [durable-workflow.github.io/docs/2.0/sdks/pyt
7777

7878
## Compatibility
7979

80-
SDK version 0.2.x is compatible with Durable Workflow server 0.x prerelease images and the Server 2.x protocol line.
80+
SDK version 0.2.x is compatible with servers that advertise these protocol
81+
manifests from `GET /api/cluster/info`:
8182

82-
The worker automatically checks server version at startup and raises a clear error if incompatible.
83+
- `control_plane.version: "2"`
84+
- `control_plane.request_contract.schema: durable-workflow.v2.control-plane-request.contract` version `1`
85+
- `worker_protocol.version: "1.0"`
86+
87+
The top-level server `version` is build identity only. The worker checks these
88+
protocol manifests at startup and fails closed when compatibility is missing,
89+
unknown, or undiscoverable.
8390

8491
## Development
8592

src/durable_workflow/client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
PROTOCOL_VERSION = "1.0"
2222
CONTROL_PLANE_VERSION = "2"
23+
CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA = "durable-workflow.v2.control-plane-request.contract"
24+
CONTROL_PLANE_REQUEST_CONTRACT_VERSION = 1
2325

2426

2527
def _default_sdk_version() -> str:
@@ -313,7 +315,7 @@ async def _do_request() -> httpx.Response:
313315
return resp.json()
314316

315317
async def get_cluster_info(self) -> dict[str, Any]:
316-
"""Fetch server version and capabilities from /api/cluster/info."""
318+
"""Fetch server build identity, capabilities, and protocol manifests."""
317319
result = await self._request("GET", "/cluster/info", worker=False, context="get_cluster_info")
318320
if not isinstance(result, dict):
319321
raise ServerError(

src/durable_workflow/worker.py

Lines changed: 81 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@
1010

1111
from . import serializer
1212
from .activity import ActivityContext, ActivityInfo, _set_context
13-
from .client import Client
13+
from .client import (
14+
CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA,
15+
CONTROL_PLANE_REQUEST_CONTRACT_VERSION,
16+
CONTROL_PLANE_VERSION,
17+
PROTOCOL_VERSION,
18+
Client,
19+
)
1420
from .errors import ActivityCancelled, AvroNotInstalledError, NonRetryableError
1521
from .workflow import replay
1622

@@ -25,6 +31,71 @@ def _workflow_name(cls: type) -> str:
2531
return getattr(cls, "__workflow_name__", cls.__name__)
2632

2733

34+
def _manifest_version(manifest: Any) -> str:
35+
if isinstance(manifest, dict):
36+
value = manifest.get("version")
37+
if isinstance(value, str | int):
38+
return str(value)
39+
return "missing"
40+
41+
42+
def _validate_server_compatibility(info: dict[str, Any]) -> None:
43+
control_plane = info.get("control_plane")
44+
if not isinstance(control_plane, dict):
45+
raise RuntimeError(
46+
"Server compatibility error: missing control_plane manifest; "
47+
f"sdk-python 0.2.x requires control_plane.version {CONTROL_PLANE_VERSION}."
48+
)
49+
50+
control_plane_version = _manifest_version(control_plane)
51+
if control_plane_version != CONTROL_PLANE_VERSION:
52+
raise RuntimeError(
53+
"Server compatibility error: unsupported control_plane.version "
54+
f"{control_plane_version!r}; sdk-python 0.2.x requires {CONTROL_PLANE_VERSION!r}."
55+
)
56+
57+
request_contract = control_plane.get("request_contract")
58+
if not isinstance(request_contract, dict):
59+
raise RuntimeError(
60+
"Server compatibility error: missing control_plane.request_contract; "
61+
f"expected {CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA} v{CONTROL_PLANE_REQUEST_CONTRACT_VERSION}."
62+
)
63+
64+
request_schema = request_contract.get("schema")
65+
request_version = request_contract.get("version")
66+
if (
67+
request_schema != CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA
68+
or not _contract_version_matches(request_version, CONTROL_PLANE_REQUEST_CONTRACT_VERSION)
69+
):
70+
raise RuntimeError(
71+
"Server compatibility error: unsupported control_plane.request_contract "
72+
f"{request_schema!r} v{request_version!r}; expected "
73+
f"{CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA} v{CONTROL_PLANE_REQUEST_CONTRACT_VERSION}."
74+
)
75+
76+
worker_protocol = info.get("worker_protocol")
77+
if not isinstance(worker_protocol, dict):
78+
raise RuntimeError(
79+
"Server compatibility error: missing worker_protocol manifest; "
80+
f"sdk-python 0.2.x requires worker_protocol.version {PROTOCOL_VERSION}."
81+
)
82+
83+
worker_protocol_version = _manifest_version(worker_protocol)
84+
if worker_protocol_version != PROTOCOL_VERSION:
85+
raise RuntimeError(
86+
"Server compatibility error: unsupported worker_protocol.version "
87+
f"{worker_protocol_version!r}; sdk-python 0.2.x requires {PROTOCOL_VERSION!r}."
88+
)
89+
90+
91+
def _contract_version_matches(value: Any, expected: int) -> bool:
92+
if isinstance(value, int):
93+
return value == expected
94+
if isinstance(value, str) and value.isdigit():
95+
return int(value) == expected
96+
return False
97+
98+
2899
class Worker:
29100
def __init__(
30101
self,
@@ -52,32 +123,18 @@ def __init__(
52123
self._in_flight: set[asyncio.Task[Any]] = set()
53124

54125
async def _register(self) -> None:
55-
# Check server version compatibility
56126
try:
57127
info = await self.client.get_cluster_info()
58-
server_version = info.get("version", "unknown")
59-
60-
# Parse major version (e.g., "0.1.9" -> 0, "2.0.0" -> 2)
61-
try:
62-
major = int(server_version.split(".")[0])
63-
except (ValueError, IndexError):
64-
log.warning("unable to parse server version %r; skipping compatibility check", server_version)
65-
major = None
66-
67-
# Require server major version 0 or 2 (0.x is pre-release, 2.x is stable).
68-
# SDK 0.2.x is compatible with server 0.x prereleases and 2.x protocol releases.
69-
if major is not None and major not in (0, 2):
70-
raise RuntimeError(
71-
f"Server version {server_version} is incompatible with sdk-python 0.2.x "
72-
f"(requires server 0.x or 2.x). "
73-
f"Upgrade the server or use a compatible SDK version."
74-
)
75-
76-
log.debug("server version %s is compatible", server_version)
77128
except Exception as e:
78-
if isinstance(e, RuntimeError) and "incompatible" in str(e):
79-
raise
80-
log.warning("failed to check server version compatibility: %s", e)
129+
raise RuntimeError(f"Server compatibility error: unable to read /api/cluster/info: {e}") from e
130+
131+
_validate_server_compatibility(info)
132+
log.debug(
133+
"server compatibility accepted: app_version=%s control_plane=%s worker_protocol=%s",
134+
info.get("version", "unknown"),
135+
_manifest_version(info.get("control_plane")),
136+
_manifest_version(info.get("worker_protocol")),
137+
)
81138

82139
await self.client.register_worker(
83140
worker_id=self.worker_id,

tests/test_worker.py

Lines changed: 57 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@
77
import pytest
88

99
from durable_workflow import activity, workflow
10-
from durable_workflow.client import Client
10+
from durable_workflow.client import (
11+
CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA,
12+
CONTROL_PLANE_REQUEST_CONTRACT_VERSION,
13+
CONTROL_PLANE_VERSION,
14+
PROTOCOL_VERSION,
15+
Client,
16+
)
1117
from durable_workflow.worker import Worker
1218

1319

@@ -38,10 +44,29 @@ def mock_client() -> AsyncMock:
3844
client.complete_activity_task = AsyncMock(return_value={"outcome": "completed"})
3945
client.fail_workflow_task = AsyncMock(return_value={"outcome": "failed"})
4046
client.fail_activity_task = AsyncMock(return_value={"outcome": "failed"})
41-
client.get_cluster_info = AsyncMock(return_value={"version": "2.0.0"})
47+
client.get_cluster_info = AsyncMock(return_value=compatible_cluster_info())
4248
return client
4349

4450

51+
def compatible_cluster_info(**overrides: object) -> dict[str, object]:
52+
info: dict[str, object] = {
53+
"version": "not-authoritative",
54+
"control_plane": {
55+
"version": CONTROL_PLANE_VERSION,
56+
"request_contract": {
57+
"schema": CONTROL_PLANE_REQUEST_CONTRACT_SCHEMA,
58+
"version": CONTROL_PLANE_REQUEST_CONTRACT_VERSION,
59+
"operations": {},
60+
},
61+
},
62+
"worker_protocol": {
63+
"version": PROTOCOL_VERSION,
64+
},
65+
}
66+
info.update(overrides)
67+
return info
68+
69+
4570
class TestWorkerRegistration:
4671
@pytest.mark.asyncio
4772
async def test_register(self, mock_client: AsyncMock) -> None:
@@ -66,48 +91,59 @@ async def test_register_calls_cluster_info(self, mock_client: AsyncMock) -> None
6691
mock_client.get_cluster_info.assert_awaited_once()
6792

6893
@pytest.mark.asyncio
69-
async def test_register_accepts_server_major_0(self, mock_client: AsyncMock) -> None:
70-
mock_client.get_cluster_info = AsyncMock(return_value={"version": "0.1.9"})
94+
async def test_register_uses_protocol_manifests_not_top_level_app_version(
95+
self, mock_client: AsyncMock
96+
) -> None:
97+
mock_client.get_cluster_info = AsyncMock(return_value=compatible_cluster_info(version="3.0.0"))
7198
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
7299
await worker._register()
73100
mock_client.register_worker.assert_awaited_once()
74101

75102
@pytest.mark.asyncio
76-
async def test_register_accepts_server_major_2(self, mock_client: AsyncMock) -> None:
77-
mock_client.get_cluster_info = AsyncMock(return_value={"version": "2.3.4"})
103+
async def test_register_rejects_missing_control_plane_manifest(self, mock_client: AsyncMock) -> None:
104+
mock_client.get_cluster_info = AsyncMock(return_value=compatible_cluster_info(control_plane=None))
78105
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
79-
await worker._register()
80-
mock_client.register_worker.assert_awaited_once()
106+
with pytest.raises(RuntimeError, match="missing control_plane manifest"):
107+
await worker._register()
108+
mock_client.register_worker.assert_not_called()
81109

82110
@pytest.mark.asyncio
83-
async def test_register_rejects_incompatible_major_1(self, mock_client: AsyncMock) -> None:
84-
mock_client.get_cluster_info = AsyncMock(return_value={"version": "1.4.0"})
111+
async def test_register_rejects_unsupported_control_plane_version(self, mock_client: AsyncMock) -> None:
112+
mock_client.get_cluster_info = AsyncMock(
113+
return_value=compatible_cluster_info(control_plane={"version": "3", "request_contract": {}})
114+
)
85115
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
86-
with pytest.raises(RuntimeError, match="incompatible"):
116+
with pytest.raises(RuntimeError, match="unsupported control_plane.version"):
87117
await worker._register()
88118
mock_client.register_worker.assert_not_called()
89119

90120
@pytest.mark.asyncio
91-
async def test_register_rejects_incompatible_major_3(self, mock_client: AsyncMock) -> None:
92-
mock_client.get_cluster_info = AsyncMock(return_value={"version": "3.0.0"})
121+
async def test_register_rejects_missing_request_contract(self, mock_client: AsyncMock) -> None:
122+
mock_client.get_cluster_info = AsyncMock(
123+
return_value=compatible_cluster_info(control_plane={"version": CONTROL_PLANE_VERSION})
124+
)
93125
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
94-
with pytest.raises(RuntimeError, match="incompatible"):
126+
with pytest.raises(RuntimeError, match="missing control_plane.request_contract"):
95127
await worker._register()
96128
mock_client.register_worker.assert_not_called()
97129

98130
@pytest.mark.asyncio
99-
async def test_register_skips_check_on_unparseable_version(self, mock_client: AsyncMock) -> None:
100-
mock_client.get_cluster_info = AsyncMock(return_value={"version": "unknown"})
131+
async def test_register_rejects_unsupported_worker_protocol_version(self, mock_client: AsyncMock) -> None:
132+
mock_client.get_cluster_info = AsyncMock(
133+
return_value=compatible_cluster_info(worker_protocol={"version": "2.0"})
134+
)
101135
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
102-
await worker._register()
103-
mock_client.register_worker.assert_awaited_once()
136+
with pytest.raises(RuntimeError, match="unsupported worker_protocol.version"):
137+
await worker._register()
138+
mock_client.register_worker.assert_not_called()
104139

105140
@pytest.mark.asyncio
106-
async def test_register_continues_when_cluster_info_fails(self, mock_client: AsyncMock) -> None:
141+
async def test_register_fails_closed_when_cluster_info_fails(self, mock_client: AsyncMock) -> None:
107142
mock_client.get_cluster_info = AsyncMock(side_effect=RuntimeError("network down"))
108143
worker = Worker(mock_client, task_queue="q1", workflows=[TestWorkflow], activities=[])
109-
await worker._register()
110-
mock_client.register_worker.assert_awaited_once()
144+
with pytest.raises(RuntimeError, match="unable to read /api/cluster/info"):
145+
await worker._register()
146+
mock_client.register_worker.assert_not_called()
111147

112148

113149
class TestWorkflowTaskExecution:

0 commit comments

Comments
 (0)