Skip to content

Commit f4493a0

Browse files
Expose task-queue admission visibility through Python SDK client methods
Expose task queue admission in sync SDK Add blocking sync facade methods and coverage for task queue admission visibility.
1 parent e2efefe commit f4493a0

2 files changed

Lines changed: 186 additions & 87 deletions

File tree

src/durable_workflow/sync.py

Lines changed: 111 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Synchronous facade over the async Client, for scripts and Jupyter."""
2+
23
from __future__ import annotations
34

45
import asyncio
@@ -13,6 +14,8 @@
1314
ScheduleList,
1415
ScheduleSpec,
1516
ScheduleTriggerResult,
17+
TaskQueueDescription,
18+
TaskQueueList,
1619
WorkflowExecution,
1720
WorkflowHandle,
1821
WorkflowList,
@@ -72,12 +75,15 @@ def update(
7275
wait_timeout_seconds: int | None = None,
7376
request_id: str | None = None,
7477
) -> Any:
75-
return _run(self._handle.update(
76-
update_name, args=args,
77-
wait_for=wait_for,
78-
wait_timeout_seconds=wait_timeout_seconds,
79-
request_id=request_id,
80-
))
78+
return _run(
79+
self._handle.update(
80+
update_name,
81+
args=args,
82+
wait_for=wait_for,
83+
wait_timeout_seconds=wait_timeout_seconds,
84+
request_id=request_id,
85+
)
86+
)
8187

8288

8389
class SyncScheduleHandle:
@@ -103,11 +109,18 @@ def update(
103109
search_attributes: dict[str, Any] | None = None,
104110
note: str | None = None,
105111
) -> None:
106-
_run(self._handle.update(
107-
spec=spec, action=action, overlap_policy=overlap_policy,
108-
jitter_seconds=jitter_seconds, max_runs=max_runs,
109-
memo=memo, search_attributes=search_attributes, note=note,
110-
))
112+
_run(
113+
self._handle.update(
114+
spec=spec,
115+
action=action,
116+
overlap_policy=overlap_policy,
117+
jitter_seconds=jitter_seconds,
118+
max_runs=max_runs,
119+
memo=memo,
120+
search_attributes=search_attributes,
121+
note=note,
122+
)
123+
)
111124

112125
def pause(self, *, note: str | None = None) -> None:
113126
_run(self._handle.pause(note=note))
@@ -129,9 +142,13 @@ def backfill(
129142
end_time: str,
130143
overlap_policy: str | None = None,
131144
) -> ScheduleBackfillResult:
132-
result: ScheduleBackfillResult = _run(self._handle.backfill(
133-
start_time=start_time, end_time=end_time, overlap_policy=overlap_policy,
134-
))
145+
result: ScheduleBackfillResult = _run(
146+
self._handle.backfill(
147+
start_time=start_time,
148+
end_time=end_time,
149+
overlap_policy=overlap_policy,
150+
)
151+
)
135152
return result
136153

137154

@@ -192,17 +209,19 @@ def start_workflow(
192209
memo: dict[str, Any] | None = None,
193210
search_attributes: dict[str, Any] | None = None,
194211
) -> SyncWorkflowHandle:
195-
handle = _run(self._async.start_workflow(
196-
workflow_type=workflow_type,
197-
task_queue=task_queue,
198-
workflow_id=workflow_id,
199-
input=input,
200-
execution_timeout_seconds=execution_timeout_seconds,
201-
run_timeout_seconds=run_timeout_seconds,
202-
duplicate_policy=duplicate_policy,
203-
memo=memo,
204-
search_attributes=search_attributes,
205-
))
212+
handle = _run(
213+
self._async.start_workflow(
214+
workflow_type=workflow_type,
215+
task_queue=task_queue,
216+
workflow_id=workflow_id,
217+
input=input,
218+
execution_timeout_seconds=execution_timeout_seconds,
219+
run_timeout_seconds=run_timeout_seconds,
220+
duplicate_policy=duplicate_policy,
221+
memo=memo,
222+
search_attributes=search_attributes,
223+
)
224+
)
206225
return SyncWorkflowHandle(handle)
207226

208227
def describe_workflow(self, workflow_id: str) -> WorkflowExecution:
@@ -218,26 +237,32 @@ def list_workflows(
218237
page_size: int | None = None,
219238
next_page_token: str | None = None,
220239
) -> WorkflowList:
221-
result: WorkflowList = _run(self._async.list_workflows(
222-
workflow_type=workflow_type,
223-
status=status,
224-
query=query,
225-
page_size=page_size,
226-
next_page_token=next_page_token,
227-
))
240+
result: WorkflowList = _run(
241+
self._async.list_workflows(
242+
workflow_type=workflow_type,
243+
status=status,
244+
query=query,
245+
page_size=page_size,
246+
next_page_token=next_page_token,
247+
)
248+
)
249+
return result
250+
251+
def list_task_queues(self) -> TaskQueueList:
252+
result: TaskQueueList = _run(self._async.list_task_queues())
253+
return result
254+
255+
def describe_task_queue(self, name: str) -> TaskQueueDescription:
256+
result: TaskQueueDescription = _run(self._async.describe_task_queue(name))
228257
return result
229258

230259
def get_history(self, workflow_id: str, run_id: str) -> Any:
231260
return _run(self._async.get_history(workflow_id, run_id))
232261

233-
def signal_workflow(
234-
self, workflow_id: str, signal_name: str, *, args: list[Any] | None = None
235-
) -> None:
262+
def signal_workflow(self, workflow_id: str, signal_name: str, *, args: list[Any] | None = None) -> None:
236263
_run(self._async.signal_workflow(workflow_id, signal_name, args=args))
237264

238-
def query_workflow(
239-
self, workflow_id: str, query_name: str, *, args: list[Any] | None = None
240-
) -> Any:
265+
def query_workflow(self, workflow_id: str, query_name: str, *, args: list[Any] | None = None) -> Any:
241266
return _run(self._async.query_workflow(workflow_id, query_name, args=args))
242267

243268
def cancel_workflow(self, workflow_id: str, *, reason: str | None = None) -> None:
@@ -256,13 +281,16 @@ def update_workflow(
256281
wait_timeout_seconds: int | None = None,
257282
request_id: str | None = None,
258283
) -> Any:
259-
return _run(self._async.update_workflow(
260-
workflow_id, update_name,
261-
args=args,
262-
wait_for=wait_for,
263-
wait_timeout_seconds=wait_timeout_seconds,
264-
request_id=request_id,
265-
))
284+
return _run(
285+
self._async.update_workflow(
286+
workflow_id,
287+
update_name,
288+
args=args,
289+
wait_for=wait_for,
290+
wait_timeout_seconds=wait_timeout_seconds,
291+
request_id=request_id,
292+
)
293+
)
266294

267295
def get_result(
268296
self,
@@ -271,9 +299,7 @@ def get_result(
271299
poll_interval: float = 0.5,
272300
timeout: float = 30.0,
273301
) -> Any:
274-
return _run(self._async.get_result(
275-
handle._handle, poll_interval=poll_interval, timeout=timeout
276-
))
302+
return _run(self._async.get_result(handle._handle, poll_interval=poll_interval, timeout=timeout))
277303

278304
# ── Schedules ─────────────────────────────────────────────────────
279305
def get_schedule_handle(self, schedule_id: str) -> SyncScheduleHandle:
@@ -293,12 +319,20 @@ def create_schedule(
293319
paused: bool = False,
294320
note: str | None = None,
295321
) -> SyncScheduleHandle:
296-
handle = _run(self._async.create_schedule(
297-
schedule_id=schedule_id, spec=spec, action=action,
298-
overlap_policy=overlap_policy, jitter_seconds=jitter_seconds,
299-
max_runs=max_runs, memo=memo, search_attributes=search_attributes,
300-
paused=paused, note=note,
301-
))
322+
handle = _run(
323+
self._async.create_schedule(
324+
schedule_id=schedule_id,
325+
spec=spec,
326+
action=action,
327+
overlap_policy=overlap_policy,
328+
jitter_seconds=jitter_seconds,
329+
max_runs=max_runs,
330+
memo=memo,
331+
search_attributes=search_attributes,
332+
paused=paused,
333+
note=note,
334+
)
335+
)
302336
return SyncScheduleHandle(handle)
303337

304338
def list_schedules(self) -> ScheduleList:
@@ -322,25 +356,28 @@ def update_schedule(
322356
search_attributes: dict[str, Any] | None = None,
323357
note: str | None = None,
324358
) -> None:
325-
_run(self._async.update_schedule(
326-
schedule_id, spec=spec, action=action,
327-
overlap_policy=overlap_policy, jitter_seconds=jitter_seconds,
328-
max_runs=max_runs, memo=memo, search_attributes=search_attributes,
329-
note=note,
330-
))
359+
_run(
360+
self._async.update_schedule(
361+
schedule_id,
362+
spec=spec,
363+
action=action,
364+
overlap_policy=overlap_policy,
365+
jitter_seconds=jitter_seconds,
366+
max_runs=max_runs,
367+
memo=memo,
368+
search_attributes=search_attributes,
369+
note=note,
370+
)
371+
)
331372

332373
def pause_schedule(self, schedule_id: str, *, note: str | None = None) -> None:
333374
_run(self._async.pause_schedule(schedule_id, note=note))
334375

335376
def resume_schedule(self, schedule_id: str, *, note: str | None = None) -> None:
336377
_run(self._async.resume_schedule(schedule_id, note=note))
337378

338-
def trigger_schedule(
339-
self, schedule_id: str, *, overlap_policy: str | None = None
340-
) -> ScheduleTriggerResult:
341-
result: ScheduleTriggerResult = _run(
342-
self._async.trigger_schedule(schedule_id, overlap_policy=overlap_policy)
343-
)
379+
def trigger_schedule(self, schedule_id: str, *, overlap_policy: str | None = None) -> ScheduleTriggerResult:
380+
result: ScheduleTriggerResult = _run(self._async.trigger_schedule(schedule_id, overlap_policy=overlap_policy))
344381
return result
345382

346383
def delete_schedule(self, schedule_id: str) -> None:
@@ -354,8 +391,12 @@ def backfill_schedule(
354391
end_time: str,
355392
overlap_policy: str | None = None,
356393
) -> ScheduleBackfillResult:
357-
result: ScheduleBackfillResult = _run(self._async.backfill_schedule(
358-
schedule_id, start_time=start_time, end_time=end_time,
359-
overlap_policy=overlap_policy,
360-
))
394+
result: ScheduleBackfillResult = _run(
395+
self._async.backfill_schedule(
396+
schedule_id,
397+
start_time=start_time,
398+
end_time=end_time,
399+
overlap_policy=overlap_policy,
400+
)
401+
)
361402
return result

0 commit comments

Comments
 (0)