Skip to content

Commit ed6f2e0

Browse files
[cross-repo from server#138] Task Queue Priority + Fairness: dispatch by priority and fair across workload classes (Temporal-parity) (#44)
1 parent 370bbb3 commit ed6f2e0

2 files changed

Lines changed: 62 additions & 0 deletions

File tree

src/durable_workflow/client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1877,6 +1877,9 @@ async def start_workflow(
18771877
duplicate_policy: str | None = None,
18781878
memo: dict[str, Any] | None = None,
18791879
search_attributes: dict[str, Any] | None = None,
1880+
priority: int | None = None,
1881+
fairness_key: str | None = None,
1882+
fairness_weight: int | None = None,
18801883
) -> WorkflowHandle:
18811884
"""Start a new workflow instance and return a handle bound to it.
18821885
@@ -1895,6 +1898,16 @@ async def start_workflow(
18951898
18961899
``memo`` and ``search_attributes`` attach operator-facing metadata to
18971900
the instance; see the main docs site for the key/value rules.
1901+
1902+
``priority`` is an integer in the range ``0..9`` (lower numbers run
1903+
first when workers on a shared task queue are saturated; default
1904+
``5``). ``fairness_key`` tags the workload class — typically a
1905+
tenant id, team name, or workflow type — so dispatch on a shared
1906+
task queue can be rebalanced across declared classes under
1907+
contention; tasks without a key share one class. ``fairness_weight``
1908+
(``1..1000``, default ``1``) lets a class take a proportionally
1909+
larger share of dispatch slots versus other classes on the same
1910+
queue.
18981911
"""
18991912
body: dict[str, Any] = {
19001913
"workflow_id": workflow_id,
@@ -1921,6 +1934,12 @@ async def start_workflow(
19211934
task_queue=task_queue,
19221935
)
19231936
body["search_attributes"] = search_attributes
1937+
if priority is not None:
1938+
body["priority"] = priority
1939+
if fairness_key is not None:
1940+
body["fairness_key"] = fairness_key
1941+
if fairness_weight is not None:
1942+
body["fairness_weight"] = fairness_weight
19241943
data = await self._request("POST", "/workflows", json=body, context=workflow_id)
19251944
return WorkflowHandle(
19261945
self,

tests/test_client.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,49 @@ async def test_duplicate_raises(self, client: Client) -> None:
230230
workflow_type="greeter", task_queue="q1", workflow_id="wf-1"
231231
)
232232

233+
@pytest.mark.asyncio
234+
async def test_priority_and_fairness_are_forwarded_on_start(self, client: Client) -> None:
235+
resp = _mock_response(201, {
236+
"workflow_id": "wf-prio",
237+
"run_id": "run-prio",
238+
"workflow_type": "greeter",
239+
})
240+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
241+
await client.start_workflow(
242+
workflow_type="greeter",
243+
task_queue="shared",
244+
workflow_id="wf-prio",
245+
priority=1,
246+
fairness_key="tenant-a",
247+
fairness_weight=3,
248+
)
249+
250+
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
251+
assert body["priority"] == 1
252+
assert body["fairness_key"] == "tenant-a"
253+
assert body["fairness_weight"] == 3
254+
255+
@pytest.mark.asyncio
256+
async def test_priority_and_fairness_are_omitted_when_unset(self, client: Client) -> None:
257+
resp = _mock_response(201, {
258+
"workflow_id": "wf-nopri",
259+
"run_id": "run-nopri",
260+
"workflow_type": "greeter",
261+
})
262+
with patch.object(client._http, "request", new_callable=AsyncMock, return_value=resp) as mock:
263+
await client.start_workflow(
264+
workflow_type="greeter",
265+
task_queue="shared",
266+
workflow_id="wf-nopri",
267+
)
268+
269+
body = mock.call_args.kwargs.get("json") or mock.call_args[1].get("json")
270+
# Server-version skew: callers that don't opt in must not send the fields
271+
# so the server defaults (priority=5, no fairness key) take effect.
272+
assert "priority" not in body
273+
assert "fairness_key" not in body
274+
assert "fairness_weight" not in body
275+
233276

234277
class TestDescribeWorkflow:
235278
@pytest.mark.asyncio

0 commit comments

Comments
 (0)