|
1 | 1 | import asyncio |
| 2 | +from types import SimpleNamespace |
2 | 3 | from unittest.mock import AsyncMock, MagicMock |
3 | 4 |
|
4 | 5 | import pytest |
5 | 6 | import pytest_asyncio |
6 | 7 | from quart import Quart |
7 | 8 |
|
| 9 | +import astrbot.dashboard.routes.knowledge_base as knowledge_base_route_module |
8 | 10 | from astrbot.core import LogBroker |
9 | 11 | from astrbot.core.core_lifecycle import AstrBotCoreLifecycle |
10 | 12 | from astrbot.core.db.sqlite import SQLiteDatabase |
@@ -117,6 +119,86 @@ async def authenticated_header(app: Quart, core_lifecycle_td: AstrBotCoreLifecyc |
117 | 119 | return {"Authorization": f"Bearer {token}"} |
118 | 120 |
|
119 | 121 |
|
| 122 | +@pytest.mark.asyncio |
| 123 | +async def test_remove_deleted_kb_from_session_configs(monkeypatch: pytest.MonkeyPatch): |
| 124 | + updates = [] |
| 125 | + configs = { |
| 126 | + "platform:GroupMessage:group!alice": { |
| 127 | + "kb_ids": ["kb-old", "kb-keep"], |
| 128 | + "top_k": 3, |
| 129 | + }, |
| 130 | + "platform:GroupMessage:group!bob": {"kb_ids": ["kb-old"]}, |
| 131 | + "platform:FriendMessage:charlie": {"kb_ids": ["kb-keep"]}, |
| 132 | + "platform:FriendMessage:broken": {"kb_ids": "kb-old"}, |
| 133 | + } |
| 134 | + |
| 135 | + class FakeSharedPreferences: |
| 136 | + async def session_get(self, umo, key): |
| 137 | + assert key == "kb_config" |
| 138 | + if umo is not None: |
| 139 | + return configs.get(umo) |
| 140 | + |
| 141 | + return [ |
| 142 | + SimpleNamespace( |
| 143 | + scope_id="platform:GroupMessage:group!alice", |
| 144 | + value={"val": configs["platform:GroupMessage:group!alice"]}, |
| 145 | + ), |
| 146 | + SimpleNamespace( |
| 147 | + scope_id="platform:GroupMessage:group!bob", |
| 148 | + value={"val": configs["platform:GroupMessage:group!bob"]}, |
| 149 | + ), |
| 150 | + SimpleNamespace( |
| 151 | + scope_id="platform:FriendMessage:charlie", |
| 152 | + value={"val": configs["platform:FriendMessage:charlie"]}, |
| 153 | + ), |
| 154 | + SimpleNamespace( |
| 155 | + scope_id="platform:FriendMessage:broken", |
| 156 | + value={"val": configs["platform:FriendMessage:broken"]}, |
| 157 | + ), |
| 158 | + ] |
| 159 | + |
| 160 | + async def session_put(self, umo, key, value): |
| 161 | + updates.append((umo, key, value)) |
| 162 | + |
| 163 | + monkeypatch.setattr(knowledge_base_route_module, "sp", FakeSharedPreferences()) |
| 164 | + |
| 165 | + updated = await KnowledgeBaseRoute._remove_kb_from_session_configs("kb-old") |
| 166 | + |
| 167 | + assert updated == 2 |
| 168 | + assert updates == [ |
| 169 | + ( |
| 170 | + "platform:GroupMessage:group!alice", |
| 171 | + "kb_config", |
| 172 | + {"kb_ids": ["kb-keep"], "top_k": 3}, |
| 173 | + ), |
| 174 | + ( |
| 175 | + "platform:GroupMessage:group!bob", |
| 176 | + "kb_config", |
| 177 | + {"kb_ids": []}, |
| 178 | + ), |
| 179 | + ] |
| 180 | + |
| 181 | + |
| 182 | +@pytest.mark.asyncio |
| 183 | +async def test_remove_deleted_kb_ignores_missing_session_list( |
| 184 | + monkeypatch: pytest.MonkeyPatch, |
| 185 | +): |
| 186 | + class FakeSharedPreferences: |
| 187 | + async def session_get(self, umo, key): |
| 188 | + assert umo is None |
| 189 | + assert key == "kb_config" |
| 190 | + return None |
| 191 | + |
| 192 | + async def session_put(self, umo, key, value): |
| 193 | + raise AssertionError("session_put should not be called") |
| 194 | + |
| 195 | + monkeypatch.setattr(knowledge_base_route_module, "sp", FakeSharedPreferences()) |
| 196 | + |
| 197 | + updated = await KnowledgeBaseRoute._remove_kb_from_session_configs("kb-old") |
| 198 | + |
| 199 | + assert updated == 0 |
| 200 | + |
| 201 | + |
120 | 202 | @pytest.mark.asyncio |
121 | 203 | async def test_import_documents( |
122 | 204 | app: Quart, authenticated_header: dict, core_lifecycle_td: AstrBotCoreLifecycle |
|
0 commit comments