Skip to content

Commit 03bf81d

Browse files
authored
Merge pull request #85 from Serverless-Devs/state-schema
State schema
2 parents c4650e6 + 14340d1 commit 03bf81d

6 files changed

Lines changed: 328 additions & 63 deletions

File tree

agentrun/conversation_service/__ots_backend_async_template.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@
3535
)
3636

3737
from agentrun.conversation_service.model import (
38+
CHECKPOINT_BLOBS_SCHEMA_VERSION,
39+
CHECKPOINT_SCHEMA_VERSION,
40+
CHECKPOINT_WRITES_SCHEMA_VERSION,
41+
CONVERSATION_SCHEMA_VERSION,
3842
ConversationEvent,
3943
ConversationSession,
4044
DEFAULT_APP_STATE_TABLE,
@@ -48,6 +52,9 @@
4852
DEFAULT_STATE_SEARCH_INDEX,
4953
DEFAULT_STATE_TABLE,
5054
DEFAULT_USER_STATE_TABLE,
55+
EVENT_SCHEMA_VERSION,
56+
SCHEMA_VERSION_COLUMN,
57+
STATE_SCHEMA_VERSION,
5158
StateData,
5259
StateScope,
5360
)
@@ -113,7 +120,7 @@ def __init__(
113120
)
114121

115122
# -----------------------------------------------------------------------
116-
# 建表(异步)/ Table creation (async)
123+
# 建表 / Table creation
117124
# -----------------------------------------------------------------------
118125

119126
async def init_tables_async(self) -> None:
@@ -174,6 +181,13 @@ async def init_search_index_async(self) -> None:
174181
await self._create_conversation_search_index_async()
175182
await self._create_state_search_index_async()
176183

184+
async def init_conversation_search_index_async(self) -> None:
185+
"""仅创建 Conversation 多元索引(异步)。
186+
187+
索引已存在时跳过,可重复调用。
188+
"""
189+
await self._create_conversation_search_index_async()
190+
177191
async def init_checkpoint_tables_async(self) -> None:
178192
"""创建 LangGraph checkpoint 相关的 3 张表(异步)。
179193
@@ -595,7 +609,7 @@ async def _create_state_search_index_async(self) -> None:
595609
raise
596610

597611
# -----------------------------------------------------------------------
598-
# Session CRUD(异步)/ Session CRUD (async)
612+
# Session CRUD
599613
# -----------------------------------------------------------------------
600614

601615
async def put_session_async(self, session: ConversationSession) -> None:
@@ -607,6 +621,7 @@ async def put_session_async(self, session: ConversationSession) -> None:
607621
]
608622

609623
attribute_columns = [
624+
(SCHEMA_VERSION_COLUMN, CONVERSATION_SCHEMA_VERSION),
610625
("created_at", session.created_at),
611626
("updated_at", session.updated_at),
612627
("is_pinned", session.is_pinned),
@@ -946,7 +961,7 @@ async def search_sessions_async(
946961
return sessions, search_response.total_count or 0
947962

948963
# -----------------------------------------------------------------------
949-
# Event CRUD(异步)/ Event CRUD (async)
964+
# Event CRUD
950965
# -----------------------------------------------------------------------
951966

952967
async def put_event_async(
@@ -991,6 +1006,7 @@ async def put_event_async(
9911006

9921007
content_json = json.dumps(content, ensure_ascii=False)
9931008
attribute_columns = [
1009+
(SCHEMA_VERSION_COLUMN, EVENT_SCHEMA_VERSION),
9941010
("type", event_type),
9951011
("content", content_json),
9961012
("created_at", created_at),
@@ -1171,7 +1187,7 @@ async def delete_events_by_session_async(
11711187
return deleted
11721188

11731189
# -----------------------------------------------------------------------
1174-
# State CRUD(JSON 字符串存储 + 列分片)(异步)
1190+
# State CRUD(JSON 字符串存储 + 列分片)
11751191
# -----------------------------------------------------------------------
11761192

11771193
async def put_state_async(
@@ -1204,6 +1220,7 @@ async def put_state_async(
12041220
state_json = serialize_state(state)
12051221

12061222
put_cols: list[tuple[str, Any]] = [
1223+
(SCHEMA_VERSION_COLUMN, STATE_SCHEMA_VERSION),
12071224
("updated_at", now),
12081225
("version", version + 1),
12091226
]
@@ -1328,7 +1345,7 @@ async def delete_state_row_async(
13281345
await self._async_client.delete_row(table_name, row, condition)
13291346

13301347
# -----------------------------------------------------------------------
1331-
# Checkpoint CRUD(LangGraph)(异步)
1348+
# Checkpoint CRUD(LangGraph)
13321349
# -----------------------------------------------------------------------
13331350

13341351
async def put_checkpoint_async(
@@ -1349,6 +1366,7 @@ async def put_checkpoint_async(
13491366
("checkpoint_id", checkpoint_id),
13501367
]
13511368
attribute_columns = [
1369+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_SCHEMA_VERSION),
13521370
("checkpoint_type", checkpoint_type),
13531371
("checkpoint_data", checkpoint_data),
13541372
("metadata", metadata_json),
@@ -1502,6 +1520,7 @@ async def put_checkpoint_writes_async(
15021520
("task_idx", w["task_idx"]),
15031521
]
15041522
attrs = [
1523+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_WRITES_SCHEMA_VERSION),
15051524
("task_id", w["task_id"]),
15061525
("task_path", w.get("task_path", "")),
15071526
("channel", w["channel"]),
@@ -1580,6 +1599,7 @@ async def put_checkpoint_blob_async(
15801599
("version", version),
15811600
]
15821601
attribute_columns = [
1602+
(SCHEMA_VERSION_COLUMN, CHECKPOINT_BLOBS_SCHEMA_VERSION),
15831603
("blob_type", blob_type),
15841604
("blob_data", blob_data),
15851605
]
@@ -1747,7 +1767,7 @@ async def _scan_and_delete_async(
17471767
await self._async_client.batch_write_row(request)
17481768

17491769
# -----------------------------------------------------------------------
1750-
# 内部辅助方法(I/O 相关,异步
1770+
# 内部辅助方法(I/O 相关)
17511771
# -----------------------------------------------------------------------
17521772

17531773
async def _get_chunk_count_async(

agentrun/conversation_service/__session_store_async_template.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ async def init_langchain_tables_async(self) -> None:
7171
表或索引已存在时跳过,可重复调用。
7272
"""
7373
await self._backend.init_core_tables_async()
74-
await self._backend.init_search_index_async()
74+
await self._backend.init_conversation_search_index_async()
7575

