Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions aperag/api/components/schemas/document.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,33 @@ document:
vector_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- FAILED
- SKIPPED
- pending
- creating
- active
- deleting
- deletion_in_progress
- failed
- skipped
fulltext_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- FAILED
- SKIPPED
- pending
- creating
- active
- deleting
- deletion_in_progress
- failed
- skipped
graph_index_status:
type: string
enum:
- PENDING
- RUNNING
- COMPLETE
- FAILED
- SKIPPED
- pending
- creating
- active
- deleting
- deletion_in_progress
- failed
- skipped
vector_index_updated:
type: string
format: date-time
Expand Down
72 changes: 35 additions & 37 deletions aperag/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,19 @@ class LightRAGDocStatus(str, Enum):
FAILED = "failed"


# Add new enums for K8s-inspired design
class IndexDesiredState(str, Enum):
"""Desired state for index - what we want"""
# Document index status for simplified single-status model
class DocumentIndexStatus(str, Enum):
"""Document index lifecycle status"""

PRESENT = "present"
ABSENT = "absent"
PENDING = "pending" # Waiting for processing (create/update)
CREATING = "creating" # Index creation/update task in progress
ACTIVE = "active" # Index is up-to-date and ready for use
DELETING = "deleting" # Deletion has been requested
DELETION_IN_PROGRESS = "deletion_in_progress" # Index deletion task in progress
FAILED = "failed" # The last operation failed


class IndexActualState(str, Enum):
"""Actual state for index - what currently exists"""

ABSENT = "absent"
CREATING = "creating"
PRESENT = "present"
DELETING = "deleting"
FAILED = "failed"


# Models
Expand Down Expand Up @@ -243,13 +240,13 @@ def get_overall_index_status(self, session) -> "DocumentStatus":
if not document_indexes:
return DocumentStatus.PENDING

states = [idx.actual_state for idx in document_indexes]
statuses = [idx.status for idx in document_indexes]

if any(state == IndexActualState.FAILED for state in states):
if any(status == DocumentIndexStatus.FAILED for status in statuses):
return DocumentStatus.FAILED
elif any(state == IndexActualState.CREATING for state in states):
elif any(status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS] for status in statuses):
return DocumentStatus.RUNNING
elif all(state == IndexActualState.PRESENT for state in states):
elif all(status == DocumentIndexStatus.ACTIVE for status in statuses):
return DocumentStatus.COMPLETE
else:
return DocumentStatus.PENDING
Expand Down Expand Up @@ -700,7 +697,7 @@ class LightRAGLLMCacheModel(Base):


class DocumentIndex(Base):
"""Document index - combines spec and status into single table"""
"""Document index - simplified single status model"""

__tablename__ = "document_index"
__table_args__ = (UniqueConstraint("document_id", "index_type", name="uq_document_index"),)
Expand All @@ -709,14 +706,13 @@ class DocumentIndex(Base):
document_id = Column(String(24), nullable=False, index=True)
index_type = Column(EnumColumn(DocumentIndexType), nullable=False, index=True)

# Desired state (spec) fields
desired_state = Column(EnumColumn(IndexDesiredState), nullable=False, default=IndexDesiredState.PRESENT, index=True)
# Single status field for lifecycle management
status = Column(EnumColumn(DocumentIndexStatus), nullable=False, default=DocumentIndexStatus.PENDING, index=True)
version = Column(Integer, nullable=False, default=1) # Incremented on each spec change
created_by = Column(String(256), nullable=False) # User who created this spec

# Actual state (status) fields
actual_state = Column(EnumColumn(IndexActualState), nullable=False, default=IndexActualState.ABSENT, index=True)
observed_version = Column(Integer, nullable=False, default=0) # Last processed spec version
created_by = Column(String(256), nullable=False) # User who created this spec

# Index-specific data and error information
index_data = Column(Text, nullable=True) # JSON string for index-specific data
error_message = Column(Text, nullable=True)

Expand All @@ -726,26 +722,28 @@ class DocumentIndex(Base):
gmt_last_reconciled = Column(DateTime(timezone=True), nullable=True) # Last reconciliation attempt

def __repr__(self):
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, desired={self.desired_state}, actual={self.actual_state})>"
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, status={self.status})>"

def is_in_sync(self) -> bool:
"""Check if desired and actual states are in sync"""
if self.observed_version < self.version:
return False
def is_pending_processing(self) -> bool:
"""Check if index needs processing (create or update)"""
return (self.status == DocumentIndexStatus.PENDING and
self.observed_version < self.version)

if self.desired_state == IndexDesiredState.PRESENT:
return self.actual_state == IndexActualState.PRESENT
elif self.desired_state == IndexDesiredState.ABSENT:
return self.actual_state == IndexActualState.ABSENT
return False
def is_ready_for_deletion(self) -> bool:
"""Check if index is ready for deletion"""
return self.status == DocumentIndexStatus.DELETING

def update_spec(self, desired_state: IndexDesiredState = None, created_by: str = None):
"""Update the spec (desired state) part"""
if desired_state is not None:
self.desired_state = desired_state
def update_spec(self, created_by: str = None):
"""Update the spec to trigger re-processing"""
if created_by is not None:
self.created_by = created_by
self.version += 1
self.status = DocumentIndexStatus.PENDING # Reset to pending for reprocessing
self.gmt_updated = utc_now()

