Skip to content

Commit 48c27d9

Browse files
committed
feat(conversation_service): enhance session management with state search index
This commit introduces the creation of a new state search index for the session management system, allowing for independent and precise querying by session ID. The changes include: - Added `DEFAULT_STATE_SEARCH_INDEX` to the model for consistent index naming. - Updated `OTSBackend` and `SessionStore` to initialize both conversation and state search indices during table setup. - Implemented asynchronous methods for creating the state search index, ensuring it can be created without conflicts if it already exists. - Enhanced documentation for table and index creation methods to reflect the new functionality. These improvements aim to optimize session state retrieval and management within the conversation service.
1 parent 6cbeb27 commit 48c27d9

File tree

6 files changed

+560
-15
lines changed

6 files changed

+560
-15
lines changed

agentrun/conversation_service/__ots_backend_async_template.py

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
DEFAULT_CONVERSATION_SECONDARY_INDEX,
4343
DEFAULT_CONVERSATION_TABLE,
4444
DEFAULT_EVENT_TABLE,
45+
DEFAULT_STATE_SEARCH_INDEX,
4546
DEFAULT_STATE_TABLE,
4647
DEFAULT_USER_STATE_TABLE,
4748
StateData,
@@ -97,15 +98,17 @@ def __init__(
9798
self._conversation_search_index = (
9899
f"{table_prefix}{DEFAULT_CONVERSATION_SEARCH_INDEX}"
99100
)
101+
self._state_search_index = f"{table_prefix}{DEFAULT_STATE_SEARCH_INDEX}"
100102

101103
# -----------------------------------------------------------------------
102104
# 建表(异步)/ Table creation (async)
103105
# -----------------------------------------------------------------------
104106

105107
async def init_tables_async(self) -> None:
106-
"""创建五张表和 Conversation 二级索引(异步)。
108+
"""创建五张表、二级索引和多元索引(异步)。
107109
108-
表已存在时跳过(catch OTSServiceError 并 log warning)。
110+
包括 Conversation 二级索引、Conversation 多元索引和 State 多元索引。
111+
表或索引已存在时跳过(catch OTSServiceError 并 log warning)。
109112
"""
110113
await self._create_conversation_table_async()
111114
await self._create_event_table_async()
@@ -125,6 +128,7 @@ async def init_tables_async(self) -> None:
125128
self._user_state_table,
126129
[("agent_id", "STRING"), ("user_id", "STRING")],
127130
)
131+
await self.init_search_index_async()
128132

129133
async def init_core_tables_async(self) -> None:
130134
"""创建核心表(Conversation + Event)和二级索引(异步)。"""
@@ -151,8 +155,12 @@ async def init_state_tables_async(self) -> None:
151155
)
152156

153157
async def init_search_index_async(self) -> None:
154-
"""创建 Conversation 多元索引(异步)。按需调用。"""
158+
"""创建 Conversation 和 State 多元索引(异步)。
159+
160+
索引已存在时跳过,可重复调用。
161+
"""
155162
await self._create_conversation_search_index_async()
163+
await self._create_state_search_index_async()
156164

157165
async def _create_conversation_table_async(self) -> None:
158166
"""创建 Conversation 表 + 二级索引(异步)。"""
@@ -383,6 +391,87 @@ async def _create_conversation_search_index_async(self) -> None:
383391
else:
384392
raise
385393

394+
async def _create_state_search_index_async(self) -> None:
395+
"""创建 State 表的多元索引(异步)。
396+
397+
支持按 session_id 独立精确匹配查询,不受主键前缀限制。
398+
索引已存在时跳过。
399+
"""
400+
from tablestore import FieldType # type: ignore[import-untyped]
401+
from tablestore import IndexSetting # type: ignore[import-untyped]
402+
from tablestore import SortOrder # type: ignore[import-untyped]
403+
from tablestore import FieldSchema
404+
from tablestore import (
405+
FieldSort as OTSFieldSort,
406+
) # type: ignore[import-untyped]
407+
from tablestore import SearchIndexMeta
408+
from tablestore import Sort as OTSSort # type: ignore[import-untyped]
409+
410+
fields = [
411+
FieldSchema(
412+
"agent_id",
413+
FieldType.KEYWORD,
414+
index=True,
415+
enable_sort_and_agg=True,
416+
),
417+
FieldSchema(
418+
"user_id",
419+
FieldType.KEYWORD,
420+
index=True,
421+
enable_sort_and_agg=True,
422+
),
423+
FieldSchema(
424+
"session_id",
425+
FieldType.KEYWORD,
426+
index=True,
427+
enable_sort_and_agg=True,
428+
),
429+
FieldSchema(
430+
"created_at",
431+
FieldType.LONG,
432+
index=True,
433+
enable_sort_and_agg=True,
434+
),
435+
FieldSchema(
436+
"updated_at",
437+
FieldType.LONG,
438+
index=True,
439+
enable_sort_and_agg=True,
440+
),
441+
]
442+
443+
index_setting = IndexSetting(routing_fields=["agent_id"])
444+
index_sort = OTSSort(
445+
sorters=[OTSFieldSort("updated_at", sort_order=SortOrder.DESC)]
446+
)
447+
index_meta = SearchIndexMeta(
448+
fields,
449+
index_setting=index_setting,
450+
index_sort=index_sort,
451+
)
452+
453+
try:
454+
await self._async_client.create_search_index(
455+
self._state_table,
456+
self._state_search_index,
457+
index_meta,
458+
)
459+
logger.info(
460+
"Created search index: %s on table: %s",
461+
self._state_search_index,
462+
self._state_table,
463+
)
464+
except OTSServiceError as e:
465+
if "already exist" in str(e).lower() or (
466+
hasattr(e, "code") and e.code == "OTSObjectAlreadyExist"
467+
):
468+
logger.warning(
469+
"Search index %s already exists, skipping.",
470+
self._state_search_index,
471+
)
472+
else:
473+
raise
474+
386475
# -----------------------------------------------------------------------
387476
# Session CRUD(异步)/ Session CRUD (async)
388477
# -----------------------------------------------------------------------

agentrun/conversation_service/__session_store_async_template.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ def __init__(self, ots_backend: OTSBackend) -> None:
3535
self._backend = ots_backend
3636

3737
async def init_tables_async(self) -> None:
38-
"""创建所有 OTS 表和索引(异步)。代理到 OTSBackend.init_tables_async()。"""
38+
"""创建所有 OTS 表、二级索引和多元索引(异步)。
39+
40+
包括建表和创建搜索索引,无需再单独调用 init_search_index_async()。
41+
"""
3942
await self._backend.init_tables_async()
4043

4144
async def init_core_tables_async(self) -> None:
@@ -47,7 +50,10 @@ async def init_state_tables_async(self) -> None:
4750
await self._backend.init_state_tables_async()
4851

4952
async def init_search_index_async(self) -> None:
50-
"""创建 Conversation 多元索引(异步)。按需调用。"""
53+
"""创建 Conversation 和 State 多元索引(异步)。
54+
55+
索引已存在时跳过,可重复调用。
56+
"""
5157
await self._backend.init_search_index_async()
5258

5359
# -------------------------------------------------------------------

agentrun/conversation_service/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DEFAULT_USER_STATE_TABLE = "user_state"
2222
DEFAULT_CONVERSATION_SECONDARY_INDEX = "conversation_secondary_index"
2323
DEFAULT_CONVERSATION_SEARCH_INDEX = "conversation_search_index"
24+
DEFAULT_STATE_SEARCH_INDEX = "state_search_index"
2425

2526

2627
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)