55from pathlib import Path
66
77from sqlalchemy import Column , Text , bindparam
8+ from sqlalchemy .dialects import sqlite
89from sqlalchemy .ext .asyncio import AsyncEngine , AsyncSession , create_async_engine
910from sqlalchemy .orm import sessionmaker
11+ from sqlalchemy .pool import NullPool
12+ from sqlalchemy .schema import CreateTable
1013from sqlmodel import Field , MetaData , SQLModel , col , func , select , text
1114
1215from astrbot .core import logger
@@ -34,7 +37,7 @@ class Document(BaseDocModel, table=True):
3437 primary_key = True ,
3538 sa_column_kwargs = {"autoincrement" : True },
3639 )
37- doc_id : str = Field (nullable = False )
40+ doc_id : str = Field (nullable = False , unique = True )
3841 text : str = Field (nullable = False )
3942 metadata_ : str | None = Field (default = None , sa_column = Column ("metadata" , Text ))
4043 created_at : datetime | None = Field (default = None )
@@ -60,8 +63,7 @@ async def initialize(self) -> None:
6063 """Initialize the SQLite database and create the documents table if it doesn't exist."""
6164 await self .connect ()
6265 async with self .engine .begin () as conn : # type: ignore
63- # Create tables using SQLModel
64- await conn .run_sync (BaseDocModel .metadata .create_all )
66+ await self ._ensure_documents_table (conn )
6567
6668 try :
6769 await conn .execute (
@@ -94,6 +96,56 @@ async def initialize(self) -> None:
9496 await self ._initialize_fts5 (conn )
9597 await conn .commit ()
9698
99+ async def _ensure_documents_table (self , executor ) -> None :
100+ """Create the document table from the SQLModel definition."""
101+ result = await executor .execute (
102+ text (
103+ """
104+ SELECT 1
105+ FROM sqlite_master
106+ WHERE type='table' AND name=:table_name
107+ LIMIT 1
108+ """ ,
109+ ),
110+ {"table_name" : Document .__tablename__ },
111+ )
112+ if result .scalar_one_or_none () is not None :
113+ await self ._ensure_doc_id_unique_index (executor )
114+ return
115+
116+ create_table = CreateTable (Document .__table__ , if_not_exists = True ) # type: ignore[attr-defined]
117+
118+ await executor .execute (
119+ text (str (create_table .compile (dialect = sqlite .dialect ())))
120+ )
121+ await self ._ensure_doc_id_unique_index (executor )
122+
123+ async def _ensure_doc_id_unique_index (self , executor ) -> None :
124+ duplicate_result = await executor .execute (
125+ text (
126+ """
127+ SELECT doc_id
128+ FROM documents
129+ GROUP BY doc_id
130+ HAVING COUNT(*) > 1
131+ LIMIT 1
132+ """ ,
133+ ),
134+ )
135+ if duplicate_result .scalar_one_or_none () is not None :
136+ logger .warning (
137+ "Skipping documents.doc_id unique index migration because duplicate "
138+ f"doc_id values already exist in { self .db_path } ." ,
139+ )
140+ return
141+
142+ await executor .execute (
143+ text (
144+ "CREATE UNIQUE INDEX IF NOT EXISTS "
145+ "idx_documents_doc_id_unique ON documents(doc_id)" ,
146+ ),
147+ )
148+
97149 async def _initialize_fts5 (self , executor ) -> None :
98150 try :
99151 await self ._create_fts5_table (executor , if_not_exists = True )
@@ -197,6 +249,7 @@ async def connect(self) -> None:
197249 self .DATABASE_URL ,
198250 echo = False ,
199251 future = True ,
252+ poolclass = NullPool ,
200253 )
201254 self .async_session_maker = sessionmaker (
202255 self .engine , # type: ignore
0 commit comments