Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 22 additions & 27 deletions docs/docs/deploy/optimization.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,51 +117,46 @@ veadk rl init --platform lightning --workspace veadk_rl_lightning_project

```bash
cd veadk_rl_lightning_project
veadk rl run --platform lightning --client
python veadk_agent.py
```

然后在终端2中运行以下命令,启动 server:
然后在终端2中运行以下命令

- 首先重启 ray 集群:

```bash
cd veadk_rl_lightning_project
bash restart_ray.sh
```

- 启动 server:

```bash
cd veadk_rl_lightning_project
veadk rl run --platform lightning --server
bash train.sh
```

#### 原理说明

生成后的项目结构如下,其中核心文件包括:

- agent_client: `examples/*/*_agent.py` 中定义了agent的rollout逻辑和reward规则
- training_server: `examples/*/train.py` 定义了训练相关参数,用于启动训练服务器
- agent_client: `*_agent.py` 中定义了agent的rollout逻辑和reward规则
- training_server: `train.sh` 定义了训练相关参数,用于启动训练服务器

```shell
veadk_rl_lightning_project
├── agentligtning
├── runner # 运行器:负责任务执行、调度、主流程管理
├── tracer # 追踪模块:记录日志、链路追踪、调试信息
├── trainer # 训练模块:支持模型训练、微调与评估逻辑
├── verl # VERL强化学习组件
└── server.py # 训练服务器
└── examples # 示例项目,包含若干示例
├── spider # 示例一:Spider 数据库问答任务
├── sql_agent.py # sql agent的rollout逻辑和reward设定
├── train.sh #训练服务器启动脚本,设定训练相关参数
└── data # 数据集
├── train.parquet # 训练数据集,需要为parquet格式
└── eval.parquet # 评测数据集,需要为parquet格式
├── rag # 示例二:RAG 应用示例
├── rag_agent.py # rag agent的rollout逻辑和reward设定
└── train.sh #训练服务器启动脚本,设定训练相关参数
└── calc_x # 示例三:计算 agent 应用示例
├── calc_agent.py # calculate agent的rollout逻辑和reward设定
└── train.sh #训练服务器启动脚本,设定训练相关参数

├── data
├── demo_train.parquet # 训练数据,必须为 parquet 格式
├── demo_test.parquet # 测试数据,必须为 parquet 格式
└── demo_calculate_agent.py # agent的rollout逻辑和reward设定
└── train.sh # 训练服务器启动脚本,设定训练相关参数
└── restart_ray.sh # 重启 ray 集群脚本
```

#### 最佳实践案例

1. 脚手架中,基于 VeADK 的天气查询 Agent 进行强化学习优化
2. 启动 client (veadk rl run --platform lightning --client) 与 server (veadk rl run --platform lightning --server),分别在终端1与终端2中运行以上命令
1. 脚手架中,基于 VeADK 的算术 Agent 进行强化学习优化
2. 启动 client (python demo_calculate_agent.py), 重启ray集群(bash restart_ray.sh), 最后启动训练服务器server (bash train.sh),分别在终端1与终端2中运行以上命令

![启动client](../assets/images/optimization/lightning_client.png)

Expand Down
7 changes: 4 additions & 3 deletions veadk/cli/cli_rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def rl_group():
"--platform",
"-p",
required=True,
type=click.Choice(["ark"], case_sensitive=False),
help="Scaffold platform type (only support for now: ark)",
type=click.Choice(["ark", "lightning"], case_sensitive=False),
help="Scaffold platform type (supported: ark, lightning)",
)
@click.option(
"--workspace", "-w", required=True, type=str, help="Target workspace directory name"
Expand All @@ -52,8 +52,9 @@ def rl_group():
)
def rl_init(platform: str, workspace: str, overwrite: bool):
"""
Initialize RL scaffold project for ark platform
Initialize RL scaffold project for ark or lightning platform
Example: veadk rl init --platform ark --workspace veadk_rl_ark_project
Example: veadk rl init --platform lightning --workspace veadk_rl_lightning_project
"""
# Locate template directory
rl_template_root = get_rl_template_root()
Expand Down
58 changes: 58 additions & 0 deletions veadk/cli/templates/rl/lightning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Agent Lightning

