Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions aperag/api/components/schemas/document.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,19 @@ documentUpdate:
type: string
source:
type: string

rebuildIndexesRequest:
type: object
properties:
index_types:
type: array
items:
type: string
enum:
- vector
- fulltext
- graph
description: Types of indexes to rebuild
minItems: 1
required:
- index_types
2 changes: 2 additions & 0 deletions aperag/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ paths:
$ref: './paths/collections.yaml#/documents'
/collections/{collection_id}/documents/{document_id}:
$ref: './paths/collections.yaml#/document'
/collections/{collection_id}/documents/{document_id}/rebuild_indexes:
$ref: './paths/collections.yaml#/rebuild_indexes'
/collections/{collection_id}/searches:
$ref: './paths/collections.yaml#/searches'
/collections/{collection_id}/searches/{search_id}:
Expand Down
39 changes: 39 additions & 0 deletions aperag/api/paths/collections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,45 @@ document:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'

rebuild_indexes:
post:
summary: Rebuild document indexes
description: Rebuild specified types of indexes for a document
security:
- BearerAuth: []
parameters:
- name: collection_id
in: path
required: true
schema:
type: string
- name: document_id
in: path
required: true
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
$ref: '../components/schemas/document.yaml#/rebuildIndexesRequest'
responses:
'204':
description: Index rebuild initiated successfully
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'
'404':
description: Document not found
content:
application/json:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'

searches:
get:
summary: Get search history
Expand Down
34 changes: 34 additions & 0 deletions aperag/index/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,40 @@ async def delete_document_indexes(
if doc_index:
doc_index.update_spec(IndexDesiredState.ABSENT)

async def rebuild_document_indexes(
self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType]
):
"""
Rebuild specified document indexes (called when user requests index rebuild)

This increments the version of specified indexes to trigger reconciliation.

Args:
session: Database session
document_id: Document ID
index_types: List of index types to rebuild
"""
if len(set(index_types)) != len(index_types):
raise Exception("Duplicate index types are not allowed")

for index_type in index_types:
stmt = select(DocumentIndex).where(
and_(DocumentIndex.document_id == document_id, DocumentIndex.index_type == index_type)
)
result = await session.execute(stmt)
doc_index = result.scalar_one_or_none()

if doc_index:
# Only rebuild if the index is present or failed
if doc_index.desired_state == IndexDesiredState.PRESENT:
doc_index.version += 1 # Increment version to trigger re-indexing
doc_index.gmt_updated = utc_now()
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
else:
logger.warning(f"Cannot rebuild {index_type.value} index for document {document_id}: index not present")
else:
logger.warning(f"No {index_type.value} index found for document {document_id}")

async def get_document_index_status(self, session: AsyncSession, document_id: str) -> dict:
"""
Get current index status for a document
Expand Down
8 changes: 7 additions & 1 deletion aperag/schema/view_models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: openapi.merged.yaml
# timestamp: 2025-06-23T03:26:47+00:00
# timestamp: 2025-06-23T09:37:51+00:00

from __future__ import annotations

Expand Down Expand Up @@ -575,6 +575,12 @@ class DocumentUpdate(BaseModel):
source: Optional[str] = None


class RebuildIndexesRequest(BaseModel):
index_types: list[Literal['vector', 'fulltext', 'graph']] = Field(
..., description='Types of indexes to rebuild', min_items=1
)


class VectorSearchParams(BaseModel):
topk: Optional[int] = Field(None, description='Top K results')
similarity: Optional[confloat(ge=0.0, le=1.0)] = Field(
Expand Down
65 changes: 65 additions & 0 deletions aperag/service/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,71 @@ async def _delete_documents_atomically(session):

return result

async def rebuild_document_indexes(
self, user_id: str, collection_id: str, document_id: str, index_types: List[str]
) -> dict:
"""
Rebuild specified indexes for a document

Args:
user_id: User ID
collection_id: Collection ID
document_id: Document ID
index_types: List of index types to rebuild ('vector', 'fulltext', 'graph')

Returns:
dict: Success response
"""
if len(set(index_types)) != len(index_types):
raise invalid_param("index_types", "duplicate index types are not allowed")

logger.info(f"Rebuilding indexes for document {document_id} with types: {index_types}")

# Convert index types to enum values outside transaction
from aperag.db.models import DocumentIndexType
index_type_enums = []
for index_type in index_types:
if index_type == 'vector':
index_type_enums.append(DocumentIndexType.VECTOR)
elif index_type == 'fulltext':
index_type_enums.append(DocumentIndexType.FULLTEXT)
elif index_type == 'graph':
index_type_enums.append(DocumentIndexType.GRAPH)
else:
raise invalid_param("index_type", f"Invalid index type: {index_type}")

# Execute all operations atomically in a single transaction
async def _rebuild_document_indexes_atomically(session):
# Verify document exists and user has access
document = await self.db_ops.query_document(user_id, collection_id, document_id)
if not document:
raise DocumentNotFoundException(f"Document {document_id} not found")

if document.collection_id != collection_id:
raise ResourceNotFoundException(f"Document {document_id} not found in collection {collection_id}")

# Verify user has access to the collection
collection = await self.db_ops.query_collection(user_id, collection_id)
if not collection or collection.user != user_id:
raise ResourceNotFoundException(f"Collection {collection_id} not found or access denied")

# Trigger index rebuild by incrementing version for selected index types
await document_index_manager.rebuild_document_indexes(session, document_id, index_type_enums)

logger.info(f"Successfully triggered rebuild for document {document_id} indexes: {index_types}")

return {
"code": "200",
"message": f"Index rebuild initiated for types: {', '.join(index_types)}"
}

result = await self.db_ops.execute_with_transaction(_rebuild_document_indexes_atomically)

# Trigger index reconciliation after successful rebuild initiation
_trigger_index_reconciliation()

return result


# Create a global service instance for easy access
# This uses the global db_ops instance and doesn't require session management in views
Expand Down
15 changes: 15 additions & 0 deletions aperag/views/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,21 @@ async def delete_documents_view(
return await document_service.delete_documents(str(user.id), collection_id, document_ids)


@router.post("/collections/{collection_id}/documents/{document_id}/rebuild_indexes")
@audit(resource_type="document", api_name="RebuildDocumentIndexes")
async def rebuild_document_indexes_view(
request: Request,
collection_id: str,
document_id: str,
rebuild_request: view_models.RebuildIndexesRequest,
user: User = Depends(current_user),
):
"""Rebuild specified indexes for a document"""
return await document_service.rebuild_document_indexes(
str(user.id), collection_id, document_id, rebuild_request.index_types
)


@router.post("/bots/{bot_id}/chats")
@audit(resource_type="chat", api_name="CreateChat")
async def create_chat_view(request: Request, bot_id: str, user: User = Depends(current_user)) -> view_models.Chat:
Expand Down
Loading
Loading