Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 185 additions & 159 deletions evaluate/base/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,175 +115,201 @@ def multi_turn_evaluation_task(dataset_item):
return result


async def evaluation_threads_task(file_path, max_turn_count=10):
"""与ADK agent进行多轮对话测试"""
print('=' * 80)
print('🤖 与ADK Agent多轮对话测试')
print('=' * 80)
import asyncio
import json
import re
import time
from typing import Dict, Any, List

dataset_json = json.loads(load_dataset_json(file_path))
eval_results = []
for index, dataset_item in enumerate(dataset_json):
time.sleep(10) # 避免请求过于频繁
session_service = InMemorySessionService()
session = await session_service.create_session(
app_name='matmaster_agent',
user_id='human_simulator_test',

async def _run_conversation(dataset_item: Dict[str, Any], max_turn_count: int, save_mode: str = 'w') -> Dict[str, Any]:
"""
执行一次对话测试,并返回结果
:param dataset_item: 单条测试数据
:param max_turn_count: 最大对话轮次
:param save_mode: 写文件模式 ("w" 覆盖 / "a" 追加)
"""
session_service = InMemorySessionService()
session = await session_service.create_session(
app_name='matmaster_agent',
user_id='human_simulator_test',
)

logger.info(f"Test Session: {session.id}")

runner = Runner(
app_name='matmaster_agent',
agent=root_agent,
session_service=session_service
)

simulator = HumanSimulator(max_turn_count=max_turn_count)

# 场景初始化
scenario = {
'name': dataset_item['initial_question'],
'goal': ConversationGoal(
initial_question=dataset_item['initial_question'],
expected_outcomes=dataset_item['expected_outcomes'],
success_criteria=dataset_item['success_criteria']
)
}
print(f"\n{'=' * 20} 测试场景: {scenario['name']} {'=' * 20}")

logger.info(f"Test Session: {session.id}")
simulator.set_goal(scenario['goal'])
initial_question = simulator.get_initial_question()

runner = Runner(
app_name='matmaster_agent',
agent=root_agent,
session_service=session_service
)
print(f"🎯 对话目标: {initial_question}")
print(f"📋 期望结果: {', '.join(scenario['goal'].expected_outcomes)}")
print(f"✅ 成功标准: {', '.join(scenario['goal'].success_criteria)}")

# 初始化结果
eval_results = {
'initial_question': initial_question,
'expected_outcomes': scenario['goal'].expected_outcomes,
'success_criteria': scenario['goal'].success_criteria,
}
for i in range(1, max_turn_count + 1):
eval_results[f'agent_response_{i}'] = ''
eval_results[f'user_response_{i}'] = ''

# 对话循环
turn_count = 0
while turn_count < max_turn_count:
turn_count += 1
print(f"\n🔄 第 {turn_count} 轮对话:")

# 获取用户输入
user_input = initial_question if turn_count == 1 else simulator.get_last_user_response()
print(f"🧑 模拟用户: {user_input}")

# 调用 agent
try:
content = types.Content(role='user', parts=[types.Part(text=user_input)])
agent_response = ''

events = runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE)
)

async for event in events:
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
agent_response += part.text
except asyncio.CancelledError:
msg = '任务被取消,可能是超时或作用域取消导致'
logger.error(msg)
eval_results[f'agent_response_{turn_count}'] = msg
break
except Exception as e:
logger.error(f"获取agent响应失败: {e}")
eval_results[f'agent_response_{turn_count}'] = str(e)
break

# 创建人类模拟器
simulator = HumanSimulator(max_turn_count=max_turn_count)

# 数据预处理
scenario = {
'name': dataset_item['initial_question'],
'goal': ConversationGoal(
initial_question=dataset_item['initial_question'],
expected_outcomes=dataset_item['expected_outcomes'],
success_criteria=dataset_item['success_criteria']
)}

print(f"\n{'=' * 20} 测试场景: {scenario['name']} {'=' * 20}")

# 设置对话目标
simulator.set_goal(scenario['goal'])
initial_question = simulator.get_initial_question()

print(f"🎯 对话目标: {initial_question}")
print(f"📋 期望结果: {', '.join(scenario['goal'].expected_outcomes)}")
print(f"✅ 成功标准: {', '.join(scenario['goal'].success_criteria)}")

# 初始化记录
eval_results.append({})
eval_results[index]['initial_question'] = initial_question
eval_results[index]['expected_outcomes'] = scenario['goal'].expected_outcomes
eval_results[index]['success_criteria'] = scenario['goal'].success_criteria
for i in range(1, 6):
eval_results[index][f'agent_response_{i}'] = ''
eval_results[index][f'user_response_{i}'] = ''

