Skip to content

feat(state): Implement LangGraph checkpoint management with OTS integ…#80

Open
TJKkking wants to merge 2 commits intomainfrom
feature/state-enhance
Open

feat(state): Implement LangGraph checkpoint management with OTS integ…#80
TJKkking wants to merge 2 commits intomainfrom
feature/state-enhance

Conversation

@TJKkking
Copy link
Copy Markdown
Collaborator

@TJKkking TJKkking commented Apr 9, 2026

…ration

  • Added checkpoint, checkpoint_writes, and checkpoint_blobs tables to support LangGraph functionality.
  • Introduced asynchronous methods for initializing and managing checkpoint tables in OTSBackend and SessionStore.
  • Enhanced SessionStore with methods for checkpoint CRUD operations.
  • Updated README and conversation_design.md to document new checkpoint features and usage examples.
  • Refactored utils to build OTS clients independently of code generation templates.

This update enables persistent storage of LangGraph checkpoints, enhancing the overall functionality of the conversation service.

Thank you for creating a pull request to contribute to Serverless Devs agentrun-sdk-python code! Before you open the request please answer the following questions to help it be more easily integrated. Please check the boxes "[ ]" with "[x]" when done too.
Please select one of the PR types below to complete


Fix bugs

Bug detail

The specific manifestation of the bug or the associated issue.

Pull request tasks

  • Add test cases for the changes
  • Passed the CI test

Update docs

Reason for update

Why do you need to update your documentation?

Pull request tasks

  • Update Chinese documentation
  • Update English documentation

Add contributor

Contributed content

  • Code
  • Document

Content detail

if content_type == 'code' || content_type == 'document':
    please tell us `PR url`,like: https://github.com/Serverless-Devs/agentrun-sdk-python/pull/1
else:
    please describe your contribution in detail

Others

Reason for update

Why do you need to update your documentation?

…ration

- Added checkpoint, checkpoint_writes, and checkpoint_blobs tables to support LangGraph functionality.
- Introduced asynchronous methods for initializing and managing checkpoint tables in OTSBackend and SessionStore.
- Enhanced SessionStore with methods for checkpoint CRUD operations.
- Updated README and conversation_design.md to document new checkpoint features and usage examples.
- Refactored utils to build OTS clients independently of code generation templates.

This update enables persistent storage of LangGraph checkpoints, enhancing the overall functionality of the conversation service.
@TJKkking
Copy link
Copy Markdown
Collaborator Author

TJKkking commented Apr 9, 2026

@copilot codereview

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 9, 2026

@copilot codereview

Code Review — feat(state): LangGraph checkpoint management with OTS integration

Overview

The PR introduces LangGraph checkpoint persistence via OTS, with three new tables (checkpoint, checkpoint_writes, checkpoint_blobs), a new OTSCheckpointSaver adapter, and a refactored build_ots_clients utility. The layered architecture (adapter → SessionStoreOTSBackend) is clean and consistent with existing patterns.


Issues Found

🔴 Bug: Section comment / method ordering is swapped in two places

ots_backend.py (around line 2544–2564):

# -----------------------------------------------------------------------
# Checkpoint CRUD(LangGraph)(异步)   ← says "async"
# -----------------------------------------------------------------------

def delete_state_row(...)   ← this is a SYNC method, belongs above
    ...

# -----------------------------------------------------------------------
# Checkpoint CRUD(LangGraph)(同步)   ← says "sync"
# -----------------------------------------------------------------------

async def put_checkpoint_async(...)   ← this is an ASYNC method

The sync delete_state_row ended up inside the "async" section and the async checkpoint methods ended up inside the "sync" section.

session_store.py (around lines 136–153):

# Checkpoint 管理(LangGraph)(异步)   ← says "async"

def init_langgraph_tables(...)         ← SYNC method

# Checkpoint 管理(LangGraph)(同步)   ← says "sync"

async def put_checkpoint_async(...)    ← ASYNC method

Also at line 380: # Session 管理(同步)/ Session management (async) — Chinese says "同步" (sync) but English says "(async)".

🟠 Design concern: checkpoint_id ordering may be incorrect

list_checkpoints and get_checkpoint(checkpoint_id=None) rely on Direction.BACKWARD lexicographic ordering of checkpoint_id. LangGraph generates UUID v4 checkpoint IDs by default, which are random — lexicographic order does not reflect insertion order. This means "get latest checkpoint" and the list() ordering may silently return the wrong checkpoint. Consider adding a created_at timestamp attribute to the checkpoint row for reliable ordering, or confirming the LangGraph version in use generates time-ordered (v7) UUIDs.

🟠 Missing langgraph optional dependency in pyproject.toml

langgraph_adapter.py has top-level imports from langgraph and langchain_core:

from langgraph.checkpoint.base import BaseCheckpointSaver, ...
from langchain_core.runnables import RunnableConfig

Unlike langchain, google-adk, pydantic-ai, etc., langgraph is not listed as an optional extra in pyproject.toml. Users who install just the base package will get an ImportError rather than a graceful message. Add:

[project.optional-dependencies]
langgraph = [
    "langgraph>=0.2.0; python_version >= '3.10'",
    "langchain-core>=0.3.0",
]

