Skip to content

Commit 68dd5e8

Browse files
committed
feat: refactor document index reconciliation and deletion logic
- Introduced a new IndexAction class to standardize index operation types (CREATE, UPDATE, DELETE). - Updated DocumentIndexReconciler to utilize the new IndexAction constants for improved clarity and maintainability. - Refactored document deletion logic in DocumentService to streamline the deletion process and ensure proper index reconciliation. - Enhanced error handling and logging during document deletion and index management operations. - Updated documentation to reflect changes in the indexing architecture and operational flow.
1 parent 59e181a commit 68dd5e8

6 files changed

Lines changed: 477 additions & 402 deletions

File tree

aperag/index/reconciler.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
)
3030
from aperag.tasks.scheduler import TaskScheduler, create_task_scheduler
3131
from aperag.utils.utils import utc_now
32+
from aperag.utils.constant import IndexAction
3233

3334
logger = logging.getLogger(__name__)
3435

@@ -71,30 +72,30 @@ def _get_indexes_needing_reconciliation(self, session: Session) -> List[Document
7172
"""
7273
from collections import defaultdict
7374

74-
operations = defaultdict(lambda: {"create": [], "update": [], "delete": []})
75+
operations = defaultdict(lambda: {IndexAction.CREATE: [], IndexAction.UPDATE: [], IndexAction.DELETE: []})
7576

7677
conditions = {
77-
"create": and_(
78+
IndexAction.CREATE: and_(
7879
DocumentIndex.status == DocumentIndexStatus.PENDING,
7980
DocumentIndex.observed_version < DocumentIndex.version,
8081
DocumentIndex.version == 1,
8182
),
82-
"update": and_(
83+
IndexAction.UPDATE: and_(
8384
DocumentIndex.status == DocumentIndexStatus.PENDING,
8485
DocumentIndex.observed_version < DocumentIndex.version,
8586
DocumentIndex.version > 1,
8687
),
87-
"delete": and_(
88+
IndexAction.DELETE: and_(
8889
DocumentIndex.status == DocumentIndexStatus.DELETING,
8990
),
9091
}
9192

92-
for operation_type, condition in conditions.items():
93+
for action, condition in conditions.items():
9394
stmt = select(DocumentIndex).where(condition)
9495
result = session.execute(stmt)
9596
indexes = result.scalars().all()
9697
for index in indexes:
97-
operations[index.document_id][operation_type].append(index)
98+
operations[index.document_id][action].append(index)
9899

99100
return operations
100101

@@ -106,9 +107,9 @@ def _reconcile_single_document(self, document_id: str, operations: dict):
106107
# Collect indexes for this document that need claiming
107108
indexes_to_claim = []
108109

109-
for operation_type, doc_indexes in operations.items():
110+
for action, doc_indexes in operations.items():
110111
for doc_index in doc_indexes:
111-
indexes_to_claim.append((doc_index.id, doc_index.index_type, operation_type))
112+
indexes_to_claim.append((doc_index.id, doc_index.index_type, action))
112113

113114
# Atomically claim the indexes for this document
114115
claimed_indexes = self._claim_document_indexes(session, document_id, indexes_to_claim)
@@ -129,10 +130,10 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
129130
claimed_indexes = []
130131

131132
try:
132-
for index_id, index_type, operation_type in indexes_to_claim:
133-
if operation_type in ["create", "update"]:
133+
for index_id, index_type, action in indexes_to_claim:
134+
if action in [IndexAction.CREATE, IndexAction.UPDATE]:
134135
target_state = DocumentIndexStatus.CREATING
135-
elif operation_type == "delete":
136+
elif action == IndexAction.DELETE:
136137
target_state = DocumentIndexStatus.DELETION_IN_PROGRESS
137138
else:
138139
continue
@@ -146,21 +147,21 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
146147
continue
147148

148149
# Build appropriate claiming conditions based on operation type
149-
if operation_type == "create":
150+
if action == IndexAction.CREATE:
150151
claiming_conditions = [
151152
DocumentIndex.id == index_id,
152153
DocumentIndex.status == DocumentIndexStatus.PENDING,
153154
DocumentIndex.observed_version < DocumentIndex.version,
154155
DocumentIndex.version == 1,
155156
]
156-
elif operation_type == "update":
157+
elif action == IndexAction.UPDATE:
157158
claiming_conditions = [
158159
DocumentIndex.id == index_id,
159160
DocumentIndex.status == DocumentIndexStatus.PENDING,
160161
DocumentIndex.observed_version < DocumentIndex.version,
161162
DocumentIndex.version > 1,
162163
]
163-
elif operation_type == "delete":
164+
elif action == IndexAction.DELETE:
164165
claiming_conditions = [
165166
DocumentIndex.id == index_id,
166167
DocumentIndex.status == DocumentIndexStatus.DELETING,
@@ -180,10 +181,10 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to
180181
'index_id': index_id,
181182
'document_id': document_id,
182183
'index_type': index_type,
183-
'operation_type': operation_type,
184-
'target_version': current_index.version if operation_type in ["create", "update"] else None,
184+
'action': action,
185+
'target_version': current_index.version if action in [IndexAction.CREATE, IndexAction.UPDATE] else None,
185186
})
186-
logger.debug(f"Claimed index {index_id} for document {document_id} ({operation_type})")
187+
logger.debug(f"Claimed index {index_id} for document {document_id} ({action})")
187188
else:
188189
logger.debug(f"Could not claim index {index_id} for document {document_id}")
189190

@@ -202,12 +203,12 @@ def _reconcile_document_operations(self, document_id: str, claimed_indexes: List
202203
# Group by operation type to batch operations
203204
operations_by_type = defaultdict(list)
204205
for claimed_index in claimed_indexes:
205-
operation_type = claimed_index['operation_type']
206-
operations_by_type[operation_type].append(claimed_index)
206+
action = claimed_index['action']
207+
operations_by_type[action].append(claimed_index)
207208

208209
# Process create operations as a batch
209-
if "create" in operations_by_type:
210-
create_indexes = operations_by_type["create"]
210+
if IndexAction.CREATE in operations_by_type:
211+
create_indexes = operations_by_type[IndexAction.CREATE]
211212
create_types = [claimed_index['index_type'] for claimed_index in create_indexes]
212213
context = {}
213214

@@ -227,8 +228,8 @@ def _reconcile_document_operations(self, document_id: str, claimed_indexes: List
227228
logger.info(f"Scheduled create task for document {document_id}, types: {create_types}")
228229

229230
# Process update operations as a batch
230-
if "update" in operations_by_type:
231-
update_indexes = operations_by_type["update"]
231+
if IndexAction.UPDATE in operations_by_type:
232+
update_indexes = operations_by_type[IndexAction.UPDATE]
232233
update_types = [claimed_index['index_type'] for claimed_index in update_indexes]
233234
context = {}
234235

@@ -248,8 +249,8 @@ def _reconcile_document_operations(self, document_id: str, claimed_indexes: List
248249
logger.info(f"Scheduled update task for document {document_id}, types: {update_types}")
249250

250251
# Process delete operations as a batch
251-
if "delete" in operations_by_type:
252-
delete_indexes = operations_by_type["delete"]
252+
if IndexAction.DELETE in operations_by_type:
253+
delete_indexes = operations_by_type[IndexAction.DELETE]
253254
delete_types = [claimed_index['index_type'] for claimed_index in delete_indexes]
254255

255256
task_id = self.task_scheduler.schedule_delete_index(

aperag/service/document_service.py

Lines changed: 44 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -250,111 +250,67 @@ async def get_document(self, user: str, collection_id: str, document_id: str) ->
250250
async for session in get_async_session():
251251
return await self.build_document_response(document, session)
252252

253-
async def delete_document(self, user: str, collection_id: str, document_id: str) -> Optional[view_models.Document]:
254-
"""Delete document by ID (idempotent operation)
255-
256-
Returns the deleted document or None if already deleted/not found
253+
async def _delete_document(self, session: AsyncSession, user: str, collection_id: str, document_id: str):
254+
"""
255+
Core logic to delete a single document and its associated resources.
256+
This method is designed to be called within a transaction.
257257
"""
258+
# Validate document existence and ownership
258259
document = await self.db_ops.query_document(user, collection_id, document_id)
259260
if document is None:
260-
# Document already deleted or never existed - idempotent operation
261-
return None
262-
263-
# Delete document and indexes atomically in a single transaction
264-
async def _delete_document_atomically(session):
265-
from sqlalchemy import select
266-
267-
from aperag.db.models import Document, DocumentStatus, utc_now
268-
269-
# Get and delete document
270-
stmt = select(Document).where(
271-
Document.id == document_id,
272-
Document.collection_id == collection_id,
273-
Document.user == user,
274-
Document.status != DocumentStatus.DELETED,
275-
)
276-
result = await session.execute(stmt)
277-
doc_to_delete = result.scalars().first()
261+
# Silently ignore if document not found, as it might have been deleted by another process
262+
logger.warning(f"Document {document_id} not found for deletion, skipping.")
263+
return
278264

279-
if not doc_to_delete:
280-
return None
281-
282-
# Soft delete document
283-
doc_to_delete.status = DocumentStatus.DELETED
284-
doc_to_delete.gmt_deleted = utc_now()
285-
session.add(doc_to_delete)
286-
await session.flush()
287-
await session.refresh(doc_to_delete)
288-
289-
# Mark index specs for deletion
290-
await document_index_manager.delete_document_indexes(session, document_id)
291-
292-
# Build response object
293-
return await self.build_document_response(doc_to_delete, session)
294-
295-
result = await self.db_ops.execute_with_transaction(_delete_document_atomically)
265+
# Use index manager to mark all related indexes for deletion
266+
await document_index_manager.delete_document_indexes(
267+
document_id=document.id, index_types=None, session=session
268+
)
296269

297-
if result:
298-
# Delete object storage files after successful database transaction
299-
obj_store = get_object_store()
270+
# Delete from object store
271+
obj_store = get_object_store()
272+
metadata = json.loads(document.doc_metadata) if document.doc_metadata else {}
273+
if object_path := metadata.get("object_path"):
300274
try:
301-
await sync_to_async(obj_store.delete_objects_by_prefix)(f"{document.object_store_base_path()}/")
275+
# Use delete_objects_by_prefix to remove all related files (original, chunks, etc.)
276+
await sync_to_async(obj_store.delete_objects_by_prefix)(document.object_store_base_path())
277+
logger.info(f"Deleted objects from object store with prefix: {document.object_store_base_path()}")
302278
except Exception as e:
303-
logger.warning(f"Failed to delete object storage files for document {document_id}: {e}")
304-
305-
# Trigger index reconciliation after successful document deletion
306-
_trigger_index_reconciliation()
307-
308-
return result
279+
logger.warning(f"Failed to delete objects for document {document.id} from object store: {e}")
309280

310-
return None
311-
312-
async def delete_documents(self, user: str, collection_id: str, document_ids: List[str]) -> dict:
313-
# Delete documents and indexes atomically in a single transaction
314-
async def _delete_documents_atomically(session):
315-
from sqlalchemy import select
316-
317-
from aperag.db.models import Document, DocumentStatus, utc_now
318-
319-
# Get documents to delete
320-
stmt = select(Document).where(
321-
Document.id.in_(document_ids),
322-
Document.collection_id == collection_id,
323-
Document.user == user,
324-
Document.status != DocumentStatus.DELETED,
325-
)
326-
result = await session.execute(stmt)
327-
documents_to_delete = result.scalars().all()
281+
# Delete the document record from the database
282+
await session.delete(document)
283+
await session.flush()
284+
logger.info(f"Successfully marked document {document.id} and its indexes for deletion.")
328285

329-
if not documents_to_delete:
330-
return [], list(document_ids)
286+
return document
331287

332-
# Soft delete documents
333-
success_ids = []
334-
for doc in documents_to_delete:
335-
doc.status = DocumentStatus.DELETED
336-
doc.gmt_deleted = utc_now()
337-
session.add(doc)
338-
success_ids.append(doc.id)
288+
async def delete_document(self, user: str, collection_id: str, document_id: str) -> dict:
289+
"""Delete a single document and trigger index reconciliation."""
339290

340-
await session.flush()
291+
async def _delete_document_atomically(session: AsyncSession):
292+
return await self._delete_document(session, user, collection_id, document_id)
341293

342-
# Delete indexes for all successful deletions
343-
for doc_id in success_ids:
344-
await document_index_manager.delete_document_indexes(session, doc_id)
294+
result = await self.db_ops.execute_with_transaction(_delete_document_atomically)
345295

346-
# Calculate failed IDs
347-
failed_ids = list(set(document_ids) - set(success_ids))
348-
return success_ids, failed_ids
296+
# Trigger reconciliation to process the deletion
297+
_trigger_index_reconciliation()
298+
return result
349299

350-
success_ids, failed_ids = await self.db_ops.execute_with_transaction(_delete_documents_atomically)
300+
async def delete_documents(self, user: str, collection_id: str, document_ids: List[str]) -> dict:
301+
"""Delete multiple documents and trigger index reconciliation."""
351302

352-
result = {"success": success_ids, "failed": failed_ids}
303+
async def _delete_documents_atomically(session: AsyncSession):
304+
deleted_ids = []
305+
for doc_id in document_ids:
306+
await self._delete_document(session, user, collection_id, doc_id)
307+
deleted_ids.append(doc_id)
308+
return {"deleted_ids": deleted_ids, "status": "success"}
353309

354-
# Trigger index reconciliation after successful batch document deletion
355-
if result.get("success"): # Only trigger if at least one document was deleted successfully
356-
_trigger_index_reconciliation()
310+
result = await self.db_ops.execute_with_transaction(_delete_documents_atomically)
357311

312+
# Trigger reconciliation to process deletions
313+
_trigger_index_reconciliation()
358314
return result
359315

360316
async def rebuild_document_indexes(

aperag/utils/constant.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,9 @@ class QuotaType:
3939
MAX_COLLECTION_COUNT = "max_collection_count"
4040
MAX_DOCUMENT_COUNT = "max_document_count"
4141
MAX_CONVERSATION_COUNT = "max_conversation_count"
42+
43+
44+
class IndexAction:
45+
CREATE = "create"
46+
UPDATE = "update"
47+
DELETE = "delete"

0 commit comments

Comments
 (0)