Skip to content

Commit 74df727

Browse files
improve workflows
1 parent 647965e commit 74df727

11 files changed

Lines changed: 215 additions & 134 deletions

File tree

examples/workflows.py

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
"""
2-
Workflow Example - Durable multi-step execution.
3-
4-
Control flow:
5-
ctx.next(payload) → advance to next block
6-
ctx.repeat() → re-run current block
7-
normal return → workflow completes
2+
Workflow examples - function-based and class-based styles.
83
"""
94

10-
from fastloop import FastLoop, LoopContext, LoopEvent, WorkflowBlock
5+
from fastloop import FastLoop, LoopContext, LoopEvent, Workflow, WorkflowBlock
116

127
app = FastLoop(name="workflow-demo")
138

149

15-
# --- Events ---
16-
17-
1810
@app.event("start_workflow")
1911
class StartWorkflow(LoopEvent):
2012
pass
@@ -25,30 +17,25 @@ class UserInput(LoopEvent):
2517
value: str
2618

2719

28-
# --- Callbacks (optional) ---
20+
@app.event("progress")
21+
class Progress(LoopEvent):
22+
message: str
23+
step: int
2924

3025

3126
def on_block_done(ctx: LoopContext, block: WorkflowBlock, payload: dict | None):
3227
print(f" ✓ Block complete: {block.type}")
3328

3429

3530
def on_error(ctx: LoopContext, block: WorkflowBlock, error: Exception):
36-
"""
37-
Called when a block raises an exception.
38-
- Normal return → workflow stops
39-
- Raise ctx.repeat() → retry the block
40-
"""
4131
retries = getattr(ctx, "_retries", 0)
4232
if retries < 3:
4333
ctx._retries = retries + 1
4434
print(f" ⟳ Retrying block {block.type} (attempt {retries + 1})")
45-
ctx.repeat() # Raises WorkflowRepeatError → retry
35+
ctx.repeat()
4636
print(f" ✗ Block failed: {block.type} - {error}")
4737

4838

