Skip to content

Commit b64f01f

Browse files
add workflows and class based
1 parent 3ec2d17 commit b64f01f

15 files changed

Lines changed: 2409 additions & 45 deletions

examples/class_based.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""
2+
Example of class-based loop definition.
3+
4+
This demonstrates the class-based alternative to the function decorator approach.
5+
Both styles use the same @app.loop decorator and have identical functionality.
6+
"""
7+
8+
from fastloop import FastLoop, Loop, LoopContext, LoopEvent
9+
10+
app = FastLoop(name="class-based-demo")
11+
12+
13+
@app.event("chat.message")
14+
class ChatMessage(LoopEvent):
15+
content: str
16+
17+
18+
@app.event("chat.response")
19+
class ChatResponse(LoopEvent):
20+
content: str
21+
22+
23+
@app.loop("chat", start_event=ChatMessage)
24+
class ChatLoop(Loop):
25+
"""A chat loop implemented as a class."""
26+
27+
messages_processed: int = 0
28+
29+
async def on_start(self, ctx: LoopContext) -> None:
30+
print(f"[{ctx.loop_id}] Chat loop started")
31+
self.messages_processed = 0
32+
33+
async def on_stop(self, ctx: LoopContext) -> None:
34+
print(
35+
f"[{ctx.loop_id}] Chat loop stopped, processed {self.messages_processed} messages"
36+
)
37+
38+
async def on_app_start(self, ctx: LoopContext) -> bool:
39+
print(f"[{ctx.loop_id}] App started, checking if loop should resume...")
40+
pending = await ctx.get("pending_work", default=False)
41+
return pending
42+
43+
async def on_event(self, ctx: LoopContext, event: LoopEvent) -> None:
44+
print(f"[{ctx.loop_id}] Received event: {event.type}")
45+
46+
async def loop(self, ctx: LoopContext) -> None:
47+
message = await ctx.wait_for(ChatMessage, timeout=5, raise_on_timeout=False)
48+
49+
if message:
50+
self.messages_processed += 1
51+
response = f"Echo: {message.content}"
52+
await ctx.emit(ChatResponse(content=response))
53+
print(f"[{ctx.loop_id}] Processed message #{self.messages_processed}")
54+
55+
56+
if __name__ == "__main__":
57+
app.run(port=8112)

examples/workflows.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
"""
2+
Workflow Example - Durable multi-step execution.
3+
4+
Control flow:
5+
ctx.next(payload) → advance to next block
6+
ctx.repeat() → re-run current block
7+
normal return → workflow completes
8+
"""
9+
10+
from fastloop import FastLoop, LoopContext, LoopEvent, WorkflowBlock
11+
12+
app = FastLoop(name="workflow-demo")
13+
14+
15+
# --- Events ---
16+
17+
18+
@app.event("start_workflow")
19+
class StartWorkflow(LoopEvent):
20+
pass
21+
22+
23+
@app.event("user_input")
24+
class UserInput(LoopEvent):
25+
value: str
26+
27+
28+
# --- Callbacks (optional) ---
29+
30+
31+
def on_block_done(ctx: LoopContext, block: WorkflowBlock, payload: dict | None):
32+
print(f" ✓ Block complete: {block.type}")
33+
34+
35+
def on_error(ctx: LoopContext, block: WorkflowBlock, error: Exception):
36+
print(f" ✗ Block error: {block.type} - {error}")
37+
38+
39+
# --- Workflow ---
40+
41+
42+
@app.workflow(
43+
name="onboarding",
44+
start_event=StartWorkflow,
45+
on_block_complete=on_block_done,
46+
on_error=on_error,
47+
)
48+
async def onboarding_workflow(
49+
ctx: LoopContext,
50+
blocks: list[WorkflowBlock],
51+
current_block: WorkflowBlock,
52+
):
53+
"""
54+
Multi-step onboarding. State survives restarts.
55+
56+
ctx.block_index - current position (0-indexed)
57+
ctx.block_count - total blocks
58+
ctx.previous_payload - data from previous ctx.next(payload)
59+
"""
60+
print(f"[{ctx.block_index + 1}/{ctx.block_count}] {current_block.type}")
61+
62+
match current_block.type:
63+
case "collect_name":
64+
response = await ctx.wait_for(
65+
UserInput, timeout=300.0, raise_on_timeout=False
66+
)
67+
if response:
68+
await ctx.set("user_name", response.value)
69+
ctx.next()
70+
else:
71+
ctx.repeat()
72+
73+
case "collect_email":
74+
response = await ctx.wait_for(
75+
UserInput, timeout=300.0, raise_on_timeout=False
76+
)
77+
if response:
78+
await ctx.set("user_email", response.value)
79+
ctx.next()
80+
else:
81+
ctx.repeat()
82+
83+
case "confirm":
84+
name = await ctx.get("user_name")
85+
email = await ctx.get("user_email")
86+
print(f"Done: {name} <{email}>")
87+
88+
89+
if __name__ == "__main__":
90+
app.run(port=8111)
91+
92+
# Example usage:
93+
#
94+
# 1. Start workflow:
95+
# curl -X POST http://localhost:8111/onboarding \
96+
# -H "Content-Type: application/json" \
97+
# -d '{
98+
# "type": "start_workflow",
99+
# "blocks": [
100+
# {"type": "collect_name", "text": "Please enter your name"},
101+
# {"type": "collect_email", "text": "Please enter your email"},
102+
# {"type": "confirm", "text": "Confirm your details"}
103+
# ]
104+
# }'
105+
#
106+
# 2. Send event to workflow (use workflow_id from response):
107+
# curl -X POST http://localhost:8111/onboarding/<workflow_id>/event \
108+
# -H "Content-Type: application/json" \
109+
# -d '{"type": "user_input", "value": "John Doe", "workflow_id": "<id>"}'
110+
#
111+
# 3. Check workflow status:
112+
# curl http://localhost:8111/onboarding/<workflow_id>
113+
#
114+
# 4. If service restarts, workflow resumes from current block automatically

fastloop/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
from . import integrations
22
from .context import LoopContext
33
from .fastloop import FastLoop
4-
from .loop import LoopEvent
4+
from .loop import Loop, LoopEvent, WorkflowBlock
55

66
__all__ = [
77
"FastLoop",
8+
"Loop",
89
"LoopContext",
910
"LoopEvent",
11+
"WorkflowBlock",
1012
"integrations",
1113
]

fastloop/context.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
LoopContextSwitchError,
1111
LoopPausedError,
1212
LoopStoppedError,
13+
WorkflowNextError,
14+
WorkflowRepeatError,
1315
)
1416
from .logging import setup_logger
1517
from .loop import LoopEvent
@@ -51,19 +53,12 @@ def __init__(
5153
self.state_manager: StateManager = state_manager
5254
self.event_this_cycle: bool = False
5355

54-
if integrations is None:
55-
integrations = []
56-
57-
self.integrations: dict[str, Integration] = {
58-
integration.type(): integration for integration in integrations
56+
integrations = integrations or []
57+
self.integrations: dict[str, Integration] = {i.type(): i for i in integrations}
58+
self.integration_events: dict[str, list[Any]] = {
59+
i.type(): i.events() for i in integrations
5960
}
6061

61-
self.integration_events: dict[str, list[Any]] = {}
62-
63-
for integration in integrations:
64-
self.integrations[integration.type()] = integration
65-
self.integration_events[integration.type()] = integration.events()
66-
6762
def stop(self):
6863
"""Request the loop to stop on the next iteration."""
6964
self._stop_requested = True
@@ -250,3 +245,11 @@ def _parse_duration(self, duration_str: str) -> float:
250245
if unit.startswith(prefix):
251246
return value * mult
252247
return value
248+
249+
def next(self, payload: dict | None = None) -> None:
250+
"""Advance to the next block in a workflow."""
251+
raise WorkflowNextError(payload)
252+
253+
def repeat(self) -> None:
254+
"""Re-execute the current block in a workflow."""
255+
raise WorkflowRepeatError()

fastloop/exceptions.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,16 @@ class LoopContextSwitchError(Exception):
3939
def __init__(self, func: Callable[[T], Awaitable[None]], context: "LoopContext"):
4040
self.func = func
4141
self.context = context
42+
43+
44+
class WorkflowNextError(Exception):
45+
def __init__(self, payload: dict | None = None):
46+
self.payload = payload
47+
48+
49+
class WorkflowRepeatError(Exception):
50+
pass
51+
52+
53+
class WorkflowNotFoundError(Exception):
54+
pass

0 commit comments

Comments
 (0)