Skip to content

Commit 80847d9

Browse files
task scheduler
1 parent c0a59e3 commit 80847d9

15 files changed

Lines changed: 2289 additions & 767 deletions

examples/tasks.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from fastloop import ExecutorType, FastLoop, RetryPolicy
2+
3+
app = FastLoop(name="task-demo")
4+
5+
6+
@app.task(name="add", retry=RetryPolicy(max_attempts=3))
7+
async def add(a: int, b: int) -> int:
8+
return a + b
9+
10+
11+
@app.task(name="cpu_work", executor=ExecutorType.PROCESS)
12+
def cpu_work(n: int) -> int:
13+
total = 0
14+
for i in range(n):
15+
total += i * i
16+
return total
17+
18+
19+
@app.task(name="flaky", retry=RetryPolicy(max_attempts=5, initial_delay=0.5))
20+
async def flaky_task() -> str:
21+
import random
22+
23+
if random.random() < 0.5:
24+
raise Exception("Random failure")
25+
return "success"
26+
27+
28+
@app.schedule(name="cleanup", cron="0 * * * *")
29+
async def hourly_cleanup() -> dict:
30+
return {"cleaned": True}
31+
32+
33+
@app.schedule(name="heartbeat", interval=60)
34+
async def heartbeat() -> dict:
35+
return {"alive": True}
36+
37+
38+
async def demo():
39+
task_id = await app.invoke("add", a=2, b=3)
40+
result = await app.invoke("add", a=10, b=20, wait=True)
41+
schedule_id = await app.schedule_task(
42+
"add", cron="*/5 * * * *", args={"a": 1, "b": 2}
43+
)
44+
print(f"task_id={task_id}, result={result}, schedule_id={schedule_id}")
45+
46+
47+
if __name__ == "__main__":
48+
app.run(port=8112)

fastloop/__init__.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
1-
"""
2-
FastLoop - A framework for building event-driven loop applications.
3-
"""
4-
51
from . import integrations
62
from .context import LoopContext
73
from .fastloop import FastLoop
84
from .loop import Loop
95
from .models import LoopEvent, WorkflowBlock
10-
from .types import BlockPlan, RetryPolicy, ScheduleType
6+
from .scheduler import Schedule
7+
from .task import TaskResult
8+
from .types import BlockPlan, ExecutorType, RetryPolicy, ScheduleType, TaskStatus
119
from .workflow import Workflow
1210