49-
# --- Workflow ---
50-
51-
5239
@app.workflow(
5340
name="onboarding",
5441
start_event=StartWorkflow,
@@ -60,14 +47,7 @@ async def onboarding_workflow(
6047
blocks: list[WorkflowBlock],
6148
current_block: WorkflowBlock,
6249
):
63-
"""
64-
Multi-step onboarding. State survives restarts.
65-
66-
ctx.block_index - current position (0-indexed)
67-
ctx.block_count - total blocks
68-
ctx.previous_payload - data from previous ctx.next(payload)
69-
"""
70-
print(f"[{ctx.block_index + 1}/{ctx.block_count}] {current_block.type}")
50+
await ctx.emit(Progress(message=current_block.text, step=ctx.block_index + 1))
7151

7252
match current_block.type:
7353
case "collect_name":
@@ -93,7 +73,58 @@ async def onboarding_workflow(
9373
case "confirm":
9474
name = await ctx.get("user_name")
9575
email = await ctx.get("user_email")
96-
print(f"Done: {name} <{email}>")
76+
await ctx.emit(
77+
Progress(message=f"Complete: {name} <{email}>", step=ctx.block_count)
78+
)
79+
80+
81+
@app.event("start_survey")
82+
class StartSurvey(LoopEvent):
83+
pass
84+
85+
86+
@app.workflow(name="survey", start_event=StartSurvey)
87+
class SurveyWorkflow(Workflow):
88+
async def on_start(self, ctx: LoopContext) -> None:
89+
print(f"[{ctx.loop_id}] Survey started")
90+
91+
async def on_stop(self, ctx: LoopContext) -> None:
92+
print(f"[{ctx.loop_id}] Survey completed")
93+
94+
async def on_block_complete(
95+
self, ctx: LoopContext, block: WorkflowBlock, payload: dict | None
96+
) -> None:
97+
print(f" ✓ Survey step complete: {block.type}")
98+
99+
async def on_error(
100+
self, ctx: LoopContext, block: WorkflowBlock, error: Exception
101+
) -> None:
102+
print(f" ✗ Survey error: {error}")
103+
104+
async def execute(
105+
self,
106+
ctx: LoopContext,
107+
blocks: list[WorkflowBlock],
108+
current_block: WorkflowBlock,
109+
) -> None:
110+
print(f"[{ctx.block_index + 1}/{ctx.block_count}] {current_block.type}")
111+
112+
match current_block.type:
113+
case "question":
114+
response = await ctx.wait_for(
115+
UserInput, timeout=60.0, raise_on_timeout=False
116+
)
117+
if response:
118+
await ctx.set(f"answer_{ctx.block_index}", response.value)
119+
ctx.next()
120+
else:
121+
ctx.abort()
122+
123+
case "summary":
124+
answers = [
125+
await ctx.get(f"answer_{i}") for i in range(ctx.block_count - 1)
126+
]
127+
print(f"Survey results: {answers}")
97128

98129

99130
if __name__ == "__main__":
@@ -113,12 +144,15 @@ async def onboarding_workflow(
113144
# ]
114145
# }'
115146
#
116-
# 2. Send event to workflow (use workflow_id from response):
147+
# 2. Subscribe to SSE events (use workflow_id from step 1):
148+
# curl -N http://localhost:8111/events/<workflow_id>/sse
149+
#
150+
# 3. Send event to workflow:
117151
# curl -X POST http://localhost:8111/onboarding/<workflow_id>/event \
118152
# -H "Content-Type: application/json" \
119153
# -d '{"type": "user_input", "value": "John Doe", "workflow_id": "<id>"}'
120154
#
121-
# 3. Check workflow status:
155+
# 4. Check workflow status:
122156
# curl http://localhost:8111/onboarding/<workflow_id>
123157
#
124-
# 4. If service restarts, workflow resumes from current block automatically
158+
# 5. If service restarts, workflow resumes from current block automatically

fastloop/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from . import integrations
22
from .context import LoopContext
33
from .fastloop import FastLoop
4-
from .loop import Loop, LoopEvent, WorkflowBlock
4+
from .loop import Loop, LoopEvent, Workflow, WorkflowBlock
55

66
__all__ = [
77
"FastLoop",
88
"Loop",
99
"LoopContext",
1010
"LoopEvent",
11+
"Workflow",
1112
"WorkflowBlock",
1213
"integrations",
1314
]

fastloop/context.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,16 @@ def __init__(
6060
}
6161

6262
def stop(self):
63-
"""Request the loop to stop on the next iteration."""
6463
self._stop_requested = True
6564
raise LoopStoppedError()
6665

6766
def pause(self):
68-
"""Request the loop to pause on the next iteration."""
6967
self._pause_requested = True
7068
raise LoopPausedError()
7169

70+
def abort(self):
71+
raise LoopStoppedError()
72+
7273
def switch_to(self: T, func: Callable[[T], Awaitable[None]]):
7374
logger.info(
7475
f"Switching context to function: {get_func_import_path(func)}",

fastloop/fastloop.py

Lines changed: 49 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,25 @@
2525
)
2626
from .integrations import Integration
2727
from .logging import configure_logging, setup_logger
28-
from .loop import Loop, LoopEvent, LoopManager, WorkflowBlock, WorkflowManager
28+
from .loop import Loop, LoopEvent, LoopManager, Workflow, WorkflowBlock, WorkflowManager
2929
from .state.state import StateManager, create_state_manager
3030
from .types import BaseConfig, LoopStatus
3131
from .utils import get_func_import_path, import_func_from_path, infer_application_path
3232

3333
logger = setup_logger()
3434

3535

36+
def _resolve_event_key(event: str | Enum | type[LoopEvent] | None) -> str | None:
37+
"""Convert event type/enum/string to string key."""
38+
if not event:
39+
return None
40+
if isinstance(event, type) and issubclass(event, LoopEvent):
41+
return event.type
42+
if hasattr(event, "value"):
43+
return event.value # type: ignore
44+
return event # type: ignore
45+
46+
3647
class FastLoop(FastAPI):
3748
def __init__(
3849
self,
@@ -206,14 +217,7 @@ def _decorator(
206217
)
207218
integration.register(self, name)
208219

209-
start_event_key = None
210-
if start_event:
211-
if isinstance(start_event, type) and issubclass(start_event, LoopEvent):
212-
start_event_key = start_event.type
213-
elif hasattr(start_event, "value"):
214-
start_event_key = start_event.value # type: ignore
215-
else:
216-
start_event_key = start_event
220+
start_event_key = _resolve_event_key(start_event)
217221

218222
if name not in self._loop_metadata:
219223
self._loop_metadata[name] = {
@@ -456,62 +460,45 @@ def workflow(
456460
on_stop: Callable[..., Any] | None = None,
457461
on_block_complete: Callable[..., Any] | None = None,
458462
on_error: Callable[..., Any] | None = None,
459-
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
460-
"""
461-
Decorator to define a durable workflow.
462-
463-
A workflow processes a list of blocks sequentially. State is persisted
464-
after each block transition, so workflows survive service restarts.
465-
466-
Args:
467-
name: URL path for the workflow (e.g., "onboarding" → POST /onboarding)
468-
start_event: Required event type to start the workflow
469-
on_start: Called when workflow starts
470-
on_stop: Called when workflow stops (complete or error)
471-
on_block_complete: Called after each block completes
472-
on_error: Called when a block raises an exception
473-
474-
The decorated function receives:
475-
ctx: LoopContext with block_index, block_count, previous_payload
476-
blocks: List of all WorkflowBlock objects
477-
current_block: The block being executed
478-
479-
Control flow in the function:
480-
ctx.next(payload) → advance to next block
481-
ctx.repeat() → re-execute current block
482-
normal return → workflow completes
483-
484-
HTTP routes created:
485-
POST /{name} → start workflow
486-
GET /{name}/{id} → get workflow status
487-
POST /{name}/{id}/event → send event to workflow
488-
POST /{name}/{id}/stop → stop workflow
489-
"""
490-
491-
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
492-
# Resolve start_event to a string key
493-
start_event_key = None
494-
if start_event:
495-
if isinstance(start_event, type) and issubclass(start_event, LoopEvent):
496-
start_event_key = start_event.type
497-
elif hasattr(start_event, "value"):
498-
start_event_key = start_event.value
499-
else:
500-
start_event_key = start_event
463+
) -> Callable[
464+
[Callable[..., Any] | type[Workflow]], Callable[..., Any] | type[Workflow]
465+
]:
466+
def _decorator(
467+
func_or_class: Callable[..., Any] | type[Workflow],
468+
) -> Callable[..., Any] | type[Workflow]:
469+
is_class_based = isinstance(func_or_class, type) and issubclass(
470+
func_or_class, Workflow
471+
)
472+
473+
if is_class_based:
474+
workflow_instance: Workflow = func_or_class()
475+
func = workflow_instance.execute
476+
workflow_on_start = workflow_instance.on_start
477+
workflow_on_stop = workflow_instance.on_stop
478+
workflow_on_block_complete = workflow_instance.on_block_complete
479+
workflow_on_error = workflow_instance.on_error
480+
else:
481+
workflow_instance = None # type: ignore
482+
func = func_or_class # type: ignore
483+
workflow_on_start = on_start
484+
workflow_on_stop = on_stop
485+
workflow_on_block_complete = on_block_complete
486+
workflow_on_error = on_error
487+
488+
start_event_key = _resolve_event_key(start_event)
501489

502490
if name in self._workflow_metadata:
503491
raise LoopAlreadyDefinedError(f"Workflow {name} already registered")
504492

505493
self._workflow_metadata[name] = {
506494
"func": func,
507-
"on_start": on_start,
508-
"on_stop": on_stop,
509-
"on_block_complete": on_block_complete,
510-
"on_error": on_error,
495+
"on_start": workflow_on_start,
496+
"on_stop": workflow_on_stop,
497+
"on_block_complete": workflow_on_block_complete,
498+
"on_error": workflow_on_error,
499+
"workflow_instance": workflow_instance,
511500
}
512501

513-
# --- HTTP Handlers ---
514-
515502
class WorkflowStartRequest(BaseModel):
516503
type: str
517504
blocks: list[WorkflowBlock]
@@ -557,10 +544,10 @@ async def _start_handler(request: WorkflowStartRequest):
557544
func,
558545
context,
559546
workflow,
560-
on_start=on_start,
561-
on_stop=on_stop,
562-
on_block_complete=on_block_complete,
563-
on_error=on_error,
547+
on_start=workflow_on_start,
548+
on_stop=workflow_on_stop,
549+
on_block_complete=workflow_on_block_complete,
550+
on_error=workflow_on_error,
564551
)
565552
return (
566553
await self.state_manager.get_workflow(workflow.workflow_id)
@@ -621,7 +608,6 @@ async def _event_handler(request: dict[str, Any]):
621608
await self.state_manager.push_event(workflow_id, event)
622609
return workflow.to_dict()
623610

624-
# --- Register routes ---
625611
self.add_api_route(f"/{name}", _start_handler, methods=["POST"])
626612
self.add_api_route(
627613
f"/{name}/{{workflow_id}}", _get_handler, methods=["GET"]
@@ -633,7 +619,7 @@ async def _event_handler(request: dict[str, Any]):
633619
f"/{name}/{{workflow_id}}/stop", _stop_handler, methods=["POST"]
634620
)
635621

636-
return func
622+
return func_or_class
637623

638624
return _decorator
639625

0 commit comments

Comments
 (0)