1313# limitations under the License.
1414
1515import logging
16- import time
1716from typing import List , Optional
1817
19- from sqlalchemy import and_ , or_ , select , update
18+ from sqlalchemy import and_ , select , update
2019from sqlalchemy .orm import Session
2120
2221from aperag .config import get_sync_session
2322from aperag .db .models import (
2423 Document ,
2524 DocumentIndex ,
26- DocumentIndexType ,
2725 DocumentIndexStatus ,
26+ DocumentIndexType ,
2827 DocumentStatus ,
2928)
3029from aperag .tasks .scheduler import TaskScheduler , create_task_scheduler
31- from aperag .utils .utils import utc_now
3230from aperag .utils .constant import IndexAction
31+ from aperag .utils .utils import utc_now
3332
3433logger = logging .getLogger (__name__ )
3534
@@ -89,7 +88,7 @@ def _get_indexes_needing_reconciliation(self, session: Session) -> List[Document
8988 DocumentIndex .status == DocumentIndexStatus .DELETING ,
9089 ),
9190 }
92-
91+
9392 for action , condition in conditions .items ():
9493 stmt = select (DocumentIndex ).where (condition )
9594 result = session .execute (stmt )
@@ -128,7 +127,7 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
128127 Returns list of successfully claimed indexes with their details.
129128 """
130129 claimed_indexes = []
131-
130+
132131 try :
133132 for index_id , index_type , action in indexes_to_claim :
134133 if action in [IndexAction .CREATE , IndexAction .UPDATE ]:
@@ -142,7 +141,7 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
142141 stmt = select (DocumentIndex ).where (DocumentIndex .id == index_id )
143142 result = session .execute (stmt )
144143 current_index = result .scalar_one_or_none ()
145-
144+
146145 if not current_index :
147146 continue
148147
@@ -177,13 +176,17 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
177176 result = session .execute (update_stmt )
178177 if result .rowcount > 0 :
179178 # Successfully claimed this index
180- claimed_indexes .append ({
181- 'index_id' : index_id ,
182- 'document_id' : document_id ,
183- 'index_type' : index_type ,
184- 'action' : action ,
185- 'target_version' : current_index .version if action in [IndexAction .CREATE , IndexAction .UPDATE ] else None ,
186- })
179+ claimed_indexes .append (
180+ {
181+ "index_id" : index_id ,
182+ "document_id" : document_id ,
183+ "index_type" : index_type ,
184+ "action" : action ,
185+ "target_version" : current_index .version
186+ if action in [IndexAction .CREATE , IndexAction .UPDATE ]
187+ else None ,
188+ }
189+ )
187190 logger .debug (f"Claimed index { index_id } for document { document_id } ({ action } )" )
188191 else :
189192 logger .debug (f"Could not claim index { index_id } for document { document_id } " )
@@ -199,64 +202,57 @@ def _reconcile_document_operations(self, document_id: str, claimed_indexes: List
199202 Reconcile operations for a single document, batching same operation types together
200203 """
201204 from collections import defaultdict
202-
205+
203206 # Group by operation type to batch operations
204207 operations_by_type = defaultdict (list )
205208 for claimed_index in claimed_indexes :
206- action = claimed_index [' action' ]
209+ action = claimed_index [" action" ]
207210 operations_by_type [action ].append (claimed_index )
208211
209212 # Process create operations as a batch
210213 if IndexAction .CREATE in operations_by_type :
211214 create_indexes = operations_by_type [IndexAction .CREATE ]
212- create_types = [claimed_index [' index_type' ] for claimed_index in create_indexes ]
215+ create_types = [claimed_index [" index_type" ] for claimed_index in create_indexes ]
213216 context = {}
214-
217+
215218 for claimed_index in create_indexes :
216- index_type = claimed_index [' index_type' ]
217- target_version = claimed_index .get (' target_version' )
218-
219+ index_type = claimed_index [" index_type" ]
220+ target_version = claimed_index .get (" target_version" )
221+
219222 # Store version info in context
220223 if target_version is not None :
221224 context [f"{ index_type } _version" ] = target_version
222-
223- task_id = self .task_scheduler .schedule_create_index (
224- document_id = document_id ,
225- index_types = create_types ,
226- context = context
225+
226+ self .task_scheduler .schedule_create_index (
227+ document_id = document_id , index_types = create_types , context = context
227228 )
228229 logger .info (f"Scheduled create task for document { document_id } , types: { create_types } " )
229-
230+
230231 # Process update operations as a batch
231232 if IndexAction .UPDATE in operations_by_type :
232233 update_indexes = operations_by_type [IndexAction .UPDATE ]
233- update_types = [claimed_index [' index_type' ] for claimed_index in update_indexes ]
234+ update_types = [claimed_index [" index_type" ] for claimed_index in update_indexes ]
234235 context = {}
235236
236237 for claimed_index in update_indexes :
237- index_type = claimed_index [' index_type' ]
238- target_version = claimed_index .get (' target_version' )
239-
238+ index_type = claimed_index [" index_type" ]
239+ target_version = claimed_index .get (" target_version" )
240+
240241 # Store version info in context
241242 if target_version is not None :
242243 context [f"{ index_type } _version" ] = target_version
243-
244- task_id = self .task_scheduler .schedule_update_index (
245- document_id = document_id ,
246- index_types = update_types ,
247- context = context
244+
245+ self .task_scheduler .schedule_update_index (
246+ document_id = document_id , index_types = update_types , context = context
248247 )
249248 logger .info (f"Scheduled update task for document { document_id } , types: { update_types } " )
250249
251250 # Process delete operations as a batch
252251 if IndexAction .DELETE in operations_by_type :
253252 delete_indexes = operations_by_type [IndexAction .DELETE ]
254- delete_types = [claimed_index ['index_type' ] for claimed_index in delete_indexes ]
255-
256- task_id = self .task_scheduler .schedule_delete_index (
257- document_id = document_id ,
258- index_types = delete_types
259- )
253+ delete_types = [claimed_index ["index_type" ] for claimed_index in delete_indexes ]
254+
255+ self .task_scheduler .schedule_delete_index (document_id = document_id , index_types = delete_types )
260256 logger .info (f"Scheduled delete task for document { document_id } , types: { delete_types } " )
261257
262258
@@ -322,7 +318,9 @@ def on_index_failed(document_id: str, index_type: str, error_message: str):
322318 DocumentIndex .document_id == document_id ,
323319 DocumentIndex .index_type == DocumentIndexType (index_type ),
324320 # Allow transition from any in-progress state
325- DocumentIndex .status .in_ ([DocumentIndexStatus .CREATING , DocumentIndexStatus .DELETION_IN_PROGRESS ]),
321+ DocumentIndex .status .in_ (
322+ [DocumentIndexStatus .CREATING , DocumentIndexStatus .DELETION_IN_PROGRESS ]
323+ ),
326324 )
327325 )
328326 .values (
@@ -350,15 +348,12 @@ def on_index_deleted(document_id: str, index_type: str):
350348 for session in get_sync_session ():
351349 # Delete the record entirely
352350 from sqlalchemy import delete
353-
354- delete_stmt = (
355- delete (DocumentIndex )
356- .where (
357- and_ (
358- DocumentIndex .document_id == document_id ,
359- DocumentIndex .index_type == DocumentIndexType (index_type ),
360- DocumentIndex .status == DocumentIndexStatus .DELETION_IN_PROGRESS ,
361- )
351+
352+ delete_stmt = delete (DocumentIndex ).where (
353+ and_ (
354+ DocumentIndex .document_id == document_id ,
355+ DocumentIndex .index_type == DocumentIndexType (index_type ),
356+ DocumentIndex .status == DocumentIndexStatus .DELETION_IN_PROGRESS ,
362357 )
363358 )
364359
0 commit comments