Skip to content

Commit ed185f9

Browse files
committed
Persistent storage
1 parent 13aa729 commit ed185f9

4 files changed

Lines changed: 96 additions & 0 deletions

File tree

docs/FEATURES.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,28 @@ offwork worker --backend redis://localhost:6379 --sandbox # run with isolation
210210

211211
See [Sandbox](SANDBOX.md) for configuration and management.
212212

213+
## Persistent storage
214+
215+
When a worker provides a persistent mount (for example the hosted broker gives each user a private volume), tasks reach it through `offwork.storage_path()`:
216+
217+
```python
218+
import offwork
219+
220+
@offwork.task
221+
def cache_model(url: str) -> str:
222+
dest = offwork.storage_path("models", "weights.bin")
223+
dest.parent.mkdir(parents=True, exist_ok=True) # subdirectories are yours to create
224+
if not dest.exists():
225+
download(url, dest)
226+
return str(dest)
227+
```
228+
229+
`storage_path()` returns the storage root, creating it if needed. Passing path
230+
parts joins them onto the root without creating them. The location comes from
231+
the `OFFWORK_STORAGE` environment variable, falling back to `./offwork-storage`
232+
when unset, so the same task code works locally and on a worker with a mounted
233+
volume.
234+
213235
## Signing
214236

215237
Pre-shared token or PIN-based pairing + HMAC-SHA256 — workers reject untrusted or tampered tasks:

offwork/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
from offwork.core.version import _VERSION
6363
from offwork.core.progress import ProgressInfo
6464
from offwork.core.progress import progress as progress
65+
from offwork.core.storage import storage_path as storage_path
6566
from offwork.core._timeout import TimeoutIn, WaitForever, ReturnImmediately
6667
from offwork.worker.remote import serve, connect, disconnect, _ConnectionContext
6768
from offwork.worker.result import Result, ResultEnvelope
@@ -130,6 +131,7 @@ def pack(func: Callable[..., object], *args: Any, **kwargs: Any) -> Task:
130131
"install_package_as",
131132
"worker_only_import",
132133
"progress",
134+
"storage_path",
133135
# Timeout types
134136
"TimeoutIn",
135137
"WaitForever",

offwork/core/storage.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Persistent per-worker storage path.
2+
3+
A worker may expose a directory that survives between task executions
4+
(e.g. a per-user volume mounted into the worker container). Tasks read
5+
its location through :func:`storage_path` rather than hard-coding a path,
6+
so the same code runs locally and on a hosted broker.
7+
"""
8+
9+
import os
10+
from pathlib import Path
11+
12+
#: Environment variable a worker sets to advertise its persistent storage
13+
#: directory. Hosted brokers point this at a per-user volume.
14+
STORAGE_ENV = "OFFWORK_STORAGE"
15+
16+
17+
def storage_path(*parts: str) -> Path:
18+
"""Return the persistent storage directory, creating it if needed.
19+
20+
Files written under the returned directory survive between task
21+
executions on workers that provide persistent storage. When no
22+
storage is configured (``OFFWORK_STORAGE`` unset) this falls back to
23+
``./offwork-storage`` in the worker's working directory.
24+
25+
Optional *parts* are joined onto the base path::
26+
27+
offwork.storage_path() # the storage root
28+
offwork.storage_path("models") # a subdirectory
29+
offwork.storage_path("data", "in.csv") # a file path
30+
31+
The storage root is created on access. The returned path is absolute.
32+
Joined *parts* are not created — call ``.mkdir(parents=True)`` on the
33+
result yourself if you need a subdirectory.
34+
"""
35+
base = Path(os.environ.get(STORAGE_ENV, "offwork-storage")).expanduser()
36+
base.mkdir(parents=True, exist_ok=True)
37+
return base.joinpath(*parts).resolve()

tests/test_storage.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
"""Tests for offwork.storage_path()."""
2+
3+
from pathlib import Path
4+
5+
import offwork
6+
from offwork.core.storage import STORAGE_ENV
7+
8+
9+
def test_storage_path_uses_env(tmp_path: Path, monkeypatch) -> None:
10+
monkeypatch.setenv(STORAGE_ENV, str(tmp_path))
11+
p = offwork.storage_path()
12+
assert p == tmp_path.resolve()
13+
assert p.is_dir()
14+
15+
16+
def test_storage_path_joins_parts(tmp_path: Path, monkeypatch) -> None:
17+
monkeypatch.setenv(STORAGE_ENV, str(tmp_path))
18+
p = offwork.storage_path("models", "weights.bin")
19+
assert p == (tmp_path / "models" / "weights.bin").resolve()
20+
# Only the storage root is created, not joined subdirectories.
21+
assert tmp_path.is_dir()
22+
assert not p.parent.exists()
23+
24+
25+
def test_storage_path_default_when_unset(tmp_path: Path, monkeypatch) -> None:
26+
monkeypatch.delenv(STORAGE_ENV, raising=False)
27+
monkeypatch.chdir(tmp_path)
28+
p = offwork.storage_path()
29+
assert p == (tmp_path / "offwork-storage").resolve()
30+
assert p.is_dir()
31+
32+
33+
def test_storage_path_in_public_api() -> None:
34+
assert "storage_path" in offwork.__all__
35+
assert callable(offwork.storage_path)

0 commit comments

Comments
 (0)