Agent Lightning 提供了灵活且可扩展的框架,实现了智能体(client)和训练(server)的完全解耦。
VeADK 与 Agent Lightning 集成,用户使用 VeADK 提供的脚手架,可以开发 VeADK Agent,然后运行 client 与 server 进行强化学习优化。

## 准备工作

在你的终端中运行以下命令,初始化一个 Agent Lightning 项目:

```bash
veadk rl init --platform lightning --workspace veadk_rl_lightning_project
```

该命令会在当前目录下创建一个名为 `veadk_rl_lightning_project` 的文件夹,其中包含了一个基本的基于 VeADK 和 Agent Lightning 的强化学习项目结构。
然后在终端1中运行以下命令,启动 client:

```bash
cd veadk_rl_lightning_project
python veadk_agent.py
```

然后在终端2中运行以下命令

- 首先重启 ray 集群:

```bash
cd veadk_rl_lightning_project
bash restart_ray.sh
```

- 启动 server:

```bash
cd veadk_rl_lightning_project
bash train.sh
```

## 原理说明

生成后的项目结构如下,其中核心文件包括:

- agent_client: `*_agent.py` 中定义了agent的rollout逻辑和reward规则
- training_server: `train.sh` 定义了训练相关参数,用于启动训练服务器

```shell
veadk_rl_lightning_project
├── data
├── demo_train.parquet # 训练数据,必须为 parquet 格式
├── demo_test.parquet # 测试数据,必须为 parquet 格式
└── demo_calculate_agent.py # agent的rollout逻辑和reward设定
└── train.sh # 训练服务器启动脚本,设定训练相关参数
└── restart_ray.sh # 重启 ray 集群脚本
```

## 最佳实践案例

1. 脚手架中,基于 VeADK 的算术 Agent 进行强化学习优化
2. 启动 client (python demo_calculate_agent.py), 重启ray集群(bash restart_ray.sh), 最后启动训练服务器server (bash train.sh),分别在终端1与终端2中运行以上命令
Binary file not shown.
Binary file not shown.
148 changes: 148 additions & 0 deletions veadk/cli/templates/rl/lightning/demo_calculate_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import re
import string
import sympy
from typing import Any, cast
from veadk.agent import Agent
from veadk.runner import Runner
from veadk.memory.short_term_memory import ShortTermMemory
from agentlightning import (
LLM,
LitAgent,
NamedResources,
Trainer,
reward,
)


def normalize_option(option: str) -> str:
"""
>>> normalize_option(" (A) \n")
'A'
"""
return re.sub(r"(\s+|\(|\))", "", option)


def is_option_result(result: str) -> bool:
"""
>>> is_option_result(" A) \n")
True
>>> is_option_result(" 23/7 ")
False
"""
return normalize_option(result) in list(string.ascii_letters)


def float_eval(input_str: str) -> float:
if " = around " in input_str:
input_str = input_str.split(" = around ")[0]
expr = sympy.parse_expr(input_str, evaluate=True)
return float(expr.evalf())


def scalar_are_results_same(pred_result: str, true_result: str, rel_tol: float) -> bool:
pred_result = str(pred_result) if pred_result is not None else "" # type: ignore
true_result = str(true_result) if true_result is not None else "" # type: ignore

if pred_result.strip() == true_result.strip():
return True

if is_option_result(true_result):
# The task is to select correct option
true_result = normalize_option(true_result)
pred_result = normalize_option(pred_result)
return pred_result == true_result

# The task is to calculate the result as a number
try:
pred_float = float_eval(pred_result)
true_float = float_eval(true_result)
return math.isclose(pred_float, true_float, rel_tol=rel_tol)
except Exception:
pass

return False


@reward
async def eval(prediction: str, ground_truth: str) -> float:
return float(scalar_are_results_same(prediction, ground_truth, 1e-2))


