Skip to content

Commit 545822c

Browse files
zachbaioz-agent
andcommitted
Add command (API-dispatch) backend to oz-agent-worker
Introduce a transport-agnostic `command` backend that delegates task execution to an operator-configured dispatch command (fire-and-forget). The worker renders a versioned JSON dispatch payload to the command's stdin; on success it suppresses its own terminal completion so the remote oz agent reports terminal state to warp-server itself, and routes cancellations to an optional cancel command. Includes config/CLI wiring, unit + integration tests, README docs, and an HTTP REST reference dispatch script. Co-Authored-By: Oz <oz-agent@warp.dev>
1 parent 6e63dce commit 545822c

20 files changed

Lines changed: 1532 additions & 11 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ vendor/
3131

3232
# OS
3333
.DS_Store
34+
35+
# Python bytecode
36+
__pycache__/

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Self-hosted worker for Oz cloud agents.
1616
- Docker daemon access for the Docker backend
1717
- Local `oz` CLI access plus a writable workspace root for the Direct backend
1818
- Kubernetes API access plus cluster credentials for the Kubernetes backend
19+
- An operator-provided dispatch command for the Command backend (dispatch to any runtime over any transport)
1920

2021
## Usage
2122

@@ -64,6 +65,40 @@ backend:
6465
oz_path: "/usr/local/bin/oz"
6566
```
6667
68+
### Command
69+
70+
The command backend hands task execution to an operator-owned runtime over **any transport**, without recompiling the worker. Instead of running the agent itself, the worker invokes an operator-configured `dispatch_command` and lets that command dispatch the task however it likes (HTTP, gRPC, a cloud SDK, a message queue, SSH, etc.). Execution is **fire-and-forget**: once the dispatch command reports success, the remote `oz` agent — launched by your runtime with the worker-provided arguments — reports task progress and terminal state directly to Warp. The worker does not finalize a dispatched task itself; it only reports a failure if the dispatch command itself fails.
71+
72+
Example config:
73+
74+
```yaml
75+
worker_id: "my-worker"
76+
backend:
77+
command:
78+
dispatch_command: "/opt/oz/dispatch.sh"
79+
cancel_command: "/opt/oz/cancel.sh"
80+
dispatch_timeout: "60s"
81+
environment:
82+
- name: MY_RUNTIME_TOKEN
83+
```
84+
85+
Config keys:
86+
87+
- `dispatch_command` (required): shell command (run via `/bin/sh -c`) invoked once per task to dispatch it.
88+
- `cancel_command` (optional): shell command invoked best-effort when a dispatched task is cancelled. If unset, the worker relies on agent-side cancellation.
89+
- `dispatch_timeout` (optional): how long the dispatch command may run before it is considered failed (humantime format, e.g. `60s`). Defaults to `60s`.
90+
- `environment`: extra environment variables exposed to the dispatch/cancel commands (same `name`/`value` semantics as the other backends; omit `value` to inherit from the host).
91+
92+
The dispatch contract:
93+
94+
- The dispatch command receives the task payload as JSON on **stdin**. This is the only place task environment variables and secrets appear — they are deliberately kept out of the subprocess environment and argv.
95+
- The following non-secret variables are also set in the command's environment for convenience: `OZ_TASK_ID`, `OZ_EXECUTION_ID`, `OZ_WORKER_BACKEND=command`, `OZ_SERVER_ROOT_URL`, `OZ_DOCKER_IMAGE`.
96+
- The JSON payload contains: `version`, `task_id`, `execution_id`, `server_root_url`, `worker_id`, `docker_image`, `base_args`, `env`, `sidecars`, and `task`. `base_args` is the `oz agent run …` argument vector your runtime should launch the agent with, inside an environment built from `docker_image` and `sidecars`.
97+
- Exit code `0` means the task was dispatched successfully; the worker will not finalize it (the remote agent reports terminal state to Warp itself). A non-zero exit or a dispatch that exceeds `dispatch_timeout` marks the task failed.
98+
- The cancel command (when configured) receives `OZ_TASK_ID`, `OZ_EXECUTION_ID`, and `OZ_WORKER_BACKEND=command` in its environment.
99+
100+
Because dispatched tasks run independently of the worker process, the command backend does not consume a local concurrency slot for the lifetime of the remote task, and worker shutdown does not cancel already-dispatched tasks.
101+
67102
### Kubernetes
68103

69104
The Kubernetes backend creates one Job per task. Cluster selection is controlled by the Kubernetes client config:

examples/command-backend/README.md

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Command backend — HTTP REST reference
2+
3+
A templatable reference for the `oz-agent-worker` [`command` backend](../../README.md#command). The worker invokes a dispatch command per task and hands it the task payload as JSON on stdin; this reference **transforms** that payload into a runtime's API shape and forwards it to an HTTP REST endpoint so a self-hosted runtime can launch the agent on demand.
4+
5+
It is written in Python (standard library only — no dependencies to install) so the transformation logic is easy to read and extend.
6+
7+
## Files
8+
9+
- `dispatch.py` — reads the JSON payload on stdin, transforms it (see `transform()`), and `POST`s it to `OZ_DISPATCH_URL`. Exit `0` means dispatched (fire-and-forget); non-zero means the worker fails the task.
10+
- `cancel.py``POST`s `{task_id, execution_id}` to `OZ_CANCEL_URL` when a dispatched task is cancelled (best-effort).
11+
12+
Requires `python3` on the worker host.
13+
14+
## The transformation
15+
16+
Real runtimes rarely accept the worker's payload verbatim. `dispatch.py` keeps the not-hard transformation in one place — the `transform()` function — that you replace to match your API. The example renames/reshapes the worker payload into a `run` object:
17+
18+
```python
19+
def transform(payload):
20+
task = payload.get("task") or {}
21+
definition = task.get("task_definition") or {}
22+
return {
23+
"run": {
24+
"task_id": payload["task_id"],
25+
"execution_id": payload.get("execution_id", ""),
26+
"image": payload.get("docker_image", ""),
27+
"command": payload.get("base_args", []), # the `oz agent run ...` argv
28+
"env": payload.get("env", {}),
29+
"mounts": [ # mount_path -> path
30+
{"image": s.get("image", ""), "path": s.get("mount_path", ""),
31+
"read_write": s.get("read_write", False)}
32+
for s in (payload.get("sidecars") or [])
33+
],
34+
"callback_url": payload.get("server_root_url", ""),
35+
"metadata": {
36+
"worker_id": payload.get("worker_id", ""),
37+
"payload_version": payload.get("version"),
38+
"title": task.get("title", ""),
39+
"prompt": definition.get("prompt", ""),
40+
},
41+
}
42+
}
43+
```
44+
45+
## Wiring it into the worker
46+
47+
Point the command backend at the scripts and template the endpoints via `environment` (or host env):
48+
49+
```yaml
50+
worker_id: "my-worker"
51+
backend:
52+
command:
53+
dispatch_command: "python3 /opt/oz/dispatch.py"
54+
cancel_command: "python3 /opt/oz/cancel.py"
55+
dispatch_timeout: "60s"
56+
environment:
57+
- name: OZ_DISPATCH_URL
58+
value: "https://my-runtime.internal/oz/dispatch"
59+
- name: OZ_CANCEL_URL
60+
value: "https://my-runtime.internal/oz/cancel"
61+
# Omit `value` to inherit the secret from the worker's host environment.
62+
- name: OZ_DISPATCH_AUTH_HEADER
63+
```
64+
65+
(The scripts are executable, so `dispatch_command: "/opt/oz/dispatch.py"` also works.)
66+
67+
## What the worker hands the script (stdin)
68+
69+
```json
70+
{
71+
"version": 1,
72+
"task_id": "...",
73+
"execution_id": "...",
74+
"server_root_url": "https://app.warp.dev",
75+
"worker_id": "my-worker",
76+
"docker_image": "ubuntu:22.04",
77+
"base_args": ["agent", "run", "--task-id", "...", "--server-root-url", "..."],
78+
"env": { "GITHUB_ACCESS_TOKEN": "...", "...": "..." },
79+
"sidecars": [ { "image": "...", "mount_path": "/agent", "read_write": false } ],
80+
"task": { "id": "...", "title": "...", "task_definition": { "prompt": "..." } }
81+
}
82+
```
83+
84+
The non-secret identifiers `OZ_TASK_ID`, `OZ_EXECUTION_ID`, `OZ_WORKER_BACKEND`, `OZ_SERVER_ROOT_URL`, and `OZ_DOCKER_IMAGE` are also set in the script's environment. Secrets appear only in the stdin payload.
85+
86+
Your runtime should launch the agent with `base_args` inside an environment built from `docker_image` + `sidecars`, injecting `env`. Because `base_args` already includes `--task-id` and `--server-root-url`, the agent reports its own progress and terminal state to Warp — the worker does not. Keep the exit-code contract: exit `0` only when the task is durably accepted for execution.

examples/command-backend/cancel.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python3
2+
"""Reference cancel command for the oz-agent-worker "command" backend.
3+
4+
Invoked best-effort when a dispatched task is cancelled. The worker sets
5+
``OZ_TASK_ID`` and ``OZ_EXECUTION_ID`` in the environment; this script POSTs them
6+
to ``OZ_CANCEL_URL`` so your runtime can stop the corresponding agent. Adapt the
7+
request body to your runtime's API if needed.
8+
9+
Required environment:
10+
OZ_CANCEL_URL REST endpoint to POST the cancellation to.
11+
12+
Optional environment:
13+
OZ_DISPATCH_AUTH_HEADER Authorization header value (e.g. "Bearer ...").
14+
OZ_DISPATCH_TIMEOUT_SECS Request timeout in seconds (default 30).
15+
"""
16+
17+
import json
18+
import os
19+
import sys
20+
import urllib.error
21+
import urllib.request
22+
23+
24+
def main():
25+
url = os.environ.get("OZ_CANCEL_URL")
26+
if not url:
27+
sys.stderr.write("OZ_CANCEL_URL must be set\n")
28+
return 2
29+
timeout = float(os.environ.get("OZ_DISPATCH_TIMEOUT_SECS", "30"))
30+
31+
body = json.dumps(
32+
{
33+
"task_id": os.environ.get("OZ_TASK_ID", ""),
34+
"execution_id": os.environ.get("OZ_EXECUTION_ID", ""),
35+
}
36+
).encode("utf-8")
37+
38+
request = urllib.request.Request(url, data=body, method="POST")
39+
request.add_header("Content-Type", "application/json")
40+
auth = os.environ.get("OZ_DISPATCH_AUTH_HEADER")
41+
if auth:
42+
request.add_header("Authorization", auth)
43+
44+
try:
45+
with urllib.request.urlopen(request, timeout=timeout) as response:
46+
response.read()
47+
except urllib.error.HTTPError as exc:
48+
sys.stderr.write(f"cancel endpoint returned HTTP {exc.code}\n")
49+
return 1
50+
except urllib.error.URLError as exc:
51+
sys.stderr.write(f"failed to reach cancel endpoint: {exc}\n")
52+
return 1
53+
return 0
54+
55+
56+
if __name__ == "__main__":
57+
sys.exit(main())
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env python3
2+
"""Reference dispatch command for the oz-agent-worker "command" backend.
3+
4+
It reads the task ``DispatchPayload`` (JSON) on stdin, transforms it into a
5+
hypothetical runtime's REST API shape, and POSTs it to ``OZ_DISPATCH_URL``.
6+
7+
Use it as a template for delegating task execution to a self-hosted runtime that
8+
is already configured to run oz agents on demand. The part you customize is
9+
``transform()`` — adapt it to your runtime's request schema. Everything else
10+
(reading stdin, auth, timeouts, exit-code contract) can stay as-is.
11+
12+
Only the Python standard library is used, so there are no dependencies to
13+
install on the worker host.
14+
15+
Required environment:
16+
OZ_DISPATCH_URL REST endpoint to POST the transformed body to.
17+
18+
Optional environment:
19+
OZ_DISPATCH_AUTH_HEADER Authorization header value (e.g. "Bearer ...").
20+
OZ_DISPATCH_TIMEOUT_SECS Request timeout in seconds (default 30).
21+
22+
Also provided by the worker (no need to set these yourself):
23+
OZ_TASK_ID, OZ_EXECUTION_ID, OZ_WORKER_BACKEND, OZ_SERVER_ROOT_URL, OZ_DOCKER_IMAGE
24+
25+
Exit semantics (the contract the command backend relies on):
26+
exit 0 => task accepted for dispatch; the remote runtime now owns it and
27+
the agent reports terminal state to Warp itself.
28+
exit != 0 => dispatch failed; the worker marks the task failed.
29+
"""
30+
31+
import json
32+
import os
33+
import sys
34+
import urllib.error
35+
import urllib.request
36+
37+
38+
def transform(payload):
39+
"""Map the worker's DispatchPayload onto our runtime's API shape.
40+
41+
This is an example of an arbitrary, not-hard transformation: fields are
42+
renamed and nested under a ``run`` object, sidecar mounts are restructured
43+
(``mount_path`` -> ``path``), and a few values are lifted into a metadata
44+
block. Replace the body of this function to match your own API.
45+
"""
46+
task = payload.get("task") or {}
47+
definition = task.get("task_definition") or {}
48+
49+
return {
50+
"run": {
51+
"task_id": payload["task_id"],
52+
"execution_id": payload.get("execution_id", ""),
53+
"image": payload.get("docker_image", ""),
54+
# base_args is the `oz agent run ...` argv the runtime should exec.
55+
"command": payload.get("base_args", []),
56+
"env": payload.get("env", {}),
57+
"mounts": [
58+
{
59+
"image": sidecar.get("image", ""),
60+
"path": sidecar.get("mount_path", ""),
61+
"read_write": sidecar.get("read_write", False),
62+
}
63+
for sidecar in (payload.get("sidecars") or [])
64+
],
65+
"callback_url": payload.get("server_root_url", ""),
66+
"metadata": {
67+
"worker_id": payload.get("worker_id", ""),
68+
"payload_version": payload.get("version"),
69+
"title": task.get("title", ""),
70+
"prompt": definition.get("prompt", ""),
71+
},
72+
}
73+
}
74+
75+
76+
def main():
77+
url = os.environ.get("OZ_DISPATCH_URL")
78+
if not url:
79+
sys.stderr.write("OZ_DISPATCH_URL must be set\n")
80+
return 2
81+
timeout = float(os.environ.get("OZ_DISPATCH_TIMEOUT_SECS", "30"))
82+
83+
try:
84+
payload = json.load(sys.stdin)
85+
except json.JSONDecodeError as exc:
86+
sys.stderr.write(f"invalid dispatch payload on stdin: {exc}\n")
87+
return 1
88+
89+
body = json.dumps(transform(payload)).encode("utf-8")
90+
91+
request = urllib.request.Request(url, data=body, method="POST")
92+
request.add_header("Content-Type", "application/json")
93+
request.add_header("X-Oz-Task-Id", os.environ.get("OZ_TASK_ID", ""))
94+
auth = os.environ.get("OZ_DISPATCH_AUTH_HEADER")
95+
if auth:
96+
request.add_header("Authorization", auth)
97+
98+
try:
99+
# Any 2xx is success; urllib raises HTTPError for status >= 400.
100+
with urllib.request.urlopen(request, timeout=timeout) as response:
101+
response.read()
102+
except urllib.error.HTTPError as exc:
103+
detail = exc.read().decode("utf-8", errors="replace")
104+
sys.stderr.write(f"dispatch endpoint returned HTTP {exc.code}: {detail}\n")
105+
return 1
106+
except urllib.error.URLError as exc:
107+
sys.stderr.write(f"failed to reach dispatch endpoint: {exc}\n")
108+
return 1
109+
return 0
110+
111+
112+
if __name__ == "__main__":
113+
sys.exit(main())

0 commit comments

Comments
 (0)