为不同 Agent 开发框架提供统一的会话状态持久化能力,底层存储选用阿里云 TableStore(OTS,宽表模型)。
采用 统一存储 + 中心 Service + 薄 Adapter 的三层设计:
ADK Agent ──→ OTSSessionService ──┐
│ ┌─────────────┐ ┌─────────┐
LangChain ──→ OTSChatMessageHistory ──→│ SessionStore │───→│ OTS │
│ │ (业务逻辑层) │───→│ Tables │
LangGraph ──→ OTSCheckpointSaver ─┘ └─────────────┘ └─────────┘
│
OTSBackend
(存储操作层)
- SessionStore:核心业务层,理解 OTS 表结构,提供 Session / Event / State 的 CRUD、级联删除、三级状态合并等统一接口。
- OTSBackend:存储操作层,封装 TableStore SDK 的底层调用。
- Adapter:薄适配层,仅负责框架数据模型转换。
- 阿里云账号,配置 AK/SK 环境变量
- AgentRun 平台上已创建 MemoryCollection(包含 OTS 实例配置)
pip install agentrun方式一(推荐):通过 MemoryCollection 自动获取 OTS 连接信息
from agentrun.conversation_service import SessionStore
# 环境变量:AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET
store = SessionStore.from_memory_collection("your-memory-collection-name")
# 首次使用时创建表
store.init_tables()from_memory_collection() 内部自动完成:
- 调用 AgentRun API 获取 MemoryCollection 配置
- 从中提取 OTS 的 endpoint 和 instance_name
- 从
Config读取 AK/SK 凭证 - 构建 OTSClient 和 OTSBackend
方式二:手动传入 OTSClient
import tablestore
from agentrun.conversation_service import SessionStore, OTSBackend
ots_client = tablestore.OTSClient(
endpoint, access_key_id, access_key_secret, instance_name,
retry_policy=tablestore.WriteRetryPolicy(),
)
backend = OTSBackend(ots_client)
store = SessionStore(backend)
store.init_tables()表和索引按用途分组创建,避免创建不必要的表:
| 方法 | 创建的资源 | 适用场景 |
|---|---|---|
init_core_tables() |
Conversation + Event + 二级索引 | 所有框架 |
init_state_tables() |
State + App_state + User_state | ADK 三级 State |
init_search_index() |
多元索引(conversation_search_index) | 需要搜索/过滤 |
init_checkpoint_tables() |
checkpoint + checkpoint_writes + checkpoint_blobs | LangGraph |
init_tables() |
核心表 + State 表 + 多元索引(不含 checkpoint 表) | 快速开发 |
多元索引创建耗时较长(数秒级),建议与核心表创建分离,不阻塞核心流程。 checkpoint 表仅在使用 LangGraph 时需要,需显式调用
init_checkpoint_tables()。
import asyncio
from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSSessionService
from google.adk.agents import Agent
from google.adk.runners import Runner
# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_tables()
session_service = OTSSessionService(session_store=store)
# 创建 Agent + Runner
agent = Agent(name="assistant", model=my_model, instruction="...")
runner = Runner(agent=agent, app_name="my_app", session_service=session_service)
# 对话自动持久化到 OTS
async def chat():
session = await session_service.create_session(
app_name="my_app", user_id="user_1"
)
async for event in runner.run_async(
user_id="user_1", session_id=session.id, new_message=content
):
...
asyncio.run(chat())from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSChatMessageHistory
from langchain_core.messages import HumanMessage, AIMessage
# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_core_tables()
# 创建消息历史(自动关联 Session)
history = OTSChatMessageHistory(
session_store=store,
agent_id="my_agent",
user_id="user_1",
session_id="session_1",
)
# 添加消息(自动持久化到 OTS)
history.add_message(HumanMessage(content="你好"))
history.add_message(AIMessage(content="你好!有什么可以帮你的?"))
# 读取历史消息
for msg in history.messages:
print(f"{msg.type}: {msg.content}")import asyncio
from langgraph.graph import StateGraph, START, END
from agentrun.conversation_service import SessionStore
from agentrun.conversation_service.adapters import OTSCheckpointSaver
# 初始化
store = SessionStore.from_memory_collection("my-collection")
store.init_core_tables() # conversation 表(会话同步需要)
store.init_checkpoint_tables() # checkpoint 相关表
# 创建 checkpointer(指定 agent_id 后自动同步 conversation 记录)
checkpointer = OTSCheckpointSaver(
store, agent_id="my_agent", user_id="default_user"
)
# 构建 Graph
graph = StateGraph(MyState)
graph.add_node("step", my_node)
graph.add_edge(START, "step")
graph.add_edge("step", END)
app = graph.compile(checkpointer=checkpointer)
# 对话(自动持久化 checkpoint 到 OTS + 同步 conversation 记录)
async def chat():
config = {
"configurable": {"thread_id": "thread-1"},
"metadata": {"user_id": "user_1"}, # 可选,覆盖默认 user_id
}
result = await app.ainvoke({"messages": [...]}, config=config)
# 再次调用同一 thread_id 会自动恢复状态
result2 = await app.ainvoke({"messages": [...]}, config=config)
asyncio.run(chat())会话同步:指定
agent_id后,每次put()会自动在 conversation 表中创建/更新会话记录(session_id = thread_id,framework = "langgraph")。这使得外部服务可以通过agent_id / user_id查询到 LangGraph 的所有会话。
外部服务(如 Go 后端)可直接通过 OTS SDK 查询 LangGraph 会话状态:
- 列出会话:查询 conversation 表(按
agent_id/user_id,过滤framework = "langgraph") - 读取最新 checkpoint:用
session_id(即thread_id)查询 checkpoint 表(GetRange BACKWARD limit=1) - 解析数据:
checkpoint_data和blob_data为base64(msgpack)格式,Go 使用 msgpack 库(如github.com/vmihailenco/msgpack/v5)解码 - 注意:对于包含 LangChain 对象(HumanMessage 等)的 blob,msgpack 中包含 ext type,需要自定义 decoder 提取 kwargs
详细序列化格式说明和 Go 伪代码见 conversation_design.md。
from agentrun.conversation_service import SessionStore
store = SessionStore.from_memory_collection("my-collection")
store.init_tables()
# Session CRUD
session = store.create_session("agent_1", "user_1", "sess_1", summary="测试会话")
sessions = store.list_sessions("agent_1", "user_1")
# Event CRUD
event = store.append_event("agent_1", "user_1", "sess_1", "message", {"text": "hello"})
events = store.get_events("agent_1", "user_1", "sess_1")
recent = store.get_recent_events("agent_1", "user_1", "sess_1", n=10)
# 三级 State 管理(ADK 概念)
store.update_app_state("agent_1", {"model": "qwen-max"})
store.update_user_state("agent_1", "user_1", {"language": "zh-CN"})
store.update_session_state("agent_1", "user_1", "sess_1", {"topic": "weather"})
merged = store.get_merged_state("agent_1", "user_1", "sess_1")
# merged = app_state <- user_state <- session_state(浅合并)
# 多元索引搜索
results, total = store.search_sessions(
"agent_1",
summary_keyword="天气",
updated_after=1700000000000000,
limit=20,
)
# 级联删除(Event → State → Session 行)
store.delete_session("agent_1", "user_1", "sess_1")核心业务层,所有方法同时提供同步和异步(_async 后缀)版本。
工厂方法
| 方法 | 说明 |
|---|---|
from_memory_collection(name, *, config, table_prefix) |
通过 MemoryCollection 名称创建实例 |
初始化
| 方法 | 说明 |
|---|---|
init_tables() |
创建所有表和索引(不含 checkpoint) |
init_core_tables() |
创建核心表 + 二级索引 |
init_state_tables() |
创建三张 State 表 |
init_search_index() |
创建多元索引 |
init_checkpoint_tables() |
创建 LangGraph checkpoint 表 |
Session 管理
| 方法 | 说明 |
|---|---|
create_session(agent_id, user_id, session_id, ...) |
创建新会话 |
get_session(agent_id, user_id, session_id) |
获取单个会话 |
list_sessions(agent_id, user_id, limit) |
列出用户会话(按 updated_at 倒序) |
list_all_sessions(agent_id, limit) |
列出 agent 下所有会话 |
search_sessions(agent_id, *, user_id, summary_keyword, ...) |
多元索引搜索会话 |
update_session(agent_id, user_id, session_id, *, version, ...) |
更新会话属性(乐观锁) |
delete_session(agent_id, user_id, session_id) |
级联删除会话 |
Event 管理
| 方法 | 说明 |
|---|---|
append_event(agent_id, user_id, session_id, event_type, content) |
追加事件 |
get_events(agent_id, user_id, session_id) |
获取全部事件(正序) |
get_recent_events(agent_id, user_id, session_id, n) |
获取最近 N 条事件 |
delete_events(agent_id, user_id, session_id) |
删除会话下所有事件 |
State 管理
| 方法 | 说明 |
|---|---|
get_session_state / update_session_state |
会话级状态读写 |
get_app_state / update_app_state |
应用级状态读写 |
get_user_state / update_user_state |
用户级状态读写 |
get_merged_state(agent_id, user_id, session_id) |
三级状态浅合并 |
Checkpoint 管理(LangGraph)
| 方法 | 说明 |
|---|---|
put_checkpoint(thread_id, checkpoint_ns, checkpoint_id, ...) |
写入 checkpoint |
get_checkpoint(thread_id, checkpoint_ns, checkpoint_id) |
读取 checkpoint |
list_checkpoints(thread_id, checkpoint_ns, *, limit, before) |
列出 checkpoint |
put_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id, writes) |
批量写入 writes |
get_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id) |
读取 writes |
put_checkpoint_blob(thread_id, checkpoint_ns, channel, version, ...) |
写入 blob |
get_checkpoint_blobs(thread_id, checkpoint_ns, channel_versions) |
批量读取 blobs |
delete_thread_checkpoints(thread_id) |
删除 thread 全部 checkpoint 数据 |
| 适配器 | 框架 | 基类 |
|---|---|---|
OTSSessionService |
Google ADK | BaseSessionService |
OTSChatMessageHistory |
LangChain | BaseChatMessageHistory |
OTSCheckpointSaver |
LangGraph | BaseCheckpointSaver |
| 模型 | 说明 |
|---|---|
ConversationSession |
会话对象(含 agent_id, user_id, session_id, summary, labels 等) |
ConversationEvent |
事件对象(含 seq_id 自增序号、type、content、raw_event) |
StateData |
状态数据对象(含 state 字典、version 乐观锁) |
StateScope |
状态作用域枚举:APP / USER / SESSION |
共八张表 + 一个二级索引 + 两个多元索引:
| 表名 | 主键 | 用途 |
|---|---|---|
conversation |
agent_id, user_id, session_id | 会话元信息 |
event |
agent_id, user_id, session_id, seq_id (自增) | 事件/消息流 |
state |
agent_id, user_id, session_id | 会话级状态 |
app_state |
agent_id | 应用级状态 |
user_state |
agent_id, user_id | 用户级状态 |
checkpoint |
thread_id, checkpoint_ns, checkpoint_id | LangGraph checkpoint |
checkpoint_writes |
thread_id, checkpoint_ns, checkpoint_id, task_idx | LangGraph 中间写入 |
checkpoint_blobs |
thread_id, checkpoint_ns, channel, version | LangGraph 通道值快照 |
conversation_secondary_index |
agent_id, user_id, updated_at, session_id | 二级索引(list 热路径) |
conversation_search_index |
多元索引 | 全文搜索 / 标签过滤 / 组合查询 |
表名支持通过
table_prefix参数添加前缀,实现多租户隔离。
| 文件 | 说明 |
|---|---|
conversation_service_adk_agent.py |
ADK Agent 完整对话示例,自动持久化到 OTS |
conversation_service_adk_example.py |
ADK 数据读写验证(Session / Event / State) |
conversation_service_adk_data.py |
ADK 模拟数据填充 + 多元索引搜索验证 |
conversation_service_langchain_example.py |
LangChain 消息历史读写验证 |
conversation_service_langchain_data.py |
LangChain 模拟数据填充 |
conversation_service_langgraph_example.py |
LangGraph checkpoint 持久化示例 |
conversation_service_verify.py |
端到端 CRUD 验证脚本 |
| 变量 | 说明 | 必填 |
|---|---|---|
AGENTRUN_ACCESS_KEY_ID |
阿里云 Access Key ID | 是(使用 from_memory_collection 时) |
AGENTRUN_ACCESS_KEY_SECRET |
阿里云 Access Key Secret | 是(使用 from_memory_collection 时) |
ALIBABA_CLOUD_ACCESS_KEY_ID |
备选 AK 环境变量 | 否(AK 候选) |
ALIBABA_CLOUD_ACCESS_KEY_SECRET |
备选 SK 环境变量 | 否(SK 候选) |
MEMORY_COLLECTION_NAME |
MemoryCollection 名称(示例脚本使用) | 否 |
详细的表设计、访问模式分析和分层架构说明见 conversation_design.md。