1311
__all__ = [
1412
"BlockPlan",
13+
"ExecutorType",
1514
"FastLoop",
1615
"Loop",
1716
"LoopContext",
1817
"LoopEvent",
1918
"RetryPolicy",
19+
"Schedule",
2020
"ScheduleType",
21+
"TaskResult",
22+
"TaskStatus",
2123
"Workflow",
2224
"WorkflowBlock",
2325
"integrations",

fastloop/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
DEFAULT_SYSTEM_CONFIG_DIR = "/etc/fastloop.d/"
88
LEASE_TTL_S = 30
99
LEASE_HEARTBEAT_INTERVAL_S = 10
10+
MAX_EVENT_HISTORY = 1000

fastloop/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class WorkflowNotFoundError(Exception):
6666
pass
6767

6868

69+
class TaskNotFoundError(Exception):
70+
pass
71+
72+
6973
class WorkflowMaxRetriesError(Exception):
7074
def __init__(
7175
self,

fastloop/executor.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import asyncio
2+
from collections.abc import Callable
3+
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
4+
from functools import partial
5+
from typing import Any
6+
7+
from .types import ExecutorType
8+
9+
_thread_pool: ThreadPoolExecutor | None = None
10+
_process_pool: ProcessPoolExecutor | None = None
11+
12+
13+
def _get_pool(executor_type: ExecutorType) -> ThreadPoolExecutor | ProcessPoolExecutor:
14+
global _thread_pool, _process_pool
15+
if executor_type == ExecutorType.THREAD:
16+
if _thread_pool is None:
17+
_thread_pool = ThreadPoolExecutor(max_workers=4)
18+
return _thread_pool
19+
if _process_pool is None:
20+
_process_pool = ProcessPoolExecutor(max_workers=4)
21+
return _process_pool
22+
23+
24+
async def run_in_executor(
25+
executor_type: ExecutorType,
26+
func: Callable[..., Any],
27+
*args: Any,
28+
**kwargs: Any,
29+
) -> Any:
30+
if executor_type == ExecutorType.ASYNC:
31+
if asyncio.iscoroutinefunction(func):
32+
return await func(*args, **kwargs)
33+
return func(*args, **kwargs)
34+
35+
loop = asyncio.get_event_loop()
36+
return await loop.run_in_executor(
37+
_get_pool(executor_type), partial(func, *args, **kwargs)
38+
)
39+
40+
41+
def shutdown_executors() -> None:
42+
global _thread_pool, _process_pool
43+
if _thread_pool:
44+
_thread_pool.shutdown(wait=False)
45+
_thread_pool = None
46+
if _process_pool:
47+
_process_pool.shutdown(wait=False)
48+
_process_pool = None

fastloop/fastloop.py

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@
2929
from .exceptions import (
3030
LoopAlreadyDefinedError,
3131
LoopNotFoundError,
32+
TaskNotFoundError,
3233
WorkflowNotFoundError,
3334
)
3435
from .integrations import Integration
3536
from .logging import configure_logging, setup_logger
3637
from .loop import Loop, LoopManager
3738
from .models import LoopEvent
3839
from .monitor import LoopMonitor
40+
from .scheduler import Schedule, validate_cron
3941
from .state.state import StateManager, create_state_manager
40-
from .types import BaseConfig, LoopStatus, RetryPolicy
42+
from .task import TaskManager, TaskResult
43+
from .types import BaseConfig, ExecutorType, LoopStatus, RetryPolicy
4144
from .utils import get_func_import_path, import_func_from_path, infer_application_path
4245
from .workflow import Workflow, WorkflowManager
4346

@@ -83,6 +86,7 @@ async def lifespan(_: FastAPI):
8386
self._monitor_task.cancel()
8487
await self.loop_manager.stop_all()
8588
await self.workflow_manager.stop_all()
89+
await self.task_manager.stop_all()
8690

8791
super().__init__(*args, **kwargs, lifespan=lifespan)
8892

@@ -102,10 +106,12 @@ async def lifespan(_: FastAPI):
102106
)
103107
self.loop_manager: LoopManager = LoopManager(self.config, self.state_manager)
104108
self.workflow_manager: WorkflowManager = WorkflowManager(self.state_manager)
109+
self.task_manager: TaskManager = TaskManager(self.state_manager)
105110
self._monitor_task: asyncio.Task[None] | None = None
106111
self._loop_start_func: Callable[[LoopContext], None] | None = None
107112
self._loop_metadata: dict[str, dict[str, Any]] = {}
108113
self._workflow_metadata: dict[str, dict[str, Any]] = {}
114+
self._task_metadata: dict[str, dict[str, Any]] = {}
109115

110116
configure_logging(
111117
pretty_print=self.config_manager.get("prettyPrintLogs", False)
@@ -802,3 +808,139 @@ async def restart_workflow(self, workflow_run_id: str) -> bool:
802808
extra={"workflow_run_id": workflow_run_id, "error": str(e)},
803809
)
804810
return False
811+
812+
def task(
813+
self,
814+
name: str,
815+
retry: RetryPolicy | None = None,
816+
executor: ExecutorType = ExecutorType.ASYNC,
817+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
818+
"""Register a task. Creates POST /{name} and GET /{name}/{task_id} endpoints."""
819+
820+
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
821+
if name in self._task_metadata:
822+
raise LoopAlreadyDefinedError(f"Task {name} already registered")
823+
824+
self._task_metadata[name] = {
825+
"func": func,
826+
"retry": retry,
827+
"executor": executor,
828+
}
829+
830+
async def _invoke_handler(request: dict[str, Any]):
831+
result = await self.task_manager.submit(
832+
func=func,
833+
args=request,
834+
task_name=name,
835+
retry_policy=retry,
836+
executor_type=executor,
837+
)
838+
return JSONResponse(
839+
content={"task_id": result.task_id, "status": "pending"},
840+
status_code=HTTPStatus.ACCEPTED,
841+
)
842+
843+
async def _status_handler(task_id: str):
844+
try:
845+
task = await self.state_manager.get_task(task_id)
846+
return JSONResponse(content=task.to_dict())
847+
except TaskNotFoundError as e:
848+
raise HTTPException(
849+
status_code=HTTPStatus.NOT_FOUND, detail=str(e)
850+
) from e
851+
852+
self.add_api_route(f"/{name}", _invoke_handler, methods=["POST"])
853+
self.add_api_route(f"/{name}/{{task_id}}", _status_handler, methods=["GET"])
854+
855+
return func
856+
857+
return _decorator
858+
859+
async def invoke(
860+
self,
861+
task_name: str,
862+
wait: bool = False,
863+
timeout: float | None = None,
864+
**kwargs: Any,
865+
) -> str | TaskResult | Any:
866+
"""Invoke a task. Returns task_id if wait=False, else waits and returns result."""
867+
if task_name not in self._task_metadata:
868+
raise ValueError(f"Unknown task: {task_name}")
869+
870+
metadata = self._task_metadata[task_name]
871+
handle = await self.task_manager.submit(
872+
func=metadata["func"],
873+
args=kwargs,
874+
task_name=task_name,
875+
retry_policy=metadata.get("retry"),
876+
executor_type=metadata.get("executor", ExecutorType.ASYNC),
877+
)
878+
879+
if wait:
880+
return await handle.result(timeout=timeout)
881+
882+
return handle.task_id
883+
884+
def schedule(
885+
self,
886+
name: str,
887+
cron: str | None = None,
888+
interval: float | None = None,
889+
retry: RetryPolicy | None = None,
890+
executor: ExecutorType = ExecutorType.ASYNC,
891+
args: dict[str, Any] | None = None,
892+
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
893+
"""Register a scheduled task. Use cron="*/5 * * * *" or interval=60."""
894+
if not cron and not interval:
895+
raise ValueError("Must specify either cron or interval")
896+
897+
if cron and not validate_cron(cron):
898+
raise ValueError(f"Invalid cron expression: {cron}")
899+
900+
def _decorator(func: Callable[..., Any]) -> Callable[..., Any]:
901+
self.task(name=name, retry=retry, executor=executor)(func)
902+
903+
schedule = Schedule(
904+
task_name=name,
905+
cron=cron,
906+
interval_seconds=interval,
907+
args=args or {},
908+
)
909+
910+
self._task_metadata[name]["schedule"] = schedule
911+
912+
return func
913+
914+
return _decorator
915+
916+
async def schedule_task(
917+
self,
918+
task_name: str,
919+
cron: str | None = None,
920+
interval: float | None = None,
921+
args: dict[str, Any] | None = None,
922+
schedule_id: str | None = None,
923+
) -> str:
924+
"""Programmatically schedule a registered task. Returns the schedule_id."""
925+
if task_name not in self._task_metadata:
926+
raise ValueError(f"Unknown task: {task_name}")
927+
928+
if not cron and not interval:
929+
raise ValueError("Must specify either cron or interval")
930+
931+
if cron and not validate_cron(cron):
932+
raise ValueError(f"Invalid cron expression: {cron}")
933+
934+
sid = schedule_id or task_name
935+
schedule = Schedule(
936+
task_name=task_name,
937+
cron=cron,
938+
interval_seconds=interval,
939+
args=args or {},
940+
)
941+
await self.state_manager.save_schedule(sid, schedule)
942+
return sid
943+
944+
async def unschedule_task(self, schedule_id: str) -> None:
945+
"""Remove a scheduled task by its schedule_id."""
946+
await self.state_manager.delete_schedule(schedule_id)

0 commit comments

Comments
 (0)