Skip to content

Latest commit

 

History

History
360 lines (275 loc) · 14.2 KB

File metadata and controls

360 lines (275 loc) · 14.2 KB

Conversation Service

为不同 Agent 开发框架提供统一的会话状态持久化能力,底层存储选用阿里云 TableStore(OTS,宽表模型)。

架构概览

采用 统一存储 + 中心 Service + 薄 Adapter 的三层设计:

ADK Agent ──→ OTSSessionService ──┐
                                  │    ┌─────────────┐    ┌─────────┐
LangChain ──→ OTSChatMessageHistory ──→│ SessionStore │───→│  OTS    │
                                  │    │ (业务逻辑层)  │───→│ Tables  │
LangGraph ──→ OTSCheckpointSaver ─┘    └─────────────┘    └─────────┘
                                              │
                                        OTSBackend
                                       (存储操作层)
  • SessionStore:核心业务层,理解 OTS 表结构,提供 Session / Event / State 的 CRUD、级联删除、三级状态合并等统一接口。
  • OTSBackend:存储操作层,封装 TableStore SDK 的底层调用。
  • Adapter:薄适配层,仅负责框架数据模型转换。

快速开始

前置条件

  • 阿里云账号,配置 AK/SK 环境变量
  • AgentRun 平台上已创建 MemoryCollection(包含 OTS 实例配置)

安装

pip install agentrun

初始化

方式一(推荐):通过 MemoryCollection 自动获取 OTS 连接信息

from agentrun.conversation_service import SessionStore

# 环境变量:AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET
store = SessionStore.from_memory_collection("your-memory-collection-name")

# 首次使用时创建表
store.init_tables()

from_memory_collection() 内部自动完成:

  1. 调用 AgentRun API 获取 MemoryCollection 配置
  2. 从中提取 OTS 的 endpoint 和 instance_name
  3. Config 读取 AK/SK 凭证
  4. 构建 OTSClient 和 OTSBackend

方式二:手动传入 OTSClient

import tablestore
from agentrun.conversation_service import SessionStore, OTSBackend

ots_client = tablestore.OTSClient(
    endpoint, access_key_id, access_key_secret, instance_name,
    retry_policy=tablestore.WriteRetryPolicy(),
)
backend = OTSBackend(ots_client)
store = SessionStore(backend)
store.init_tables()

表初始化策略

表和索引按用途分组创建,避免创建不必要的表:

方法 创建的资源 适用场景
init_core_tables() Conversation + Event + 二级索引 所有框架
init_state_tables() State + App_state + User_state ADK 三级 State
init_search_index() 多元索引(conversation_search_index) 需要搜索/过滤
init_checkpoint_tables() checkpoint + checkpoint_writes + checkpoint_blobs LangGraph
init_tables() 核心表 + State 表 + 多元索引(不含 checkpoint 表) 快速开发

多元索引创建耗时较长(数秒级),建议与核心表创建分离,不阻塞核心流程。 checkpoint 表仅在使用 LangGraph 时需要,需显式调用 init_checkpoint_tables()

使用示例

Google ADK 集成

import asyncio
from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSSessionService
from google.adk.agents import Agent
from google.adk.runners import Runner

# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_tables()
session_service = OTSSessionService(session_store=store)

# 创建 Agent + Runner
agent = Agent(name="assistant", model=my_model, instruction="...")
runner = Runner(agent=agent, app_name="my_app", session_service=session_service)

# 对话自动持久化到 OTS
async def chat():
    session = await session_service.create_session(
        app_name="my_app", user_id="user_1"
    )
    async for event in runner.run_async(
        user_id="user_1", session_id=session.id, new_message=content
    ):
        ...

asyncio.run(chat())

LangChain 集成

from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage

# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_core_tables()

# 创建消息历史(自动关联 Session)
history = OTSChatMessageHistory(
    session_store=store,
    agent_id="my_agent",
    user_id="user_1",
    session_id="session_1",
)

# 添加消息(自动持久化到 OTS)
history.add_message(HumanMessage(content="你好"))
history.add_message(AIMessage(content="你好!有什么可以帮你的?"))

# 读取历史消息
for msg in history.messages:
    print(f"{msg.type}: {msg.content}")

LangGraph 集成

import asyncio
from langgraph.graph import StateGraph, START, END
from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSCheckpointSaver

# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_core_tables()          # conversation 表(会话同步需要)
store.init_checkpoint_tables()    # checkpoint 相关表

# 创建 checkpointer(指定 agent_id 后自动同步 conversation 记录)
checkpointer = OTSCheckpointSaver(
    store, agent_id="my_agent", user_id="default_user"
)

# 构建 Graph
graph = StateGraph(MyState)
graph.add_node("step", my_node)
graph.add_edge(START, "step")
graph.add_edge("step", END)
app = graph.compile(checkpointer=checkpointer)

# 对话(自动持久化 checkpoint 到 OTS + 同步 conversation 记录)
async def chat():
    config = {
        "configurable": {"thread_id": "thread-1"},
        "metadata": {"user_id": "user_1"},  # 可选,覆盖默认 user_id
    }
    result = await app.ainvoke({"messages": [...]}, config=config)
    # 再次调用同一 thread_id 会自动恢复状态
    result2 = await app.ainvoke({"messages": [...]}, config=config)

asyncio.run(chat())

会话同步:指定 agent_id 后,每次 put() 会自动在 conversation 表中创建/更新会话记录(session_id = thread_idframework = "langgraph")。这使得外部服务可以通过 agent_id / user_id 查询到 LangGraph 的所有会话。

跨语言查询 LangGraph 状态

外部服务(如 Go 后端)可直接通过 OTS SDK 查询 LangGraph 会话状态:

  1. 列出会话:查询 conversation 表(按 agent_id/user_id,过滤 framework = "langgraph"
  2. 读取最新 checkpoint:用 session_id(即 thread_id)查询 checkpoint 表(GetRange BACKWARD limit=1)
  3. 解析数据checkpoint_datablob_database64(msgpack) 格式,Go 使用 msgpack 库(如 github.com/vmihailenco/msgpack/v5)解码
  4. 注意:对于包含 LangChain 对象(HumanMessage 等)的 blob,msgpack 中包含 ext type,需要自定义 decoder 提取 kwargs

详细序列化格式说明和 Go 伪代码见 conversation_design.md

直接使用 SessionStore

from agentrun.conversation_service import SessionStore

store = SessionStore.from_memory_collection("my-collection")
store.init_tables()

# Session CRUD
session = store.create_session("agent_1", "user_1", "sess_1", summary="测试会话")
sessions = store.list_sessions("agent_1", "user_1")

# Event CRUD
event = store.append_event("agent_1", "user_1", "sess_1", "message", {"text": "hello"})
events = store.get_events("agent_1", "user_1", "sess_1")
recent = store.get_recent_events("agent_1", "user_1", "sess_1", n=10)

# 三级 State 管理(ADK 概念)
store.update_app_state("agent_1", {"model": "qwen-max"})
store.update_user_state("agent_1", "user_1", {"language": "zh-CN"})
store.update_session_state("agent_1", "user_1", "sess_1", {"topic": "weather"})
merged = store.get_merged_state("agent_1", "user_1", "sess_1")
# merged = app_state <- user_state <- session_state(浅合并)

# 多元索引搜索
results, total = store.search_sessions(
    "agent_1",
    summary_keyword="天气",
    updated_after=1700000000000000,
    limit=20,
)

# 级联删除(Event → State → Session 行)
store.delete_session("agent_1", "user_1", "sess_1")

API 参考

SessionStore

核心业务层,所有方法同时提供同步和异步(_async 后缀)版本。

工厂方法

方法 说明
from_memory_collection(name, *, config, table_prefix) 通过 MemoryCollection 名称创建实例

初始化

方法 说明
init_tables() 创建所有表和索引(不含 checkpoint)
init_core_tables() 创建核心表 + 二级索引
init_state_tables() 创建三张 State 表
init_search_index() 创建多元索引
init_checkpoint_tables() 创建 LangGraph checkpoint 表

Session 管理

方法 说明
create_session(agent_id, user_id, session_id, ...) 创建新会话
get_session(agent_id, user_id, session_id) 获取单个会话
list_sessions(agent_id, user_id, limit) 列出用户会话(按 updated_at 倒序)
list_all_sessions(agent_id, limit) 列出 agent 下所有会话
search_sessions(agent_id, *, user_id, summary_keyword, ...) 多元索引搜索会话
update_session(agent_id, user_id, session_id, *, version, ...) 更新会话属性(乐观锁)
delete_session(agent_id, user_id, session_id) 级联删除会话

Event 管理

方法 说明
append_event(agent_id, user_id, session_id, event_type, content) 追加事件
get_events(agent_id, user_id, session_id) 获取全部事件(正序)
get_recent_events(agent_id, user_id, session_id, n) 获取最近 N 条事件
delete_events(agent_id, user_id, session_id) 删除会话下所有事件

State 管理

方法 说明
get_session_state / update_session_state 会话级状态读写
get_app_state / update_app_state 应用级状态读写
get_user_state / update_user_state 用户级状态读写
get_merged_state(agent_id, user_id, session_id) 三级状态浅合并

Checkpoint 管理(LangGraph)

方法 说明
put_checkpoint(thread_id, checkpoint_ns, checkpoint_id, ...) 写入 checkpoint
get_checkpoint(thread_id, checkpoint_ns, checkpoint_id) 读取 checkpoint
list_checkpoints(thread_id, checkpoint_ns, *, limit, before) 列出 checkpoint
put_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id, writes) 批量写入 writes
get_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id) 读取 writes
put_checkpoint_blob(thread_id, checkpoint_ns, channel, version, ...) 写入 blob
get_checkpoint_blobs(thread_id, checkpoint_ns, channel_versions) 批量读取 blobs
delete_thread_checkpoints(thread_id) 删除 thread 全部 checkpoint 数据

