Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ For retry configuration (`retry`, `rate_limit_retry`) see [Task Retry](task_retr

::: fluid.scheduler.K8sConfig

::: fluid.scheduler.K8sResourceRequirements

::: fluid.scheduler.is_in_cpu_process
107 changes: 79 additions & 28 deletions docs/tutorials/task_k8s.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,36 @@ This offloads heavy computation to dedicated pods and keeps the consumer event l

The switch is automatic. When `KUBERNETES_SERVICE_HOST` is set (which Kubernetes injects into every pod) and the `k8s` extra is installed, any task declared with `cpu_bound=True` will spawn a Kubernetes Job instead of a subprocess. No code change is required in the task itself.

The Job reuses the **targeted container's spec** from the task consumer deployment (image, resource limits, volume mounts, security context, image pull policy, and everything else). Only that container is included in the Job pod — sidecars and other containers from the deployment are dropped. The **pod-level init containers** and **volumes** are preserved as-is, so any setup performed by init containers (e.g. installing TLS certificates) is reproduced in the Job pod. Only these fields on the main container are overridden:
The Job pod template is derived from the **task consumer deployment**. The implementation reads the deployment, locates the target container, and builds a Job spec from it. This means the Job inherits most of the container's configuration from the deployment — image, image pull policy, volume mounts, security context, and everything else — while only overriding the fields necessary to run the task.

| Field | Value |

**Inherited from the deployment container (unchanged):**

- Container image and image pull policy
- Volume mounts (and pod-level volumes)
- Environment variables (the task's env vars are appended, never replaced)
- Security context
- Everything else not listed below

**Overridden or cleared:**

| Field | Value in the Job |
|---|---|
| `command` | same as the deployment container, with any trailing `serve` token removed |
| `command` | Same as the deployment, but any trailing `serve` token is removed |
| `args` | `exec <task-name> --log --run-id <id> --params <json>` |
| `env` | same as the deployment container, with `TASK_MANAGER_SPAWN=true` appended |
| `liveness_probe` | cleared — probes are not meaningful for Job pods and could prematurely kill a long-running task |
| `readiness_probe` | cleared |
| `env` | Inherited from the deployment, then `TASK_MANAGER_SPAWN=true` appended, then any task-level `env` vars appended (see [Injecting environment variables](#injecting-environment-variables)) |
| `resources` | Inherited from the deployment unless overridden via [`K8sConfig.resources`](#configuration) |
| `liveness_probe` | Cleared — probes are not meaningful for Job pods and would prematurely kill long-running tasks |
| `readiness_probe` | Cleared |

The `TASK_MANAGER_SPAWN=true` environment variable signals to the process running inside the Job that it is executing as a CPU-bound worker rather than a long-lived consumer.
**Other containers** (sidecars) from the deployment are dropped — only the target container runs in the Job pod. **Pod-level init containers** and **volumes** are preserved, so any setup performed at pod startup (e.g. installing TLS certificates) is reproduced in the Job pod.

`TASK_MANAGER_SPAWN=true` signals to the process inside the Job that it is a CPU-bound worker rather than a long-lived consumer.

The Job is created in the same namespace as the consumer with:

- `backoff_limit: 0`: a failed pod is never retried; the error is propagated back to the task consumer instead
- `ttlSecondsAfterFinished`: set from `K8sConfig.job_ttl`, the Job and its pods are cleaned up automatically after completion (default 300 s)
- `backoff_limit: 0` a failed pod is never retried; the error is propagated back to the task consumer instead
- `ttlSecondsAfterFinished`set from [`K8sConfig.job_ttl`](#configuration), the Job and its pods are cleaned up automatically after completion (default 300 s)
- `restartPolicy: Never` on the pod template

The job name is derived from the task name and the first 7 characters of the run ID, slugified and capped at 63 characters to comply with Kubernetes DNS label requirements:
Expand All @@ -31,7 +45,7 @@ The job name is derived from the task name and the first 7 characters of the run
task-<slugified-task-name>-<short-run-id>
```

Once the Job is created, the consumer polls its status every `sleep` seconds until it either succeeds or fails.
Once the Job is created, the consumer polls its status every [`K8sConfig.sleep`](#configuration) seconds until it either succeeds or fails.

## Installation

Expand All @@ -48,27 +62,27 @@ from fluid.scheduler import task, TaskRun

@task(cpu_bound=True)
async def heavy_calculation(ctx: TaskRun) -> None:
# heavy CPU work here, runs in a k8s Job when inside a cluster
# heavy CPU work here — runs in a k8s Job when inside a cluster,
# or in a local subprocess when running outside one
...
```

## Configuration

K8s behaviour can be tuned per-task via the `k8s_config` argument which
accepts a [K8sConfig][fluid.scheduler.K8sConfig] object:
K8s behaviour can be tuned per-task via the `k8s_config` argument, which accepts a [K8sConfig][fluid.scheduler.K8sConfig] object:

```python
from fluid.scheduler import task, TaskRun, K8sConfig

@task(
cpu_bound=True,
k8s_config=K8sConfig(
namespace="workers", # namespace where the Job is created
deployment="fluid-task", # deployment to copy the container spec from
container="main", # container name inside the deployment
job_ttl=600, # seconds to keep the Job after completion
sleep=2.0, # polling interval while waiting for the Job
resources={ # optional Kubernetes resource limits and requests for the container
namespace="workers", # namespace where the Job is created
deployment="fluid-task", # deployment to copy the container spec from
container="main", # container name inside the deployment
job_ttl=600, # seconds to keep the Job after completion (default 300)
sleep=2.0, # polling interval while waiting for the Job (default 2.0)
resources={ # override the container's resource spec (default: inherited from deployment)
"limits": {"cpu": "2", "memory": "4Gi"},
"requests": {"cpu": "1", "memory": "2Gi"},
},
Expand All @@ -78,17 +92,54 @@ async def heavy_calculation(ctx: TaskRun) -> None:
...
```

If `k8s_config` is omitted, or any of the optional fields are not provided, the following environment variables are used:
### K8sConfig fields

| Field | Type | Default | Description |
|---|---|---|---|
| `namespace` | `str` | `FLUID_TASK_CONSUMER_K8S_NAMESPACE` or `"default"` | Kubernetes namespace where the Job is created |
| `deployment` | `str` | `FLUID_TASK_CONSUMER_K8S_DEPLOYMENT` or `"fluid-task"` | Deployment to read the container spec from |
| `container` | `str` | `FLUID_TASK_CONSUMER_K8S_CONTAINER` or `"main"` | Container name within the deployment |
| `resources` | `K8sResourceRequirements \| None` | `None` | Resource limits/requests for the Job container. If `None`, the deployment's existing resource spec is used unchanged |
| `job_ttl` | `int` | `FLUID_TASK_CONSUMER_K8S_JOB_TTL` or `300` | Seconds to retain the Job after completion before automatic cleanup |
| `sleep` | `float` | `FLUID_TASK_CONSUMER_K8S_SLEEP` or `2.0` | Polling interval in seconds while waiting for the Job to finish |

All `K8sConfig` fields have defaults drawn from environment variables, so a minimal deployment only needs to set those variables rather than hard-coding values per task.

If `k8s_config` is omitted entirely, a [K8sConfig][fluid.scheduler.K8sConfig] instance with all defaults is used.

### Resource overrides

The `resources` field accepts a [K8sResourceRequirements][fluid.scheduler.K8sResourceRequirements] dict with optional `limits` and `requests` keys:

```python
resources={
"limits": {"cpu": "4", "memory": "8Gi"},
"requests": {"cpu": "500m", "memory": "1Gi"},
}
```

When not provided (the default), the Job container inherits the resource spec from the deployment container unchanged. This is useful for tasks that need more CPU or memory than the consumer pod is allocated.

## Injecting environment variables

Extra environment variables can be injected into the Job (or subprocess, when running outside a cluster) using the `env` argument on the [`@task`][fluid.scheduler.task] decorator:

```python
from fluid.scheduler import task, TaskRun

@task(
cpu_bound=True,
env={"MODEL_PATH": "/mnt/models/v2", "LOG_LEVEL": "DEBUG"},
)
async def heavy_calculation(ctx: TaskRun) -> None:
import os
model_path = os.environ["MODEL_PATH"]
...
```

| Variable | Default | Description |
|---|---|---|
| `FLUID_TASK_CONSUMER_K8S_NAMESPACE` | `default` | Kubernetes namespace |
| `FLUID_TASK_CONSUMER_K8S_DEPLOYMENT` | `fluid-task` | Deployment name |
| `FLUID_TASK_CONSUMER_K8S_CONTAINER` | `main` | Container name |
| `FLUID_TASK_CONSUMER_K8S_JOB_TTL` | `300` | Job TTL in seconds |
| `FLUID_TASK_CONSUMER_K8S_SLEEP` | `2.0` | Polling interval in seconds |
These variables are appended to the environment after the deployment's existing env vars and `TASK_MANAGER_SPAWN=true`, so they can override anything set in the deployment if needed.

If `resources` is not provided, the container's resource spec from the deployment is used as-is. If provided, it must be a [K8sResourceRequirements][fluid.scheduler.K8sResourceRequirements] with optional `limits` and `requests` keys, each mapping resource names (e.g. `"cpu"`, `"memory"`) to their string values.
For subprocess execution (outside a cluster), they are merged into the spawned process's environment the same way, making task definitions portable across both runtimes without any conditional logic.

## Required RBAC permissions

Expand Down
68 changes: 42 additions & 26 deletions docs/tutorials/task_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,6 @@ if is_in_cpu_process():

Stdout and stderr from the subprocess are streamed back to the consumer in real time, so logs produced by the task appear in the consumer's output.

#### Timeout

CPU bound tasks respect the `timeout_seconds` parameter. If the subprocess has not finished within the timeout, it is killed and the task run transitions to the `failure` state.

```python
@task(cpu_bound=True, timeout_seconds=300)
async def slow_calculation(ctx: TaskRun) -> None:
...
```

The default timeout is **60 seconds**. For long-running tasks make sure to raise this to an appropriate value.

#### Concurrency control

Use `max_concurrency` to limit how many instances of a CPU bound task can run simultaneously. This is useful to prevent exhausting system resources when many tasks are queued at the same time.

```python
@task(cpu_bound=True, max_concurrency=2)
async def heavy_calculation(ctx: TaskRun) -> None:
...
```

A value of `0` (the default) means no limit.

#### Kubernetes

When the consumer is running inside a Kubernetes cluster, CPU bound tasks can be dispatched as Kubernetes Jobs instead of local subprocesses. See [K8s Jobs](task_k8s.md) for more details.
Expand Down Expand Up @@ -150,6 +126,46 @@ async def scheduled(ctx: TaskRun) -> None:
await asyncio.sleep(0.1)
```

## Timeout

All tasks, both IO and CPU bound, respect the `timeout_seconds` parameter (default **60 seconds**). The timeout is measured from when the task starts executing.

For IO bound tasks, `asyncio` raises a `TimeoutError` if the coroutine has not completed within the timeout, and the task run transitions to the `failure` state. For CPU bound tasks, the subprocess (or Kubernetes Job) is killed and the run likewise transitions to `failure`.

```python
from fluid.scheduler import task, TaskRun

@task(timeout_seconds=300)
async def slow_io_task(ctx: TaskRun) -> None:
...

@task(cpu_bound=True, timeout_seconds=300)
async def slow_cpu_task(ctx: TaskRun) -> None:
...
```

For long-running tasks make sure to raise `timeout_seconds` to an appropriate value.

## Concurrency control

Use `max_concurrency` to limit how many instances of a task can run simultaneously. This applies to both IO and CPU bound tasks, and is useful to avoid overwhelming downstream services or exhausting system resources when many tasks are queued at once.

```python
from fluid.scheduler import task, TaskRun

@task(max_concurrency=5)
async def fetch_data(ctx: TaskRun) -> None:
...

@task(cpu_bound=True, max_concurrency=2)
async def heavy_calculation(ctx: TaskRun) -> None:
...
```

A value of `0` (the default) means no limit.

When the limit is reached the task run transitions to the `rate_limited` state. To automatically retry rate-limited tasks, combine `max_concurrency` with `rate_limit_retry`. See [Task Retry](task_retry.md) for details.

## Aborting a task

Any task — IO or CPU bound — can signal a deliberate, non-error cancellation by calling [ctx.abort()][fluid.scheduler.TaskRun.abort]:
Expand All @@ -174,8 +190,8 @@ When this happens the task run transitions to the `aborted` [TaskState][fluid.sc

For CPU-bound tasks (subprocess or Kubernetes Job) the task function runs in a **separate process**, so the abort signal must be relayed back to the consumer. The mechanism works as follows:

1. The inner process calls `ctx.abort()`, which raises [TaskAbortedError][fluid.scheduler.TaskAbortedError].
1. The inner process calls `ctx.abort()`, which raises [TaskAbortedError][fluid.scheduler.errors.TaskAbortedError].
2. The consumer running *inside* that process catches the error and writes the reason to a short-lived Redis key (60-second TTL).
3. After the subprocess or k8s Job exits, the outer consumer reads the Redis key. If an abort reason is found it re-raises [TaskAbortedError][fluid.scheduler.TaskAbortedError], marking the run as `aborted` instead of `success`.
3. After the subprocess or k8s Job exits, the outer consumer reads the Redis key. If an abort reason is found it re-raises [TaskAbortedError][fluid.scheduler.errors.TaskAbortedError], marking the run as `aborted` instead of `success`.

This means a CPU-bound task that aborts itself is always correctly reflected as `aborted` in the task run state, regardless of whether it ran locally or as a Kubernetes Job.
17 changes: 17 additions & 0 deletions docs/tutorials/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,20 @@ To shut down a worker there are few possibilities.

* Direct call to the async [Worker.shutdown][fluid.utils.worker.Worker.shutdown] method which will trigger the graceful shutdown and wait for the worker to finish its work.
* Call the [Worker.gracefully_stop][fluid.utils.worker.Worker.gracefully_stop] method which will trigger the graceful shutdown. Importantly, this method does not wait for the worker to finish its work, ti simply transition from the [WorkerState.RUNNING][fluid.utils.worker.WorkerState.RUNNING] to [WorkerState.STOPPING][fluid.utils.worker.WorkerState.STOPPING] state. To wait for the worker exit one should call the async [Worker.wait_for_shutdown][fluid.utils.worker.Worker.wait_for_shutdown] method (as in the example above)

## Async Context Manager

[Worker][fluid.utils.worker.Worker] implements the async context manager protocol. Entering the context calls [Worker.startup][fluid.utils.worker.Worker.startup] and exiting it calls [Worker.shutdown][fluid.utils.worker.Worker.shutdown], so the `async with` pattern is the most concise way to manage the full lifecycle:

```python
async with MyWorker() as worker:
# worker is running here
...
# worker is fully shut down here
```

Resources that the worker needs for its entire lifetime can be opened and closed inside [Worker.run][fluid.utils.worker.Worker.run] using normal `async with` statements — no subclassing of lifecycle hooks is required. The example below subclasses [QueueConsumer][fluid.utils.worker.QueueConsumer] to build a worker that accepts text items via [send][fluid.utils.worker.QueueConsumer.send], opens an [AsyncAnthropic](https://github.com/anthropics/anthropic-sdk-python) client for the duration of `run`, and streams a one-sentence summary from Claude for each item:

```python
--8<-- "./docs_src/worker_context_manager.py"
```
65 changes: 65 additions & 0 deletions docs_src/worker_context_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio

import anthropic

from fluid.utils.worker import QueueConsumer

ARTICLES = [
(
"Async programming in Python allows multiple tasks to run concurrently "
"within a single thread by yielding control at await points, which is "
"particularly efficient for I/O-bound workloads such as HTTP requests "
"and database queries."
),
(
"Large language models are trained on vast corpora of text using "
"self-supervised objectives, enabling them to learn grammar, facts, and "
"reasoning patterns that can be adapted to a wide range of downstream "
"tasks through prompting or fine-tuning."
),
(
"The observer pattern decouples event producers from consumers by "
"introducing an intermediary that maintains a list of subscribers and "
"notifies them when state changes, making it straightforward to add new "
"listeners without modifying the source."
),
]


class AnthropicWorker(QueueConsumer[str]):

def __init__(self):
super().__init__()
self.results = []

async def run(self) -> None:
async with anthropic.AsyncAnthropic() as client:
while not self.is_stopping():
item = await self.get_message()
if item is None:
continue
if item == "":
break
async with client.messages.stream(
model="claude-opus-4-7",
max_tokens=128,
messages=[
{
"role": "user",
"content": f"Summarise in one sentence: {item}",
}
],
) as stream:
self.results.append(await stream.get_final_text())


async def main() -> None:
async with AnthropicWorker() as worker:
for article in ARTICLES:
worker.send(article)
worker.send("") # signal end of input
print(worker.results)


if __name__ == "__main__":
asyncio.run(main())
7 changes: 7 additions & 0 deletions examples/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ async def datetime_task(context: TaskRun[DatetimeParams]) -> None:
context.logger.info(f"Received datetime: {context.params.dt}")


@task(cpu_bound=True, env={"FLUID_TEST_ENV": "hello_from_env"})
async def cpu_bound_env(context: TaskRun) -> None:
"""A CPU bound task that reads an env variable injected via the env argument"""
broker = cast(RedisTaskBroker, context.task_manager.broker)
await broker.redis_cli.setex(context.id, 10, os.environ.get("FLUID_TEST_ENV", ""))


@task(cpu_bound=True)
async def cpu_bound(context: TaskRun[Sleep]) -> None:
"""A CPU bound task running on subprocess
Expand Down
2 changes: 2 additions & 0 deletions fluid/scheduler/k8s_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def k8s_job_pod_template(
env = list(container.env or [])
for name, value in cpu_env().items():
env.append(client.V1EnvVar(name=name, value=value))
for name, value in ctx.task.env.items():
env.append(client.V1EnvVar(name=name, value=value))
container.env = env
container.liveness_probe = None # type: ignore[assignment]
container.readiness_probe = None # type: ignore[assignment]
Expand Down
Loading
Loading