Skip to content

Commit b7c95e4

Browse files
authored
Allow to set env vars (#90)
1 parent 1b9413f commit b7c95e4

12 files changed

Lines changed: 376 additions & 55 deletions

File tree

docs/reference/task.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ For retry configuration (`retry`, `rate_limit_retry`) see [Task Retry](task_retr
2828

2929
::: fluid.scheduler.K8sConfig
3030

31+
::: fluid.scheduler.K8sResourceRequirements
32+
3133
::: fluid.scheduler.is_in_cpu_process

docs/tutorials/task_k8s.md

Lines changed: 79 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,36 @@ This offloads heavy computation to dedicated pods and keeps the consumer event l
77

88
The switch is automatic. When `KUBERNETES_SERVICE_HOST` is set (which Kubernetes injects into every pod) and the `k8s` extra is installed, any task declared with `cpu_bound=True` will spawn a Kubernetes Job instead of a subprocess. No code change is required in the task itself.
99

10-
The Job reuses the **targeted container's spec** from the task consumer deployment (image, resource limits, volume mounts, security context, image pull policy, and everything else). Only that container is included in the Job pod — sidecars and other containers from the deployment are dropped. The **pod-level init containers** and **volumes** are preserved as-is, so any setup performed by init containers (e.g. installing TLS certificates) is reproduced in the Job pod. Only these fields on the main container are overridden:
10+
The Job pod template is derived from the **task consumer deployment**. The implementation reads the deployment, locates the target container, and builds a Job spec from it. This means the Job inherits most of the container's configuration from the deployment — image, image pull policy, volume mounts, security context, and everything else — while only overriding the fields necessary to run the task.
1111

12-
| Field | Value |
12+
13+
**Inherited from the deployment container (unchanged):**
14+
15+
- Container image and image pull policy
16+
- Volume mounts (and pod-level volumes)
17+
- Environment variables (the task's env vars are appended, never replaced)
18+
- Security context
19+
- Everything else not listed below
20+
21+
**Overridden or cleared:**
22+
23+
| Field | Value in the Job |
1324
|---|---|
14-
| `command` | same as the deployment container, with any trailing `serve` token removed |
25+
| `command` | Same as the deployment, but any trailing `serve` token is removed |
1526
| `args` | `exec <task-name> --log --run-id <id> --params <json>` |
16-
| `env` | same as the deployment container, with `TASK_MANAGER_SPAWN=true` appended |
17-
| `liveness_probe` | cleared — probes are not meaningful for Job pods and could prematurely kill a long-running task |
18-
| `readiness_probe` | cleared |
27+
| `env` | Inherited from the deployment, then `TASK_MANAGER_SPAWN=true` appended, then any task-level `env` vars appended (see [Injecting environment variables](#injecting-environment-variables)) |
28+
| `resources` | Inherited from the deployment unless overridden via [`K8sConfig.resources`](#configuration) |
29+
| `liveness_probe` | Cleared — probes are not meaningful for Job pods and would prematurely kill long-running tasks |
30+
| `readiness_probe` | Cleared |
1931

20-
The `TASK_MANAGER_SPAWN=true` environment variable signals to the process running inside the Job that it is executing as a CPU-bound worker rather than a long-lived consumer.
32+
**Other containers** (sidecars) from the deployment are dropped — only the target container runs in the Job pod. **Pod-level init containers** and **volumes** are preserved, so any setup performed at pod startup (e.g. installing TLS certificates) is reproduced in the Job pod.
33+
34+
`TASK_MANAGER_SPAWN=true` signals to the process inside the Job that it is a CPU-bound worker rather than a long-lived consumer.
2135

2236
The Job is created in the same namespace as the consumer with:
2337

24-
- `backoff_limit: 0`: a failed pod is never retried; the error is propagated back to the task consumer instead
25-
- `ttlSecondsAfterFinished`: set from `K8sConfig.job_ttl`, the Job and its pods are cleaned up automatically after completion (default 300 s)
38+
- `backoff_limit: 0` a failed pod is never retried; the error is propagated back to the task consumer instead
39+
- `ttlSecondsAfterFinished`set from [`K8sConfig.job_ttl`](#configuration), the Job and its pods are cleaned up automatically after completion (default 300 s)
2640
- `restartPolicy: Never` on the pod template
2741

2842
The job name is derived from the task name and the first 7 characters of the run ID, slugified and capped at 63 characters to comply with Kubernetes DNS label requirements:
@@ -31,7 +45,7 @@ The job name is derived from the task name and the first 7 characters of the run
3145
task-<slugified-task-name>-<short-run-id>
3246
```
3347

34-
Once the Job is created, the consumer polls its status every `sleep` seconds until it either succeeds or fails.
48+
Once the Job is created, the consumer polls its status every [`K8sConfig.sleep`](#configuration) seconds until it either succeeds or fails.
3549

3650
## Installation
3751

@@ -48,27 +62,27 @@ from fluid.scheduler import task, TaskRun
4862

4963
@task(cpu_bound=True)
5064
async def heavy_calculation(ctx: TaskRun) -> None:
51-
# heavy CPU work here, runs in a k8s Job when inside a cluster
65+
# heavy CPU work here — runs in a k8s Job when inside a cluster,
66+
# or in a local subprocess when running outside one
5267
...
5368
```
5469

5570
## Configuration
5671

57-
K8s behaviour can be tuned per-task via the `k8s_config` argument which
58-
accepts a [K8sConfig][fluid.scheduler.K8sConfig] object:
72+
K8s behaviour can be tuned per-task via the `k8s_config` argument, which accepts a [K8sConfig][fluid.scheduler.K8sConfig] object:
5973

6074
```python
6175
from fluid.scheduler import task, TaskRun, K8sConfig
6276

6377
@task(
6478
cpu_bound=True,
6579
k8s_config=K8sConfig(
66-
namespace="workers", # namespace where the Job is created
67-
deployment="fluid-task", # deployment to copy the container spec from
68-
container="main", # container name inside the deployment
69-
job_ttl=600, # seconds to keep the Job after completion
70-
sleep=2.0, # polling interval while waiting for the Job
71-
resources={ # optional Kubernetes resource limits and requests for the container
80+
namespace="workers", # namespace where the Job is created
81+
deployment="fluid-task", # deployment to copy the container spec from
82+
container="main", # container name inside the deployment
83+
job_ttl=600, # seconds to keep the Job after completion (default 300)
84+
sleep=2.0, # polling interval while waiting for the Job (default 2.0)
85+
resources={ # override the container's resource spec (default: inherited from deployment)
7286
"limits": {"cpu": "2", "memory": "4Gi"},
7387
"requests": {"cpu": "1", "memory": "2Gi"},
7488
},
@@ -78,17 +92,54 @@ async def heavy_calculation(ctx: TaskRun) -> None:
7892
...
7993
```
8094

81-
If `k8s_config` is omitted, or any of the optional fields are not provided, the following environment variables are used:
95+
### K8sConfig fields
96+
97+
| Field | Type | Default | Description |
98+
|---|---|---|---|
99+
| `namespace` | `str` | `FLUID_TASK_CONSUMER_K8S_NAMESPACE` or `"default"` | Kubernetes namespace where the Job is created |
100+
| `deployment` | `str` | `FLUID_TASK_CONSUMER_K8S_DEPLOYMENT` or `"fluid-task"` | Deployment to read the container spec from |
101+
| `container` | `str` | `FLUID_TASK_CONSUMER_K8S_CONTAINER` or `"main"` | Container name within the deployment |
102+
| `resources` | `K8sResourceRequirements \| None` | `None` | Resource limits/requests for the Job container. If `None`, the deployment's existing resource spec is used unchanged |
103+
| `job_ttl` | `int` | `FLUID_TASK_CONSUMER_K8S_JOB_TTL` or `300` | Seconds to retain the Job after completion before automatic cleanup |
104+
| `sleep` | `float` | `FLUID_TASK_CONSUMER_K8S_SLEEP` or `2.0` | Polling interval in seconds while waiting for the Job to finish |
105+
106+
All `K8sConfig` fields have defaults drawn from environment variables, so a minimal deployment only needs to set those variables rather than hard-coding values per task.
107+
108+
If `k8s_config` is omitted entirely, a [K8sConfig][fluid.scheduler.K8sConfig] instance with all defaults is used.
109+
110+
### Resource overrides
111+
112+
The `resources` field accepts a [K8sResourceRequirements][fluid.scheduler.K8sResourceRequirements] dict with optional `limits` and `requests` keys:
113+
114+
```python
115+
resources={
116+
"limits": {"cpu": "4", "memory": "8Gi"},
117+
"requests": {"cpu": "500m", "memory": "1Gi"},
118+
}
119+
```
120+
121+
When not provided (the default), the Job container inherits the resource spec from the deployment container unchanged. This is useful for tasks that need more CPU or memory than the consumer pod is allocated.
122+
123+
## Injecting environment variables
124+
125+
Extra environment variables can be injected into the Job (or subprocess, when running outside a cluster) using the `env` argument on the [`@task`][fluid.scheduler.task] decorator:
126+
127+
```python
128+
from fluid.scheduler import task, TaskRun
129+
130+
@task(
131+
cpu_bound=True,
132+
env={"MODEL_PATH": "/mnt/models/v2", "LOG_LEVEL": "DEBUG"},
133+
)
134+
async def heavy_calculation(ctx: TaskRun) -> None:
135+
import os
136+
model_path = os.environ["MODEL_PATH"]
137+
...
138+
```
82139

83-
| Variable | Default | Description |
84-
|---|---|---|
85-
| `FLUID_TASK_CONSUMER_K8S_NAMESPACE` | `default` | Kubernetes namespace |
86-
| `FLUID_TASK_CONSUMER_K8S_DEPLOYMENT` | `fluid-task` | Deployment name |
87-
| `FLUID_TASK_CONSUMER_K8S_CONTAINER` | `main` | Container name |
88-
| `FLUID_TASK_CONSUMER_K8S_JOB_TTL` | `300` | Job TTL in seconds |
89-
| `FLUID_TASK_CONSUMER_K8S_SLEEP` | `2.0` | Polling interval in seconds |
140+
These variables are appended to the environment after the deployment's existing env vars and `TASK_MANAGER_SPAWN=true`, so they can override anything set in the deployment if needed.
90141

91-
If `resources` is not provided, the container's resource spec from the deployment is used as-is. If provided, it must be a [K8sResourceRequirements][fluid.scheduler.K8sResourceRequirements] with optional `limits` and `requests` keys, each mapping resource names (e.g. `"cpu"`, `"memory"`) to their string values.
142+
For subprocess execution (outside a cluster), they are merged into the spawned process's environment the same way, making task definitions portable across both runtimes without any conditional logic.
92143

93144
## Required RBAC permissions
94145

docs/tutorials/task_queue.md

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -94,30 +94,6 @@ if is_in_cpu_process():
9494

9595
Stdout and stderr from the subprocess are streamed back to the consumer in real time, so logs produced by the task appear in the consumer's output.
9696

97-
#### Timeout
98-
99-
CPU bound tasks respect the `timeout_seconds` parameter. If the subprocess has not finished within the timeout, it is killed and the task run transitions to the `failure` state.
100-
101-
```python
102-
@task(cpu_bound=True, timeout_seconds=300)
103-
async def slow_calculation(ctx: TaskRun) -> None:
104-
...
105-
```
106-
107-
The default timeout is **60 seconds**. For long-running tasks make sure to raise this to an appropriate value.
108-
109-
#### Concurrency control
110-
111-
Use `max_concurrency` to limit how many instances of a CPU bound task can run simultaneously. This is useful to prevent exhausting system resources when many tasks are queued at the same time.
112-
113-
```python
114-
@task(cpu_bound=True, max_concurrency=2)
115-
async def heavy_calculation(ctx: TaskRun) -> None:
116-
...
117-
```
118-
119-
A value of `0` (the default) means no limit.
120-
12197
#### Kubernetes
12298

12399
When the consumer is running inside a Kubernetes cluster, CPU bound tasks can be dispatched as Kubernetes Jobs instead of local subprocesses. See [K8s Jobs](task_k8s.md) for more details.
@@ -150,6 +126,46 @@ async def scheduled(ctx: TaskRun) -> None:
150126
await asyncio.sleep(0.1)
151127
```
152128

129+
## Timeout
130+
131+
All tasks, both IO and CPU bound, respect the `timeout_seconds` parameter (default **60 seconds**). The timeout is measured from when the task starts executing.
132+
133+
For IO bound tasks, `asyncio` raises a `TimeoutError` if the coroutine has not completed within the timeout, and the task run transitions to the `failure` state. For CPU bound tasks, the subprocess (or Kubernetes Job) is killed and the run likewise transitions to `failure`.
134+
135+
```python
136+
from fluid.scheduler import task, TaskRun
137+
138+
@task(timeout_seconds=300)
139+
async def slow_io_task(ctx: TaskRun) -> None:
140+
...
141+
142+
@task(cpu_bound=True, timeout_seconds=300)
143+
async def slow_cpu_task(ctx: TaskRun) -> None:
144+
...
145+
```
146+
147+
For long-running tasks make sure to raise `timeout_seconds` to an appropriate value.
148+
149+
## Concurrency control
150+
151+
Use `max_concurrency` to limit how many instances of a task can run simultaneously. This applies to both IO and CPU bound tasks, and is useful to avoid overwhelming downstream services or exhausting system resources when many tasks are queued at once.
152+
153+
```python
154+
from fluid.scheduler import task, TaskRun
155+
156+
@task(max_concurrency=5)
157+
async def fetch_data(ctx: TaskRun) -> None:
158+
...
159+
160+
@task(cpu_bound=True, max_concurrency=2)
161+
async def heavy_calculation(ctx: TaskRun) -> None:
162+
...
163+
```
164+
165+
A value of `0` (the default) means no limit.
166+
167+
When the limit is reached the task run transitions to the `rate_limited` state. To automatically retry rate-limited tasks, combine `max_concurrency` with `rate_limit_retry`. See [Task Retry](task_retry.md) for details.
168+
153169
## Aborting a task
154170

155171
Any task — IO or CPU bound — can signal a deliberate, non-error cancellation by calling [ctx.abort()][fluid.scheduler.TaskRun.abort]:
@@ -174,8 +190,8 @@ When this happens the task run transitions to the `aborted` [TaskState][fluid.sc
174190

175191
For CPU-bound tasks (subprocess or Kubernetes Job) the task function runs in a **separate process**, so the abort signal must be relayed back to the consumer. The mechanism works as follows:
176192

177-
1. The inner process calls `ctx.abort()`, which raises [TaskAbortedError][fluid.scheduler.TaskAbortedError].
193+
1. The inner process calls `ctx.abort()`, which raises [TaskAbortedError][fluid.scheduler.errors.TaskAbortedError].
178194
2. The consumer running *inside* that process catches the error and writes the reason to a short-lived Redis key (60-second TTL).
179-
3. After the subprocess or k8s Job exits, the outer consumer reads the Redis key. If an abort reason is found it re-raises [TaskAbortedError][fluid.scheduler.TaskAbortedError], marking the run as `aborted` instead of `success`.
195+
3. After the subprocess or k8s Job exits, the outer consumer reads the Redis key. If an abort reason is found it re-raises [TaskAbortedError][fluid.scheduler.errors.TaskAbortedError], marking the run as `aborted` instead of `success`.
180196

181197
This means a CPU-bound task that aborts itself is always correctly reflected as `aborted` in the task run state, regardless of whether it ran locally or as a Kubernetes Job.

docs/tutorials/workers.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,20 @@ To shut down a worker there are few possibilities.
2323

2424
* Direct call to the async [Worker.shutdown][fluid.utils.worker.Worker.shutdown] method which will trigger the graceful shutdown and wait for the worker to finish its work.
2525
* Call the [Worker.gracefully_stop][fluid.utils.worker.Worker.gracefully_stop] method which will trigger the graceful shutdown. Importantly, this method does not wait for the worker to finish its work, ti simply transition from the [WorkerState.RUNNING][fluid.utils.worker.WorkerState.RUNNING] to [WorkerState.STOPPING][fluid.utils.worker.WorkerState.STOPPING] state. To wait for the worker exit one should call the async [Worker.wait_for_shutdown][fluid.utils.worker.Worker.wait_for_shutdown] method (as in the example above)
26+
27+
## Async Context Manager
28+
29+
[Worker][fluid.utils.worker.Worker] implements the async context manager protocol. Entering the context calls [Worker.startup][fluid.utils.worker.Worker.startup] and exiting it calls [Worker.shutdown][fluid.utils.worker.Worker.shutdown], so the `async with` pattern is the most concise way to manage the full lifecycle:
30+
31+
```python
32+
async with MyWorker() as worker:
33+
# worker is running here
34+
...
35+
# worker is fully shut down here
36+
```
37+
38+
Resources that the worker needs for its entire lifetime can be opened and closed inside [Worker.run][fluid.utils.worker.Worker.run] using normal `async with` statements — no subclassing of lifecycle hooks is required. The example below subclasses [QueueConsumer][fluid.utils.worker.QueueConsumer] to build a worker that accepts text items via [send][fluid.utils.worker.QueueConsumer.send], opens an [AsyncAnthropic](https://github.com/anthropics/anthropic-sdk-python) client for the duration of `run`, and streams a one-sentence summary from Claude for each item:
39+
40+
```python
41+
--8<-- "./docs_src/worker_context_manager.py"
42+
```

docs_src/worker_context_manager.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import asyncio
2+
3+
import anthropic
4+
5+
from fluid.utils.worker import QueueConsumer
6+
7+
ARTICLES = [
8+
(
9+
"Async programming in Python allows multiple tasks to run concurrently "
10+
"within a single thread by yielding control at await points, which is "
11+
"particularly efficient for I/O-bound workloads such as HTTP requests "
12+
"and database queries."
13+
),
14+
(
15+
"Large language models are trained on vast corpora of text using "
16+
"self-supervised objectives, enabling them to learn grammar, facts, and "
17+
"reasoning patterns that can be adapted to a wide range of downstream "
18+
"tasks through prompting or fine-tuning."
19+
),
20+
(
21+
"The observer pattern decouples event producers from consumers by "
22+
"introducing an intermediary that maintains a list of subscribers and "
23+
"notifies them when state changes, making it straightforward to add new "
24+
"listeners without modifying the source."
25+
),
26+
]
27+
28+
29+
class AnthropicWorker(QueueConsumer[str]):
30+
31+
def __init__(self):
32+
super().__init__()
33+
self.results = []
34+
35+
async def run(self) -> None:
36+
async with anthropic.AsyncAnthropic() as client:
37+
while not self.is_stopping():
38+
item = await self.get_message()
39+
if item is None:
40+
continue
41+
if item == "":
42+
break
43+
async with client.messages.stream(
44+
model="claude-opus-4-7",
45+
max_tokens=128,
46+
messages=[
47+
{
48+
"role": "user",
49+
"content": f"Summarise in one sentence: {item}",
50+
}
51+
],
52+
) as stream:
53+
self.results.append(await stream.get_final_text())
54+
55+
56+
async def main() -> None:
57+
async with AnthropicWorker() as worker:
58+
for article in ARTICLES:
59+
worker.send(article)
60+
worker.send("") # signal end of input
61+
print(worker.results)
62+
63+
64+
if __name__ == "__main__":
65+
asyncio.run(main())

examples/tasks/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ async def datetime_task(context: TaskRun[DatetimeParams]) -> None:
105105
context.logger.info(f"Received datetime: {context.params.dt}")
106106

107107

108+
@task(cpu_bound=True, env={"FLUID_TEST_ENV": "hello_from_env"})
109+
async def cpu_bound_env(context: TaskRun) -> None:
110+
"""A CPU bound task that reads an env variable injected via the env argument"""
111+
broker = cast(RedisTaskBroker, context.task_manager.broker)
112+
await broker.redis_cli.setex(context.id, 10, os.environ.get("FLUID_TEST_ENV", ""))
113+
114+
108115
@task(cpu_bound=True)
109116
async def cpu_bound(context: TaskRun[Sleep]) -> None:
110117
"""A CPU bound task running on subprocess

fluid/scheduler/k8s_job.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ def k8s_job_pod_template(
104104
env = list(container.env or [])
105105
for name, value in cpu_env().items():
106106
env.append(client.V1EnvVar(name=name, value=value))
107+
for name, value in ctx.task.env.items():
108+
env.append(client.V1EnvVar(name=name, value=value))
107109
container.env = env
108110
container.liveness_probe = None # type: ignore[assignment]
109111
container.readiness_probe = None # type: ignore[assignment]

0 commit comments

Comments
 (0)