# 开始对话
conversation_ended = False
turn_count = 0

while not conversation_ended and turn_count < 10:
turn_count += 1
print(f"\n🔄 第 {turn_count} 轮对话:")

# 获取用户输入(从模拟器)
if turn_count == 1:
user_input = initial_question
else:
# 从模拟器获取响应
user_input = simulator.get_last_user_response()

print(f"🧑 模拟用户: {user_input}")

# 调用ADK agent
eval_results[f'agent_response_{turn_count}'] = agent_response
print(f"🤖 ADK Agent: {agent_response}")

# 提取 job_id
job_jsons = re.findall(r'<bohrium-chat-msg>(.*?)</bohrium-chat-msg>', agent_response)
job_ids: List[str] = []
for job_json in job_jsons:
try:
content = types.Content(role='user', parts=[types.Part(text=user_input)])

agent_response = ''

events = runner.run_async(
user_id=session.user_id,
session_id=session.id,
new_message=content,
run_config=RunConfig(streaming_mode=StreamingMode.SSE)
)

# 收集agent响应
async for event in events:
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
agent_response += part.text
except asyncio.CancelledError:
logger.error('任务被取消,可能是超时或作用域取消导致')
print(f"✅ 对话在第{turn_count}轮结束")
eval_results[index][f'agent_response_{turn_count}'] = '任务被取消,可能是超时或作用域取消导致'
break
job_json = json.loads(job_json)
if 'eventData' in job_json and 'content' in job_json['eventData']:
content = job_json['eventData']['content']
if 'job_list' in content and 'job_id' in content['job_list']:
job_ids.append(content['job_list']['job_id'])
except Exception as e:
logger.error(f"获取agent响应失败: {e}")
print(f"✅ 对话在第{turn_count}轮结束")
eval_results[index][f'agent_response_{turn_count}'] = str(e)
break

eval_results[index][f'agent_response_{turn_count}'] = agent_response
print(f"🤖 ADK Agent: {agent_response}")

job_jsons = re.findall(r'<bohrium-chat-msg>(.*?)</bohrium-chat-msg>', agent_response)
job_ids = []
if job_jsons:
for job_json in job_jsons:
try:
job_json = json.loads(job_json)
if 'eventData' in job_json and 'content' in job_json['eventData']:
content = job_json['eventData']['content']
if 'job_list' in content and 'job_id' in content['job_list']:
job_id = content['job_list']['job_id']
job_ids.append(job_id)
except Exception as e:
logger.error(f"提取job_id失败: {e}")

# 查询job状态
if job_ids:
job_ids = list(set(job_ids))
while True:
time.sleep(10)
all_finished = True
for job_id in job_ids:
bohrium_client = Bohrium()
job_info = bohrium_client.job.detail(job_id)
logger.info(f"查询到job状态: {job_id} - 状态: {job_info["status"]}")
if job_info['status'] not in [-1, 2]:
all_finished = False
if all_finished:
break

# 使用模拟器生成用户响应
user_response, should_continue = simulator.get_bohr_results(agent_response, job_ids)
eval_results[index][f'user_response_{turn_count}'] = user_response
print(f"🧑 模拟用户: {user_response}")
else:
# 使用模拟器生成用户响应
user_response, should_continue = simulator.generate_response(agent_response)
eval_results[index][f'user_response_{turn_count}'] = user_response
print(f"🧑 模拟用户: {user_response}")

if not should_continue:
print(f"✅ 对话在第{turn_count}轮结束")
break

# 获取对话摘要
summary = simulator.get_conversation_summary()
eval_results[index]['total_turns'] = summary['total_turns']
eval_results[index]['final_state'] = summary['final_state']
eval_results[index]['duration_minutes'] = summary['duration_minutes']
print(f"\n📊 对话摘要:")
print(f" - 总轮次: {summary['total_turns']}")
print(f" - 最终状态: {summary['final_state']}")
print(f" - 耗时: {summary['duration_minutes']:.1f} 分钟")

with open('evaluation_results.json', 'w') as f:
json.dump(eval_results, f, indent=4, ensure_ascii=False)

# 简单的成功判断
if summary['final_state'] == 'satisfied':
print('✅ 测试通过: 对话成功完成')
logger.error(f"提取job_id失败: {e}")

