Skip to content

Commit d0d1ce2

Browse files
committed
Add Runloop rollout processor integration
1 parent 1bd5447 commit d0d1ce2

12 files changed

Lines changed: 759 additions & 1 deletion

File tree

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,8 @@ FIREWORKS_API_KEY="your_fireworks_api_key_here"
1919
# E2B API Key (if working with E2B code execution features)
2020
# E2B_API_KEY="your_e2b_api_key_here"
2121

22+
# Runloop API Key (if hosting remote rollout servers in Runloop Devboxes)
23+
# RUNLOOP_API_KEY="your_runloop_api_key_here"
24+
2225
# Other environment variables your custom reward functions might need
2326
# MY_CUSTOM_SERVICE_API_KEY="some_other_key"
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Runloop Remote Rollouts
2+
3+
`RunloopRolloutProcessor` runs your remote rollout server inside a Runloop Devbox and then delegates rollout execution to Eval Protocol's existing `RemoteRolloutProcessor`.
4+
5+
This is useful when your rollout server needs an isolated, reproducible environment but you still want Eval Protocol to use the standard `/init` request and Fireworks tracing metadata flow.
6+
7+
## Install
8+
9+
```bash
10+
pip install "eval-protocol[runloop]"
11+
```
12+
13+
Set the API keys used by the local evaluator and remote server:
14+
15+
```bash
16+
export RUNLOOP_API_KEY=...
17+
export FIREWORKS_API_KEY=...
18+
```
19+
20+
## Usage
21+
22+
```python
23+
from eval_protocol.pytest import RunloopRolloutProcessor, evaluation_test
24+
25+
26+
@evaluation_test(
27+
rollout_processor=RunloopRolloutProcessor(
28+
blueprint_id="bpt_your_blueprint_id",
29+
server_command=(
30+
"python -m uvicorn examples.runloop_remote_rollout.server:app "
31+
"--host 0.0.0.0 --port 8000"
32+
),
33+
port=8000,
34+
),
35+
)
36+
async def test_my_eval(row):
37+
return row
38+
```
39+
40+
The server command must bind to `0.0.0.0` on the configured port so the Runloop tunnel can reach it. The server must expose `POST /init` and should use `FireworksTracingHttpHandler` plus `RolloutIdFilter` to publish rollout completion status.
41+
42+
## Existing Devboxes
43+
44+
You can attach to an existing Devbox instead of creating one from a blueprint:
45+
46+
```python
47+
RunloopRolloutProcessor(
48+
devbox_id="devbox_existing_id",
49+
server_command="python -m uvicorn server:app --host 0.0.0.0 --port 8000",
50+
port=8000,
51+
)
52+
```
53+
54+
Eval Protocol only shuts down Devboxes created by `RunloopRolloutProcessor` when `shutdown_on_cleanup=True`. Existing Devboxes are left running.
55+
56+
## Trace Flow
57+
58+
`RunloopRolloutProcessor` does not change default rollout behavior. After setup it calls `RemoteRolloutProcessor(remote_base_url=...)`; `RemoteRolloutProcessor` sends `/init`, polls Fireworks tracing status by rollout ID, and backfills the final row from trace data.

eval_protocol/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
"evaluation_test": (".pytest", "evaluation_test"),
8686
"SingleTurnRolloutProcessor": (".pytest", "SingleTurnRolloutProcessor"),
8787
"RemoteRolloutProcessor": (".pytest", "RemoteRolloutProcessor"),
88+
"RunloopRolloutProcessor": (".pytest", "RunloopRolloutProcessor"),
8889
"GithubActionRolloutProcessor": (".pytest", "GithubActionRolloutProcessor"),
8990
# From .pytest.parameterize
9091
"DefaultParameterIdGenerator": (".pytest.parameterize", "DefaultParameterIdGenerator"),
@@ -174,6 +175,7 @@ def __init__(self, *args, **kwargs):
174175
"DataLoaderConfig",
175176
"Status",
176177
"RemoteRolloutProcessor",
178+
"RunloopRolloutProcessor",
177179
"GithubActionRolloutProcessor",
178180
"InputMetadata",
179181
"EvaluationRow",
@@ -278,6 +280,7 @@ def _get_version():
278280
evaluation_test,
279281
SingleTurnRolloutProcessor,
280282
RemoteRolloutProcessor,
283+
RunloopRolloutProcessor,
281284
GithubActionRolloutProcessor,
282285
)
283286
from .pytest.parameterize import DefaultParameterIdGenerator

