Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions aperag/api/components/schemas/document.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,34 @@ document:
- FAILED
- DELETING
- DELETED
- WARNING
vector_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- CREATING
- ACTIVE
- DELETING
- DELETION_IN_PROGRESS
- FAILED
- SKIPPED
fulltext_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- CREATING
- ACTIVE
- DELETING
- DELETION_IN_PROGRESS
- FAILED
- SKIPPED
graph_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- CREATING
- ACTIVE
- DELETING
- DELETION_IN_PROGRESS
- FAILED
- SKIPPED
vector_index_updated:
Expand Down Expand Up @@ -85,16 +90,6 @@ documentCreate:
collection_id:
type: string

documentUpdate:
type: object
properties:
title:
type: string
description:
type: string
source:
type: string

rebuildIndexesRequest:
type: object
properties:
Expand Down
41 changes: 0 additions & 41 deletions aperag/api/paths/collections.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -254,47 +254,6 @@ document:
application/json:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'
put:
summary: Update a document
description: Update a document
security:
- BearerAuth: []
parameters:
- name: collection_id
in: path
required: true
schema:
type: string
- name: document_id
in: path
required: true
schema:
type: string
requestBody:
required: true
content:
application/json:
schema:
$ref: '../components/schemas/document.yaml#/documentUpdate'
responses:
'200':
description: Document updated successfully
content:
application/json:
schema:
$ref: '../components/schemas/document.yaml#/document'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'
'404':
description: Document not found
content:
application/json:
schema:
$ref: '../components/schemas/common.yaml#/failResponse'

rebuild_indexes:
post:
Expand Down
71 changes: 24 additions & 47 deletions aperag/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ class DocumentIndexType(str, Enum):
GRAPH = "GRAPH"


class DocumentIndexStatus(str, Enum):
"""Document index lifecycle status"""

PENDING = "PENDING" # Awaiting processing (create/update)
CREATING = "CREATING" # Task claimed, creation/update in progress
ACTIVE = "ACTIVE" # Index is up-to-date and ready for use
DELETING = "DELETING" # Deletion has been requested
DELETION_IN_PROGRESS = "DELETION_IN_PROGRESS" # Task claimed, deletion in progress
FAILED = "FAILED" # The last operation failed


class BotStatus(str, Enum):
ACTIVE = "ACTIVE"
DELETED = "DELETED"
Expand Down Expand Up @@ -159,24 +170,6 @@ class LightRAGDocStatus(str, Enum):
FAILED = "failed"


# Add new enums for K8s-inspired design
class IndexDesiredState(str, Enum):
"""Desired state for index - what we want"""

PRESENT = "present"
ABSENT = "absent"


class IndexActualState(str, Enum):
"""Actual state for index - what currently exists"""

ABSENT = "absent"
CREATING = "creating"
PRESENT = "present"
DELETING = "deleting"
FAILED = "failed"


# Models
class Collection(Base):
__tablename__ = "collection"
Expand Down Expand Up @@ -243,13 +236,15 @@ def get_overall_index_status(self, session) -> "DocumentStatus":
if not document_indexes:
return DocumentStatus.PENDING

states = [idx.actual_state for idx in document_indexes]
statuses = [idx.status for idx in document_indexes]

if any(state == IndexActualState.FAILED for state in states):
if any(status == DocumentIndexStatus.FAILED for status in statuses):
return DocumentStatus.FAILED
elif any(state == IndexActualState.CREATING for state in states):
elif any(
status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS] for status in statuses
):
return DocumentStatus.RUNNING
elif all(state == IndexActualState.PRESENT for state in states):
elif all(status == DocumentIndexStatus.ACTIVE for status in statuses):
return DocumentStatus.COMPLETE
else:
return DocumentStatus.PENDING
Expand Down Expand Up @@ -700,7 +695,7 @@ class LightRAGLLMCacheModel(Base):


class DocumentIndex(Base):
"""Document index - combines spec and status into single table"""
"""Document index - single status model"""

__tablename__ = "document_index"
__table_args__ = (UniqueConstraint("document_id", "index_type", name="uq_document_index"),)
Expand All @@ -709,14 +704,11 @@ class DocumentIndex(Base):
document_id = Column(String(24), nullable=False, index=True)
index_type = Column(EnumColumn(DocumentIndexType), nullable=False, index=True)

# Desired state (spec) fields
desired_state = Column(EnumColumn(IndexDesiredState), nullable=False, default=IndexDesiredState.PRESENT, index=True)
status = Column(EnumColumn(DocumentIndexStatus), nullable=False, default=DocumentIndexStatus.PENDING, index=True)
version = Column(Integer, nullable=False, default=1) # Incremented on each spec change
created_by = Column(String(256), nullable=False) # User who created this spec

# Actual state (status) fields
actual_state = Column(EnumColumn(IndexActualState), nullable=False, default=IndexActualState.ABSENT, index=True)
observed_version = Column(Integer, nullable=False, default=0) # Last processed spec version

# Index data and task tracking
index_data = Column(Text, nullable=True) # JSON string for index-specific data
error_message = Column(Text, nullable=True)

Expand All @@ -726,25 +718,10 @@ class DocumentIndex(Base):
gmt_last_reconciled = Column(DateTime(timezone=True), nullable=True) # Last reconciliation attempt

def __repr__(self):
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, desired={self.desired_state}, actual={self.actual_state})>"

def is_in_sync(self) -> bool:
"""Check if desired and actual states are in sync"""
if self.observed_version < self.version:
return False

if self.desired_state == IndexDesiredState.PRESENT:
return self.actual_state == IndexActualState.PRESENT
elif self.desired_state == IndexDesiredState.ABSENT:
return self.actual_state == IndexActualState.ABSENT
return False
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, status={self.status}, version={self.version})>"

