diff --git a/docs/docs/deploy/optimization.md b/docs/docs/deploy/optimization.md index 8f1f060b..5a72655f 100644 --- a/docs/docs/deploy/optimization.md +++ b/docs/docs/deploy/optimization.md @@ -117,51 +117,46 @@ veadk rl init --platform lightning --workspace veadk_rl_lightning_project ```bash cd veadk_rl_lightning_project -veadk rl run --platform lightning --client +python veadk_agent.py ``` -然后在终端2中运行以下命令,启动 server: +然后在终端2中运行以下命令 + +- 首先重启 ray 集群: + +```bash +cd veadk_rl_lightning_project +bash restart_ray.sh +``` + +- 启动 server: ```bash cd veadk_rl_lightning_project -veadk rl run --platform lightning --server +bash train.sh ``` #### 原理说明 生成后的项目结构如下,其中核心文件包括: -- agent_client: `examples/*/*_agent.py` 中定义了agent的rollout逻辑和reward规则 -- training_server: `examples/*/train.py` 定义了训练相关参数,用于启动训练服务器 +- agent_client: `*_agent.py` 中定义了agent的rollout逻辑和reward规则 +- training_server: `train.sh` 定义了训练相关参数,用于启动训练服务器 ```shell veadk_rl_lightning_project -├── agentligtning - ├── runner # 运行器:负责任务执行、调度、主流程管理 - ├── tracer # 追踪模块:记录日志、链路追踪、调试信息 - ├── trainer # 训练模块:支持模型训练、微调与评估逻辑 - ├── verl # VERL强化学习组件 - └── server.py # 训练服务器 -└── examples # 示例项目,包含若干示例 - ├── spider # 示例一:Spider 数据库问答任务 - ├── sql_agent.py # sql agent的rollout逻辑和reward设定 - ├── train.sh #训练服务器启动脚本,设定训练相关参数 - └── data # 数据集 - ├── train.parquet # 训练数据集,需要为parquet格式 - └── eval.parquet # 评测数据集,需要为parquet格式 - ├── rag # 示例二:RAG 应用示例 - ├── rag_agent.py # rag agent的rollout逻辑和reward设定 - └── train.sh #训练服务器启动脚本,设定训练相关参数 - └── calc_x # 示例三:计算 agent 应用示例 - ├── calc_agent.py # calculate agent的rollout逻辑和reward设定 - └── train.sh #训练服务器启动脚本,设定训练相关参数 - +├── data + ├── demo_train.parquet # 训练数据,必须为 parquet 格式 + ├── demo_test.parquet # 测试数据,必须为 parquet 格式 +└── demo_calculate_agent.py # agent的rollout逻辑和reward设定 +└── train.sh # 训练服务器启动脚本,设定训练相关参数 +└── restart_ray.sh # 重启 ray 集群脚本 ``` #### 最佳实践案例 -1. 脚手架中,基于 VeADK 的天气查询 Agent 进行强化学习优化 -2. 启动 client (veadk rl run --platform lightning --client) 与 server (veadk rl run --platform lightning --server),分别在终端1与终端2中运行以上命令 +1. 脚手架中,基于 VeADK 的算术 Agent 进行强化学习优化 +2. 启动 client (python demo_calculate_agent.py), 重启ray集群(bash restart_ray.sh), 最后启动训练服务器server (bash train.sh),分别在终端1与终端2中运行以上命令 ![启动client](../assets/images/optimization/lightning_client.png) diff --git a/veadk/cli/cli_rl.py b/veadk/cli/cli_rl.py index 02ff05ce..e2fd712d 100644 --- a/veadk/cli/cli_rl.py +++ b/veadk/cli/cli_rl.py @@ -38,8 +38,8 @@ def rl_group(): "--platform", "-p", required=True, - type=click.Choice(["ark"], case_sensitive=False), - help="Scaffold platform type (only support for now: ark)", + type=click.Choice(["ark", "lightning"], case_sensitive=False), + help="Scaffold platform type (supported: ark, lightning)", ) @click.option( "--workspace", "-w", required=True, type=str, help="Target workspace directory name" @@ -52,8 +52,9 @@ def rl_group(): ) def rl_init(platform: str, workspace: str, overwrite: bool): """ - Initialize RL scaffold project for ark platform + Initialize RL scaffold project for ark or lightning platform Example: veadk rl init --platform ark --workspace veadk_rl_ark_project + Example: veadk rl init --platform lightning --workspace veadk_rl_lightning_project """ # Locate template directory rl_template_root = get_rl_template_root() diff --git a/veadk/cli/templates/rl/lightning/README.md b/veadk/cli/templates/rl/lightning/README.md new file mode 100644 index 00000000..8c1a0901 --- /dev/null +++ b/veadk/cli/templates/rl/lightning/README.md @@ -0,0 +1,58 @@ +# Agent Lightning + +Agent Lightning 提供了灵活且可扩展的框架,实现了智能体(client)和训练(server)的完全解耦。 +VeADK 与 Agent Lightning 集成,用户使用 VeADK 提供的脚手架,可以开发 VeADK Agent,然后运行 client 与 server 进行强化学习优化。 + +## 准备工作 + +在你的终端中运行以下命令,初始化一个 Agent Lightning 项目: + +```bash +veadk rl init --platform lightning --workspace veadk_rl_lightning_project +``` + +该命令会在当前目录下创建一个名为 `veadk_rl_lightning_project` 的文件夹,其中包含了一个基本的基于 VeADK 和 Agent Lightning 的强化学习项目结构。 +然后在终端1中运行以下命令,启动 client: + +```bash +cd veadk_rl_lightning_project +python veadk_agent.py +``` + +然后在终端2中运行以下命令 + +- 首先重启 ray 集群: + +```bash +cd veadk_rl_lightning_project +bash restart_ray.sh +``` + +- 启动 server: + +```bash +cd veadk_rl_lightning_project +bash train.sh +``` + +## 原理说明 + +生成后的项目结构如下,其中核心文件包括: + +- agent_client: `*_agent.py` 中定义了agent的rollout逻辑和reward规则 +- training_server: `train.sh` 定义了训练相关参数,用于启动训练服务器 + +```shell +veadk_rl_lightning_project +├── data + ├── demo_train.parquet # 训练数据,必须为 parquet 格式 + ├── demo_test.parquet # 测试数据,必须为 parquet 格式 +└── demo_calculate_agent.py # agent的rollout逻辑和reward设定 +└── train.sh # 训练服务器启动脚本,设定训练相关参数 +└── restart_ray.sh # 重启 ray 集群脚本 +``` + +## 最佳实践案例 + +1. 脚手架中,基于 VeADK 的算术 Agent 进行强化学习优化 +2. 启动 client (python demo_calculate_agent.py), 重启ray集群(bash restart_ray.sh), 最后启动训练服务器server (bash train.sh),分别在终端1与终端2中运行以上命令 diff --git a/veadk/cli/templates/rl/lightning/data/demo_test.parquet b/veadk/cli/templates/rl/lightning/data/demo_test.parquet new file mode 100644 index 00000000..06ba8cb6 Binary files /dev/null and b/veadk/cli/templates/rl/lightning/data/demo_test.parquet differ diff --git a/veadk/cli/templates/rl/lightning/data/demo_train.parquet b/veadk/cli/templates/rl/lightning/data/demo_train.parquet new file mode 100644 index 00000000..e99f09aa Binary files /dev/null and b/veadk/cli/templates/rl/lightning/data/demo_train.parquet differ diff --git a/veadk/cli/templates/rl/lightning/demo_calculate_agent.py b/veadk/cli/templates/rl/lightning/demo_calculate_agent.py new file mode 100644 index 00000000..c6fb742e --- /dev/null +++ b/veadk/cli/templates/rl/lightning/demo_calculate_agent.py @@ -0,0 +1,148 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import re +import string +import sympy +from typing import Any, cast +from veadk.agent import Agent +from veadk.runner import Runner +from veadk.memory.short_term_memory import ShortTermMemory +from agentlightning import ( + LLM, + LitAgent, + NamedResources, + Trainer, + reward, +) + + +def normalize_option(option: str) -> str: + """ + >>> normalize_option(" (A) \n") + 'A' + """ + return re.sub(r"(\s+|\(|\))", "", option) + + +def is_option_result(result: str) -> bool: + """ + >>> is_option_result(" A) \n") + True + >>> is_option_result(" 23/7 ") + False + """ + return normalize_option(result) in list(string.ascii_letters) + + +def float_eval(input_str: str) -> float: + if " = around " in input_str: + input_str = input_str.split(" = around ")[0] + expr = sympy.parse_expr(input_str, evaluate=True) + return float(expr.evalf()) + + +def scalar_are_results_same(pred_result: str, true_result: str, rel_tol: float) -> bool: + pred_result = str(pred_result) if pred_result is not None else "" # type: ignore + true_result = str(true_result) if true_result is not None else "" # type: ignore + + if pred_result.strip() == true_result.strip(): + return True + + if is_option_result(true_result): + # The task is to select correct option + true_result = normalize_option(true_result) + pred_result = normalize_option(pred_result) + return pred_result == true_result + + # The task is to calculate the result as a number + try: + pred_float = float_eval(pred_result) + true_float = float_eval(true_result) + return math.isclose(pred_float, true_float, rel_tol=rel_tol) + except Exception: + pass + + return False + + +@reward +async def eval(prediction: str, ground_truth: str) -> float: + return float(scalar_are_results_same(prediction, ground_truth, 1e-2)) + + +class CalcAgent(LitAgent[Any]): + async def training_rollout_async( + self, task: Any, rollout_id: str, resources: NamedResources + ) -> Any: # type: ignore + llm: LLM = cast(LLM, resources.get("main_llm")) + calc_agent = Agent( + name="CalcAgent", + description="An agent that can perform calculations to answer questions.", + instruction="You are a helpful assistant that can perform mathematical calculations to answer questions accurately.", + model_provider="openai", + model=llm.model, + api_base=llm.endpoint, + api_key="", + ) + runner = Runner( + agent=calc_agent, + short_term_memory=ShortTermMemory(), + app_name="calc_agent", + user_id="veadk_default_user", + ) + try: + output_format = "Output the answer when you are ready. The answer should be surrounded by three sharps (`###`), in the form of ### ANSWER: ###." + prompt = task["question"] + " " + output_format + result = await runner.run( + session_id=rollout_id, + messages=prompt, + ) + # evaluate + answer = re.search( + r"###\s*ANSWER:\s*(.+?)(\s*###|$)", result.messages[-1].content + ) # type: ignore + if answer: + answer = answer.group(1) + else: + answer = result.messages[-1].content # type: ignore + except Exception as e: + print("Failure:", str(e)) + answer = "None" + reward = await eval( + answer, str(task["result"]) + ) # reward is tracked with the decorator # type: ignore + print( + "answer: {} ground_truth: {} reward: {}".format( + answer, task["result"], reward + ) + ) # type: ignore + + async def validation_rollout_async( + self, task: Any, rollout_id: str, resources: NamedResources + ) -> Any: # type: ignore + llm: LLM = cast(LLM, resources.get("main_llm")) + resources = { + "main_llm": LLM( + endpoint=llm.endpoint, + model=llm.model, + sampling_parameters={"temperature": 0}, + ) + } + return await self.training_rollout_async(task, rollout_id, resources) + + +if __name__ == "__main__": + Trainer(n_workers=10).fit(CalcAgent(), "http://localhost:9999/") diff --git a/veadk/cli/templates/rl/lightning/restart_ray.sh b/veadk/cli/templates/rl/lightning/restart_ray.sh new file mode 100755 index 00000000..7bec3d83 --- /dev/null +++ b/veadk/cli/templates/rl/lightning/restart_ray.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -ex + +ray stop -v --force --grace-period 60 +ps aux +env RAY_DEBUG=legacy HYDRA_FULL_ERROR=1 VLLM_USE_V1=1 ray start --head --dashboard-host=0.0.0.0 --port 6380 --dashboard-port 8266 \ No newline at end of file diff --git a/veadk/cli/templates/rl/lightning/train.sh b/veadk/cli/templates/rl/lightning/train.sh new file mode 100755 index 00000000..32e0742b --- /dev/null +++ b/veadk/cli/templates/rl/lightning/train.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +set -e + +export N_GPUS=1 +export BASE_MODEL=Qwen/Qwen2.5-1.5B-Instruct +export DATA_DIR=data +export ROLLOUT_TP_SIZE=1 +export EXPERIMENT_NAME=calc_x +export PROJECT_NAME=AgentLightning + +echo "Starting training script..." + +python -m agentlightning.verl \ + algorithm.adv_estimator=grpo \ + data.train_files=${DATA_DIR}/train.parquet \ + data.val_files=${DATA_DIR}/test.parquet \ + actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \ + trainer.n_gpus_per_node=${N_GPUS} \ + data.train_batch_size=32 \ + actor_rollout_ref.rollout.n=4 \ + actor_rollout_ref.actor.ppo_mini_batch_size=32 \ + actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \ + actor_rollout_ref.rollout.multi_turn.format=hermes \ + actor_rollout_ref.model.path=${BASE_MODEL} \ + data.max_prompt_length=4096 \ + data.max_response_length=2048 \ + data.truncation='error' \ + trainer.val_before_train=True \ + actor_rollout_ref.actor.optim.lr=1e-6 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.actor.use_kl_loss=False \ + actor_rollout_ref.actor.kl_loss_coef=0.000 \ + actor_rollout_ref.actor.entropy_coeff=0 \ + actor_rollout_ref.actor.clip_ratio_low=0.2 \ + actor_rollout_ref.actor.clip_ratio_high=0.3 \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.fsdp_config.param_offload=True \ + actor_rollout_ref.actor.fsdp_config.optimizer_offload=True \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.8 \ + actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=8 \ + actor_rollout_ref.ref.fsdp_config.param_offload=True \ + algorithm.use_kl_in_reward=False \ + trainer.critic_warmup=0 \ + trainer.logger=['console','wandb'] \ + trainer.project_name=${PROJECT_NAME} \ + trainer.experiment_name=${EXPERIMENT_NAME} \ + trainer.nnodes=1 \ + trainer.save_freq=256 \ + trainer.test_freq=32 \ + trainer.total_epochs=2 $@