def mark_for_deletion(self):
"""Mark index for deletion"""
self.status = DocumentIndexStatus.DELETING
self.gmt_updated = utc_now()


Expand Down
1 change: 0 additions & 1 deletion aperag/index/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class IndexStatus(Enum):
RUNNING = "running"
COMPLETE = "complete"
FAILED = "failed"
SKIPPED = "skipped"


@dataclass
Expand Down
82 changes: 46 additions & 36 deletions aperag/index/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from sqlalchemy import and_, select
from sqlalchemy.ext.asyncio import AsyncSession

from aperag.db.models import DocumentIndex, DocumentIndexType, IndexActualState, IndexDesiredState, utc_now
from aperag.db.models import DocumentIndex, DocumentIndexType, DocumentIndexStatus, utc_now

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,14 +51,14 @@ async def create_document_indexes(

if existing_index:
# Update existing index
existing_index.update_spec(IndexDesiredState.PRESENT, user)
existing_index.update_spec(user)
logger.debug(f"Updated index for {document_id}:{index_type} to version {existing_index.version}")
else:
# Create new index
# Create new index with PENDING status
doc_index = DocumentIndex(
document_id=document_id,
index_type=index_type,
desired_state=IndexDesiredState.PRESENT,
status=DocumentIndexStatus.PENDING,
version=1,
created_by=user,
)
Expand All @@ -79,9 +79,8 @@ async def update_document_indexes(self, session: AsyncSession, document_id: str)
indexes = result.scalars().all()

for index in indexes:
if index.desired_state == IndexDesiredState.PRESENT:
index.version += 1 # Increment version to trigger re-indexing
index.gmt_updated = utc_now()
# Reset to PENDING to trigger re-indexing for existing indexes
index.update_spec()

async def delete_document_indexes(
self, session: AsyncSession, document_id: str, index_types: Optional[List[DocumentIndexType]] = None
Expand All @@ -105,7 +104,7 @@ async def delete_document_indexes(
doc_index = result.scalar_one_or_none()

if doc_index:
doc_index.update_spec(IndexDesiredState.ABSENT)
doc_index.mark_for_deletion()

async def rebuild_document_indexes(
self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType]
Expand All @@ -131,13 +130,9 @@ async def rebuild_document_indexes(
doc_index = result.scalar_one_or_none()

if doc_index:
# Only rebuild if the index is present or failed
if doc_index.desired_state == IndexDesiredState.PRESENT:
doc_index.version += 1 # Increment version to trigger re-indexing
doc_index.gmt_updated = utc_now()
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
else:
logger.warning(f"Cannot rebuild {index_type.value} index for document {document_id}: index not present")
# Reset to PENDING to trigger re-indexing
doc_index.update_spec()
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
else:
logger.warning(f"No {index_type.value} index found for document {document_id}")

Expand All @@ -150,39 +145,54 @@ async def get_document_index_status(self, session: AsyncSession, document_id: st
document_id: Document ID

Returns:
Dictionary with index status information
Dictionary with index status information including update times
"""
# Get all indexes for the document
stmt = select(DocumentIndex).where(DocumentIndex.document_id == document_id)
result = await session.execute(stmt)
indexes = result.scalars().all()
query_result = await session.execute(stmt)
indexes = query_result.scalars().all()

# Build result
# Build result with all possible index types
result = {"document_id": document_id, "indexes": {}, "overall_status": "complete"}

has_creating = False
has_active_processing = False
has_failed = False

for index in indexes:
index_info = {
"type": index.index_type,
"desired_state": index.desired_state,
"actual_state": index.actual_state,
"in_sync": index.is_in_sync(),
}

if index.actual_state == IndexActualState.CREATING:
has_creating = True
elif index.actual_state == IndexActualState.FAILED:
has_failed = True
index_info["error"] = index.error_message

result["indexes"][index.index_type] = index_info
# Create a map of existing indexes
existing_indexes = {index.index_type: index for index in indexes}

# Process all possible index types
for index_type in [DocumentIndexType.VECTOR, DocumentIndexType.FULLTEXT, DocumentIndexType.GRAPH]:
if index_type in existing_indexes:
# Index exists in database
index = existing_indexes[index_type]
index_info = {
"type": index.index_type,
"status": index.status,
"updated_time": index.gmt_updated,
"needs_processing": index.is_pending_processing(),
}

if index.status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS]:
has_active_processing = True
elif index.status == DocumentIndexStatus.FAILED:
has_failed = True
index_info["error"] = index.error_message

result["indexes"][index.index_type] = index_info
else:
# Index doesn't exist in database - show as skipped
result["indexes"][index_type] = {
"type": index_type,
"status": "skipped",
"updated_time": None,
"needs_processing": False,
}

# Determine overall status
if has_failed:
result["overall_status"] = "failed"
elif has_creating:
elif has_active_processing:
result["overall_status"] = "running"
else:
result["overall_status"] = "complete"
Expand Down
Loading
Loading