def update_spec(self, desired_state: IndexDesiredState = None, created_by: str = None):
"""Update the spec (desired state) part"""
if desired_state is not None:
self.desired_state = desired_state
if created_by is not None:
self.created_by = created_by
def update_version(self):
"""Update the version to trigger reconciliation"""
self.version += 1
self.gmt_updated = utc_now()

Expand Down
133 changes: 17 additions & 116 deletions aperag/index/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession

from aperag.db.models import DocumentIndex, DocumentIndexType, IndexActualState, IndexDesiredState, utc_now
from aperag.db.models import DocumentIndex, DocumentIndexStatus, DocumentIndexType, utc_now

logger = logging.getLogger(__name__)


class FrontendIndexManager:
class DocumentIndexManager:
"""Simple manager for document index specs (frontend chain)"""

async def create_document_indexes(
self, session: AsyncSession, document_id: str, user: str, index_types: Optional[List[DocumentIndexType]] = None
async def create_or_update_document_indexes(
self, session: AsyncSession, document_id: str, index_types: Optional[List[DocumentIndexType]] = None
):
"""
Create index specs for a document (called when document is created)
Create or update index records for a document (called when document is created or index isupdated)

Args:
session: Database session
document_id: Document ID
user: User creating the document
index_types: List of index types to create (defaults to all)
"""
if index_types is None:
Expand All @@ -50,38 +49,21 @@ async def create_document_indexes(
existing_index = result.scalar_one_or_none()

if existing_index:
# Update existing index
existing_index.update_spec(IndexDesiredState.PRESENT, user)
logger.debug(f"Updated index for {document_id}:{index_type} to version {existing_index.version}")
# Update existing index to pending and increment version
existing_index.status = DocumentIndexStatus.PENDING
existing_index.update_version()
logger.debug(f"Updated index for {document_id}:{index_type.value} to version {existing_index.version}")
else:
# Create new index
doc_index = DocumentIndex(
document_id=document_id,
index_type=index_type,
desired_state=IndexDesiredState.PRESENT,
status=DocumentIndexStatus.PENDING,
version=1,
created_by=user,
observed_version=0,
)
session.add(doc_index)

async def update_document_indexes(self, session: AsyncSession, document_id: str):
"""
Update document indexes (called when document content is updated)

This increments the version of all indexes to trigger reconciliation.

Args:
session: Database session
document_id: Document ID
"""
stmt = select(DocumentIndex).where(DocumentIndex.document_id == document_id)
result = await session.execute(stmt)
indexes = result.scalars().all()

for index in indexes:
if index.desired_state == IndexDesiredState.PRESENT:
index.version += 1 # Increment version to trigger re-indexing
index.gmt_updated = utc_now()
logger.debug(f"Created new index for {document_id}:{index_type.value}")

async def delete_document_indexes(
self, session: AsyncSession, document_id: str, index_types: Optional[List[DocumentIndexType]] = None
Expand All @@ -105,92 +87,11 @@ async def delete_document_indexes(
doc_index = result.scalar_one_or_none()

if doc_index:
doc_index.update_spec(IndexDesiredState.ABSENT)

async def rebuild_document_indexes(
self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType]
):
"""
Rebuild specified document indexes (called when user requests index rebuild)

This increments the version of specified indexes to trigger reconciliation.

Args:
session: Database session
document_id: Document ID
index_types: List of index types to rebuild
"""
if len(set(index_types)) != len(index_types):
raise Exception("Duplicate index types are not allowed")

for index_type in index_types:
stmt = select(DocumentIndex).where(
and_(DocumentIndex.document_id == document_id, DocumentIndex.index_type == index_type)
)
result = await session.execute(stmt)
doc_index = result.scalar_one_or_none()

if doc_index:
# Only rebuild if the index is present or failed
if doc_index.desired_state == IndexDesiredState.PRESENT:
doc_index.version += 1 # Increment version to trigger re-indexing
doc_index.gmt_updated = utc_now()
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
else:
logger.warning(
f"Cannot rebuild {index_type.value} index for document {document_id}: index not present"
)
else:
logger.warning(f"No {index_type.value} index found for document {document_id}")

async def get_document_index_status(self, session: AsyncSession, document_id: str) -> dict:
"""
Get current index status for a document

Args:
session: Database session
document_id: Document ID

Returns:
Dictionary with index status information
"""
# Get all indexes for the document
stmt = select(DocumentIndex).where(DocumentIndex.document_id == document_id)
result = await session.execute(stmt)
indexes = result.scalars().all()

# Build result
result = {"document_id": document_id, "indexes": {}, "overall_status": "complete"}

has_creating = False
has_failed = False

for index in indexes:
index_info = {
"type": index.index_type,
"desired_state": index.desired_state,
"actual_state": index.actual_state,
"in_sync": index.is_in_sync(),
}

if index.actual_state == IndexActualState.CREATING:
has_creating = True
elif index.actual_state == IndexActualState.FAILED:
has_failed = True
index_info["error"] = index.error_message

result["indexes"][index.index_type] = index_info

# Determine overall status
if has_failed:
result["overall_status"] = "failed"
elif has_creating:
result["overall_status"] = "running"
else:
result["overall_status"] = "complete"

return result
# Mark for deletion
doc_index.status = DocumentIndexStatus.DELETING
doc_index.gmt_updated = utc_now()
logger.debug(f"Marked index {document_id}:{index_type.value} for deletion")


# Global instance
document_index_manager = FrontendIndexManager()
document_index_manager = DocumentIndexManager()
Loading
Loading