class CalcAgent(LitAgent[Any]):
async def training_rollout_async(
self, task: Any, rollout_id: str, resources: NamedResources
) -> Any: # type: ignore
llm: LLM = cast(LLM, resources.get("main_llm"))
calc_agent = Agent(
name="CalcAgent",
description="An agent that can perform calculations to answer questions.",
instruction="You are a helpful assistant that can perform mathematical calculations to answer questions accurately.",
model_provider="openai",
model=llm.model,
api_base=llm.endpoint,
api_key="",
)
runner = Runner(
agent=calc_agent,
short_term_memory=ShortTermMemory(),
app_name="calc_agent",
user_id="veadk_default_user",
)
try:
output_format = "Output the answer when you are ready. The answer should be surrounded by three sharps (`###`), in the form of ### ANSWER: <answer> ###."
prompt = task["question"] + " " + output_format
result = await runner.run(
session_id=rollout_id,
messages=prompt,
)
# evaluate
answer = re.search(
r"###\s*ANSWER:\s*(.+?)(\s*###|$)", result.messages[-1].content
) # type: ignore
if answer:
answer = answer.group(1)
else:
answer = result.messages[-1].content # type: ignore
except Exception as e:
print("Failure:", str(e))
answer = "None"
reward = await eval(
answer, str(task["result"])
) # reward is tracked with the decorator # type: ignore
print(
"answer: {} ground_truth: {} reward: {}".format(
answer, task["result"], reward
)
) # type: ignore

async def validation_rollout_async(
self, task: Any, rollout_id: str, resources: NamedResources
) -> Any: # type: ignore
llm: LLM = cast(LLM, resources.get("main_llm"))
resources = {
"main_llm": LLM(
endpoint=llm.endpoint,
model=llm.model,
sampling_parameters={"temperature": 0},
)
}
return await self.training_rollout_async(task, rollout_id, resources)


if __name__ == "__main__":
Trainer(n_workers=10).fit(CalcAgent(), "http://localhost:9999/")
7 changes: 7 additions & 0 deletions veadk/cli/templates/rl/lightning/restart_ray.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -ex

ray stop -v --force --grace-period 60
ps aux
env RAY_DEBUG=legacy HYDRA_FULL_ERROR=1 VLLM_USE_V1=1 ray start --head --dashboard-host=0.0.0.0 --port 6380 --dashboard-port 8266
53 changes: 53 additions & 0 deletions veadk/cli/templates/rl/lightning/train.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

set -e

export N_GPUS=1
export BASE_MODEL=Qwen/Qwen2.5-1.5B-Instruct
export DATA_DIR=data
export ROLLOUT_TP_SIZE=1
export EXPERIMENT_NAME=calc_x
export PROJECT_NAME=AgentLightning

echo "Starting training script..."

python -m agentlightning.verl \
algorithm.adv_estimator=grpo \
data.train_files=${DATA_DIR}/train.parquet \
data.val_files=${DATA_DIR}/test.parquet \
actor_rollout_ref.rollout.tensor_model_parallel_size=$ROLLOUT_TP_SIZE \
trainer.n_gpus_per_node=${N_GPUS} \
data.train_batch_size=32 \
actor_rollout_ref.rollout.n=4 \
actor_rollout_ref.actor.ppo_mini_batch_size=32 \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=4 \
actor_rollout_ref.rollout.multi_turn.format=hermes \
actor_rollout_ref.model.path=${BASE_MODEL} \
data.max_prompt_length=4096 \
data.max_response_length=2048 \
data.truncation='error' \
trainer.val_before_train=True \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.actor.use_kl_loss=False \
actor_rollout_ref.actor.kl_loss_coef=0.000 \
actor_rollout_ref.actor.entropy_coeff=0 \
actor_rollout_ref.actor.clip_ratio_low=0.2 \
actor_rollout_ref.actor.clip_ratio_high=0.3 \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.actor.fsdp_config.param_offload=True \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=True \
actor_rollout_ref.rollout.name=vllm \
actor_rollout_ref.rollout.gpu_memory_utilization=0.8 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=8 \
actor_rollout_ref.ref.fsdp_config.param_offload=True \
algorithm.use_kl_in_reward=False \
trainer.critic_warmup=0 \
trainer.logger=['console','wandb'] \
trainer.project_name=${PROJECT_NAME} \
trainer.experiment_name=${EXPERIMENT_NAME} \
trainer.nnodes=1 \
trainer.save_freq=256 \
trainer.test_freq=32 \
trainer.total_epochs=2 $@