1818from sqlalchemy import and_ , select
1919from sqlalchemy .ext .asyncio import AsyncSession
2020
21- from aperag .db .models import DocumentIndex , DocumentIndexType , IndexActualState , IndexDesiredState , utc_now
21+ from aperag .db .models import DocumentIndex , DocumentIndexStatus , DocumentIndexType , utc_now
2222
2323logger = logging .getLogger (__name__ )
2424
2525
26- class FrontendIndexManager :
26+ class DocumentIndexManager :
2727 """Simple manager for document index specs (frontend chain)"""
2828
29- async def create_document_indexes (
30- self , session : AsyncSession , document_id : str , user : str , index_types : Optional [List [DocumentIndexType ]] = None
29+ async def create_or_update_document_indexes (
30+ self , session : AsyncSession , document_id : str , index_types : Optional [List [DocumentIndexType ]] = None
3131 ):
3232 """
33- Create index specs for a document (called when document is created)
33+ Create or update index records for a document (called when document is created or index isupdated )
3434
3535 Args:
3636 session: Database session
3737 document_id: Document ID
38- user: User creating the document
3938 index_types: List of index types to create (defaults to all)
4039 """
4140 if index_types is None :
@@ -50,38 +49,21 @@ async def create_document_indexes(
5049 existing_index = result .scalar_one_or_none ()
5150
5251 if existing_index :
53- # Update existing index
54- existing_index .update_spec (IndexDesiredState .PRESENT , user )
55- logger .debug (f"Updated index for { document_id } :{ index_type } to version { existing_index .version } " )
52+ # Update existing index to pending and increment version
53+ existing_index .status = DocumentIndexStatus .PENDING
54+ existing_index .update_version ()
55+ logger .debug (f"Updated index for { document_id } :{ index_type .value } to version { existing_index .version } " )
5656 else :
5757 # Create new index
5858 doc_index = DocumentIndex (
5959 document_id = document_id ,
6060 index_type = index_type ,
61- desired_state = IndexDesiredState . PRESENT ,
61+ status = DocumentIndexStatus . PENDING ,
6262 version = 1 ,
63- created_by = user ,
63+ observed_version = 0 ,
6464 )
6565 session .add (doc_index )
66-
67- async def update_document_indexes (self , session : AsyncSession , document_id : str ):
68- """
69- Update document indexes (called when document content is updated)
70-
71- This increments the version of all indexes to trigger reconciliation.
72-
73- Args:
74- session: Database session
75- document_id: Document ID
76- """
77- stmt = select (DocumentIndex ).where (DocumentIndex .document_id == document_id )
78- result = await session .execute (stmt )
79- indexes = result .scalars ().all ()
80-
81- for index in indexes :
82- if index .desired_state == IndexDesiredState .PRESENT :
83- index .version += 1 # Increment version to trigger re-indexing
84- index .gmt_updated = utc_now ()
66+ logger .debug (f"Created new index for { document_id } :{ index_type .value } " )
8567
8668 async def delete_document_indexes (
8769 self , session : AsyncSession , document_id : str , index_types : Optional [List [DocumentIndexType ]] = None
@@ -105,92 +87,11 @@ async def delete_document_indexes(
10587 doc_index = result .scalar_one_or_none ()
10688
10789 if doc_index :
108- doc_index .update_spec (IndexDesiredState .ABSENT )
109-
110- async def rebuild_document_indexes (
111- self , session : AsyncSession , document_id : str , index_types : List [DocumentIndexType ]
112- ):
113- """
114- Rebuild specified document indexes (called when user requests index rebuild)
115-
116- This increments the version of specified indexes to trigger reconciliation.
117-
118- Args:
119- session: Database session
120- document_id: Document ID
121- index_types: List of index types to rebuild
122- """
123- if len (set (index_types )) != len (index_types ):
124- raise Exception ("Duplicate index types are not allowed" )
125-
126- for index_type in index_types :
127- stmt = select (DocumentIndex ).where (
128- and_ (DocumentIndex .document_id == document_id , DocumentIndex .index_type == index_type )
129- )
130- result = await session .execute (stmt )
131- doc_index = result .scalar_one_or_none ()
132-
133- if doc_index :
134- # Only rebuild if the index is present or failed
135- if doc_index .desired_state == IndexDesiredState .PRESENT :
136- doc_index .version += 1 # Increment version to trigger re-indexing
137- doc_index .gmt_updated = utc_now ()
138- logger .info (f"Triggered rebuild for { index_type .value } index of document { document_id } " )
139- else :
140- logger .warning (
141- f"Cannot rebuild { index_type .value } index for document { document_id } : index not present"
142- )
143- else :
144- logger .warning (f"No { index_type .value } index found for document { document_id } " )
145-
146- async def get_document_index_status (self , session : AsyncSession , document_id : str ) -> dict :
147- """
148- Get current index status for a document
149-
150- Args:
151- session: Database session
152- document_id: Document ID
153-
154- Returns:
155- Dictionary with index status information
156- """
157- # Get all indexes for the document
158- stmt = select (DocumentIndex ).where (DocumentIndex .document_id == document_id )
159- result = await session .execute (stmt )
160- indexes = result .scalars ().all ()
161-
162- # Build result
163- result = {"document_id" : document_id , "indexes" : {}, "overall_status" : "complete" }
164-
165- has_creating = False
166- has_failed = False
167-
168- for index in indexes :
169- index_info = {
170- "type" : index .index_type ,
171- "desired_state" : index .desired_state ,
172- "actual_state" : index .actual_state ,
173- "in_sync" : index .is_in_sync (),
174- }
175-
176- if index .actual_state == IndexActualState .CREATING :
177- has_creating = True
178- elif index .actual_state == IndexActualState .FAILED :
179- has_failed = True
180- index_info ["error" ] = index .error_message
181-
182- result ["indexes" ][index .index_type ] = index_info
183-
184- # Determine overall status
185- if has_failed :
186- result ["overall_status" ] = "failed"
187- elif has_creating :
188- result ["overall_status" ] = "running"
189- else :
190- result ["overall_status" ] = "complete"
191-
192- return result
90+ # Mark for deletion
91+ doc_index .status = DocumentIndexStatus .DELETING
92+ doc_index .gmt_updated = utc_now ()
93+ logger .debug (f"Marked index { document_id } :{ index_type .value } for deletion" )
19394
19495
19596# Global instance
196- document_index_manager = FrontendIndexManager ()
97+ document_index_manager = DocumentIndexManager ()
0 commit comments