# 查询 job 状态
if job_ids:
job_ids = list(set(job_ids))
while True:
time.sleep(10)
all_finished = True
for job_id in job_ids:
bohrium_client = Bohrium()
job_info = bohrium_client.job.detail(job_id)
logger.info(f"查询到job状态: {job_id} - 状态: {job_info['status']}")
if job_info['status'] not in [-1, 2]:
all_finished = False
if all_finished:
break

user_response, should_continue = simulator.get_bohr_results(agent_response, job_ids)
else:
print('❌ 测试失败: 对话未成功完成')
user_response, should_continue = simulator.generate_response(agent_response)

eval_results[f'user_response_{turn_count}'] = user_response
print(f"🧑 模拟用户: {user_response}")

await runner.close()
if not should_continue:
print(f"✅ 对话在第{turn_count}轮结束")
break

# 对话总结
summary = simulator.get_conversation_summary()
eval_results.update({
'total_turns': summary['total_turns'],
'final_state': summary['final_state'],
'duration_minutes': summary['duration_minutes'],
})

print(f"\n📊 对话摘要:")
print(f" - 总轮次: {summary['total_turns']}")
print(f" - 最终状态: {summary['final_state']}")
print(f" - 耗时: {summary['duration_minutes']:.1f} 分钟")

# 保存结果
with open('evaluation_results.json', save_mode) as f:
json.dump(eval_results, f, indent=4, ensure_ascii=False)

if summary['final_state'] == 'satisfied':
print('✅ 测试通过: 对话成功完成')
else:
print('❌ 测试失败: 对话未成功完成')

await runner.close()
return eval_results


async def evaluation_threads_task(file_path: str, max_turn_count: int = 10):
"""批量测试所有数据"""
print('=' * 80)
print('🤖 与ADK Agent多轮对话测试')
print('=' * 80)

dataset_json = json.loads(load_dataset_json(file_path))
results = []
for i, dataset_item in enumerate(dataset_json):
time.sleep(10) # 避免请求过于频繁
result = await _run_conversation(dataset_item, max_turn_count, save_mode='w' if i == 0 else 'a')
results.append(result)

print('\n' + '=' * 80)
print('🎉 多轮对话测试完成!')
print('=' * 80)
return results


async def evaluation_threads_single_task(file_path: str, item_id: int, max_turn_count: int = 10):
"""测试单个数据"""
print('=' * 80)
print('🤖 与ADK Agent多轮对话测试')
print('=' * 80)

dataset_json = json.loads(load_dataset_json(file_path))
dataset_item = dataset_json[item_id]
time.sleep(10) # 避免请求过于频繁

result = await _run_conversation(dataset_item, max_turn_count, save_mode='a')

print('\n' + '=' * 80)
print('🎉 单条多轮对话测试完成!')
print('=' * 80)
return result
18 changes: 10 additions & 8 deletions evaluate/base/human_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,20 +142,22 @@ def _build_response_prompt(self, agent_message: str) -> str:
Agent最新回复:
{agent_message}

请分析agent的回复是否满足任务需求,并生成合适的响应
请分析agent的回复是否满足 初始问题 需求。如果回复大致符合初始任务要求,请结束对话。如果不符合,请分析不符合的点在哪儿,并生成简洁的用户回复,继续引导agent完成任务

重要限制:
- 对话最多{self.max_turn_count}轮,当前是第{self.turn_count}轮
- 除首轮对话外,其他轮次尽可能简短地回答agent的问题,回复内容紧扣初始问题,禁止发散
- 如果agent在询问具体参数或设置,提供简洁明确的回答
- 如果agent已经提供了初始任务所需的信息或完成了任务,请立刻结束对话
- 禁止回复可能导致agent产生误解或偏离目标的内容

请以JSON格式回复:
- 对话最多{self.max_turn_count}轮,当前是第{self.turn_count}轮;
- 尽可能简短地回答agent的问题,回复内容紧扣初始问题,禁止发散,避免执行轮数超出限制;
- 如果agent在询问具体参数或设置,提供简洁明确的回答;
- 如果agent明确指出当前任务无法完成,请礼貌地结束对话;
- 如果agent已经提供了初始任务所需的信息或完成了任务,请立刻结束对话;
- agent仅能以文本形式回复,禁止要求agent提供可视化结果;

请以如下JSON格式回复:
{{
"response": "你的回复内容",
"continue": true/false // 是否继续对话
}}

"""

def get_bohr_results(self, agent_message: str, job_id: List[str]) -> Tuple[str, bool]:
Expand Down
Loading
Loading