Skip to content

Commit b4eaa7a

Browse files
committed
feat(state): Implement LangGraph checkpoint management with OTS integration
- Added checkpoint, checkpoint_writes, and checkpoint_blobs tables to support LangGraph functionality. - Introduced asynchronous methods for initializing and managing checkpoint tables in OTSBackend and SessionStore. - Enhanced SessionStore with methods for checkpoint CRUD operations. - Updated README and conversation_design.md to document new checkpoint features and usage examples. - Refactored utils to build OTS clients independently of code generation templates. This update enables persistent storage of LangGraph checkpoints, enhancing the overall functionality of the conversation service.
1 parent 2f71b02 commit b4eaa7a

18 files changed

+4266
-155
lines changed

.gitignore

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,4 @@ uv.lock
106106
coverage.json
107107
coverage.json
108108

109-
# examples
110-
examples/conversation_service_adk_example.py
111-
examples/conversation_service_adk_data.py
112-
examples/conversation_service_langchain_example.py
113-
examples/conversation_service_langchain_data.py
114-
examples/conversation_service_verify.py
115-
examples/Langchain_His_example.py
116-
examples/agent-quickstart-langchain/
109+
local

agentrun/conversation_service/README.md

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ ADK Agent ──→ OTSSessionService ──┐
1111
│ ┌─────────────┐ ┌─────────┐
1212
LangChain ──→ OTSChatMessageHistory ──→│ SessionStore │───→│ OTS │
1313
│ │ (业务逻辑层) │───→│ Tables │
14-
LangGraph ──→ (LG Adapter) ─────┘ └─────────────┘ └─────────┘
14+
LangGraph ──→ OTSCheckpointSaver ─┘ └─────────────┘ └─────────┘
1515
1616
OTSBackend
1717
(存储操作层)
@@ -78,9 +78,11 @@ store.init_tables()
7878
| `init_core_tables()` | Conversation + Event + 二级索引 | 所有框架 |
7979
| `init_state_tables()` | State + App_state + User_state | ADK 三级 State |
8080
| `init_search_index()` | 多元索引(conversation_search_index) | 需要搜索/过滤 |
81-
| `init_tables()` | 以上全部 | 快速开发 |
81+
| `init_checkpoint_tables()` | checkpoint + checkpoint_writes + checkpoint_blobs | LangGraph |
82+
| `init_tables()` | 核心表 + State 表 + 多元索引(不含 checkpoint 表) | 快速开发 |
8283

8384
> 多元索引创建耗时较长(数秒级),建议与核心表创建分离,不阻塞核心流程。
85+
> checkpoint 表仅在使用 LangGraph 时需要,需显式调用 `init_checkpoint_tables()`
8486
8587
## 使用示例
8688

@@ -143,6 +145,57 @@ for msg in history.messages:
143145
print(f"{msg.type}: {msg.content}")
144146
```
145147

148+
### LangGraph 集成
149+
150+
```python
151+
import asyncio
152+
from langgraph.graph import StateGraph, START, END
153+
from agentrun.conversation_service import SessionStore
154+
from agentrun.conversation_service.adapters import OTSCheckpointSaver
155+
156+
# 初始化
157+
store = SessionStore.from_memory_collection("my-collection")
158+
store.init_core_tables() # conversation 表(会话同步需要)
159+
store.init_checkpoint_tables() # checkpoint 相关表
160+
161+
# 创建 checkpointer(指定 agent_id 后自动同步 conversation 记录)
162+
checkpointer = OTSCheckpointSaver(
163+
store, agent_id="my_agent", user_id="default_user"
164+
)
165+
166+
# 构建 Graph
167+
graph = StateGraph(MyState)
168+
graph.add_node("step", my_node)
169+
graph.add_edge(START, "step")
170+
graph.add_edge("step", END)
171+
app = graph.compile(checkpointer=checkpointer)
172+
173+
# 对话(自动持久化 checkpoint 到 OTS + 同步 conversation 记录)
174+
async def chat():
175+
config = {
176+
"configurable": {"thread_id": "thread-1"},
177+
"metadata": {"user_id": "user_1"}, # 可选,覆盖默认 user_id
178+
}
179+
result = await app.ainvoke({"messages": [...]}, config=config)
180+
# 再次调用同一 thread_id 会自动恢复状态
181+
result2 = await app.ainvoke({"messages": [...]}, config=config)
182+
183+
asyncio.run(chat())
184+
```
185+
186+
> **会话同步**:指定 `agent_id` 后,每次 `put()` 会自动在 conversation 表中创建/更新会话记录(`session_id = thread_id``framework = "langgraph"`)。这使得外部服务可以通过 `agent_id / user_id` 查询到 LangGraph 的所有会话。
187+
188+
### 跨语言查询 LangGraph 状态
189+
190+
外部服务(如 Go 后端)可直接通过 OTS SDK 查询 LangGraph 会话状态:
191+
192+
1. **列出会话**:查询 conversation 表(按 `agent_id/user_id`,过滤 `framework = "langgraph"`
193+
2. **读取最新 checkpoint**:用 `session_id`(即 `thread_id`)查询 checkpoint 表(GetRange BACKWARD limit=1)
194+
3. **解析数据**`checkpoint_data``blob_data``base64(msgpack)` 格式,Go 使用 msgpack 库(如 `github.com/vmihailenco/msgpack/v5`)解码
195+
4. **注意**:对于包含 LangChain 对象(HumanMessage 等)的 blob,msgpack 中包含 ext type,需要自定义 decoder 提取 kwargs
196+
197+
详细序列化格式说明和 Go 伪代码见 [conversation_design.md](./conversation_design.md#跨语言查询-checkpoint-状态)
198+
146199
### 直接使用 SessionStore
147200

148201
```python
@@ -195,10 +248,11 @@ store.delete_session("agent_1", "user_1", "sess_1")
195248

