Skip to content

Commit c500ec7

Browse files
authored
feat: implement rebuild indexes functionality for documents (#979)
1 parent f50a84a commit c500ec7

15 files changed

Lines changed: 630 additions & 4 deletions

File tree

aperag/api/components/schemas/document.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,19 @@ documentUpdate:
8282
type: string
8383
source:
8484
type: string
85+
86+
rebuildIndexesRequest:
87+
type: object
88+
properties:
89+
index_types:
90+
type: array
91+
items:
92+
type: string
93+
enum:
94+
- vector
95+
- fulltext
96+
- graph
97+
description: Types of indexes to rebuild
98+
minItems: 1
99+
required:
100+
- index_types

aperag/api/openapi.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ paths:
4747
$ref: './paths/collections.yaml#/documents'
4848
/collections/{collection_id}/documents/{document_id}:
4949
$ref: './paths/collections.yaml#/document'
50+
/collections/{collection_id}/documents/{document_id}/rebuild_indexes:
51+
$ref: './paths/collections.yaml#/rebuild_indexes'
5052
/collections/{collection_id}/searches:
5153
$ref: './paths/collections.yaml#/searches'
5254
/collections/{collection_id}/searches/{search_id}:

aperag/api/paths/collections.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,45 @@ document:
296296
schema:
297297
$ref: '../components/schemas/common.yaml#/failResponse'
298298

299+
rebuild_indexes:
300+
post:
301+
summary: Rebuild document indexes
302+
description: Rebuild specified types of indexes for a document
303+
security:
304+
- BearerAuth: []
305+
parameters:
306+
- name: collection_id
307+
in: path
308+
required: true
309+
schema:
310+
type: string
311+
- name: document_id
312+
in: path
313+
required: true
314+
schema:
315+
type: string
316+
requestBody:
317+
required: true
318+
content:
319+
application/json:
320+
schema:
321+
$ref: '../components/schemas/document.yaml#/rebuildIndexesRequest'
322+
responses:
323+
'204':
324+
description: Index rebuild initiated successfully
325+
'401':
326+
description: Unauthorized
327+
content:
328+
application/json:
329+
schema:
330+
$ref: '../components/schemas/common.yaml#/failResponse'
331+
'404':
332+
description: Document not found
333+
content:
334+
application/json:
335+
schema:
336+
$ref: '../components/schemas/common.yaml#/failResponse'
337+
299338
searches:
300339
get:
301340
summary: Get search history

aperag/index/manager.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,40 @@ async def delete_document_indexes(
107107
if doc_index:
108108
doc_index.update_spec(IndexDesiredState.ABSENT)
109109

110+
async def rebuild_document_indexes(
111+
self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType]
112+
):
113+
"""
114+
Rebuild specified document indexes (called when user requests index rebuild)
115+
116+
This increments the version of specified indexes to trigger reconciliation.
117+
118+
Args:
119+
session: Database session
120+
document_id: Document ID
121+
index_types: List of index types to rebuild
122+
"""
123+
if len(set(index_types)) != len(index_types):
124+
raise Exception("Duplicate index types are not allowed")
125+
126+
for index_type in index_types:
127+
stmt = select(DocumentIndex).where(
128+
and_(DocumentIndex.document_id == document_id, DocumentIndex.index_type == index_type)
129+
)
130+
result = await session.execute(stmt)
131+
doc_index = result.scalar_one_or_none()
132+
133+
if doc_index:
134+
# Only rebuild if the index is present or failed
135+
if doc_index.desired_state == IndexDesiredState.PRESENT:
136+
doc_index.version += 1 # Increment version to trigger re-indexing
137+
doc_index.gmt_updated = utc_now()
138+
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
139+
else:
140+
logger.warning(f"Cannot rebuild {index_type.value} index for document {document_id}: index not present")
141+
else:
142+
logger.warning(f"No {index_type.value} index found for document {document_id}")
143+
110144
async def get_document_index_status(self, session: AsyncSession, document_id: str) -> dict:
111145
"""
112146
Get current index status for a document

aperag/schema/view_models.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# generated by datamodel-codegen:
22
# filename: openapi.merged.yaml
3-
# timestamp: 2025-06-23T03:26:47+00:00
3+
# timestamp: 2025-06-23T09:37:51+00:00
44

55
from __future__ import annotations
66

@@ -575,6 +575,12 @@ class DocumentUpdate(BaseModel):
575575
source: Optional[str] = None
576576

577577

578+
class RebuildIndexesRequest(BaseModel):
579+
index_types: list[Literal['vector', 'fulltext', 'graph']] = Field(
580+
..., description='Types of indexes to rebuild', min_items=1
581+
)
582+
583+
578584
class VectorSearchParams(BaseModel):
579585
topk: Optional[int] = Field(None, description='Top K results')
580586
similarity: Optional[confloat(ge=0.0, le=1.0)] = Field(

aperag/service/document_service.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,71 @@ async def _delete_documents_atomically(session):
402402

403403
return result
404404

405+
async def rebuild_document_indexes(
406+
self, user_id: str, collection_id: str, document_id: str, index_types: List[str]
407+
) -> dict:
408+
"""
409+
Rebuild specified indexes for a document
410+
411+
Args:
412+
user_id: User ID
413+
collection_id: Collection ID
414+
document_id: Document ID
415+
index_types: List of index types to rebuild ('vector', 'fulltext', 'graph')
416+
417+
Returns:
418+
dict: Success response
419+
"""
420+
if len(set(index_types)) != len(index_types):
421+
raise invalid_param("index_types", "duplicate index types are not allowed")
422+
423+
logger.info(f"Rebuilding indexes for document {document_id} with types: {index_types}")
424+
425+
# Convert index types to enum values outside transaction
426+
from aperag.db.models import DocumentIndexType
427+
index_type_enums = []
428+
for index_type in index_types:
429+
if index_type == 'vector':
430+
index_type_enums.append(DocumentIndexType.VECTOR)
431+
elif index_type == 'fulltext':
432+
index_type_enums.append(DocumentIndexType.FULLTEXT)
433+
elif index_type == 'graph':
434+
index_type_enums.append(DocumentIndexType.GRAPH)
435+
else:
436+
raise invalid_param("index_type", f"Invalid index type: {index_type}")
437+
438+
# Execute all operations atomically in a single transaction
439+
async def _rebuild_document_indexes_atomically(session):
440+
# Verify document exists and user has access
441+
document = await self.db_ops.query_document(user_id, collection_id, document_id)
442+
if not document:
443+
raise DocumentNotFoundException(f"Document {document_id} not found")
444+
445+
if document.collection_id != collection_id:
446+
raise ResourceNotFoundException(f"Document {document_id} not found in collection {collection_id}")
447+
448+
# Verify user has access to the collection
449+
collection = await self.db_ops.query_collection(user_id, collection_id)
450+
if not collection or collection.user != user_id:
451+
raise ResourceNotFoundException(f"Collection {collection_id} not found or access denied")
452+
453+
# Trigger index rebuild by incrementing version for selected index types
454+
await document_index_manager.rebuild_document_indexes(session, document_id, index_type_enums)
455+
456+
logger.info(f"Successfully triggered rebuild for document {document_id} indexes: {index_types}")
457+
458+
return {
459+
"code": "200",
460+
"message": f"Index rebuild initiated for types: {', '.join(index_types)}"
461+
}
462+
463+
result = await self.db_ops.execute_with_transaction(_rebuild_document_indexes_atomically)
464+
465+
# Trigger index reconciliation after successful rebuild initiation
466+
_trigger_index_reconciliation()
467+
468+
return result
469+
405470

406471
# Create a global service instance for easy access
407472
# This uses the global db_ops instance and doesn't require session management in views

aperag/views/main.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,21 @@ async def delete_documents_view(
158158
return await document_service.delete_documents(str(user.id), collection_id, document_ids)
159159

160160

161+
@router.post("/collections/{collection_id}/documents/{document_id}/rebuild_indexes")
162+
@audit(resource_type="document", api_name="RebuildDocumentIndexes")
163+
async def rebuild_document_indexes_view(
164+
request: Request,
165+
collection_id: str,
166+
document_id: str,
167+
rebuild_request: view_models.RebuildIndexesRequest,
168+
user: User = Depends(current_user),
169+
):
170+
"""Rebuild specified indexes for a document"""
171+
return await document_service.rebuild_document_indexes(
172+
str(user.id), collection_id, document_id, rebuild_request.index_types
173+
)
174+
175+
161176
@router.post("/bots/{bot_id}/chats")
162177
@audit(resource_type="chat", api_name="CreateChat")
163178
async def create_chat_view(request: Request, bot_id: str, user: User = Depends(current_user)) -> view_models.Chat:

0 commit comments

Comments
 (0)