diff --git a/examples/terminal_bench/README.md b/examples/terminal_bench/README.md new file mode 100644 index 0000000000..9ff7b7b9b0 --- /dev/null +++ b/examples/terminal_bench/README.md @@ -0,0 +1,318 @@ +# Terminal Agent Training with Terminal Bench 1.0 + +## Overview + +This example demonstrates how to train terminal agents with AReaL's PPO/GRPO-style +training pipeline on Terminal Bench tasks. + +It is an AReaL adaptation of the training workflow originally developed in +[SETA](https://github.com/camel-ai/seta), with the environment management and rollout +loop refactored into an AReaL example. In this example, we focus on an easy subset of +Terminal Bench 1.0 derived from the SETA conversion of Terminal Bench tasks. + +[Terminal Bench](https://github.com/harbor-framework/terminal-bench) is a benchmark for +evaluating AI agents in real terminal environments. It provides a task dataset plus an +execution harness, where each task includes a natural language instruction, a runnable +environment, and outcome-based verification. This example targets the Terminal Bench 1.0 +style workflow used in SETA and trains on the easy subset prepared for that pipeline. + +## Relation to SETA + +This directory is not a copy of SETA. It is a conversion of the Terminal Bench training +path in SETA into AReaL's workflow abstraction and launcher model. + +Compared with SETA: + +- updated to work with the current AReaL stack (`v1.0.2`) +- supports single-controller mode through AReaL's `PPOTrainer` +- rollout logic is implemented as an AReaL `RolloutWorkflow` +- the CAMEL-based terminal agent is packaged as an example-local agent module +- Terminal Bench task environments are still created and verified through + `terminal_bench` + +## Code Architecture + +- `train.py`: Entry point that loads config, builds the dataset, and launches AReaL + training. +- `workflow/camel_rlvr_workflow.py`: Rollout workflow that builds task images, runs + trajectories, collects rewards, and exports interactions. +- `workflow/pre_build_tasks_utils.py`: Helper for pre-building Terminal Bench task + images before rollout. +- `agent/camel_terminal_agent.py`: CAMEL-based terminal agent wrapper used for each + trajectory. +- `agent/chat_agent_trace.py`: Traced `ChatAgent` variant used by the agent. +- `agent/prompts.py`: Developer-agent prompt construction. +- `agent_rl_config.py`: Example-specific config extensions on top of AReaL `GRPOConfig`. + +## Included Configurations + +Two example configs are currently included: + +| Config | Backend | Cluster Target | Use Case | +| ------------------------- | ------- | --------------------- | ----------------------------- | +| `config_tb_sglang.yaml` | SGLang | single-node GPU setup | local or small-scale training | +| `config_tb_vllm_npu.yaml` | vLLM | Ascend NPU setup | NPU training | + +## Running the Example + +### Prerequisites + +Please make sure AReaL itself is already installed and working. + +You will need: + +- Python `>=3.10` +- a working AReaL environment +- Docker CLI available inside the AReaL runtime +- Docker Compose and Buildx available as Docker CLI plugins +- the `terminal_bench` Python package + +For NPU usage, you will also need: + +- Ascend drivers and runtime +- access to the required `/dev/davinci*` devices +- `sglang[srt_npu]`, since this workflow currently depends on SGLang tool parsing even + when using the vLLM-based config + +### Recommended Runtime Model + +This example is intended to run inside the AReaL runtime, with host Docker mounted into +that runtime container. + +That structure is important: Terminal Bench task environments are launched via +`docker compose`, and the `docker compose` invocation needs to happen from the same +AReaL runtime that is performing rollout and evaluation. + +The recommended setup is: + +- run AReaL inside a runtime container +- mount the host Docker socket into that container +- mount the Docker CLI and Docker CLI plugins into that container +- run this example from inside that AReaL runtime container + +Minimum mounts: + +```bash +-v /var/run/docker.sock:/var/run/docker.sock +-v /usr/bin/docker:/usr/bin/docker:ro +-v /usr/libexec/docker/cli-plugins:/usr/libexec/docker/cli-plugins:ro +``` + +### Install Example Dependencies + +From the AReaL repo root: + +```bash +cd examples/terminal_bench +pip install -e . +``` + +This installs the example-scoped dependencies declared in +[`pyproject.toml`](./pyproject.toml): + +- `ipython` +- `ruamel.yaml` +- `streamlit` +- `sqlalchemy` +- `docker` +- `camel_ai` +- `terminal_bench` + +If you are using the NPU / vLLM path, also install the optional extra: + +```bash +pip install -e ".[npu]" +``` + +If `terminal_bench` fails to install because of an upstream Python-version constraint +mismatch, which can happen on some NPU runtime images, install it from source and relax +its Python requirement to `>=3.11`: + +```bash +git clone https://github.com/harbor-framework/terminal-bench.git +cd terminal-bench +``` + +Edit `pyproject.toml`: + +```toml +requires-python = ">=3.11" +``` + +Then install it manually: + +```bash +pip install --no-deps -e . +``` + +If you use this fallback path, you can install the rest of the example dependencies +separately: + +```bash +cd ../AReaL/examples/terminal_bench +pip install --no-deps -e . +pip install ipython ruamel.yaml streamlit sqlalchemy docker +``` + +### Manual Dependency Path + +If you already manage some dependencies separately, you can use the same manual setup +pattern used in SETA. + +Install CAMEL and Terminal Bench from a SETA checkout: + +```bash +git clone https://github.com/camel-ai/seta.git +cd seta +git submodule update --init --recursive + +cd external/camel +pip install --no-deps -e . + +cd ../terminal-bench +pip install --no-deps -e . +``` + +Then install the remaining example dependencies: + +```bash +pip install ipython ruamel.yaml streamlit sqlalchemy docker +``` + +### Install SGLang for NPU + +One working installation path from the original setup is: + +```bash +git clone -b v0.5.6.post2 https://github.com/sgl-project/sglang.git +cd sglang +mv python/pyproject_other.toml python/pyproject.toml +pip install -e python[srt_npu] --no-deps +``` + +### Configure `tiktoken` + +This example assumes `o200k_base.tiktoken` is cached locally. + +```bash +export TIKTOKEN_CACHE_DIR=/tmp/tiktoken-cache +mkdir -p "$TIKTOKEN_CACHE_DIR" +curl -k -o "$TIKTOKEN_CACHE_DIR/o200k_base.tiktoken" \ + https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken +``` + +If you need the hashed cache filename used by `tiktoken`, compute it with: + +```bash +python3 - <<'PY' +import hashlib +url = "https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken" +print(hashlib.sha1(url.encode()).hexdigest()) +PY +``` + +### Prepare the Dataset + +This example does not work with the parquet file alone. The parquet rows point to task +assets that must also exist under `AReaL/dataset/`. + +You should prepare the converted Terminal Bench dataset from either of these sources: + +- SETA: https://github.com/camel-ai/seta +- terminal-bench-seta: https://github.com/ActuallyEdward/terminal-bench-seta + +For this example, those two sources should be treated as equivalent dataset sources. + +The configs in this directory expect the easy-subset parquet to be available at: + +```bash +AReaL/dataset/tbench-tasks_convert/tbench-selected-tasks-easy.parquet +``` + +and they also expect the referenced task files and directories from the same converted +dataset to be present under `AReaL/dataset/`. + +One workable setup is: + +```bash +cd AReaL/dataset +git clone https://github.com/ActuallyEdward/terminal-bench-seta.git +``` + +The `train_filtered_easy.parquet` file is also provided in +[`terminal-bench-seta`](https://github.com/ActuallyEdward/terminal-bench-seta). + +Then place or link the easy-subset parquet from that checkout at the path expected by +the configs: + +```bash +mkdir -p AReaL/dataset/tbench-tasks_convert +cp AReaL/dataset/terminal-bench-seta/train_filtered_easy.parquet \ + AReaL/dataset/tbench-tasks_convert/tbench-selected-tasks-easy.parquet +``` + +If you source the data from SETA instead, use the same converted dataset layout and +place the parquet and referenced task assets under `AReaL/dataset/` in the same way. + +### Docker Compose / Buildx + +Docker Compose and Buildx should be available inside the AReaL runtime at: + +```bash +/usr/libexec/docker/cli-plugins/ +``` + +If needed: + +```bash +chmod +x /usr/libexec/docker/cli-plugins/docker-compose +chmod +x /usr/libexec/docker/cli-plugins/docker-buildx +``` + +### Training Commands + +The following commands are intended to be executed from the AReaL repo root. + +#### SGLang + +```bash +python3 examples/terminal_bench/train.py \ + --config examples/terminal_bench/config_tb_sglang.yaml +``` + +#### vLLM on NPU + +```bash +python3 examples/terminal_bench/train.py \ + --config examples/terminal_bench/config_tb_vllm_npu.yaml +``` + +## Results + +The following figure shows a representative training reward curve on the easy subset +derived from SETA: + +

