Skip to content

Commit 647965e

Browse files
wip
1 parent b64f01f commit 647965e

5 files changed

Lines changed: 54 additions & 4 deletions

File tree

examples/workflows.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,17 @@ def on_block_done(ctx: LoopContext, block: WorkflowBlock, payload: dict | None):
3333

3434

3535
def on_error(ctx: LoopContext, block: WorkflowBlock, error: Exception):
36-
print(f" ✗ Block error: {block.type} - {error}")
36+
"""
37+
Called when a block raises an exception.
38+
- Normal return → workflow stops
39+
- Raise ctx.repeat() → retry the block
40+
"""
41+
retries = getattr(ctx, "_retries", 0)
42+
if retries < 3:
43+
ctx._retries = retries + 1
44+
print(f" ⟳ Retrying block {block.type} (attempt {retries + 1})")
45+
ctx.repeat() # Raises WorkflowRepeatError → retry
46+
print(f" ✗ Block failed: {block.type} - {error}")
3747

3848

3949
# --- Workflow ---

fastloop/loop.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,11 @@ async def _run(
467467
"traceback": traceback.format_exc(),
468468
},
469469
)
470-
await _call(on_error, context, current_block, e)
470+
if on_error:
471+
try:
472+
await _call(on_error, context, current_block, e)
473+
except WorkflowRepeatError:
474+
continue # Retry the block
471475
raise LoopStoppedError() from e
472476

473477
except asyncio.CancelledError:

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "fastloop"
3-
version = "0.1.76"
3+
version = "0.1.77"
44
description = "A Python package for deploying stateful loops"
55
readme = "README.md"
66
requires-python = ">=3.12"

tests/test_workflow_integration.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,42 @@ async def func(ctx, blocks, block):
321321
assert errors[0][0] == "bad"
322322
assert "boom" in errors[0][1]
323323

324+
async def test_on_error_can_retry(self, mock_state):
325+
"""on_error can raise ctx.repeat() to retry the block."""
326+
attempts = [0]
327+
328+
mock_state.get_workflow = AsyncMock(
329+
return_value=WorkflowState(
330+
workflow_id="test",
331+
blocks=[{"type": "flaky", "text": ""}],
332+
status=LoopStatus.RUNNING,
333+
)
334+
)
335+
336+
def on_error(ctx, block, error):
337+
if attempts[0] < 3:
338+
ctx.repeat() # Retry
339+
340+
async def func(ctx, blocks, block):
341+
attempts[0] += 1
342+
if attempts[0] < 3:
343+
raise ValueError("transient error")
344+
# Succeeds on 3rd attempt
345+
346+
ctx = MagicMock()
347+
ctx.repeat = LoopContext.repeat.__get__(ctx, LoopContext)
348+
349+
wm = WorkflowManager(mock_state)
350+
await wm.start(
351+
func,
352+
ctx,
353+
WorkflowState(workflow_id="test", blocks=[], status=LoopStatus.RUNNING),
354+
on_error=on_error,
355+
)
356+
await asyncio.sleep(0.1)
357+
358+
assert attempts[0] == 3 # Tried 3 times, succeeded on 3rd
359+
324360

325361
# --- Context Attributes ---
326362

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)