196249
| 方法 | 说明 |
197250
|------|------|
198-
| `init_tables()` | 创建所有表和索引 |
251+
| `init_tables()` | 创建所有表和索引(不含 checkpoint) |
199252
| `init_core_tables()` | 创建核心表 + 二级索引 |
200253
| `init_state_tables()` | 创建三张 State 表 |
201254
| `init_search_index()` | 创建多元索引 |
255+
| `init_checkpoint_tables()` | 创建 LangGraph checkpoint 表 |
202256

203257
**Session 管理**
204258

@@ -230,12 +284,26 @@ store.delete_session("agent_1", "user_1", "sess_1")
230284
| `get_user_state / update_user_state` | 用户级状态读写 |
231285
| `get_merged_state(agent_id, user_id, session_id)` | 三级状态浅合并 |
232286

287+
**Checkpoint 管理(LangGraph)**
288+
289+
| 方法 | 说明 |
290+
|------|------|
291+
| `put_checkpoint(thread_id, checkpoint_ns, checkpoint_id, ...)` | 写入 checkpoint |
292+
| `get_checkpoint(thread_id, checkpoint_ns, checkpoint_id)` | 读取 checkpoint |
293+
| `list_checkpoints(thread_id, checkpoint_ns, *, limit, before)` | 列出 checkpoint |
294+
| `put_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id, writes)` | 批量写入 writes |
295+
| `get_checkpoint_writes(thread_id, checkpoint_ns, checkpoint_id)` | 读取 writes |
296+
| `put_checkpoint_blob(thread_id, checkpoint_ns, channel, version, ...)` | 写入 blob |
297+
| `get_checkpoint_blobs(thread_id, checkpoint_ns, channel_versions)` | 批量读取 blobs |
298+
| `delete_thread_checkpoints(thread_id)` | 删除 thread 全部 checkpoint 数据 |
299+
233300
### 框架适配器
234301

235302
| 适配器 | 框架 | 基类 |
236303
|--------|------|------|
237304
| `OTSSessionService` | Google ADK | `BaseSessionService` |
238305
| `OTSChatMessageHistory` | LangChain | `BaseChatMessageHistory` |
306+
| `OTSCheckpointSaver` | LangGraph | `BaseCheckpointSaver` |
239307

240308
### 领域模型
241309

@@ -248,7 +316,7 @@ store.delete_session("agent_1", "user_1", "sess_1")
248316

249317
## OTS 表结构
250318

251-
共五张表 + 一个二级索引 + 一个多元索引
319+
共八张表 + 一个二级索引 + 两个多元索引
252320

253321
| 表名 | 主键 | 用途 |
254322
|------|------|------|
@@ -257,6 +325,9 @@ store.delete_session("agent_1", "user_1", "sess_1")
257325
| `state` | agent_id, user_id, session_id | 会话级状态 |
258326
| `app_state` | agent_id | 应用级状态 |
259327
| `user_state` | agent_id, user_id | 用户级状态 |
328+
| `checkpoint` | thread_id, checkpoint_ns, checkpoint_id | LangGraph checkpoint |
329+
| `checkpoint_writes` | thread_id, checkpoint_ns, checkpoint_id, task_idx | LangGraph 中间写入 |
330+
| `checkpoint_blobs` | thread_id, checkpoint_ns, channel, version | LangGraph 通道值快照 |
260331
| `conversation_secondary_index` | agent_id, user_id, updated_at, session_id | 二级索引(list 热路径) |
261332
| `conversation_search_index` | 多元索引 | 全文搜索 / 标签过滤 / 组合查询 |
262333

@@ -271,6 +342,7 @@ store.delete_session("agent_1", "user_1", "sess_1")
271342
| [`conversation_service_adk_data.py`](../../examples/conversation_service_adk_data.py) | ADK 模拟数据填充 + 多元索引搜索验证 |
272343
| [`conversation_service_langchain_example.py`](../../examples/conversation_service_langchain_example.py) | LangChain 消息历史读写验证 |
273344
| [`conversation_service_langchain_data.py`](../../examples/conversation_service_langchain_data.py) | LangChain 模拟数据填充 |
345+
| [`conversation_service_langgraph_example.py`](../../examples/conversation_service_langgraph_example.py) | LangGraph checkpoint 持久化示例 |
274346
| [`conversation_service_verify.py`](../../examples/conversation_service_verify.py) | 端到端 CRUD 验证脚本 |
275347

276348
## 环境变量

0 commit comments

Comments
 (0)