The fluid.scheduler module is a simple yet powerful distributed task producer ([TaskScheduler][fluid.scheduler.TaskScheduler]) and consumer ([TaskConsumer][fluid.scheduler.TaskConsumer]) system for executing tasks.
The middleware for distributing tasks can be configured via the [TaskBroker][fluid.scheduler.TaskBroker] interface.
A redis task broker is provided for convenience.
Create a task consumer, register tasks from modules, and run the consumer.
import asyncio
from typing import Any
from fluid.scheduler import TaskConsumer
import task_module_a, task_module_b
def task_consumer(**kwargs: Any) -> TaskConsumer:
consumer = TaskConsumer(**kwargs)
consumer.register_from_module(task_module_a)
consumer.register_from_module(task_module_b)
return consumer
if __name__ == "__main__":
consumer = task_consumer()
asyncio.run(consumer.run())A [TaskManager][fluid.scheduler.TaskManager] can be integrated with FastAPI so that tasks can be queued via HTTP requests.
To setup the FastAPI app, use the [task_manager_fastapi][fluid.scheduler.task_manager_fastapi] function:
import uvicorn
from fluid.scheduler import task_manager_fastapi
if __name__ == "__main__":
consumer = task_consumer()
app = task_manager_fastapi(consumer)
uvicorn.run(app)You can test via the example provided
$ python -m examples.simple_fastapiand check the openapi UI at http://127.0.0.1:8000/docs.
The GET /tasks endpoint lists registered tasks and accepts a repeatable tags
query parameter to only return tasks that have at least one of the given tags:
GET /tasks
GET /tasks?tags=fast&tags=slow
The [TaskConsumer][fluid.scheduler.TaskConsumer] or [TaskScheduler][fluid.scheduler.TaskScheduler] can be run with the command line tool to allow for an even richer API.
from fluid.scheduler.cli import TaskManagerCLI
from fluid.scheduler import task_manager_fastapi
if __name__ == "__main__":
consumer = task_consumer()
TaskManagerCLI(task_manager_fastapi(consumer))()This features requires to install the package with the cli extra.
$ pip install aio-fluid[cli]$ python -m examples.simple_cli
Usage: python -m examples.simple_cli [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
enable Enable or disable a task
exec Execute a registered task
ls List all tasks with their schedules
serve Start app server.The command line tool provides a powerful interface to execute tasks, parameters are passed as optional arguments using the standard click interface.
Plugins extend the task manager with additional behaviour by hooking into task lifecycle events. A plugin implements the [TaskManagerPlugin][fluid.scheduler.TaskManagerPlugin] interface and is registered via [TaskManager.with_plugin][fluid.scheduler.TaskManager.with_plugin].
The [TaskDbPlugin][fluid.scheduler.db.TaskDbPlugin] stores every task run in a database table so you can query task history, audit outcomes, and build dashboards on top of the data.
It requires a [CrudDB][fluid.db.CrudDB] instance and the db extra:
pip install aio-fluid[db]Register the plugin when building your task manager:
from fluid.scheduler import TaskScheduler, task_manager_fastapi
from fluid.scheduler.db import TaskDbPlugin, with_task_history_router
from fluid.db import CrudDB
db = CrudDB.from_env()
task_manager = TaskScheduler(...)
task_manager.with_plugin(TaskDbPlugin(db))
app = task_manager_fastapi(task_manager)
with_task_history_router(app)The plugin creates a fluid_tasks table (configurable via table_name) and
persists a row for each task run as it moves through its lifecycle states.
Tasks tagged with skip_db are excluded from persistence.
with_task_history_router mounts a /history router on the app with two endpoints:
| Method | Path | Description |
|---|---|---|
GET |
/history |
List task run history with optional filters |
GET |
/history/{run_id} |
Fetch a single task run by ID |
The list endpoint accepts the following query parameters:
| Parameter | Type | Description |
|---|---|---|
name |
string |
Filter by task name |
state |
TaskState |
Filter by task state (e.g. success, failure) |
start |
datetime |
Only runs queued at or after this time |
end |
datetime |
Only runs queued at or before this time |
Example requests:
# All history, most recent first
GET /history
# Only successful runs of the "add" task
GET /history?name=add&state=success
# Runs queued in a specific time window
GET /history?start=2024-01-01T00:00:00Z&end=2024-01-02T00:00:00Z
# Fetch a specific run by ID
GET /history/abc123To create your own plugin, subclass [TaskManagerPlugin][fluid.scheduler.TaskManagerPlugin]
and implement the register method. Use
[TaskManager.register_async_handler][fluid.scheduler.TaskManager.register_async_handler]
to subscribe to task lifecycle events:
from fluid.scheduler import TaskManagerPlugin, TaskManager, TaskState
from fluid.utils.dispatcher import Event
class MyPlugin(TaskManagerPlugin):
def register(self, task_manager: TaskManager) -> None:
task_manager.register_async_handler(
Event(TaskState.success, "my_plugin"),
self._on_success,
)
async def _on_success(self, task_run) -> None:
print(f"Task {task_run.name} succeeded")