eval_protocol/pytest/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"NoOpRolloutProcessor": (".default_no_op_rollout_processor", "NoOpRolloutProcessor"),
2020
"SingleTurnRolloutProcessor": (".default_single_turn_rollout_process", "SingleTurnRolloutProcessor"),
2121
"RemoteRolloutProcessor": (".remote_rollout_processor", "RemoteRolloutProcessor"),
22+
"RunloopRolloutProcessor": (".runloop_rollout_processor", "RunloopRolloutProcessor"),
2223
"GithubActionRolloutProcessor": (".github_action_rollout_processor", "GithubActionRolloutProcessor"),
2324
"RolloutProcessor": (".rollout_processor", "RolloutProcessor"),
2425
# Dataset adapter
@@ -103,6 +104,7 @@ def __dir__():
103104
"RolloutProcessor",
104105
"SingleTurnRolloutProcessor",
105106
"RemoteRolloutProcessor",
107+
"RunloopRolloutProcessor",
106108
"GithubActionRolloutProcessor",
107109
"NoOpRolloutProcessor",
108110
# Dataset
@@ -133,6 +135,7 @@ def __dir__():
133135
from .default_no_op_rollout_processor import NoOpRolloutProcessor as NoOpRolloutProcessor
134136
from .default_single_turn_rollout_process import SingleTurnRolloutProcessor as SingleTurnRolloutProcessor
135137
from .remote_rollout_processor import RemoteRolloutProcessor as RemoteRolloutProcessor
138+
from .runloop_rollout_processor import RunloopRolloutProcessor as RunloopRolloutProcessor
136139
from .github_action_rollout_processor import GithubActionRolloutProcessor as GithubActionRolloutProcessor
137140
from .evaluation_test import evaluation_test as evaluation_test
138141
from .exception_config import (
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
"""Runloop-backed remote rollout processor."""
2+
3+
from __future__ import annotations
4+
5+
import asyncio
6+
import os
7+
import time
8+
import urllib.error
9+
import urllib.request
10+
from typing import Any
11+
12+
from eval_protocol.models import EvaluationRow
13+
from eval_protocol.pytest.remote_rollout_processor import RemoteRolloutProcessor
14+
from eval_protocol.pytest.rollout_processor import RolloutProcessor
15+
from eval_protocol.pytest.types import RolloutProcessorConfig
16+
17+
18+
def _load_runloop_sdk() -> Any:
19+
try:
20+
from runloop_api_client import RunloopSDK
21+
except ImportError as exc:
22+
raise ImportError(
23+
"RunloopRolloutProcessor requires the optional Runloop dependency. "
24+
"Install it with `pip install 'eval-protocol[runloop]'`."
25+
) from exc
26+
return RunloopSDK
27+
28+
29+
class RunloopRolloutProcessor(RolloutProcessor):
30+
"""Host a remote rollout server in a Runloop Devbox.
31+
32+
This processor only orchestrates Runloop lifecycle. Row processing is delegated
33+
to :class:`RemoteRolloutProcessor`, so completion and trace collection continue
34+
to use Eval Protocol's existing remote rollout contract.
35+
"""
36+
37+
def __init__(
38+
self,
39+
*,
40+
blueprint_id: str | None = None,
41+
devbox_id: str | None = None,
42+
server_command: str,
43+
port: int = 8000,
44+
model_base_url: str = "https://tracing.fireworks.ai",
45+
poll_interval: float = 1.0,
46+
timeout_seconds: float = 120.0,
47+
startup_timeout_seconds: float = 60.0,
48+
include_payloads: bool = False,
49+
shutdown_on_cleanup: bool = True,
50+
runloop_api_key: str | None = None,
51+
) -> None:
52+
if not blueprint_id and not devbox_id:
53+
raise ValueError("Either blueprint_id or devbox_id is required for RunloopRolloutProcessor")
54+
if not server_command:
55+
raise ValueError("server_command is required for RunloopRolloutProcessor")
56+
57+
self._blueprint_id = blueprint_id
58+
self._devbox_id = devbox_id
59+
self._server_command = server_command
60+
self._port = port
61+
self._model_base_url = model_base_url
62+
self._poll_interval = poll_interval
63+
self._timeout_seconds = timeout_seconds
64+
self._startup_timeout_seconds = startup_timeout_seconds
65+
self._include_payloads = include_payloads
66+
self._shutdown_on_cleanup = shutdown_on_cleanup
67+
self._runloop_api_key = runloop_api_key
68+
69+
self._client: Any | None = None
70+
self._devbox: Any | None = None
71+
self._server_execution: Any | None = None
72+
self._remote_base_url: str | None = None
73+
self._remote_processor: RemoteRolloutProcessor | None = None
74+
self._owns_devbox = False
75+
self._shutdown_complete = False
76+
77+
@property
78+
def remote_base_url(self) -> str | None:
79+
"""The derived public URL for the Runloop-hosted rollout server."""
80+
return self._remote_base_url
81+
82+
@property
83+
def devbox_id(self) -> str | None:
84+
"""The Devbox ID used by this processor once setup has completed."""
85+
if self._devbox is not None and hasattr(self._devbox, "id"):
86+
return self._devbox.id
87+
return self._devbox_id
88+
89+
def setup(self) -> None:
90+
"""Create or attach to a Devbox, expose the server port, and start the server."""
91+
if self._remote_processor is not None:
92+
return
93+
94+
api_key = self._runloop_api_key or os.getenv("RUNLOOP_API_KEY")
95+
if not api_key:
96+
raise ValueError(
97+
"RUNLOOP_API_KEY is required for RunloopRolloutProcessor. "
98+
"Set the environment variable or pass runloop_api_key explicitly."
99+
)
100+
101+
RunloopSDK = _load_runloop_sdk()
102+
self._client = RunloopSDK(bearer_token=api_key)
103+
104+
if self._devbox_id:
105+
self._devbox = self._client.devbox.from_id(self._devbox_id)
106+
self._owns_devbox = False
107+
else:
108+
assert self._blueprint_id is not None
109+
self._devbox = self._client.devbox.create_from_blueprint_id(self._blueprint_id)
110+
self._owns_devbox = True
111+
112+
self._await_running()
113+
tunnel = self._create_tunnel()
114+
self._remote_base_url = self._derive_remote_base_url(tunnel)
115+
self._server_execution = self._devbox.cmd.exec_async(self._server_command)
116+
self._wait_for_server_startup()
117+
self._remote_processor = RemoteRolloutProcessor(
118+
remote_base_url=self._remote_base_url,
119+
model_base_url=self._model_base_url,
120+
poll_interval=self._poll_interval,
121+
timeout_seconds=self._timeout_seconds,
122+
include_payloads=self._include_payloads,
123+
)
124+
125+
def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> list[asyncio.Task[EvaluationRow]]:
126+
if self._remote_processor is None:
127+
self.setup()
128+
assert self._remote_processor is not None
129+
return self._remote_processor(rows, config)
130+
131+
async def acleanup(self) -> None:
132+
"""Async cleanup for the delegated processor and any owned Devbox."""
133+
if self._remote_processor is not None:
134+
await self._remote_processor.acleanup()
135+
if self._should_shutdown_devbox():
136+
await asyncio.to_thread(self._shutdown_devbox)
137+
138+
def cleanup(self) -> None:
139+
"""Best-effort synchronous cleanup."""
140+
if self._remote_processor is not None:
141+
self._remote_processor.cleanup()
142+
if self._should_shutdown_devbox():
143+
self._shutdown_devbox()
144+
145+
def _await_running(self) -> None:
146+
await_running = getattr(self._devbox, "await_running", None)
147+
if await_running is None:
148+
return
149+
await_running()
150+
151+
def _create_tunnel(self) -> Any:
152+
net = self._devbox.net
153+
create_tunnel = getattr(net, "create_tunnel", None)
154+
if create_tunnel is not None:
155+
return create_tunnel(port=self._port)
156+
157+
enable_tunnel = getattr(net, "enable_tunnel", None)
158+
if enable_tunnel is None:
159+
raise RuntimeError("Runloop Devbox networking API does not expose create_tunnel or enable_tunnel")
160+
return enable_tunnel(auth_mode="open")
161+
162+
def _derive_remote_base_url(self, tunnel: Any) -> str:
163+
get_tunnel_url = getattr(self._devbox, "get_tunnel_url", None)
164+
if get_tunnel_url is not None:
165+
url = get_tunnel_url(self._port)
166+
if url:
167+
return str(url).rstrip("/")
168+
169+
for attr in ("url", "base_url", "public_url"):
170+
value = getattr(tunnel, attr, None)
171+
if value:
172+
return str(value).rstrip("/")
173+
174+
tunnel_key = getattr(tunnel, "tunnel_key", None)
175+
if tunnel_key:
176+
return f"https://{self._port}-{tunnel_key}.tunnel.runloop.ai"
177+
178+
raise RuntimeError("Could not determine Runloop tunnel URL for the rollout server")
179+
180+
def _wait_for_server_startup(self) -> None:
181+
if self._startup_timeout_seconds <= 0:
182+
return
183+
assert self._remote_base_url is not None
184+
185+
deadline = time.monotonic() + self._startup_timeout_seconds
186+
last_error: Exception | None = None
187+
while time.monotonic() < deadline:
188+
try:
189+
request = urllib.request.Request(self._remote_base_url, method="GET")
190+
with urllib.request.urlopen(request, timeout=min(5.0, self._startup_timeout_seconds)) as response:
191+
response.read(1)
192+
return
193+
except urllib.error.HTTPError:
194+
return
195+
except Exception as exc:
196+
last_error = exc
197+
time.sleep(min(1.0, max(0.0, deadline - time.monotonic())))
198+
199+
message = f"Runloop rollout server did not become reachable within {self._startup_timeout_seconds} seconds"
200+
if last_error is not None:
201+
message = f"{message}: {last_error}"
202+
raise TimeoutError(message)
203+
204+
def _should_shutdown_devbox(self) -> bool:
205+
return (
206+
self._devbox is not None
207+
and self._owns_devbox
208+
and self._shutdown_on_cleanup
209+
and not self._shutdown_complete
210+
)
211+
212+
def _shutdown_devbox(self) -> None:
213+
if self._devbox is None or self._shutdown_complete:
214+
return
215+
self._devbox.shutdown()
216+
self._shutdown_complete = True
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Runloop Remote Rollout Example
2+
3+
This example hosts an Eval Protocol remote rollout server in a Runloop Devbox.
4+
5+
## Requirements
6+
7+
```bash
8+
pip install "eval-protocol[runloop]"
9+
export RUNLOOP_API_KEY=...
10+
export FIREWORKS_API_KEY=...
11+
```
12+
13+
Create a Runloop blueprint that contains this repository and its Python dependencies, then set `RUNLOOP_BLUEPRINT_ID`:
14+
15+
```bash
16+
export RUNLOOP_BLUEPRINT_ID=bpt_your_blueprint_id
17+
pytest examples/runloop_remote_rollout/test_eval.py
18+
```
19+
20+
The processor starts:
21+
22+
```bash
23+
python -m uvicorn examples.runloop_remote_rollout.server:app --host 0.0.0.0 --port 8000
24+
```
25+
26+
The server receives `POST /init`, performs a chat completion through the Fireworks tracing base URL provided by Eval Protocol, and logs rollout completion using Fireworks tracing metadata.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Runloop remote rollout example."""

0 commit comments

Comments
 (0)