框架适配器

适配器 框架 基类
OTSSessionService Google ADK BaseSessionService
OTSChatMessageHistory LangChain BaseChatMessageHistory
OTSCheckpointSaver LangGraph BaseCheckpointSaver

领域模型

模型 说明
ConversationSession 会话对象(含 agent_id, user_id, session_id, summary, labels 等)
ConversationEvent 事件对象(含 seq_id 自增序号、type、content、raw_event)
StateData 状态数据对象(含 state 字典、version 乐观锁)
StateScope 状态作用域枚举:APP / USER / SESSION

OTS 表结构

共八张表 + 一个二级索引 + 两个多元索引:

表名 主键 用途
conversation agent_id, user_id, session_id 会话元信息
event agent_id, user_id, session_id, seq_id (自增) 事件/消息流
state agent_id, user_id, session_id 会话级状态
app_state agent_id 应用级状态
user_state agent_id, user_id 用户级状态
checkpoint thread_id, checkpoint_ns, checkpoint_id LangGraph checkpoint
checkpoint_writes thread_id, checkpoint_ns, checkpoint_id, task_idx LangGraph 中间写入
checkpoint_blobs thread_id, checkpoint_ns, channel, version LangGraph 通道值快照
conversation_secondary_index agent_id, user_id, updated_at, session_id 二级索引(list 热路径)
conversation_search_index 多元索引 全文搜索 / 标签过滤 / 组合查询

表名支持通过 table_prefix 参数添加前缀,实现多租户隔离。

示例代码

文件 说明
conversation_service_adk_agent.py ADK Agent 完整对话示例,自动持久化到 OTS
conversation_service_adk_example.py ADK 数据读写验证(Session / Event / State)
conversation_service_adk_data.py ADK 模拟数据填充 + 多元索引搜索验证
conversation_service_langchain_example.py LangChain 消息历史读写验证
conversation_service_langchain_data.py LangChain 模拟数据填充
conversation_service_langgraph_example.py LangGraph checkpoint 持久化示例
conversation_service_verify.py 端到端 CRUD 验证脚本

环境变量

变量 说明 必填
AGENTRUN_ACCESS_KEY_ID 阿里云 Access Key ID 是(使用 from_memory_collection 时)
AGENTRUN_ACCESS_KEY_SECRET 阿里云 Access Key Secret 是(使用 from_memory_collection 时)
ALIBABA_CLOUD_ACCESS_KEY_ID 备选 AK 环境变量 否(AK 候选)
ALIBABA_CLOUD_ACCESS_KEY_SECRET 备选 SK 环境变量 否(SK 候选)
MEMORY_COLLECTION_NAME MemoryCollection 名称(示例脚本使用)

设计文档

详细的表设计、访问模式分析和分层架构说明见 conversation_design.md