diff --git a/openadapt_evals/adapters/rl_env.py b/openadapt_evals/adapters/rl_env.py index 12a8241..1ed02b0 100644 --- a/openadapt_evals/adapters/rl_env.py +++ b/openadapt_evals/adapters/rl_env.py @@ -10,6 +10,11 @@ at the final step. This matches GRPO where reward comes from an outcome verifier, not per-step shaping. + When ``evaluate_every_step=True``, the evaluator is called after each + step and the score is included in ``info["evaluation_score"]``. The + reward signal is NOT changed — training code decides how to use the + per-step evaluation data. + Example: from openadapt_evals.adapters.waa.live import WAALiveAdapter, WAALiveConfig from openadapt_evals.adapters.rl_env import RLEnvironment @@ -23,6 +28,12 @@ if obs_step.done: break score = env.evaluate() + + # With per-step evaluation (for RL training loops): + env = RLEnvironment(adapter, default_task_id="", evaluate_every_step=True) + obs = env.reset() + step = env.step(BenchmarkAction(type="click", x=0.5, y=0.3)) + print(step.info["evaluation_score"]) # 0.0 or 1.0 """ from __future__ import annotations @@ -100,9 +111,11 @@ def __init__( self, adapter: BenchmarkAdapter, default_task_id: str | None = None, + evaluate_every_step: bool = False, ): self._adapter = adapter self._default_task_id = default_task_id + self._evaluate_every_step = evaluate_every_step self._current_task: BenchmarkTask | None = None self._step_count = 0 self._done = False @@ -224,6 +237,16 @@ def step(self, action: BenchmarkAction) -> RolloutStep: self._done = done info["step"] = self._step_count + # Optional per-step evaluation for RL training loops + if self._evaluate_every_step and self._current_task is not None: + try: + result = self._adapter.evaluate(self._current_task) + info["evaluation_score"] = result.score + info["evaluation_success"] = result.success + except Exception as e: + logger.warning("Per-step evaluation failed at step %d: %s", self._step_count, e) + info["evaluation_error"] = str(e) + rollout_step = RolloutStep( observation=obs, action=action, diff --git a/openadapt_evals/agents/__init__.py b/openadapt_evals/agents/__init__.py index 408565b..bd57aff 100644 --- a/openadapt_evals/agents/__init__.py +++ b/openadapt_evals/agents/__init__.py @@ -10,6 +10,7 @@ - SmartMockAgent: Designed to pass mock adapter tests - ApiAgent: Uses Claude/GPT APIs directly (for WAA) - ClaudeComputerUseAgent: Uses Claude's native computer_use tool + - HttpAgent: Delegates to a remote HTTP agent endpoint - Qwen3VLAgent: Uses Qwen3-VL for local inference - SmolOperatorAgent: Uses SmolVLM2-2.2B for local inference - PolicyAgent: Uses local trained policy model @@ -18,16 +19,13 @@ Example: ```python - from openadapt_evals.agents import ApiAgent, ScriptedAgent, RetrievalAugmentedAgent + from openadapt_evals.agents import ApiAgent, ScriptedAgent, HttpAgent # Use API agent with Claude agent = ApiAgent(provider="anthropic") - # Use retrieval-augmented agent with automatic demo selection - agent = RetrievalAugmentedAgent( - demo_library_path="/path/to/demo_library", - provider="anthropic", - ) + # Use remote agent-as-a-service + agent = HttpAgent(endpoint_url="http://gpu-box:8080") # Use scripted agent for replay agent = ScriptedAgent([ @@ -58,6 +56,7 @@ ) from openadapt_evals.agents.api_agent import ApiAgent from openadapt_evals.agents.claude_computer_use_agent import ClaudeComputerUseAgent +from openadapt_evals.agents.http_agent import HttpAgent from openadapt_evals.agents.retrieval_agent import RetrievalAugmentedAgent # Lazy imports for agents requiring additional dependencies @@ -86,6 +85,7 @@ def __getattr__(name: str): "SmartMockAgent", "ApiAgent", "ClaudeComputerUseAgent", + "HttpAgent", "Qwen3VLAgent", "SmolOperatorAgent", "PolicyAgent", diff --git a/openadapt_evals/agents/http_agent.py b/openadapt_evals/agents/http_agent.py new file mode 100644 index 0000000..aa8fcc1 --- /dev/null +++ b/openadapt_evals/agents/http_agent.py @@ -0,0 +1,199 @@ +"""HTTP-backed agent for remote agent-as-a-service integration. + +Forwards observations to any HTTP endpoint and parses the response +into a BenchmarkAction. This lets external teams deploy their own +agent stack (model + prompt + parsing) as a black-box HTTP server +without coupling to openadapt-evals internals. + +Protocol: + POST {endpoint_url}/act + Request: + { + "screenshot_b64": "", + "instruction": "Click the Submit button", + "task_id": "notepad_1", + "viewport": [1920, 1200], + "accessibility_tree": {...}, + "step_count": 3 + } + Response: + { + "type": "click", + "x": 0.5, + "y": 0.3 + } + + GET {endpoint_url}/health -> 200 OK + +Example: + from openadapt_evals.agents import HttpAgent + + agent = HttpAgent(endpoint_url="http://gpu-box:8080") + action = agent.act(observation, task) +""" + +from __future__ import annotations + +import base64 +import logging +from typing import Any + +import requests + +from openadapt_evals.adapters.base import ( + BenchmarkAction, + BenchmarkObservation, + BenchmarkTask, +) +from openadapt_evals.agents.base import BenchmarkAgent + +logger = logging.getLogger(__name__) + + +class HttpAgent(BenchmarkAgent): + """Agent that delegates to a remote HTTP endpoint. + + The remote server receives observation data (screenshot, task, + accessibility tree) and returns an action dict that maps directly + to BenchmarkAction fields. + + Args: + endpoint_url: Base URL of the remote agent server (no trailing slash). + timeout: Request timeout in seconds. + headers: Optional extra HTTP headers (e.g. auth tokens). + """ + + def __init__( + self, + endpoint_url: str, + timeout: int = 120, + headers: dict[str, str] | None = None, + ): + self.endpoint_url = endpoint_url.rstrip("/") + self.timeout = timeout + self.headers = headers or {} + self._step_count = 0 + + logger.info("HttpAgent initialized: endpoint=%s", self.endpoint_url) + + def act( + self, + observation: BenchmarkObservation, + task: BenchmarkTask, + history: list[tuple[BenchmarkObservation, BenchmarkAction]] | None = None, + ) -> BenchmarkAction: + """Send observation to remote endpoint, parse response as BenchmarkAction.""" + self._step_count += 1 + + # Encode screenshot + screenshot_b64 = None + if observation.screenshot: + screenshot_b64 = base64.b64encode(observation.screenshot).decode("ascii") + + payload: dict[str, Any] = { + "screenshot_b64": screenshot_b64, + "instruction": task.instruction, + "task_id": task.task_id, + "viewport": list(observation.viewport) if observation.viewport else None, + "accessibility_tree": observation.accessibility_tree, + "step_count": self._step_count - 1, + } + + try: + resp = requests.post( + f"{self.endpoint_url}/act", + json=payload, + headers=self.headers, + timeout=self.timeout, + ) + resp.raise_for_status() + data = resp.json() + except requests.ConnectionError as e: + logger.error("Connection failed: %s", e) + return BenchmarkAction( + type="done", + raw_action={"error": f"connection_failed: {e}"}, + ) + except requests.Timeout as e: + logger.error("Request timed out: %s", e) + return BenchmarkAction( + type="done", + raw_action={"error": f"timeout: {e}"}, + ) + except requests.HTTPError as e: + logger.error("HTTP error: %s", e) + return BenchmarkAction( + type="done", + raw_action={"error": f"http_error: {e}"}, + ) + except (ValueError, KeyError) as e: + logger.error("Invalid response: %s", e) + return BenchmarkAction( + type="done", + raw_action={"error": f"invalid_response: {e}"}, + ) + + return _parse_action_response(data) + + def reset(self) -> None: + """Reset agent state and optionally notify remote endpoint.""" + self._step_count = 0 + try: + requests.post( + f"{self.endpoint_url}/reset", + headers=self.headers, + timeout=10, + ) + except requests.RequestException: + # Remote endpoint may not support /reset — that's fine + pass + + def health_check(self) -> bool: + """Check if the remote endpoint is reachable. + + Returns: + True if GET /health returns 200, False otherwise. + """ + try: + resp = requests.get( + f"{self.endpoint_url}/health", + headers=self.headers, + timeout=10, + ) + return resp.status_code == 200 + except requests.RequestException: + return False + + +def _parse_action_response(data: dict[str, Any]) -> BenchmarkAction: + """Convert a response dict into a BenchmarkAction. + + The response dict should have at minimum a ``type`` field. All other + fields map directly to BenchmarkAction attributes. + + Args: + data: Response dict from the remote agent. + + Returns: + Parsed BenchmarkAction. + """ + action_type = data.get("type", "done") + + return BenchmarkAction( + type=action_type, + x=data.get("x"), + y=data.get("y"), + target_node_id=data.get("target_node_id"), + target_bbox=tuple(data["target_bbox"]) if data.get("target_bbox") else None, + target_role=data.get("target_role"), + target_name=data.get("target_name"), + text=data.get("text"), + key=data.get("key"), + modifiers=data.get("modifiers"), + scroll_direction=data.get("scroll_direction"), + scroll_amount=data.get("scroll_amount"), + end_x=data.get("end_x"), + end_y=data.get("end_y"), + answer=data.get("answer"), + raw_action=data, + ) diff --git a/openadapt_evals/benchmarks/trace_export.py b/openadapt_evals/benchmarks/trace_export.py index 1fe7a9d..cc25405 100644 --- a/openadapt_evals/benchmarks/trace_export.py +++ b/openadapt_evals/benchmarks/trace_export.py @@ -1,17 +1,22 @@ """Export WAA benchmark traces as training data. This module provides functionality to filter and export successful WAA benchmark -traces in a format suitable for VLM fine-tuning. It converts benchmark execution -traces to the openadapt-ml Episode format. +traces in a format suitable for VLM fine-tuning. + +Two exporters are available: + +- ``TraceExporter``: Converts traces to the openadapt-ml Episode format. + Requires openadapt-ml as a dependency. +- ``LightweightTraceExporter``: Exports plain JSON + screenshots with no + external ML dependencies. Useful for teams with their own training pipelines. Usage: # Via CLI uv run python -m openadapt_evals.benchmarks.vm_cli export-traces --status passed --output training_data/ - # Via Python + # Via Python (openadapt-ml format) from openadapt_evals.benchmarks.trace_export import export_traces, TraceExporter - # Export all passing traces exporter = TraceExporter( benchmark_dir=Path("benchmark_results/waa_eval_20241214"), output_dir=Path("training_data"), @@ -19,26 +24,24 @@ ) episodes = exporter.export() - # Or use convenience function - episodes = export_traces( + # Via Python (lightweight JSON, no openadapt-ml dependency) + from openadapt_evals.benchmarks.trace_export import export_traces_lightweight + + results = export_traces_lightweight( benchmark_dir="benchmark_results/waa_eval_20241214", - output_dir="training_data", + output_dir="training_data_json", status_filter="passed", ) -Directory structure created: - training_data/ +Directory structure created (lightweight): + training_data_json/ |-- episodes/ - | |-- episode_001.json # Episode schema format - | |-- episode_002.json - | |-- ... + | |-- waa_task_001.json # Plain JSON (no ML schema dependency) |-- screenshots/ - | |-- episode_001/ + | |-- waa_task_001/ | | |-- step_000.png - | | |-- step_001.png - | |-- episode_002/ - |-- manifest.json # Index of all exported episodes - |-- training_samples.jsonl # JSONL format for training + |-- manifest.json + |-- training_samples.jsonl """ from __future__ import annotations @@ -47,7 +50,7 @@ import logging import shutil from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Literal @@ -619,3 +622,267 @@ def list_available_runs( runs.append(run_info) return runs + + +# --------------------------------------------------------------------------- +# Lightweight exporter (no openadapt-ml dependency) +# --------------------------------------------------------------------------- + + +@dataclass +class LightweightTraceExporter: + """Export benchmark traces as plain JSON with no ML framework dependencies. + + Produces simple JSON files with screenshots, actions, and metadata that + any training pipeline can consume. Does not import from openadapt-ml. + + Args: + benchmark_dir: Path to benchmark results directory. + output_dir: Output directory for exported data. + status_filter: Filter by task status ("passed", "failed", "all"). + copy_screenshots: Whether to copy screenshots to output directory. + viewport_size: Default viewport size for coordinate normalization. + """ + + benchmark_dir: Path + output_dir: Path + status_filter: StatusFilter = "passed" + copy_screenshots: bool = True + viewport_size: tuple[int, int] = (1920, 1200) + + def __post_init__(self): + self.benchmark_dir = Path(self.benchmark_dir) + self.output_dir = Path(self.output_dir) + + def export(self) -> list[dict[str, Any]]: + """Export traces as plain JSON dicts. + + Returns: + List of episode dicts (same structure as the JSON files written). + """ + metadata = load_benchmark_metadata(self.benchmark_dir) + load_benchmark_summary(self.benchmark_dir) + tasks = load_task_results(self.benchmark_dir) + + logger.info( + "LightweightTraceExporter: %d tasks from %s", + len(tasks), + self.benchmark_dir.name, + ) + + filtered = self._filter_tasks(tasks) + logger.info("Filtered to %d tasks (status=%s)", len(filtered), self.status_filter) + + if not filtered: + logger.warning("No tasks match the filter criteria") + return [] + + self.output_dir.mkdir(parents=True, exist_ok=True) + (self.output_dir / "episodes").mkdir(exist_ok=True) + if self.copy_screenshots: + (self.output_dir / "screenshots").mkdir(exist_ok=True) + + episodes: list[dict[str, Any]] = [] + stats = ExportStats(total_tasks=len(tasks)) + + for i, task in enumerate(filtered): + try: + episode = self._convert_task(task, i, metadata) + episodes.append(episode) + + episode_path = self.output_dir / "episodes" / f"{episode['episode_id']}.json" + with open(episode_path, "w") as f: + json.dump(episode, f, indent=2) + + if self.copy_screenshots: + self._copy_screenshots(task, episode["episode_id"]) + + stats.exported_tasks += 1 + stats.total_steps += len(episode.get("steps", [])) + except Exception as e: + error_msg = f"Failed to export task {task.get('task_id', 'unknown')}: {e}" + logger.error(error_msg) + stats.errors.append(error_msg) + stats.skipped_tasks += 1 + + self._write_manifest(episodes, metadata, stats) + self._write_jsonl(episodes) + + logger.info( + "Lightweight export complete: %d/%d tasks, %d steps", + stats.exported_tasks, + stats.total_tasks, + stats.total_steps, + ) + return episodes + + def _filter_tasks(self, tasks: list[dict[str, Any]]) -> list[dict[str, Any]]: + if self.status_filter == "all": + return tasks + filtered = [] + for task in tasks: + success = task.get("execution", {}).get("success", False) + if self.status_filter == "passed" and success: + filtered.append(task) + elif self.status_filter == "failed" and not success: + filtered.append(task) + return filtered + + def _convert_task( + self, + task: dict[str, Any], + index: int, + metadata: dict[str, Any], + ) -> dict[str, Any]: + definition = task.get("definition", {}) + execution = task.get("execution", {}) + screenshots = task.get("screenshots", []) + execution_steps = execution.get("steps", []) + + task_id = task.get("task_id", f"task_{index:03d}") + episode_id = f"waa_{task_id}" + + steps = [] + for step_idx, step_data in enumerate(execution_steps): + action_data = step_data.get("action", {}) + + screenshot_rel = None + if step_idx < len(screenshots): + screenshot_rel = f"screenshots/{episode_id}/step_{step_idx:03d}.png" + elif step_data.get("screenshot_path"): + screenshot_rel = step_data["screenshot_path"] + + # Normalize coordinates to [0,1] if they look like pixel values + x = action_data.get("x") + y = action_data.get("y") + if x is not None and y is not None: + if not (0 <= x <= 1 and 0 <= y <= 1): + x = x / self.viewport_size[0] + y = y / self.viewport_size[1] + + steps.append({ + "step_index": step_idx, + "screenshot": screenshot_rel, + "action": { + "type": action_data.get("type", "click"), + "x": x, + "y": y, + "text": action_data.get("text"), + "key": action_data.get("key"), + "modifiers": action_data.get("modifiers"), + "scroll_direction": action_data.get("scroll_direction"), + "target_node_id": action_data.get("target_node_id"), + }, + "reasoning": step_data.get("reasoning"), + "timestamp": step_data.get("timestamp"), + }) + + return { + "episode_id": episode_id, + "task_id": task_id, + "instruction": definition.get("instruction", ""), + "success": execution.get("success", False), + "score": execution.get("score", 0.0), + "domain": definition.get("domain"), + "agent_model": metadata.get("model_id"), + "num_steps": len(steps), + "steps": steps, + "metadata": { + "benchmark_name": metadata.get("benchmark_name", "waa"), + "run_name": metadata.get("run_name"), + "total_time_seconds": execution.get("total_time_seconds"), + }, + } + + def _copy_screenshots(self, task: dict[str, Any], episode_id: str) -> None: + screenshots = task.get("screenshots", []) + if not screenshots: + return + dest_dir = self.output_dir / "screenshots" / episode_id + dest_dir.mkdir(parents=True, exist_ok=True) + for i, rel_path in enumerate(screenshots): + src = self.benchmark_dir / rel_path + if src.exists(): + shutil.copy2(src, dest_dir / f"step_{i:03d}.png") + + def _write_manifest( + self, + episodes: list[dict[str, Any]], + metadata: dict[str, Any], + stats: ExportStats, + ) -> None: + manifest = { + "export_timestamp": datetime.now(timezone.utc).isoformat(), + "format": "lightweight-v1", + "source_benchmark": metadata.get("benchmark_name", "waa"), + "source_run": metadata.get("run_name"), + "source_model": metadata.get("model_id"), + "status_filter": self.status_filter, + "statistics": { + "total_tasks": stats.total_tasks, + "exported_tasks": stats.exported_tasks, + "skipped_tasks": stats.skipped_tasks, + "total_steps": stats.total_steps, + "errors": len(stats.errors), + }, + "episodes": [ + { + "episode_id": ep["episode_id"], + "task_id": ep["task_id"], + "instruction": ep["instruction"], + "num_steps": ep["num_steps"], + "success": ep["success"], + "domain": ep.get("domain"), + } + for ep in episodes + ], + } + with open(self.output_dir / "manifest.json", "w") as f: + json.dump(manifest, f, indent=2) + + def _write_jsonl(self, episodes: list[dict[str, Any]]) -> None: + jsonl_path = self.output_dir / "training_samples.jsonl" + with open(jsonl_path, "w") as f: + for ep in episodes: + for step in ep.get("steps", []): + sample = { + "episode_id": ep["episode_id"], + "task_id": ep["task_id"], + "instruction": ep["instruction"], + "step_index": step["step_index"], + "screenshot": step["screenshot"], + "action": step["action"], + "reasoning": step.get("reasoning"), + "success": ep["success"], + "domain": ep.get("domain"), + } + f.write(json.dumps(sample) + "\n") + + +def export_traces_lightweight( + benchmark_dir: str | Path, + output_dir: str | Path, + status_filter: StatusFilter = "passed", + copy_screenshots: bool = True, + viewport_size: tuple[int, int] = (1920, 1200), +) -> list[dict[str, Any]]: + """Export benchmark traces as plain JSON (no openadapt-ml dependency). + + Args: + benchmark_dir: Path to benchmark results directory. + output_dir: Output directory for exported data. + status_filter: Filter by task status ("passed", "failed", "all"). + copy_screenshots: Whether to copy screenshots to output directory. + viewport_size: Default viewport size for coordinate normalization. + + Returns: + List of episode dicts. + """ + exporter = LightweightTraceExporter( + benchmark_dir=Path(benchmark_dir), + output_dir=Path(output_dir), + status_filter=status_filter, + copy_screenshots=copy_screenshots, + viewport_size=viewport_size, + ) + return exporter.export() diff --git a/tests/test_http_agent.py b/tests/test_http_agent.py new file mode 100644 index 0000000..40ac2d4 --- /dev/null +++ b/tests/test_http_agent.py @@ -0,0 +1,261 @@ +"""Tests for HttpAgent.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from openadapt_evals.adapters.base import ( + BenchmarkAction, + BenchmarkObservation, + BenchmarkTask, +) +from openadapt_evals.agents.http_agent import HttpAgent, _parse_action_response + + +@pytest.fixture +def agent() -> HttpAgent: + return HttpAgent(endpoint_url="http://fake-agent:8080") + + +@pytest.fixture +def task() -> BenchmarkTask: + return BenchmarkTask( + task_id="test_1", + instruction="Click the Submit button", + domain="desktop", + ) + + +@pytest.fixture +def observation() -> BenchmarkObservation: + return BenchmarkObservation( + screenshot=b"\x89PNG\r\n\x1a\nfake", + viewport=(1920, 1200), + accessibility_tree={"role": "window", "name": "Test"}, + ) + + +class TestAct: + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_click(self, mock_post, agent, observation, task): + """act() sends observation and parses a click response.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "click", "x": 0.5, "y": 0.3} + mock_post.return_value = mock_resp + + action = agent.act(observation, task) + + assert action.type == "click" + assert action.x == 0.5 + assert action.y == 0.3 + + # Verify the request payload + call_args = mock_post.call_args + payload = call_args.kwargs["json"] + assert payload["instruction"] == "Click the Submit button" + assert payload["task_id"] == "test_1" + assert payload["viewport"] == [1920, 1200] + assert payload["screenshot_b64"] is not None + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_type(self, mock_post, agent, observation, task): + """act() parses a type response.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "type", "text": "hello world"} + mock_post.return_value = mock_resp + + action = agent.act(observation, task) + + assert action.type == "type" + assert action.text == "hello world" + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_done(self, mock_post, agent, observation, task): + """act() parses a done response.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "done"} + mock_post.return_value = mock_resp + + action = agent.act(observation, task) + + assert action.type == "done" + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_connection_error(self, mock_post, agent, observation, task): + """act() returns done with error info on connection failure.""" + import requests + + mock_post.side_effect = requests.ConnectionError("refused") + + action = agent.act(observation, task) + + assert action.type == "done" + assert "connection_failed" in action.raw_action["error"] + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_timeout(self, mock_post, agent, observation, task): + """act() returns done with error info on timeout.""" + import requests + + mock_post.side_effect = requests.Timeout("timed out") + + action = agent.act(observation, task) + + assert action.type == "done" + assert "timeout" in action.raw_action["error"] + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_http_error(self, mock_post, agent, observation, task): + """act() returns done with error info on HTTP error.""" + import requests + + mock_resp = MagicMock() + mock_resp.raise_for_status.side_effect = requests.HTTPError("500") + mock_post.return_value = mock_resp + + action = agent.act(observation, task) + + assert action.type == "done" + assert "http_error" in action.raw_action["error"] + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_no_screenshot(self, mock_post, agent, task): + """act() works when observation has no screenshot.""" + obs = BenchmarkObservation(screenshot=None) + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "done"} + mock_post.return_value = mock_resp + + action = agent.act(obs, task) + + payload = mock_post.call_args.kwargs["json"] + assert payload["screenshot_b64"] is None + assert action.type == "done" + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_act_step_count_increments(self, mock_post, agent, observation, task): + """act() increments step_count on each call.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "click", "x": 0.1, "y": 0.1} + mock_post.return_value = mock_resp + + agent.act(observation, task) + payload1 = mock_post.call_args.kwargs["json"] + assert payload1["step_count"] == 0 + + agent.act(observation, task) + payload2 = mock_post.call_args.kwargs["json"] + assert payload2["step_count"] == 1 + + +class TestReset: + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_reset_resets_step_count(self, mock_post, agent, observation, task): + """reset() resets internal step count.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = {"type": "done"} + mock_post.return_value = mock_resp + + agent.act(observation, task) + assert agent._step_count == 1 + + agent.reset() + assert agent._step_count == 0 + + @patch("openadapt_evals.agents.http_agent.requests.post") + def test_reset_tolerates_missing_endpoint(self, mock_post, agent): + """reset() does not fail if /reset endpoint is not available.""" + import requests + + mock_post.side_effect = requests.ConnectionError("no /reset") + + agent.reset() # Should not raise + + +class TestHealthCheck: + @patch("openadapt_evals.agents.http_agent.requests.get") + def test_health_check_success(self, mock_get, agent): + """health_check() returns True when endpoint responds 200.""" + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_get.return_value = mock_resp + + assert agent.health_check() is True + + @patch("openadapt_evals.agents.http_agent.requests.get") + def test_health_check_failure(self, mock_get, agent): + """health_check() returns False on connection error.""" + import requests + + mock_get.side_effect = requests.ConnectionError() + + assert agent.health_check() is False + + @patch("openadapt_evals.agents.http_agent.requests.get") + def test_health_check_non_200(self, mock_get, agent): + """health_check() returns False on non-200 status.""" + mock_resp = MagicMock() + mock_resp.status_code = 503 + mock_get.return_value = mock_resp + + assert agent.health_check() is False + + +class TestParseActionResponse: + def test_click_action(self): + action = _parse_action_response({"type": "click", "x": 0.5, "y": 0.3}) + assert action.type == "click" + assert action.x == 0.5 + assert action.y == 0.3 + + def test_type_action(self): + action = _parse_action_response({"type": "type", "text": "hello"}) + assert action.type == "type" + assert action.text == "hello" + + def test_key_action(self): + action = _parse_action_response( + {"type": "key", "key": "Enter", "modifiers": ["ctrl"]} + ) + assert action.type == "key" + assert action.key == "Enter" + assert action.modifiers == ["ctrl"] + + def test_scroll_action(self): + action = _parse_action_response( + {"type": "scroll", "scroll_direction": "down"} + ) + assert action.type == "scroll" + assert action.scroll_direction == "down" + + def test_drag_action(self): + action = _parse_action_response( + {"type": "drag", "x": 0.1, "y": 0.2, "end_x": 0.8, "end_y": 0.9} + ) + assert action.type == "drag" + assert action.x == 0.1 + assert action.end_x == 0.8 + + def test_element_grounding(self): + action = _parse_action_response( + {"type": "click", "target_node_id": "btn_42", "target_role": "button"} + ) + assert action.target_node_id == "btn_42" + assert action.target_role == "button" + + def test_missing_type_defaults_to_done(self): + action = _parse_action_response({}) + assert action.type == "done" + + def test_raw_action_preserved(self): + data = {"type": "click", "x": 0.5, "y": 0.5, "extra_field": "kept"} + action = _parse_action_response(data) + assert action.raw_action == data diff --git a/tests/test_lightweight_trace_export.py b/tests/test_lightweight_trace_export.py new file mode 100644 index 0000000..694221b --- /dev/null +++ b/tests/test_lightweight_trace_export.py @@ -0,0 +1,285 @@ +"""Tests for LightweightTraceExporter.""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import patch + +import pytest + +from openadapt_evals.benchmarks.trace_export import ( + LightweightTraceExporter, + export_traces_lightweight, +) + + +@pytest.fixture +def benchmark_dir(tmp_path: Path) -> Path: + """Create a fake benchmark results directory.""" + bm = tmp_path / "benchmark_results" / "test_run" + bm.mkdir(parents=True) + + # metadata.json + (bm / "metadata.json").write_text( + json.dumps({ + "benchmark_name": "waa", + "run_name": "test_run", + "model_id": "test-model-v1", + "created_at": "2026-03-16T00:00:00", + }) + ) + + # summary.json + (bm / "summary.json").write_text( + json.dumps({ + "num_tasks": 2, + "num_success": 1, + "success_rate": 0.5, + }) + ) + + # tasks directory with screenshots + tasks_dir = bm / "tasks" + tasks_dir.mkdir() + + for task_idx, (success, score) in enumerate([(True, 1.0), (False, 0.0)]): + task_dir = tasks_dir / f"task_{task_idx:03d}" + task_dir.mkdir() + + # task result JSON + (task_dir / "result.json").write_text( + json.dumps({ + "task_id": f"task_{task_idx:03d}", + "definition": { + "instruction": f"Do thing {task_idx}", + "domain": "browser", + }, + "execution": { + "success": success, + "score": score, + "total_time_seconds": 10.0 + task_idx, + "steps": [ + { + "action": { + "type": "click", + "x": 960, + "y": 600, + }, + "reasoning": "Clicking button", + "timestamp": "2026-03-16T00:00:01", + }, + { + "action": { + "type": "type", + "text": "hello", + }, + }, + ], + }, + "screenshots": [ + f"tasks/task_{task_idx:03d}/step_000.png", + f"tasks/task_{task_idx:03d}/step_001.png", + ], + }) + ) + + # Fake screenshot files + (task_dir / "step_000.png").write_bytes(b"fake-png-0") + (task_dir / "step_001.png").write_bytes(b"fake-png-1") + + return bm + + +@pytest.fixture +def output_dir(tmp_path: Path) -> Path: + return tmp_path / "output" + + +def _mock_load_metadata(bm_dir): + return json.loads((bm_dir / "metadata.json").read_text()) + + +def _mock_load_summary(bm_dir): + return json.loads((bm_dir / "summary.json").read_text()) + + +def _mock_load_tasks(bm_dir): + tasks = [] + tasks_dir = bm_dir / "tasks" + if not tasks_dir.exists(): + return tasks + for task_dir in sorted(tasks_dir.iterdir()): + result_file = task_dir / "result.json" + if result_file.exists(): + tasks.append(json.loads(result_file.read_text())) + return tasks + + +class TestLightweightTraceExporter: + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_export_passed_only(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Exports only passed tasks when status_filter='passed'.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="passed", + copy_screenshots=False, + ) + episodes = exporter.export() + + assert len(episodes) == 1 + assert episodes[0]["success"] is True + assert episodes[0]["task_id"] == "task_000" + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_export_all(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Exports all tasks when status_filter='all'.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + ) + episodes = exporter.export() + + assert len(episodes) == 2 + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_episode_schema(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Exported episodes have the expected schema.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + ) + episodes = exporter.export() + + ep = episodes[0] + assert "episode_id" in ep + assert "task_id" in ep + assert "instruction" in ep + assert "success" in ep + assert "score" in ep + assert "domain" in ep + assert "agent_model" in ep + assert "num_steps" in ep + assert "steps" in ep + assert "metadata" in ep + + step = ep["steps"][0] + assert "step_index" in step + assert "action" in step + assert "type" in step["action"] + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_coordinate_normalization(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Pixel coordinates are normalized to [0,1].""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + viewport_size=(1920, 1200), + ) + episodes = exporter.export() + + click_step = episodes[0]["steps"][0] + assert click_step["action"]["type"] == "click" + assert click_step["action"]["x"] == pytest.approx(960 / 1920) + assert click_step["action"]["y"] == pytest.approx(600 / 1200) + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_json_files_written(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Episode JSON files and manifest are written to disk.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + ) + exporter.export() + + # Episode files + episode_files = list((output_dir / "episodes").glob("*.json")) + assert len(episode_files) == 2 + + # Manifest + manifest_path = output_dir / "manifest.json" + assert manifest_path.exists() + manifest = json.loads(manifest_path.read_text()) + assert manifest["format"] == "lightweight-v1" + assert len(manifest["episodes"]) == 2 + + # JSONL + jsonl_path = output_dir / "training_samples.jsonl" + assert jsonl_path.exists() + lines = jsonl_path.read_text().strip().split("\n") + assert len(lines) == 4 # 2 tasks x 2 steps each + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_screenshot_copy(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Screenshots are copied when copy_screenshots=True.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="passed", + copy_screenshots=True, + ) + exporter.export() + + screenshots_dir = output_dir / "screenshots" / "waa_task_000" + assert screenshots_dir.exists() + assert (screenshots_dir / "step_000.png").exists() + assert (screenshots_dir / "step_001.png").exists() + + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_jsonl_format(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """JSONL lines are valid JSON with expected fields.""" + exporter = LightweightTraceExporter( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + ) + exporter.export() + + jsonl_path = output_dir / "training_samples.jsonl" + for line in jsonl_path.read_text().strip().split("\n"): + sample = json.loads(line) + assert "episode_id" in sample + assert "task_id" in sample + assert "instruction" in sample + assert "step_index" in sample + assert "action" in sample + assert "success" in sample + + +class TestConvenienceFunction: + @patch("openadapt_evals.benchmarks.trace_export.load_task_results", side_effect=_mock_load_tasks) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_summary", side_effect=_mock_load_summary) + @patch("openadapt_evals.benchmarks.trace_export.load_benchmark_metadata", side_effect=_mock_load_metadata) + def test_export_traces_lightweight(self, mock_meta, mock_summary, mock_tasks, benchmark_dir, output_dir): + """Convenience function produces same results.""" + results = export_traces_lightweight( + benchmark_dir=benchmark_dir, + output_dir=output_dir, + status_filter="all", + copy_screenshots=False, + ) + assert len(results) == 2 diff --git a/tests/test_rl_env_per_step_eval.py b/tests/test_rl_env_per_step_eval.py new file mode 100644 index 0000000..7ec75f0 --- /dev/null +++ b/tests/test_rl_env_per_step_eval.py @@ -0,0 +1,116 @@ +"""Tests for RLEnvironment per-step evaluation feature.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest + +from openadapt_evals.adapters import ( + BenchmarkAction, + BenchmarkObservation, + WAAMockAdapter, +) +from openadapt_evals.adapters.base import BenchmarkResult +from openadapt_evals.adapters.rl_env import RLEnvironment, RolloutStep + + +@pytest.fixture +def mock_env_with_eval() -> RLEnvironment: + """RLEnvironment with evaluate_every_step=True.""" + adapter = WAAMockAdapter(num_tasks=5, domains=["notepad"]) + task_id = adapter.list_tasks()[0].task_id + return RLEnvironment( + adapter=adapter, + default_task_id=task_id, + evaluate_every_step=True, + ) + + +@pytest.fixture +def mock_env_default() -> RLEnvironment: + """RLEnvironment with evaluate_every_step=False (default).""" + adapter = WAAMockAdapter(num_tasks=5, domains=["notepad"]) + task_id = adapter.list_tasks()[0].task_id + return RLEnvironment(adapter=adapter, default_task_id=task_id) + + +class TestPerStepEvalDisabled: + def test_no_evaluation_score_by_default(self, mock_env_default: RLEnvironment): + """With evaluate_every_step=False, info does NOT contain evaluation_score.""" + mock_env_default.reset() + result = mock_env_default.step(BenchmarkAction(type="click", x=100.0, y=100.0)) + + assert "evaluation_score" not in result.info + assert "evaluation_success" not in result.info + assert "evaluation_error" not in result.info + + +class TestPerStepEvalEnabled: + def test_evaluation_score_present(self, mock_env_with_eval: RLEnvironment): + """With evaluate_every_step=True, info contains evaluation_score.""" + mock_env_with_eval.reset() + result = mock_env_with_eval.step( + BenchmarkAction(type="click", x=100.0, y=100.0) + ) + + assert "evaluation_score" in result.info + assert isinstance(result.info["evaluation_score"], float) + assert 0.0 <= result.info["evaluation_score"] <= 1.0 + assert "evaluation_success" in result.info + assert isinstance(result.info["evaluation_success"], bool) + + def test_reward_still_zero(self, mock_env_with_eval: RLEnvironment): + """Per-step evaluation does NOT change the reward signal.""" + mock_env_with_eval.reset() + result = mock_env_with_eval.step( + BenchmarkAction(type="click", x=100.0, y=100.0) + ) + + assert result.reward == 0.0 + + def test_evaluation_error_handled_gracefully(self): + """If evaluate() raises, info contains evaluation_error and step succeeds.""" + adapter = WAAMockAdapter(num_tasks=5, domains=["notepad"]) + task_id = adapter.list_tasks()[0].task_id + + # Patch evaluate to raise + original_evaluate = adapter.evaluate + adapter.evaluate = MagicMock(side_effect=RuntimeError("eval server down")) + + env = RLEnvironment( + adapter=adapter, + default_task_id=task_id, + evaluate_every_step=True, + ) + env.reset() + result = env.step(BenchmarkAction(type="click", x=100.0, y=100.0)) + + # Step should succeed despite eval failure + assert isinstance(result, RolloutStep) + assert result.reward == 0.0 + assert "evaluation_error" in result.info + assert "eval server down" in result.info["evaluation_error"] + assert "evaluation_score" not in result.info + + def test_evaluation_called_each_step(self, mock_env_with_eval: RLEnvironment): + """evaluate() is called on every step when enabled.""" + mock_env_with_eval.reset() + + # Patch evaluate to track calls + call_count = 0 + original_evaluate = mock_env_with_eval._adapter.evaluate + + def counting_evaluate(task): + nonlocal call_count + call_count += 1 + return original_evaluate(task) + + mock_env_with_eval._adapter.evaluate = counting_evaluate + + for _ in range(3): + mock_env_with_eval.step( + BenchmarkAction(type="click", x=100.0, y=100.0) + ) + + assert call_count == 3