+ +

+ +On this setup, we observe reward-curve behavior qualitatively similar to the GRPO +training trends reported in +[terminal-bench-rl](https://github.com/Danau5tin/terminal-bench-rl). This is a +directional comparison of training dynamics rather than a claim of identical setup, +identical scale, or identical leaderboard numbers. + +## Notes + +1. This example currently targets the easy subset used in the SETA conversion, not the + full Terminal Bench task distribution. +1. `pyproject.toml` in this directory is intentionally example-scoped. It does not + replace installing AReaL itself. +1. Docker, proxy, model-mount, and NPU device details are environment-specific and + should be adapted locally. + +## References + +- SETA: https://github.com/camel-ai/seta +- Terminal Bench: https://github.com/harbor-framework/terminal-bench +- Terminal-Bench-RL: https://github.com/Danau5tin/terminal-bench-rl diff --git a/examples/terminal_bench/__init__.py b/examples/terminal_bench/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/terminal_bench/agent/__init__.py b/examples/terminal_bench/agent/__init__.py new file mode 100644 index 0000000000..fe6332c6d4 --- /dev/null +++ b/examples/terminal_bench/agent/__init__.py @@ -0,0 +1,9 @@ +from .camel_terminal_agent import CamelTerminalAgent +from .chat_agent_trace import ChatAgentTrace +from .prompts import get_developer_agent_prompt + +__all__ = [ + "CamelTerminalAgent", + "ChatAgentTrace", + "get_developer_agent_prompt", +] diff --git a/examples/terminal_bench/agent/camel_terminal_agent.py b/examples/terminal_bench/agent/camel_terminal_agent.py new file mode 100644 index 0000000000..5547aba768 --- /dev/null +++ b/examples/terminal_bench/agent/camel_terminal_agent.py @@ -0,0 +1,349 @@ +from __future__ import annotations + +import asyncio +import datetime +import json +import os +from concurrent.futures import ThreadPoolExecutor +from functools import partial +from pathlib import Path + +from agent_rl_config import TaskTimeouts +from camel.messages import BaseMessage +from camel.toolkits import FunctionTool, TerminalToolkit +from transformers import PreTrainedTokenizerFast + +from terminal_bench.handlers.trial_handler import TrialHandler +from terminal_bench.parsers.base_parser import UnitTestStatus +from terminal_bench.parsers.parser_factory import ParserFactory +from terminal_bench.terminal.docker_compose_manager import DockerComposeManager +from terminal_bench.terminal.terminal import Terminal + +from areal.experimental.camel.openai_model import AReaLOpenAICompatibleModel +from areal.utils.perf_tracer import ( + Category, + atrace_scope, + atrace_session_phase, + session_context, + trace_perf, + trace_scope, +) + +from .chat_agent_trace import ChatAgentTrace +from .prompts import get_developer_agent_prompt + +DATASET_ROOT = Path(__file__).resolve().parents[3] / "dataset" + + +class CamelTerminalAgent: + def __init__( + self, + tokenizer: PreTrainedTokenizerFast | None = None, + max_tokens_per_turn: int = 1024, + max_total_tokens: int = 40000, + output_path: str = "CamelTerminalAgent_Output", + max_iteration: int = 50, + executor: ThreadPoolExecutor | None = None, + task_timeouts: TaskTimeouts | None = None, + non_think_mode: bool = True, + encourage_completion_reward: bool = False, + ): + self.tokenizer = tokenizer + self.max_tokens_per_turn = max_tokens_per_turn + self.max_total_tokens = max_total_tokens + self.output_path = output_path + self.max_iteration = max_iteration + self.task_timeouts = task_timeouts or TaskTimeouts() + self.executor = executor + self.non_think_mode = non_think_mode + self.encourage_completion_reward = encourage_completion_reward + assert self.executor is not None, ( + "Executor must be provided to CamelTerminalAgent" + ) + + @session_context() + @trace_perf("CamelTerminalAgent.run_agent", category=Category.COMPUTE) + async def run_agent( + self, + data, + client, + uid: str | None = None, + traj_i: int = 0, + ) -> float | None: + """Execute a complete agent workflow: setup environment, run agent, cleanup.""" + task_name = data.get("task_name") + self.task_name = task_name + self.uid = uid + self.traj_i = traj_i + self.meta_info = {} + reward = None + + print(f"Running task {task_name}") + + try: + async with atrace_scope( + f"reset_env:{task_name}, traj:{traj_i}", + args={"uid": uid, "timeout": self.task_timeouts._reset_env}, + ): + prompt = await self.run_in_executor( + self._reset_env, + data, + uid, + timeout=self.task_timeouts._reset_env, + ) + print(f"env started: {task_name}") + + async with atrace_scope( + f"reset_agent:{task_name}, traj:{traj_i}", + args={"uid": uid, "timeout": self.task_timeouts._reset_agent}, + ): + await self.run_in_executor( + self._reset_agent, + client, + timeout=self.task_timeouts._reset_agent, + ) + + try: + async with atrace_scope( + f"agent_astep:{task_name}, traj:{traj_i}", + args={"uid": uid, "timeout": self.task_timeouts.agent_astep}, + ): + self.response = await self.agent.astep(prompt) + except TimeoutError as exc: + print(f"Agent step timeout for task {task_name}: {exc}") + print(f"Task {task_name}: agent responded") + + async with atrace_session_phase( + "reward", + start_payload={ + "task_name": task_name, + "traj_i": traj_i, + "uid": uid, + "timeout": self.task_timeouts._evaluate_completion_sync, + }, + ): + async with atrace_scope( + f"evaluate_completion_sync:{task_name}, traj:{traj_i}", + args={ + "uid": uid, + "timeout": self.task_timeouts._evaluate_completion_sync, + }, + ): + print("try to set rewards") + reward = await self.run_in_executor( + self._evaluate_completion_sync, + timeout=self.task_timeouts._evaluate_completion_sync, + ) + print(f"reward from run in executor is set as {reward}") + client.set_last_reward(reward) + + except TimeoutError as exc: + print(f"Timeout for task {task_name}: {exc}") + except Exception as exc: + print(f"Error in task {task_name}: {exc}") + import traceback + + traceback.print_exc() + finally: + try: + if hasattr(self, "terminal") and self.terminal is not None: + async with atrace_scope( + f"cleanup_env:{task_name}, traj:{traj_i}", + args={"uid": uid, "timeout": self.task_timeouts._cleanup}, + ): + await self.run_in_executor( + self._close_env, + timeout=self.task_timeouts._cleanup, + ) + print(f"Task {task_name}: cleaned up") + except Exception as exc: + print(f"Cleanup error for task {task_name}: {exc}") + finally: + return reward + + def _close_env(self): + if self.terminal: + self.terminal.stop(timeout=self.task_timeouts._cleanup) + + async def run_in_executor(self, fn, *args, timeout: float | None = None, **kwargs): + loop = asyncio.get_running_loop() + executor_task = loop.run_in_executor( + self.executor, + partial(fn, *args, **kwargs), + ) + if timeout is not None: + return await asyncio.wait_for(executor_task, timeout=timeout) + return await executor_task + + def _reset_env(self, task: dict, uid: str | None): + output_path = Path(self.output_path).resolve() + output_path.mkdir(parents=True, exist_ok=True) + + task_path = DATASET_ROOT / task.get("task_path") + print(f"Task path: {task_path}") + instruction = task.get("instruction") + task_id = task.get("task_name") + + self.trial_handler = TrialHandler( + trial_name=f"{task_id}.{uid}.areal-run", + input_path=task_path, + output_path=output_path, + ) + + task_config = self.trial_handler.task + self.parser = ParserFactory.get_parser(task_config.parser_name) + + self.client_container_name = f"{self.trial_handler.client_container_name}" + self.terminal = Terminal( + client_container_name=self.trial_handler.client_container_name, + client_image_name=self.trial_handler.client_image_name, + docker_compose_path=self.trial_handler.task_paths.docker_compose_path, + docker_image_name_prefix=self.trial_handler.docker_image_name_prefix, + sessions_logs_path=self.trial_handler.trial_paths.sessions_path, + agent_logs_path=self.trial_handler.trial_paths.agent_logging_dir, + no_rebuild=True, + cleanup=False, + ) + with trace_scope( + f"reset_env.start_terminal:{task_id}, traj:{self.traj_i}", + args={"uid": uid}, + ): + self.terminal.start(timeout=self.task_timeouts._reset_env) + + return f"Task name:{self.task_name}\nTask instruction: {instruction}" + + def _reset_agent(self, client): + session_logs_dir = ( + self.trial_handler.trial_paths.sessions_path + / "terminal_toolkit_session_logs" + ) + terminal_toolkit = TerminalToolkit( + timeout=20.0, + working_directory=None, + use_docker_backend=True, + docker_container_name=self.trial_handler.client_container_name, + session_logs_dir=session_logs_dir, + safe_mode=False, + ) + tools = [ + FunctionTool(terminal_toolkit.shell_exec), + FunctionTool(terminal_toolkit.shell_view), + FunctionTool(terminal_toolkit.shell_write_to_process), + FunctionTool(terminal_toolkit.shell_write_content_to_file), + ] + + system_message = get_developer_agent_prompt( + current_date=str(datetime.date.today()), + system="Linux (in Docker)", + machine="aarch64", + is_workforce=False, + non_think_mode=self.non_think_mode, + ) + print("starting chat agent") + os.environ["CAMEL_MODEL_LOG_ENABLED"] = "True" + os.environ["CAMEL_LOG_DIR"] = str( + self.trial_handler.trial_paths.sessions_path.parent / "CAMEL_LOG_DIR" + ) + model = AReaLOpenAICompatibleModel( + openai_client=client, + tokenizer=self.tokenizer, + model_type="areal", + model_config_dict={ + "max_completion_tokens": self.max_tokens_per_turn, + }, + ) + self.agent = ChatAgentTrace( + system_message=BaseMessage.make_assistant_message( + role_name="Developer Agent", + content=system_message, + ), + model=model, + tools=tools, + token_limit=self.max_total_tokens, + step_timeout=self.task_timeouts.agent_astep, + ) + self.agent.reset() + self.agent.max_iteration = self.max_iteration + print(f"{self.task_name}: agent started") + + def _evaluate_completion_sync(self) -> float: + assert self.trial_handler is not None and self.terminal is not None + + paths = [self.trial_handler.task_paths.run_tests_path] + if self.trial_handler.task_paths.test_dir.exists(): + paths.append(self.trial_handler.task_paths.test_dir) + with trace_scope( + f"evaluate_completion_sync.copy_tests:{self.task_name}, traj:{self.traj_i}" + ): + self.terminal.copy_to_container( + paths=paths, + container_dir=str(DockerComposeManager.CONTAINER_TEST_DIR), + ) + + print("running tests in a new shell") + with trace_scope( + f"evaluate_completion_sync.create_test_session:{self.task_name}, traj:{self.traj_i}" + ): + test_session = self.terminal.create_session( + "tests", + is_active_stream=False, + as_configured_user=False, + ) + + test_script_path = str(DockerComposeManager.CONTAINER_TEST_DIR / "run-tests.sh") + try: + with trace_scope( + f"evaluate_completion_sync.run_tests:{self.task_name}, traj:{self.traj_i}" + ): + test_session.send_keys( + [f"bash {test_script_path}", "Enter"], + block=True, + max_timeout_sec=min( + self.task_timeouts._evaluate_completion_sync, + 4 * self.trial_handler.task.max_test_timeout_sec, + ), + ) + test_output = test_session.capture_pane(capture_entire=True) + parser_results = self.parser.parse(test_output) + + all_passed = parser_results and all( + status == UnitTestStatus.PASSED for status in parser_results.values() + ) + pass_ratio = ( + sum( + 1 + for status in parser_results.values() + if status == UnitTestStatus.PASSED + ) + / len(parser_results) + if parser_results + else 0.0 + ) + results_path = str( + self.trial_handler.trial_paths.sessions_path.parent + / "test_results.json" + ) + result_dict = { + "test_results": { + k: (v == UnitTestStatus.PASSED) for k, v in parser_results.items() + }, + "all_passed": all_passed, + "pass_ratio": pass_ratio, + } + try: + result_dict["iteration"] = len(self.response.info["tool_calls"]) + result_dict.update(self.response.info["usage"]) + except Exception: + pass + with open(results_path, "w") as f: + json.dump(result_dict, f, indent=4) + + except Exception as exc: + print(exc) + all_passed = False + pass_ratio = 0.0 + + if self.encourage_completion_reward and pass_ratio == 1.0: + pass_ratio += 1.0 + + return pass_ratio diff --git a/examples/terminal_bench/agent/chat_agent_trace.py b/examples/terminal_bench/agent/chat_agent_trace.py new file mode 100644 index 0000000000..812ac3a377 --- /dev/null +++ b/examples/terminal_bench/agent/chat_agent_trace.py @@ -0,0 +1,381 @@ +import asyncio +import atexit +import json +import os +import re +import textwrap +import threading +import time +import uuid +from typing import ( + TYPE_CHECKING, +) + +from camel.agents import ChatAgent +from camel.agents._types import ToolCallRequest +from camel.logger import get_logger +from camel.messages import ( + BaseMessage, + FunctionCallingMessage, +) +from camel.prompts import TextPrompt +from camel.responses import ChatAgentResponse +from camel.types import ( + OpenAIBackendRole, +) +from camel.types.agents import ToolCallingRecord +from pydantic import BaseModel + +from areal.utils.perf_tracer import ( + Category, + atrace_scope, + atrace_session_phase, +) + +if TYPE_CHECKING: + pass + +logger = get_logger(__name__) + +# Cleanup temp files on exit +_temp_files: set[str] = set() +_temp_files_lock = threading.Lock() + + +def _cleanup_temp_files(): + with _temp_files_lock: + for path in _temp_files: + try: + os.unlink(path) + except Exception: + pass + + +atexit.register(_cleanup_temp_files) + +SIMPLE_FORMAT_PROMPT = TextPrompt( + textwrap.dedent( + """\ + Please format the following content: + + {content} + """ + ) +) + + +class ChatAgentTrace(ChatAgent): + """A ChatAgent with performance tracing capabilities.""" + + def __init__(self, *args, **kwargs): + """Initialize ChatAgentTrace with parse error tracking.""" + super().__init__(*args, **kwargs) + self.max_parse_errors = kwargs.get("max_parse_errors", 3) + self.parse_error_count = 0 + + async def adetect_tool_calls_parse_error(self, response): + r""" + Asynchronously detect tool calls in the response content using Qwen25Detector. + if the model is Qwen 2.5 or Qwen 3. + if there's tool call tokens detected, but got json parse failure, format the information into a tool call record, + so that the agent can handle the error next step. + add a self.count_parse_error, so that we can limit the number of parse errors we handle in one step. if max reached, just + break the loop. + + Args: + response: The model response to check for parse errors + + Returns: + Optional[ToolCallingRecord]: A tool calling record with error information if parse error detected, None otherwise + """ + bot_token = "\n" + eot_token = "\n" + + # Check if we've reached max parse errors + if self.parse_error_count >= self.max_parse_errors: + logger.warning( + f"Max parse errors ({self.max_parse_errors}) reached, stopping error handling" + ) + return None + + # Extract content from response + if not response.output_messages: + return None + + content = response.output_messages[0].content + if not content or bot_token not in content: + return None + + # Find all potential tool call blocks + pattern = rf"{re.escape(bot_token)}(.*?){re.escape(eot_token)}" + matches = re.findall(pattern, content, re.DOTALL) + + if not matches: + return None + + # Check each match for JSON parse errors + for match_text in matches: + try: + # Try to parse the JSON + json.loads(match_text.strip()) + # If successful, no error for this match + continue + except json.JSONDecodeError as e: + # Found a parse error + self.parse_error_count += 1 + logger.warning( + f"Detected JSON parse error (count: {self.parse_error_count}/{self.max_parse_errors}): {str(e)}" + ) + logger.warning(f"Problematic content: {match_text[:200]}...") + + # Create an error tool calling record + error_message = ( + f"JSON Parse Error: {str(e)}\n" + f"The tool call format is incorrect. Please ensure:\n" + f"1. The JSON is valid and properly formatted\n" + f"2. All quotes are properly escaped\n" + f"3. The structure matches: {{'name': 'function_name', 'arguments': {{}}}}\n" + f"Problematic content (first 200 chars): {match_text[:200]}..." + ) + + # Generate a unique error tool call ID + error_tool_call_id = f"error_{uuid.uuid4().hex[:8]}" + + # Create the error record + error_record = ToolCallingRecord( + tool_name="json_parse_error", + args={"raw_content": match_text, "error": str(e)}, + result=error_message, + tool_call_id=error_tool_call_id, + ) + + # Record this in memory so the model can see the error + assist_msg = FunctionCallingMessage( + role_name=self.role_name, + role_type=self.role_type, + meta_dict=None, + content="", + func_name="json_parse_error", + args={"raw_content": match_text[:200], "error": str(e)}, + tool_call_id=error_tool_call_id, + ) + + func_msg = FunctionCallingMessage( + role_name=self.role_name, + role_type=self.role_type, + meta_dict=None, + content="", + func_name="json_parse_error", + result=error_message, + tool_call_id=error_tool_call_id, + ) + + # Use precise timestamps + current_time_ns = time.time_ns() + base_timestamp = current_time_ns / 1_000_000_000 + + self.update_memory( + assist_msg, OpenAIBackendRole.ASSISTANT, timestamp=base_timestamp + ) + self.update_memory( + func_msg, + OpenAIBackendRole.FUNCTION, + timestamp=base_timestamp + 1e-6, + ) + + return error_record + + return None + + async def _astep_non_streaming_task( + self, + input_message: BaseMessage | str, + response_format: type[BaseModel] | None = None, + ) -> ChatAgentResponse: + r"""Internal async method for non-streaming astep logic.""" + + # try to extract task name if exists in input_message + if isinstance(input_message, str): + task_name_match = re.search(r"Task name:(.*)\n", input_message) + if task_name_match: + task_name = task_name_match.group(1).strip() + else: + task_name = "default" + else: + task_name = "default" + + # Reset parse error counter at the start of each step + self.parse_error_count = 0 + + try: + from camel.utils.langfuse import set_current_agent_session_id + + set_current_agent_session_id(self.agent_id) + except ImportError: + pass # Langfuse not available + + # Check if this call is from a RegisteredAgentToolkit to prevent tool + # use + disable_tools = self._is_called_from_registered_toolkit() + + # Handle response format compatibility with non-strict tools + original_response_format = response_format + input_message, response_format, used_prompt_formatting = ( + self._handle_response_format_with_non_strict_tools( + input_message, response_format + ) + ) + + if isinstance(input_message, str): + input_message = BaseMessage.make_user_message( + role_name="User", content=input_message + ) + + self.update_memory(input_message, OpenAIBackendRole.USER) + + tool_call_records: list[ToolCallingRecord] = [] + external_tool_call_requests: list[ToolCallRequest] | None = None + accumulated_context_tokens = ( + 0 # This tracks cumulative context tokens, not API usage tokens + ) + + # Initialize token usage tracker + step_token_usage = self._create_token_usage_tracker() + iteration_count: int = 0 + prev_num_openai_messages: int = 0 + while True: + if self.pause_event is not None and not self.pause_event.is_set(): + if isinstance(self.pause_event, asyncio.Event): + await self.pause_event.wait() + elif isinstance(self.pause_event, threading.Event): + # For threading.Event in async context, run in executor + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.pause_event.wait) + try: + openai_messages, num_tokens = self.memory.get_context() + accumulated_context_tokens += num_tokens + except RuntimeError as e: + return self._step_terminate( + e.args[1], tool_call_records, "max_tokens_exceeded" + ) + + async with atrace_scope( + f"agent_astep._aget_model_response:{task_name}", + category=Category.COMM, + args={"agent_id": self.agent_id, "iteration": iteration_count}, + ): + async with atrace_session_phase("generate"): + response = await self._aget_model_response( + openai_messages, + # num_tokens=num_tokens, + current_iteration=iteration_count, + response_format=response_format, + tool_schemas=[] + if disable_tools + else self._get_full_tool_schemas(), + prev_num_openai_messages=prev_num_openai_messages, + ) + + prev_num_openai_messages = len(openai_messages) + iteration_count += 1 + + # Accumulate API token usage + self._update_token_usage_tracker(step_token_usage, response.usage_dict) + + # Terminate Agent if stop_event is set + if self.stop_event and self.stop_event.is_set(): + # Use the _step_terminate to terminate the agent with reason + logger.info(f"Termination triggered at iteration {iteration_count}") + return self._step_terminate( + accumulated_context_tokens, + tool_call_records, + "termination_triggered", + ) + + if tool_call_requests := response.tool_call_requests: + # Process all tool calls + for tool_call_request in tool_call_requests: + if tool_call_request.tool_name in self._external_tool_schemas: + if external_tool_call_requests is None: + external_tool_call_requests = [] + external_tool_call_requests.append(tool_call_request) + else: + if ( + self.pause_event is not None + and not self.pause_event.is_set() + ): + if isinstance(self.pause_event, asyncio.Event): + await self.pause_event.wait() + elif isinstance(self.pause_event, threading.Event): + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.pause_event.wait) + async with atrace_scope( + f"agent_astep._aexecute_tool:{task_name}", + category=Category.IO, + args={ + "agent_id": self.agent_id, + "iteration": iteration_count, + "tool_name": tool_call_request.tool_name, + }, + ): + async with atrace_session_phase("toolcall"): + tool_call_record = await self._aexecute_tool( + tool_call_request + ) + tool_call_records.append(tool_call_record) + + # If we found an external tool call, break the loop + if external_tool_call_requests: + break + + if ( + self.max_iteration is not None + and iteration_count >= self.max_iteration + ): + break + + # If we're still here, continue the loop + continue + + # Check for JSON parse errors in tool calls (Qwen 2.5/3 specific) + parse_error_record = await self.adetect_tool_calls_parse_error(response) + if parse_error_record: + print( + f"Task {task_name}: Detected tool call parse error, prompting model to correct." + ) + tool_call_records.append(parse_error_record) + + # Check if we've reached max parse errors + if self.parse_error_count >= self.max_parse_errors: + logger.error( + f"Max parse errors reached ({self.max_parse_errors}), " + "terminating step to prevent infinite loop" + ) + break + + # Continue to let the model try again with the error feedback + continue + + break + + await self._aformat_response_if_needed(response, response_format) + + # Apply manual parsing if we used prompt-based formatting + if used_prompt_formatting and original_response_format: + self._apply_prompt_based_parsing(response, original_response_format) + + self._record_final_output(response.output_messages) + + # Clean tool call messages from memory after response generation + if self.prune_tool_calls_from_memory and tool_call_records: + self.memory.clean_tool_calls() + + return self._convert_to_chatagent_response( + response, + tool_call_records, + accumulated_context_tokens, + external_tool_call_requests, + step_token_usage["prompt_tokens"], + step_token_usage["completion_tokens"], + step_token_usage["total_tokens"], + ) diff --git a/examples/terminal_bench/agent/prompts.py b/examples/terminal_bench/agent/prompts.py new file mode 100644 index 0000000000..59579a362f --- /dev/null +++ b/examples/terminal_bench/agent/prompts.py @@ -0,0 +1,161 @@ +def get_developer_agent_prompt( + current_date: str, + system: str, + machine: str, + is_workforce: bool, + non_think_mode: bool = True, +): + """ + Generate the prompt for the Lead Software Engineer agent. + Args: + current_date (str): The current date. + system (str): The operating system. (e.g., "Linux", "Darwin", "Windows", "Linux (in Docker)"...) + machine (str): The machine type. (e.g., "x86_64", "arm64") + is_workforce (bool): Whether the agent is part of a workforce with other agents or standalone. + Returns: + str: The prompt for the Lead Software Engineer agent. + """ + LEAD_SDE_ROLE_PROMPT = """ + +You are a Lead Software Engineer, a master-level coding assistant with a +powerful and unrestricted terminal. Your primary role is to solve any +technical task by analyzing the problem, making plans, +writing and executing code, installing necessary libraries, +interacting with the operating system, and deploying applications. You are the +team's go-to expert for all technical implementation. + +""" + TEAM_STRUCTURE_PROMPT = "" + + OPERATING_ENVIRONMENT_PROMPT = ( + f""" + +- **System**: {system} ({machine}). +""" + + ( + """ +Note that the terminal commands and file system operations you perform will be +executed inside a Docker container. But note taking tools will operate on the host system. +""" + ) + if "Docker" in system + else "" + + f""" +- **Current Date**: {current_date}. + +""" + ) + + MANDATORY_INSTRUCTIONS_PROMPT = """ + +- You MUST use analyze, plan and review requirements and your work. +- When you complete your task, your final response must be a comprehensive +summary of your work and the outcome, presented in a clear, detailed, and +easy-to-read format. Avoid using markdown tables for presenting data; use +plain text formatting instead. +- You MUST use tools and follow tool schemas precisely for every response, +- You MUST be concise about your reasoning and planning, and limit within 600 tokens. +- You MUST try diverse tools available in toolkits. + +""" + CAPABILITIES_PROMPT = ( + """ + +Your capabilities are extensive and powerful: +- **Unrestricted Code Execution**: You can write and execute code in any +language to solve a task. +- For multi-line code, You MUST use tool (shell_write_content_to_file) to first save your code +to somewhere on the system (e.g.,`script.py`) and then run it from the terminal (e.g., +`python script.py`). Beware of the code that includes quotes\"\'; ensure proper +escaping when writing arguments for toolkit. Make sure it can be parsed by JSON. +- **Full Terminal Control**: You have root-level access to the terminal. You +can run any command-line tool, manage files, and interact with the OS. If +a tool is missing, you MUST install it with the appropriate package manager +(e.g., `pip3`, `uv`, or `apt-get`). Your capabilities include: + - **Text & Data Processing**: `awk`, `sed`, `grep`, `jq`. + - **File System & Execution**: `find`, `xargs`, `tar`, `zip`, `unzip`, + `chmod`. + - **Networking & Web**: `curl`, `wget` for web requests; `ssh` for + remote access. +- **IMPORTANT**: Always complete the full automation workflow—do not just +prepare or suggest actions. Execute them to completion. +- **Solution Verification**: You can immediately test and verify your +solutions by executing them in the terminal. +""" + + """ + +""" + ) + + PHILOSOPHY_PROMPT = """ + +- **Bias for Action**: Your purpose is to take action. Don't just suggest +solutions—implement them. Write code, run commands, and build things. +- **Complete the Full Task**: When automating GUI applications, always finish +what you start. If the task involves sending something, send it. If it +involves submitting data, submit it. Never stop at just preparing or +drafting—execute the complete workflow to achieve the desired outcome. +- **Embrace Challenges**: Never say "I can't." If you +encounter a limitation, find a way to overcome it. +- **Resourcefulness**: If a tool is missing, install it. If information is +lacking, find it. You have the full power of a terminal to acquire any +resource you need. +- **Think Like an Engineer**: Approach problems methodically. Analyze +requirements, execute it, and verify the results. Your +strength lies in your ability to engineer solutions. +- ** Use Absolute Paths**: You can access files from any place in the file +system. For all file system operations, you MUST use absolute paths to ensure +precision and avoid ambiguity. +- ** Check current directory**: Always check your current directory with `pwd` and list +files with `ls -la` before performing file operations. This helps you +understand your context and avoid mistakes. +- ** Search for Files**: If you need a file but cannot find it in the current directory, +use commands like `find / -name "filename"` or search in directories common for the System +to locate it anywhere in the file system. This ensures you can always access the resources you need. +- ** Adhere to the initial task instruction**: Always keep the original task instruction in mind, make sure to understand +all requirements and useful information. Make sure finish every subtask mentioned in the instruction. + +""" + + TERMINAL_TIPS_PROMPT = """ + +The terminal tools are session-based, identified by a unique `id`. Master +these tips to maximize your effectiveness: + +- **Command-Line Best Practices**: +- **Be Creative**: The terminal is your most powerful tool. Use it boldly. +- **Automate Confirmation**: Use `-y` or `-f` flags to avoid interactive +prompts. +- **Manage Output**: Redirect long outputs to a file (e.g., `> output.txt`). +- **Chain Commands**: Use `&&` to link several commands for sequential execution. +But also avoid chaining too many commands in one line +to avoid json parse errors due to complex escaping issues. +- **Piping**: Use `|` to pass output from one command to another. +- **Permissions**: Use `ls -F` to check file permissions. +- **Installation**: Use `pip3 install` or `apt-get install` for new +packages. +- **Time Management**: `shell_exec` commands come with block or non-block mode. The block mode + has a time limit, and only suitable for very quick commands. If you expect a command to take a long time, or + you have experienced a timeout for a command, you MUST use non-block mode by setting `block=False`. + The non-block mode allows commands to run in the background. You can check the status using `shell_view`, + send in further input using `shell_write_to_process`, and kill it using `shell_kill_process` if needed. + + +""" + COLLABORATION_AND_ASSISTANCE_PROMPT = """ + """ + + FINAL_INSTRUCTIONS_PROMPT = f""" + {LEAD_SDE_ROLE_PROMPT} + {TEAM_STRUCTURE_PROMPT} + {OPERATING_ENVIRONMENT_PROMPT} + {MANDATORY_INSTRUCTIONS_PROMPT} + {CAPABILITIES_PROMPT} + {PHILOSOPHY_PROMPT} + {TERMINAL_TIPS_PROMPT} + {COLLABORATION_AND_ASSISTANCE_PROMPT} + """ + if non_think_mode: + FINAL_INSTRUCTIONS_PROMPT = rf"{FINAL_INSTRUCTIONS_PROMPT} /no_think" + + return FINAL_INSTRUCTIONS_PROMPT diff --git a/examples/terminal_bench/agent_rl_config.py b/examples/terminal_bench/agent_rl_config.py new file mode 100644 index 0000000000..3fa2d44ed7 --- /dev/null +++ b/examples/terminal_bench/agent_rl_config.py @@ -0,0 +1,25 @@ +from dataclasses import dataclass, field + +from areal.api.cli_args import GRPOConfig + + +@dataclass +class TaskTimeouts: + _reset_env: float = 1800.0 + _reset_agent: float = 120.0 + agent_astep: float = 300.0 + _evaluate_completion_sync: float = 1200.0 + _cleanup: float | None = None + + +@dataclass +class AgentRLConfig(GRPOConfig): + n_trajs: int = field(default=1) + max_tokens_per_trajectory: int = field(default=32768) + max_iteration: int = field(default=3) + max_workers: int = field(default=25) + non_think_mode: bool = field(default=True) + async_training: bool = field(default=False) + task_timeouts: TaskTimeouts = field(default_factory=TaskTimeouts) + filter_uniform_reward: bool = field(default=False) + encourage_completion_reward: bool = field(default=False) diff --git a/examples/terminal_bench/config_tb_sglang.yaml b/examples/terminal_bench/config_tb_sglang.yaml new file mode 100644 index 0000000000..746ccb8c9b --- /dev/null +++ b/examples/terminal_bench/config_tb_sglang.yaml @@ -0,0 +1,187 @@ +experiment_name: terminal_bench_rl +trial_name: trial0 + +seed: 1 +total_train_epochs: 200 +tokenizer_path: ${actor.path} +async_training: true + +n_trajs: 4 +max_tokens_per_trajectory: 40000 + +dynamic_bs: false + +cluster: + n_nodes: 1 + n_gpus_per_node: 8 + fileroot: /tmp/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /tmp/areal/name_resolve + +allocation_mode: sglang:d2p1t2+d2p1t1c2 + +scheduler: + type: local + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 64 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 2 + enable_rollout_tracing: false + scheduling_spec: ${actor.scheduling_spec} + fileroot: ${cluster.fileroot} + tokenizer_path: ${tokenizer_path} + dump_to_file: true + check_trajectory_format: true + +gconfig: + n_samples: 2 + min_new_tokens: 0 + max_new_tokens: 1024 + greedy: false + temperature: 1.0 + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: Qwen/Qwen3-8B + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 30000 + optimizer: + type: adam + lr: 1.70e-5 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + eps_clip: 0.4 + temperature: ${gconfig.temperature} + reward_scaling: 10.0 + reward_bias: -0.5 + kl_ctl: 0.0 + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: true + behave_imp_weight_cap: 5.0 + adv_norm: + mean_level: batch + std_level: batch + weight_update_mode: xccl + max_new_tokens: ${gconfig.max_new_tokens} + scheduling_spec: + - task_type: worker + port_count: 2 + gpu: 1 + cpu: 4 + mem: 16 + cmd: python3 -m areal.infra.rpc.rpc_server + env_vars: {} + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 10240 + optimizer: + type: adam + lr: 1.70e-5 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + scheduling_strategy: + type: colocation + target: actor + scheduling_spec: ${actor.scheduling_spec} + +# SGLang +sglang: + model_path: ${actor.path} + random_seed: ${seed} + skip_tokenizer_init: true + dtype: ${actor.dtype} + max_running_requests: null + context_length: 40000 + mem_fraction_static: 0.8 + disable_radix_cache: true + + +# datasets +train_dataset: + batch_size: 16 + shuffle: true + pin_memory: true + num_workers: 4 + path: path/to/tbench-selected-tasks-easy.parquet + type: rl + max_length: 1024 + +valid_dataset: + batch_size: 4 + shuffle: true + pin_memory: true + num_workers: 4 + path: path/to/val.parquet + type: rl + +# Utilities +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: null + +recover: + mode: disabled + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: 3600 + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: disabled + + +perf_tracer: + enabled: true + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + save_interval: 1 + session_tracer: + enabled: true + flush_threshold: 100 diff --git a/examples/terminal_bench/config_tb_vllm_npu.yaml b/examples/terminal_bench/config_tb_vllm_npu.yaml new file mode 100644 index 0000000000..df6432e370 --- /dev/null +++ b/examples/terminal_bench/config_tb_vllm_npu.yaml @@ -0,0 +1,196 @@ +experiment_name: terminal_bench_npu_rl +trial_name: trial0 + +seed: 1 +total_train_epochs: 200 +tokenizer_path: ${actor.path} +async_training: true + +n_trajs: 4 +max_tokens_per_trajectory: 40000 + +dynamic_bs: false + +cluster: + n_nodes: 1 + n_gpus_per_node: 16 + fileroot: /tmp/areal/experiments + name_resolve: + type: nfs + nfs_record_root: /tmp/areal/name_resolve + +allocation_mode: vllm:d4p1t2+d4p1t1c2 + +scheduler: + type: local + +rollout: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + max_concurrent_rollouts: 64 + queue_size: null + consumer_batch_size: ${train_dataset.batch_size} + max_head_offpolicyness: 2 + enable_rollout_tracing: false + scheduling_spec: ${actor.scheduling_spec} + fileroot: ${cluster.fileroot} + tokenizer_path: ${tokenizer_path} + dump_to_file: true + check_trajectory_format: true + +gconfig: + n_samples: 2 + min_new_tokens: 0 + max_new_tokens: 1024 + greedy: false + temperature: 1.0 + +actor: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: Qwen/Qwen3-8B + init_from_scratch: false + disable_dropout: true + gradient_checkpointing: true + dtype: bfloat16 + mb_spec: + max_tokens_per_mb: 30000 + optimizer: + type: adam + lr: 1.70e-5 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + eps_clip: 0.4 + temperature: ${gconfig.temperature} + reward_scaling: 10.0 + reward_bias: -0.5 + kl_ctl: 0.0 + ppo_n_minibatches: 1 + recompute_logprob: true + use_decoupled_loss: true + behave_imp_weight_cap: 5.0 + adv_norm: + mean_level: batch + std_level: batch + weight_update_mode: xccl + max_new_tokens: ${gconfig.max_new_tokens} + scheduling_spec: + - task_type: worker + port_count: 2 + gpu: 1 + cpu: 4 + mem: 16 + cmd: python3 -m areal.infra.rpc.rpc_server + env_vars: + VLLM_USE_V1: "1" + VLLM_ALLOW_LONG_MAX_MODEL_LEN: "1" + TASK_QUEUE_ENABLE: "2" + HCCL_EXEC_TIMEOUT: "14400" + HCCL_OP_EXPANSION_MODE: "HOST" + ACL_DEVICE_SYNC_TIMEOUT: "14400" + HCCL_EVENT_TIMEOUT: "14500" + HCCL_ASYNC_ERROR_HANDLING: "0" + ACL_STREAM_TIMEOUT: "14500000" + HCCL_CONNECT_TIMEOUT: "7200" + PYTORCH_NPU_ALLOC_CONF: "expandable_segments:True" + +ref: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + path: ${actor.path} + init_from_scratch: false + disable_dropout: true + dtype: ${actor.dtype} + mb_spec: + max_tokens_per_mb: 10240 + optimizer: + type: adam + lr: 1.70e-5 + weight_decay: 0.017 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + warmup_steps_proportion: 0.001 + scheduling_strategy: + type: colocation + target: actor + scheduling_spec: ${actor.scheduling_spec} + +# adpated for NPU +vllm: + model: ${actor.path} + seed: ${seed} + skip_tokenizer_init: false + dtype: ${actor.dtype} + max_model_len: 40000 + gpu_memory_utilization: 0.8 + enforce_eager: true + + +# datasets +train_dataset: + batch_size: 16 + shuffle: true + pin_memory: true + num_workers: 4 + path: path/to/tbench-selected-tasks-easy.parquet + type: rl + max_length: 1024 + +valid_dataset: + batch_size: 4 + shuffle: true + pin_memory: true + num_workers: 4 + path: path/to/val.parquet + type: rl + +# Utilities +saver: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: null + +recover: + mode: disabled + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: 1 + freq_steps: null + freq_secs: 3600 + +evaluator: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + freq_epochs: null + freq_steps: null + freq_secs: null + +stats_logger: + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + wandb: + mode: disabled + +perf_tracer: + enabled: true + experiment_name: ${experiment_name} + trial_name: ${trial_name} + fileroot: ${cluster.fileroot} + save_interval: 1 + session_tracer: + enabled: true + flush_threshold: 100 diff --git a/examples/terminal_bench/pyproject.toml b/examples/terminal_bench/pyproject.toml new file mode 100644 index 0000000000..4fde56fa37 --- /dev/null +++ b/examples/terminal_bench/pyproject.toml @@ -0,0 +1,40 @@ +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + +[project] +name = "areal-terminal-bench-env" +version = "0.1.0" +description = "Minimal dependency spec for AReaL + Camel + Terminal Bench workflow with the premise of a working AReaL environment" +requires-python = ">=3.10" +dependencies = [ + "ipython", + "ruamel.yaml", + "streamlit", + "sqlalchemy", + "docker", + "camel-ai==0.2.85a0", + "terminal-bench==0.2.18", +] + +[project.optional-dependencies] +npu = [ + "sglang[srt_npu] @ git+https://github.com/sgl-project/sglang.git@v0.5.6.post2#subdirectory=python", +] + +[tool.notes] +tiktoken_cache_dir = "/tmp/tiktoken-cache" +tiktoken_cache_file = "o200k_base.tiktoken" + +[tool.install-notes] +no_deps = [ + "docker", + "camel-ai", + "terminal-bench", +] + +commands = [ + "export TIKTOKEN_CACHE_DIR=/tmp/tiktoken-cache", + "mkdir -p '$TIKTOKEN_CACHE_DIR'", + "curl -k -O https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken", +] diff --git a/examples/terminal_bench/reward.png b/examples/terminal_bench/reward.png new file mode 100644 index 0000000000..42a7ec3249 Binary files /dev/null and b/examples/terminal_bench/reward.png differ diff --git a/examples/terminal_bench/train.py b/examples/terminal_bench/train.py new file mode 100644 index 0000000000..043df0830f --- /dev/null +++ b/examples/terminal_bench/train.py @@ -0,0 +1,74 @@ +import os + +os.environ["TOKENIZERS_PARALLELISM"] = "false" +import sys +from pathlib import Path + +from agent_rl_config import AgentRLConfig +from datasets import load_dataset + +from areal import PPOTrainer +from areal.api.alloc_mode import AllocationMode +from areal.api.cli_args import load_expr_config +from areal.utils import seeding +from areal.utils.hf_utils import load_hf_tokenizer +from areal.utils.stats_logger import StatsLogger + +WORKFLOW_PATH = "workflow.camel_rlvr_workflow.CamelRLVRWorkflow" + + +def main(args): + config, _ = load_expr_config(args, AgentRLConfig) + + rank = int(os.getenv("RANK", "0")) + tokenizer = load_hf_tokenizer(config.tokenizer_path) + + seeding.set_random_seed(config.seed, key=f"trainer{rank}") + allocation_mode = AllocationMode.from_str(config.allocation_mode) + assert allocation_mode.train is not None + + dataset = load_dataset( + path="parquet", + split="train", + data_files=[ + str( + Path(__file__).parent.parent.parent + / "dataset" + / config.train_dataset.path + ) + ], + ) + + workflow_kwargs = dict( + gconfig=config.gconfig, + tokenizer=tokenizer, + n_trajs=config.n_trajs, + max_tokens=config.max_tokens_per_trajectory, + dump_dir=os.path.join( + StatsLogger.get_log_path(config.stats_logger), "generated" + ), + max_iteration=config.max_iteration, + max_workers=config.max_workers, + non_think_mode=config.non_think_mode, + task_timeouts=config.task_timeouts, + filter_uniform_reward=config.filter_uniform_reward, + encourage_completion_reward=config.encourage_completion_reward, + ) + + eval_workflow_kwargs = workflow_kwargs.copy() + + with PPOTrainer( + config, + train_dataset=dataset, + valid_dataset=dataset, + ) as trainer: + trainer.train( + workflow=WORKFLOW_PATH, + workflow_kwargs=workflow_kwargs, + eval_workflow=WORKFLOW_PATH, + eval_workflow_kwargs=eval_workflow_kwargs, + ) + + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/examples/terminal_bench/workflow/__init__.py b/examples/terminal_bench/workflow/__init__.py new file mode 100644 index 0000000000..a66a78c750 --- /dev/null +++ b/examples/terminal_bench/workflow/__init__.py @@ -0,0 +1,4 @@ +from .camel_rlvr_workflow import CamelRLVRWorkflow +from .pre_build_tasks_utils import build_docker_image + +__all__ = ["CamelRLVRWorkflow", "build_docker_image"] diff --git a/examples/terminal_bench/workflow/camel_rlvr_workflow.py b/examples/terminal_bench/workflow/camel_rlvr_workflow.py new file mode 100644 index 0000000000..4b63724dfd --- /dev/null +++ b/examples/terminal_bench/workflow/camel_rlvr_workflow.py @@ -0,0 +1,175 @@ +from __future__ import annotations + +import asyncio +import os +import uuid +from concurrent.futures import ThreadPoolExecutor +from functools import partial + +os.environ["TOKENIZERS_PARALLELISM"] = "false" + +from agent.camel_terminal_agent import CamelTerminalAgent +from agent_rl_config import TaskTimeouts +from transformers import PreTrainedTokenizerFast + +from areal.api.cli_args import GenerationHyperparameters +from areal.api.workflow_api import RolloutWorkflow +from areal.experimental.openai import ArealOpenAI +from areal.utils import stats_tracker +from areal.utils.perf_tracer import atrace_scope + +from .pre_build_tasks_utils import build_docker_image + + +class CamelRLVRWorkflow(RolloutWorkflow): + def __init__( + self, + gconfig: GenerationHyperparameters, + tokenizer: PreTrainedTokenizerFast, + dump_dir: str | None = None, + rollout_stat_scope: str = "rollout", + n_trajs: int = 1, + max_tokens: int = 32768, + max_iteration: int = 50, + max_workers: int = 25, + non_think_mode: bool = True, + task_timeouts: TaskTimeouts | None = None, + filter_uniform_reward: bool = False, + encourage_completion_reward: bool = False, + ): + self.gconfig = gconfig + self.gconfig.n_samples = 1 + self.tokenizer = tokenizer + self.dump_dir = dump_dir + self.max_tokens = max_tokens + self.max_iteration = max_iteration + self.rollout_stat_scope = rollout_stat_scope + if self.dump_dir is not None and not os.path.exists(self.dump_dir): + os.makedirs(self.dump_dir, exist_ok=True) + + self.n_trajs = n_trajs + self.non_think_mode = non_think_mode + self.task_timeouts = task_timeouts or TaskTimeouts() + self.filter_uniform_reward = filter_uniform_reward + self.encourage_completion_reward = encourage_completion_reward + self.executor = ThreadPoolExecutor(max_workers=max_workers) + + async def arun_episode(self, engine, data): + clients = [ + ArealOpenAI( + engine=engine, + tokenizer=self.tokenizer, + tool_call_parser="qwen25", + ) + for _ in range(self.n_trajs) + ] + uids = [uuid.uuid4().hex[:8] for _ in range(self.n_trajs)] + + loop = asyncio.get_running_loop() + try: + async with atrace_scope( + f"build_docker_image:{data.get('task_name')}", + args={"timeout": self.task_timeouts._reset_env}, + ): + await asyncio.wait_for( + loop.run_in_executor( + self.executor, + partial( + build_docker_image, + task=data, + timeout=self.task_timeouts._reset_env, + ), + ), + timeout=self.task_timeouts._reset_env + 60.0, + ) + except TimeoutError: + print( + f"Timeout while building docker image for task {data.get('task_name')}" + ) + return None + + print(f"\n{'=' * 70}") + print(f"[EPISODE START] Task {data.get('task_name')}") + print(f"{'=' * 70}\n") + + rewards = await asyncio.gather( + *[ + CamelTerminalAgent( + max_tokens_per_turn=self.gconfig.max_new_tokens, + max_total_tokens=self.max_tokens, + max_iteration=self.max_iteration, + output_path=f"{self.dump_dir}/CamelTerminalAgent_Output", + executor=self.executor, + non_think_mode=self.non_think_mode, + task_timeouts=self.task_timeouts, + encourage_completion_reward=self.encourage_completion_reward, + ).run_agent( + data=data, + client=clients[i], + uid=uids[i], + traj_i=i, + ) + for i in range(self.n_trajs) + ] + ) + + print(f"\n{'=' * 70}") + print(f"[EPISODE END] Task {data.get('task_name')}") + print(f"{'=' * 70}\n") + + completions_with_reward = {} + if self.filter_uniform_reward: + valid_rewards = [reward for reward in rewards if reward is not None] + if valid_rewards and all( + reward == valid_rewards[0] for reward in valid_rewards + ): + print( + f"Rank {os.getenv('RANK', '0')} - Task {data.get('task_name')} " + "has uniform reward across trajectories. Discarding all." + ) + return completions_with_reward + if not valid_rewards: + print( + f"Rank {os.getenv('RANK', '0')} - Task {data.get('task_name')} " + "all trajectories failed." + ) + return completions_with_reward + + for i, (reward, client) in enumerate(zip(rewards, clients)): + if reward is None: + print( + f"Rank {os.getenv('RANK', '0')} - Task {data.get('task_name')}, " + f"Trajectory {i} failed." + ) + os.makedirs(f"{self.dump_dir}/failed_tasks", exist_ok=True) + with open( + f"{self.dump_dir}/failed_tasks/{data.get('task_name')}_traj_{i}.txt", + "w", + ) as f: + f.write(f"Task {data.get('task_name')} trajectory {i} failed.\n") + continue + + print( + f"Rank {os.getenv('RANK', '0')} - Task {data.get('task_name')}, " + f"Trajectory {i} reward: {reward}" + ) + stats_tracker.get(self.rollout_stat_scope).scalar(reward=reward) + client.apply_reward_discount(turn_discount=0.9) + completions = client.export_interactions(style="individual") + completions_with_reward.update(completions) + + if len(completions_with_reward) == 0: + print(f"All trajectories failed for task {data.get('task_name')}.") + completions_with_reward = None + + stats_tracker.get(self.rollout_stat_scope).scalar( + num_full_passes=sum(1 for reward in rewards if reward == 1.0) + ) + stats_tracker.get(self.rollout_stat_scope).scalar( + num_trajectories_failed=sum(1 for reward in rewards if reward is None) + ) + + print( + f"Rank {os.getenv('RANK', '0')} - Task {data.get('task_name')} completed." + ) + return completions_with_reward diff --git a/examples/terminal_bench/workflow/pre_build_tasks_utils.py b/examples/terminal_bench/workflow/pre_build_tasks_utils.py new file mode 100644 index 0000000000..92a57428a2 --- /dev/null +++ b/examples/terminal_bench/workflow/pre_build_tasks_utils.py @@ -0,0 +1,28 @@ +from pathlib import Path + +from terminal_bench.handlers.trial_handler import TrialHandler +from terminal_bench.terminal.docker_compose_manager import DockerComposeManager + +DATASET_ROOT = Path(__file__).resolve().parents[3] / "dataset" + + +def build_docker_image(task: dict, timeout=1200.0): + task_path = DATASET_ROOT / task.get("task_path") + trial_handler = TrialHandler( + trial_name="build_run", + input_path=task_path, + output_path=Path("build_outputs"), + ) + print(f"Task path: {task_path}") + + compose_manager = DockerComposeManager( + client_container_name=trial_handler.client_container_name, + client_image_name=trial_handler.client_image_name, + docker_image_name_prefix=trial_handler.docker_image_name_prefix, + docker_compose_path=trial_handler.task_paths.docker_compose_path, + no_rebuild=True, + cleanup=True, + sessions_logs_path=trial_handler.trial_paths.sessions_path, + agent_logs_path=trial_handler.trial_paths.agent_logging_dir, + ) + compose_manager.build(timeout=timeout)