7676
async def init_langgraph_tables_async(self) -> None:
7777
"""创建 LangGraph 所需的全部表和索引(异步)。
@@ -81,7 +81,7 @@ async def init_langgraph_tables_async(self) -> None:
8181
表或索引已存在时跳过,可重复调用。
8282
"""
8383
await self._backend.init_core_tables_async()
84-
await self._backend.init_search_index_async()
84+
await self._backend.init_conversation_search_index_async()
8585
await self._backend.init_checkpoint_tables_async()
8686

8787
async def init_adk_tables_async(self) -> None:
@@ -96,7 +96,7 @@ async def init_adk_tables_async(self) -> None:
9696
await self._backend.init_search_index_async()
9797

9898
# -------------------------------------------------------------------
99-
# Checkpoint 管理(LangGraph)(异步)
99+
# Checkpoint 管理(LangGraph)
100100
# -------------------------------------------------------------------
101101

102102
async def put_checkpoint_async(
@@ -210,7 +210,7 @@ async def delete_thread_checkpoints_async(
210210
await self._backend.delete_thread_checkpoints_async(thread_id)
211211

212212
# -------------------------------------------------------------------
213-
# Session 管理(异步)/ Session management (async)
213+
# Session 管理 / Session management
214214
# -------------------------------------------------------------------
215215

216216
async def create_session_async(
@@ -496,7 +496,7 @@ async def update_session_async(
496496
)
497497

498498
# -------------------------------------------------------------------
499-
# Event 管理(异步)/ Event management (async)
499+
# Event 管理 / Event management
500500
# -------------------------------------------------------------------
501501

502502
async def append_event_async(
@@ -631,7 +631,7 @@ async def get_recent_events_async(
631631
return events
632632

633633
# -------------------------------------------------------------------
634-
# State 管理(异步)/ State management (async)
634+
# State 管理 / State management
635635
# -------------------------------------------------------------------
636636

637637
async def get_session_state_async(
@@ -746,7 +746,7 @@ async def get_merged_state_async(
746746
return merged
747747

748748
# -------------------------------------------------------------------
749-
# 内部辅助方法(异步)
749+
# 内部辅助方法
750750
# -------------------------------------------------------------------
751751

752752
async def _apply_delta_async(
@@ -802,7 +802,7 @@ async def _apply_delta_async(
802802
)
803803

804804
# -------------------------------------------------------------------
805-
# 工厂方法(异步)/ Factory methods (async)
805+
# 工厂方法 / Factory methods
806806
# -------------------------------------------------------------------
807807

808808
@classmethod

agentrun/conversation_service/model.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,41 @@
2828
DEFAULT_CHECKPOINT_WRITES_TABLE = "checkpoint_writes"
2929
DEFAULT_CHECKPOINT_BLOBS_TABLE = "checkpoint_blobs"
3030

31+
# ---------------------------------------------------------------------------
32+
# OTS Schema 版本管理
33+
#
34+
# 用于 SDK 写入端与 Core 读取端(funagent-core)的兼容性协调。
35+
# 每次写入行(PutRow / UpdateRow / BatchWriteRow)时在
36+
# attribute_columns 中携带 _schema_version 字段。
37+
# Core 端读取时检查该字段,版本不匹配时打 WARN 日志并尽力解析。
38+
# 历史数据(无此字段)视为 v0。
39+
#
40+
# 版本计数规则:
41+
# - 大部分表独立计数
42+
# - state / app_state / user_state 三张表共享 STATE_SCHEMA_VERSION
43+
#
44+
# 升级流程:
45+
# 1. 递增对应表的 *_SCHEMA_VERSION 常量
46+
# 2. 在 PR 描述中记录变更的列名/类型/语义
47+
# 3. 通知 funagent-core 侧同步更新解析逻辑和版本常量
48+
# 4. 如涉及 breaking change,提供数据迁移指引
49+
#
50+
# 兼容性规则:
51+
# - 只加不删:新增列允许,删除/重命名列视为 breaking change
52+
# - PK 不可变:主键结构永不改变
53+
# - 索引名不可变:Search Index 名称一旦确定不再修改
54+
# - 语义不可变:已有列的类型和含义不改变
55+
# ---------------------------------------------------------------------------
56+
57+
SCHEMA_VERSION_COLUMN = "_schema_version"
58+
59+
CONVERSATION_SCHEMA_VERSION = 1
60+
EVENT_SCHEMA_VERSION = 1
61+
STATE_SCHEMA_VERSION = 1 # state / app_state / user_state 共享
62+
CHECKPOINT_SCHEMA_VERSION = 1
63+
CHECKPOINT_WRITES_SCHEMA_VERSION = 1
64+
CHECKPOINT_BLOBS_SCHEMA_VERSION = 1
65+
3166

3267
# ---------------------------------------------------------------------------
3368
# 枚举

0 commit comments

Comments
 (0)