Skip to content

Commit 1dab95b

Browse files
more advanced planning features
1 parent 34f4484 commit 1dab95b

12 files changed

Lines changed: 508 additions & 30 deletions

File tree

examples/workflows.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,17 @@
22
Workflow examples - function-based and class-based styles.
33
"""
44

5-
from fastloop import FastLoop, LoopContext, LoopEvent, Workflow, WorkflowBlock
5+
from typing import Any
6+
7+
from fastloop import (
8+
BlockPlan,
9+
FastLoop,
10+
LoopContext,
11+
LoopEvent,
12+
ScheduleType,
13+
Workflow,
14+
WorkflowBlock,
15+
)
616

717
app = FastLoop(name="workflow-demo")
818

@@ -127,6 +137,95 @@ async def execute(
127137
print(f"Survey results: {answers}")
128138

129139

140+
@app.event("start_email_monitor")
141+
class StartEmailMonitor(LoopEvent):
142+
pass
143+
144+
145+
async def email_monitor_plan(
146+
_ctx: LoopContext,
147+
_blocks: list[WorkflowBlock],
148+
current_block: WorkflowBlock,
149+
block_output: Any,
150+
) -> BlockPlan:
151+
"""Plan function decides next block and scheduling based on block output."""
152+
match current_block.type:
153+
case "search_emails":
154+
if block_output and block_output.get("found_emails"):
155+
return BlockPlan(schedule_type=ScheduleType.IMMEDIATE)
156+
return BlockPlan(
157+
next_block_index=0,
158+
schedule_type=ScheduleType.DELAY,
159+
delay_seconds=600,
160+
reason="No emails found, retry in 10 minutes",
161+
)
162+
case "notify":
163+
return BlockPlan(schedule_type=ScheduleType.STOP)
164+
case _:
165+
return BlockPlan(schedule_type=ScheduleType.IMMEDIATE)
166+
167+
168+
@app.workflow(
169+
name="email_monitor",
170+
start_event=StartEmailMonitor,
171+
plan=email_monitor_plan,
172+
)
173+
async def email_monitor_workflow(
174+
ctx: LoopContext,
175+
_blocks: list[WorkflowBlock],
176+
current_block: WorkflowBlock,
177+
) -> dict[str, Any]:
178+
"""Workflow that returns output for the plan function to decide next steps."""
179+
print(f"[{ctx.block_index + 1}/{ctx.block_count}] {current_block.type}")
180+
181+
match current_block.type:
182+
case "search_emails":
183+
emails = [] # simulate search
184+
return {"found_emails": len(emails) > 0, "emails": emails}
185+
case "summarize":
186+
return {"summary": "Email summary"}
187+
case "notify":
188+
return {"notified": True}
189+
case _:
190+
return {}
191+
192+
193+
@app.event("start_retry_demo")
194+
class StartRetryDemo(LoopEvent):
195+
pass
196+
197+
198+
@app.workflow(name="retry_demo", start_event=StartRetryDemo)
199+
class RetryDemoWorkflow(Workflow):
200+
"""Class-based workflow with plan method for rate limit handling."""
201+
202+
async def plan(
203+
self,
204+
ctx: LoopContext,
205+
_blocks: list[WorkflowBlock],
206+
_current_block: WorkflowBlock,
207+
block_output: Any,
208+
) -> BlockPlan | None:
209+
if block_output and block_output.get("rate_limited"):
210+
return BlockPlan(
211+
next_block_index=ctx.block_index,
212+
schedule_type=ScheduleType.DELAY,
213+
delay_seconds=block_output.get("retry_after", 60),
214+
)
215+
if ctx.block_index >= ctx.block_count - 1:
216+
return BlockPlan(schedule_type=ScheduleType.STOP)
217+
return None
218+
219+
async def execute(
220+
self,
221+
ctx: LoopContext,
222+
_blocks: list[WorkflowBlock],
223+
current_block: WorkflowBlock,
224+
) -> dict[str, Any]:
225+
print(f"[{ctx.block_index + 1}/{ctx.block_count}] {current_block.type}")
226+
return {"success": True}
227+
228+
130229
if __name__ == "__main__":
131230
app.run(port=8111)
132231

@@ -156,3 +255,21 @@ async def execute(
156255
# curl http://localhost:8111/onboarding/<workflow_id>
157256
#
158257
# 5. If service restarts, workflow resumes from current block automatically
258+
#
259+
# 6. Start email monitor workflow with plan function:
260+
# curl -X POST http://localhost:8111/email_monitor \
261+
# -H "Content-Type: application/json" \
262+
# -d '{
263+
# "type": "start_email_monitor",
264+
# "blocks": [
265+
# {"type": "search_emails", "text": "Search for customer emails"},
266+
# {"type": "summarize", "text": "Summarize found emails"},
267+
# {"type": "notify", "text": "Send notification"}
268+
# ]
269+
# }'
270+
#
271+
# The plan function will:
272+
# - Retry the search block with a 10-minute delay if no emails found
273+
# - Proceed immediately when emails are found
274+
# - Stop the workflow after notification
275+
# - Delays use Redis scheduling, so the workflow survives restarts

fastloop/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,16 @@
22
from .context import LoopContext
33
from .fastloop import FastLoop
44
from .loop import Loop, LoopEvent, Workflow, WorkflowBlock
5-
from .types import RetryPolicy
5+
from .types import BlockPlan, RetryPolicy, ScheduleType
66

77
__all__ = [
8+
"BlockPlan",
89
"FastLoop",
910
"Loop",
1011
"LoopContext",
1112
"LoopEvent",
1213
"RetryPolicy",
14+
"ScheduleType",
1315
"Workflow",
1416
"WorkflowBlock",
1517
"integrations",

fastloop/context.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
LoopContextSwitchError,
1111
LoopPausedError,
1212
LoopStoppedError,
13+
WorkflowGotoError,
1314
WorkflowNextError,
1415
WorkflowRepeatError,
1516
)
@@ -254,3 +255,12 @@ def next(self, payload: dict | None = None) -> None:
254255
def repeat(self) -> None:
255256
"""Re-execute the current block in a workflow."""
256257
raise WorkflowRepeatError()
258+
259+
def goto(
260+
self,
261+
block_index: int,
262+
delay_seconds: float | None = None,
263+
reason: str | None = None,
264+
) -> None:
265+
"""Jump to a specific block in a workflow, optionally after a delay."""
266+
raise WorkflowGotoError(block_index, delay_seconds, reason)

fastloop/exceptions.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ class WorkflowRepeatError(Exception):
5050
pass
5151

5252

53+
class WorkflowGotoError(Exception):
54+
def __init__(
55+
self,
56+
block_index: int,
57+
delay_seconds: float | None = None,
58+
reason: str | None = None,
59+
):
60+
self.block_index = block_index
61+
self.delay_seconds = delay_seconds
62+
self.reason = reason
63+
64+
5365
class WorkflowNotFoundError(Exception):
5466
pass
5567

fastloop/fastloop.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,7 @@ def workflow(
460460
on_stop: Callable[..., Any] | None = None,
461461
on_block_complete: Callable[..., Any] | None = None,
462462
on_error: Callable[..., Any] | None = None,
463+
plan: Callable[..., Any] | None = None,
463464
retry: RetryPolicy | None = None,
464465
) -> Callable[
465466
[Callable[..., Any] | type[Workflow]], Callable[..., Any] | type[Workflow]
@@ -478,13 +479,15 @@ def _decorator(
478479
workflow_on_stop = workflow_instance.on_stop
479480
workflow_on_block_complete = workflow_instance.on_block_complete
480481
workflow_on_error = workflow_instance.on_error
482+
workflow_plan = getattr(workflow_instance, "plan", None) or plan
481483
else:
482484
workflow_instance = None # type: ignore
483485
func = func_or_class # type: ignore
484486
workflow_on_start = on_start
485487
workflow_on_stop = on_stop
486488
workflow_on_block_complete = on_block_complete
487489
workflow_on_error = on_error
490+
workflow_plan = plan
488491

489492
start_event_key = _resolve_event_key(start_event)
490493

@@ -497,6 +500,7 @@ def _decorator(
497500
"on_stop": workflow_on_stop,
498501
"on_block_complete": workflow_on_block_complete,
499502
"on_error": workflow_on_error,
503+
"plan": workflow_plan,
500504
"workflow_instance": workflow_instance,
501505
"retry_policy": retry,
502506
}
@@ -561,6 +565,7 @@ async def _start_handler(request: dict[str, Any]):
561565
on_stop=workflow_on_stop,
562566
on_block_complete=workflow_on_block_complete,
563567
on_error=workflow_on_error,
568+
plan=workflow_plan,
564569
retry_policy=retry,
565570
)
566571
return (
@@ -745,6 +750,7 @@ async def restart_workflow(self, workflow_id: str) -> bool:
745750
on_stop=metadata.get("on_stop"),
746751
on_block_complete=metadata.get("on_block_complete"),
747752
on_error=metadata.get("on_error"),
753+
plan=metadata.get("plan"),
748754
retry_policy=metadata.get("retry_policy"),
749755
)
750756

@@ -789,12 +795,26 @@ def __init__(
789795
def stop(self) -> None:
790796
self._stop_event.set()
791797

792-
async def _process_wake(self, loop_id: str) -> None:
793-
if await self.state_manager.has_claim(loop_id):
794-
return
795-
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
796-
if not await self.restart_callback(loop_id):
797-
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
798+
async def _process_wake(self, wake_id: str) -> None:
799+
if wake_id.startswith("workflow:"):
800+
workflow_id = wake_id[9:]
801+
if await self.state_manager.workflow_has_claim(workflow_id):
802+
return
803+
logger.info(
804+
"Workflow woke up, restarting", extra={"workflow_id": workflow_id}
805+
)
806+
await self.state_manager.clear_workflow_wake_time(workflow_id)
807+
if not await self.fastloop_instance.restart_workflow(workflow_id):
808+
await self.state_manager.update_workflow_status(
809+
workflow_id, LoopStatus.STOPPED
810+
)
811+
else:
812+
loop_id = wake_id
813+
if await self.state_manager.has_claim(loop_id):
814+
return
815+
logger.info("Loop woke up, restarting", extra={"loop_id": loop_id})
816+
if not await self.restart_callback(loop_id):
817+
await self.state_manager.update_loop_status(loop_id, LoopStatus.STOPPED)
798818

799819
async def _check_orphaned_loops(self) -> None:
800820
running_loops = await self.state_manager.get_all_loops(

0 commit comments

Comments
 (0)