🟡 Performance: Extra OTS read on every put()

_sync_session / _sync_session_async calls get_session(...) before every checkpoint write to decide between create/update. This doubles OTS latency per put() call. Consider using a conditional put_row with RowExistenceExpectation.EXPECT_NOT_EXIST for the create path and catching the conflict error, or caching session existence locally per thread_id.

🟡 get_next_version incorrect type hint

def get_next_version(self, current: str | None, channel: None) -> str:

channel: None is wrong — it should match the base class signature (the actual channel type, typically ChannelVersions | None). This may cause type checking failures.

🟡 No unit tests for new checkpoint functionality

There are no unit tests for OTSCheckpointSaver, init_checkpoint_tables, or any of the new put_checkpoint / get_checkpoint / list_checkpoints methods on OTSBackend. The PR description checklist also notes this. Given the complexity of the ordering logic and batch delete, even mocked unit tests would help prevent regressions.

🔵 Minor: build_ots_clients loses type information

def build_ots_clients(...) -> tuple[Any, Any]:

Consider returning tuple["OTSClient", "AsyncOTSClient"] (using string annotations or TYPE_CHECKING) to preserve type information for callers.


What's done well

  • The three-layer architecture (adapter → SessionStoreOTSBackend) is consistent.
  • Error handling in table creation correctly ignores OTSObjectAlreadyExist and now also handles the missing-table edge case in search index creation.
  • build_ots_clients correctly isolates AsyncOTSClient from codegen template substitution — good defensive engineering.
  • delete_thread_checkpoints correctly cleans all three tables atomically using _scan_and_delete.
  • The __init__.py gracefully handles missing langgraph with try/except.
  • The example file is clear and well-structured.

- Fix sync/async section comment headers in ots_backend.py and session_store.py
- Add langgraph optional dependency in pyproject.toml
- Improve build_ots_clients return type annotation with TYPE_CHECKING import
- Handle missing state table gracefully in init_search_index

Made-with: Cursor
@TJKkking
Copy link
Copy Markdown
Collaborator Author

  1. 🔴 Section comment 同步/异步标题错位
    结论:接受,已修复。

ots_backend.py 中 delete_state_row(同步)误放在 "Checkpoint CRUD(异步)" 标题下,后面的 put_checkpoint_async(异步)误放在 "Checkpoint CRUD(同步)" 标题下。session_store.py 中同样存在类似问题。

修复:

ots_backend.py:delete_state_row 上方改为 "State CRUD(同步)",异步/同步 checkpoint 方法各加正确 section header
session_store.py:Checkpoint 和 Session 的同步/异步 section header 全部修正
2. 🟠 checkpoint_id 排序可能不正确
结论:不接受。Reviewer 的假设有误。

Reviewer 认为 LangGraph 使用 UUID v4(随机),字典序不等于时间序。实际验证发现 LangGraph 使用 UUID v6(时间有序),其高位为时间戳,字典序严格等于生成顺序:

ids = [create_checkpoint(...)["id"] for _ in range(5)]
ids == sorted(ids)
True
uuid.UUID(ids[0]).version
6
因此 Direction.BACKWARD + checkpoint_id 字典序降序 = 时间降序,逻辑正确。

  1. 🟠 pyproject.toml 缺少 langgraph 可选依赖
    结论:接受,已修复。

新增:

langgraph = [
"langgraph>=0.2.0; python_version >= '3.10'",
"langchain-core>=0.3.0; python_version >= '3.10'",
]
init.py 中已有 try/except ImportError 优雅降级,此处补充声明。

  1. 🟡 _sync_session 每次 put 额外一次 OTS 读
    结论:认可,但暂不修改。

_sync_session 调用 get_session() 判断 create vs update,确实多一次点读。但:

get_session 是 OTS 点读(GetRow),延迟通常 < 5ms
相比 checkpoint 写入本身(put_row + put_checkpoint_writes),额外开销占比很低
使用 EXPECT_NOT_EXIST 条件写 + 异常回退会增加代码复杂度,且异常路径(已存在)反而是热路径
计划在后续版本通过线程级缓存(_synced_threads: set[str])优化——首次 put 后缓存 thread_id,后续跳过 get_session。

  1. 🟡 get_next_version 的 channel: None 类型提示
    结论:不接受。我们的签名与基类一致。

验证 LangGraph 基类签名:

inspect.signature(BaseCheckpointSaver.get_next_version)
(self, current: 'V | None', channel: 'None') -> 'V'
基类 channel 的类型标注就是 None(表示该参数总是 None),我们的 channel: None 完全匹配。

  1. 🟡 缺少单元测试
    结论:认可,后续补充。

LangGraph checkpoint 涉及 OTS 真实读写,需要 mock 或集成测试环境。已有端到端验证(example 脚本),将在下一个 PR 补充以下单元测试:

OTSCheckpointSaver.put/get/list 的 mock 测试
init_checkpoint_tables 幂等性测试
_sync_session 的 create/update 分支覆盖
7. 🔵 build_ots_clients 返回类型丢失
结论:接受,已修复。

添加 TYPE_CHECKING 导入块,返回类型改为 tuple[OTSClient, AsyncOTSClient],保留静态类型信息。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants