Skip to content

Commit 22c6bdc

Browse files
committed
Add func to run workflows returning immediately
1 parent a40b196 commit 22c6bdc

1 file changed

Lines changed: 25 additions & 5 deletions

File tree

metafold/workflows.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class Workflow:
3131
_jobs: dict[str, str] = field(factory=dict, init=False)
3232

3333
id: str
34+
link: Optional[str] = None
3435
jobs: list[str] = field(factory=list)
3536
state: str
3637
created: datetime = field(converter=asdatetime)
@@ -159,18 +160,37 @@ def run(
159160
Returns:
160161
Completed workflow resource.
161162
"""
162-
project_id = self._client.project_id(project_id)
163-
payload = asdict(definition=definition, parameters=parameters, assets=assets)
164-
r: Response = self._client.post(f"/projects/{project_id}/workflows", json=payload)
165-
url = r.json()["link"]
163+
w = self.run_async(definition, parameters, assets, project_id)
166164
try:
167-
r = self._client.poll(url, timeout)
165+
r = self._client.poll(w.link, timeout)
168166
except PollTimeout as e:
169167
raise RuntimeError(
170168
f"Workflow failed to complete within {timeout} seconds"
171169
) from e
172170
return Workflow(client=cast("MetafoldClient", self._client), **r.json())
173171

172+
def run_async(
173+
self, definition: str,
174+
parameters: Optional[dict[str, str]] = None,
175+
assets: Optional[dict[str, str]] = None,
176+
project_id: Optional[str] = None,
177+
) -> Workflow:
178+
"""Dispatch a new workflow and return immediately without waiting for result.
179+
180+
Args:
181+
definition: Workflow definition YAML.
182+
parameters: Parameter mapping for jobs in the definition.
183+
assets: Asset mapping for jobs in the definition.
184+
project_id: Workflow project ID.
185+
186+
Returns:
187+
Incomplete workflow resource.
188+
"""
189+
project_id = self._client.project_id(project_id)
190+
payload = asdict(definition=definition, parameters=parameters, assets=assets)
191+
r: Response = self._client.post(f"/projects/{project_id}/workflows", json=payload)
192+
return Workflow(client=cast("MetafoldClient", self._client), **r.json())
193+
174194
def cancel(self, workflow_id: str, project_id: Optional[str] = None) -> Workflow:
175195
"""Cancel a running workflow.
176196

0 commit comments

Comments
 (0)