Skip to content

Commit 918ecc2

Browse files
pause
1 parent b836d0a commit 918ecc2

10 files changed

Lines changed: 189 additions & 12 deletions

File tree

fastloop/context.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
LoopStoppedError,
1313
WorkflowGotoError,
1414
WorkflowNextError,
15+
WorkflowPauseError,
1516
WorkflowRepeatError,
1617
)
1718
from .logging import setup_logger
@@ -61,6 +62,7 @@ def __init__(
6162
i.type(): i.events() for i in integrations
6263
}
6364
self._integration_clients: dict[str, Any] = {}
65+
self.resume_payload: dict[str, Any] | None = None
6466

6567
def _reset_cycle_tracking(self) -> None:
6668
self.event_this_cycle = False
@@ -296,3 +298,7 @@ def goto(
296298
) -> None:
297299
"""Jump to a specific block in a workflow, optionally after a delay."""
298300
raise WorkflowGotoError(block_index, delay_seconds, reason)
301+
302+
def pause_until_resumed(self, reason: str | None = None) -> None:
303+
"""Pause the workflow indefinitely until resumed via API."""
304+
raise WorkflowPauseError(reason)

fastloop/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ def __init__(
6262
self.reason = reason
6363

6464

65+
class WorkflowPauseError(Exception):
66+
def __init__(self, reason: str | None = None):
67+
self.reason = reason
68+
69+
6570
class WorkflowNotFoundError(Exception):
6671
pass
6772

fastloop/fastloop.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,35 @@ async def _cancel_handler(workflow_run_id: str):
681681
status_code=HTTPStatus.NOT_FOUND, detail=str(e)
682682
) from e
683683

684+
async def _resume_handler(
685+
workflow_run_id: str, request: dict[str, Any] = {}
686+
):
687+
try:
688+
workflow = await self.state_manager.get_workflow(workflow_run_id)
689+
if workflow.status != LoopStatus.PAUSED:
690+
raise HTTPException(
691+
status_code=HTTPStatus.BAD_REQUEST,
692+
detail=f"Workflow {workflow_run_id} is not paused",
693+
)
694+
payload = request.get("payload") if request else None
695+
if payload is not None:
696+
await self.state_manager.set_workflow_resume_payload(
697+
workflow_run_id, payload
698+
)
699+
await self.state_manager.mark_workflow_for_resume(workflow_run_id)
700+
restarted = await self.restart_workflow(workflow_run_id)
701+
if restarted:
702+
return JSONResponse(content={"message": "Workflow resumed"})
703+
else:
704+
raise HTTPException(
705+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
706+
detail="Failed to resume workflow",
707+
)
708+
except WorkflowNotFoundError as e:
709+
raise HTTPException(
710+
status_code=HTTPStatus.NOT_FOUND, detail=str(e)
711+
) from e
712+
684713
async def _event_handler(request: dict[str, Any]):
685714
workflow_run_id = request.get("workflow_run_id")
686715
event_type = request.get("type")
@@ -725,6 +754,9 @@ async def _event_handler(request: dict[str, Any]):
725754
self.add_api_route(
726755
f"/{name}/{{workflow_run_id}}/cancel", _cancel_handler, methods=["POST"]
727756
)
757+
self.add_api_route(
758+
f"/{name}/{{workflow_run_id}}/resume", _resume_handler, methods=["POST"]
759+
)
728760

729761
return func_or_class
730762

fastloop/state/state.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,22 @@ async def set_workflow_block_output(
247247
async def get_workflow_block_output(self, workflow_run_id: str) -> Any:
248248
pass
249249

250+
@abstractmethod
251+
async def set_workflow_resume_payload(
252+
self, workflow_run_id: str, payload: dict[str, Any] | None
253+
) -> None:
254+
pass
255+
256+
@abstractmethod
257+
async def get_workflow_resume_payload(
258+
self, workflow_run_id: str
259+
) -> dict[str, Any] | None:
260+
pass
261+
262+
@abstractmethod
263+
async def mark_workflow_for_resume(self, workflow_run_id: str) -> None:
264+
pass
265+
250266

251267
def create_state_manager(
252268
*,

fastloop/state/state_redis.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class RedisKeys:
6767
WORKFLOW_BLOCK_OUTPUT = (
6868
f"{KEY_PREFIX}:{{app_name}}:workflow_block_output:{{workflow_run_id}}"
6969
)
70+
WORKFLOW_RESUME_PAYLOAD = (
71+
f"{KEY_PREFIX}:{{app_name}}:workflow_resume_payload:{{workflow_run_id}}"
72+
)
7073
TASK_INDEX = f"{KEY_PREFIX}:{{app_name}}:task_index"
7174
TASK_STATE = f"{KEY_PREFIX}:{{app_name}}:task:{{task_id}}"
7275
TASK_CLAIM = f"{KEY_PREFIX}:{{app_name}}:task_claim:{{task_id}}"
@@ -1070,6 +1073,36 @@ async def get_workflow_block_output(self, workflow_run_id: str) -> Any:
10701073
return cloudpickle.loads(output_bytes)
10711074
return None
10721075

1076+
async def set_workflow_resume_payload(
1077+
self, workflow_run_id: str, payload: dict[str, Any] | None
1078+
) -> None:
1079+
payload_key = RedisKeys.WORKFLOW_RESUME_PAYLOAD.format(
1080+
app_name=self.app_name, workflow_run_id=workflow_run_id
1081+
)
1082+
if payload is None:
1083+
await self.rdb.delete(payload_key)
1084+
else:
1085+
await self.rdb.set(payload_key, json.dumps(payload))
1086+
1087+
async def get_workflow_resume_payload(
1088+
self, workflow_run_id: str
1089+
) -> dict[str, Any] | None:
1090+
payload_key = RedisKeys.WORKFLOW_RESUME_PAYLOAD.format(
1091+
app_name=self.app_name, workflow_run_id=workflow_run_id
1092+
)
1093+
payload_bytes = await self.rdb.get(payload_key)
1094+
if payload_bytes:
1095+
return json.loads(payload_bytes.decode("utf-8"))
1096+
return None
1097+
1098+
async def mark_workflow_for_resume(self, workflow_run_id: str) -> None:
1099+
workflow = await self.get_workflow(workflow_run_id)
1100+
if workflow.status != LoopStatus.PAUSED:
1101+
raise ValueError(f"Workflow {workflow_run_id} is not paused")
1102+
workflow.status = LoopStatus.IDLE
1103+
workflow.scheduled_wake_time = time.time()
1104+
await self.update_workflow(workflow_run_id, workflow)
1105+
10731106
async def create_task(self, task: TaskState) -> TaskState:
10741107
state_key = RedisKeys.TASK_STATE.format(
10751108
app_name=self.app_name, task_id=task.task_id

fastloop/state/state_s3.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,3 +741,39 @@ async def get_workflow_block_output(self, workflow_id: str) -> Any:
741741
return cloudpickle.loads(response["Body"].read())
742742
except ClientError:
743743
return None
744+
745+
async def set_workflow_resume_payload(
746+
self, workflow_id: str, payload: dict[str, Any] | None
747+
) -> None:
748+
key = (
749+
f"{self.prefix}/{self.app_name}/workflow_resume_payload/{workflow_id}.json"
750+
)
751+
if payload is None:
752+
try:
753+
self.s3.delete_object(Bucket=self.bucket, Key=key)
754+
except ClientError:
755+
pass
756+
else:
757+
self.s3.put_object(
758+
Bucket=self.bucket, Key=key, Body=json.dumps(payload).encode("utf-8")
759+
)
760+
761+
async def get_workflow_resume_payload(
762+
self, workflow_id: str
763+
) -> dict[str, Any] | None:
764+
key = (
765+
f"{self.prefix}/{self.app_name}/workflow_resume_payload/{workflow_id}.json"
766+
)
767+
try:
768+
response = self.s3.get_object(Bucket=self.bucket, Key=key)
769+
return json.loads(response["Body"].read().decode("utf-8"))
770+
except ClientError:
771+
return None
772+
773+
async def mark_workflow_for_resume(self, workflow_id: str) -> None:
774+
workflow = await self.get_workflow(workflow_id)
775+
if workflow.status != LoopStatus.PAUSED:
776+
raise ValueError(f"Workflow {workflow_id} is not paused")
777+
workflow.status = LoopStatus.IDLE
778+
workflow.scheduled_wake_time = time.time()
779+
await self.update_workflow(workflow_id, workflow)

fastloop/types.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ class LoopStatus(StrEnum):
1414
PENDING = "pending"
1515
RUNNING = "running"
1616
IDLE = "idle"
17+
PAUSED = "paused"
1718
STOPPED = "stopped"
1819
FAILED = "failed"
1920

2021

2122
class ScheduleType(StrEnum):
2223
IMMEDIATE = "immediate"
2324
DELAY = "delay"
25+
PAUSE = "pause"
2426
STOP = "stop"
2527

2628

fastloop/workflow.py

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
WorkflowGotoError,
2222
WorkflowMaxRetriesError,
2323
WorkflowNextError,
24+
WorkflowPauseError,
2425
WorkflowRepeatError,
2526
)
2627
from .logging import setup_logger
@@ -160,12 +161,19 @@ async def _apply_plan(
160161
workflow_run_id: str,
161162
idx: int,
162163
blocks: list[WorkflowBlock],
163-
) -> tuple[int | None, float | None]:
164+
block_output: Any = None,
165+
) -> tuple[int | None, float | None, bool]:
164166
if plan_result is None:
165-
return idx + 1, None
167+
return idx + 1, None, False
166168

167169
if plan_result.schedule_type == ScheduleType.STOP:
168-
return None, None
170+
return None, None, False
171+
172+
if plan_result.schedule_type == ScheduleType.PAUSE:
173+
await self._schedule_pause(
174+
workflow_run_id, block_output, plan_result.reason
175+
)
176+
return idx + 1, None, True
169177

170178
next_idx = plan_result.next_block_index
171179
if next_idx is None:
@@ -186,7 +194,7 @@ async def _apply_plan(
186194
if plan_result.schedule_type == ScheduleType.DELAY:
187195
delay = plan_result.delay_seconds
188196

189-
return next_idx, delay
197+
return next_idx, delay, False
190198

191199
async def _schedule_delay(
192200
self,
@@ -211,6 +219,28 @@ async def _schedule_delay(
211219
)
212220
raise LoopPausedError()
213221

222+
async def _schedule_pause(
223+
self,
224+
workflow_run_id: str,
225+
block_output: Any,
226+
reason: str | None = None,
227+
) -> None:
228+
"""Pause a workflow indefinitely until resumed via API."""
229+
await self.state_manager.set_workflow_block_output(
230+
workflow_run_id, block_output
231+
)
232+
await self.state_manager.update_workflow_status(
233+
workflow_run_id, LoopStatus.PAUSED
234+
)
235+
logger.info(
236+
"Workflow paused until resumed",
237+
extra={
238+
"workflow_run_id": workflow_run_id,
239+
"reason": reason,
240+
},
241+
)
242+
raise LoopPausedError()
243+
214244
async def _run(
215245
self,
216246
func: Callable[..., Any],
@@ -267,6 +297,14 @@ async def _run(
267297
workflow_run_id
268298
)
269299
)
300+
context.resume_payload = (
301+
await self.state_manager.get_workflow_resume_payload(
302+
workflow_run_id
303+
)
304+
)
305+
await self.state_manager.set_workflow_resume_payload(
306+
workflow_run_id, None
307+
)
270308

271309
try:
272310
block_output = await _call_with_result(
@@ -293,10 +331,13 @@ async def _run(
293331
},
294332
)
295333

296-
next_idx, delay = await self._apply_plan(
297-
plan_result, workflow_run_id, idx, blocks
334+
next_idx, delay, paused = await self._apply_plan(
335+
plan_result, workflow_run_id, idx, blocks, block_output
298336
)
299337

338+
if paused:
339+
continue
340+
300341
if next_idx is None:
301342
await _call(on_block_complete, context, current_block, None)
302343
raise LoopStoppedError()
@@ -358,6 +399,11 @@ async def _run(
358399
workflow_run_id, e.delay_seconds, None, e.reason
359400
)
360401

402+
except WorkflowPauseError as e:
403+
await self._schedule_pause(
404+
workflow_run_id, context.block_output, e.reason
405+
)
406+
361407
except (asyncio.CancelledError, LoopPausedError, LoopStoppedError):
362408
raise
363409

@@ -431,10 +477,11 @@ async def _run(
431477
)
432478
await _call(on_stop, context)
433479
except LoopPausedError:
434-
await self.state_manager.update_workflow_status(
435-
workflow_run_id, LoopStatus.IDLE
436-
)
437-
# Don't call on_stop - workflow is just paused, not finished
480+
workflow = await self.state_manager.get_workflow(workflow_run_id)
481+
if workflow.status != LoopStatus.PAUSED:
482+
await self.state_manager.update_workflow_status(
483+
workflow_run_id, LoopStatus.IDLE
484+
)
438485
finally:
439486
self.tasks.pop(workflow_run_id, None)
440487

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.113"
3+
version = "0.1.114"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)