Skip to content

Commit cb6a92b

Browse files
committed
feat: update document index status model and related components
- Refactored DocumentIndex to use a simplified status model with new states: pending, creating, active, deleting, deletion_in_progress, and failed. - Updated related schemas, services, and frontend components to reflect the new status model. - Removed old desired_state and actual_state fields, streamlining the index lifecycle management. - Adjusted API responses and documentation to align with the new status definitions.
1 parent 6b7cf4d commit cb6a92b

17 files changed

Lines changed: 487 additions & 334 deletions

File tree

aperag/api/components/schemas/document.yaml

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,33 @@ document:
1818
vector_index_status:
1919
type: string
2020
enum:
21-
- PENDING
22-
- RUNNING
23-
- COMPLETE
24-
- FAILED
25-
- SKIPPED
21+
- pending
22+
- creating
23+
- active
24+
- deleting
25+
- deletion_in_progress
26+
- failed
27+
- skipped
2628
fulltext_index_status:
2729
type: string
2830
enum:
29-
- PENDING
30-
- RUNNING
31-
- COMPLETE
32-
- FAILED
33-
- SKIPPED
31+
- pending
32+
- creating
33+
- active
34+
- deleting
35+
- deletion_in_progress
36+
- failed
37+
- skipped
3438
graph_index_status:
3539
type: string
3640
enum:
37-
- PENDING
38-
- RUNNING
39-
- COMPLETE
40-
- FAILED
41-
- SKIPPED
41+
- pending
42+
- creating
43+
- active
44+
- deleting
45+
- deletion_in_progress
46+
- failed
47+
- skipped
4248
vector_index_updated:
4349
type: string
4450
format: date-time

aperag/db/models.py

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -159,22 +159,19 @@ class LightRAGDocStatus(str, Enum):
159159
FAILED = "failed"
160160

161161

162-
# Add new enums for K8s-inspired design
163-
class IndexDesiredState(str, Enum):
164-
"""Desired state for index - what we want"""
162+
# Document index status for simplified single-status model
163+
class DocumentIndexStatus(str, Enum):
164+
"""Document index lifecycle status"""
165165

166-
PRESENT = "present"
167-
ABSENT = "absent"
166+
PENDING = "pending" # Waiting for processing (create/update)
167+
CREATING = "creating" # Index creation/update task in progress
168+
ACTIVE = "active" # Index is up-to-date and ready for use
169+
DELETING = "deleting" # Deletion has been requested
170+
DELETION_IN_PROGRESS = "deletion_in_progress" # Index deletion task in progress
171+
FAILED = "failed" # The last operation failed
168172

169173

170-
class IndexActualState(str, Enum):
171-
"""Actual state for index - what currently exists"""
172174

173-
ABSENT = "absent"
174-
CREATING = "creating"
175-
PRESENT = "present"
176-
DELETING = "deleting"
177-
FAILED = "failed"
178175

179176

180177
# Models
@@ -243,13 +240,13 @@ def get_overall_index_status(self, session) -> "DocumentStatus":
243240
if not document_indexes:
244241
return DocumentStatus.PENDING
245242

246-
states = [idx.actual_state for idx in document_indexes]
243+
statuses = [idx.status for idx in document_indexes]
247244

248-
if any(state == IndexActualState.FAILED for state in states):
245+
if any(status == DocumentIndexStatus.FAILED for status in statuses):
249246
return DocumentStatus.FAILED
250-
elif any(state == IndexActualState.CREATING for state in states):
247+
elif any(status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS] for status in statuses):
251248
return DocumentStatus.RUNNING
252-
elif all(state == IndexActualState.PRESENT for state in states):
249+
elif all(status == DocumentIndexStatus.ACTIVE for status in statuses):
253250
return DocumentStatus.COMPLETE
254251
else:
255252
return DocumentStatus.PENDING
@@ -700,7 +697,7 @@ class LightRAGLLMCacheModel(Base):
700697

701698

702699
class DocumentIndex(Base):
703-
"""Document index - combines spec and status into single table"""
700+
"""Document index - simplified single status model"""
704701

705702
__tablename__ = "document_index"
706703
__table_args__ = (UniqueConstraint("document_id", "index_type", name="uq_document_index"),)
@@ -709,14 +706,13 @@ class DocumentIndex(Base):
709706
document_id = Column(String(24), nullable=False, index=True)
710707
index_type = Column(EnumColumn(DocumentIndexType), nullable=False, index=True)
711708

712-
# Desired state (spec) fields
713-
desired_state = Column(EnumColumn(IndexDesiredState), nullable=False, default=IndexDesiredState.PRESENT, index=True)
709+
# Single status field for lifecycle management
710+
status = Column(EnumColumn(DocumentIndexStatus), nullable=False, default=DocumentIndexStatus.PENDING, index=True)
714711
version = Column(Integer, nullable=False, default=1) # Incremented on each spec change
715-
created_by = Column(String(256), nullable=False) # User who created this spec
716-
717-
# Actual state (status) fields
718-
actual_state = Column(EnumColumn(IndexActualState), nullable=False, default=IndexActualState.ABSENT, index=True)
719712
observed_version = Column(Integer, nullable=False, default=0) # Last processed spec version
713+
created_by = Column(String(256), nullable=False) # User who created this spec
714+
715+
# Index-specific data and error information
720716
index_data = Column(Text, nullable=True) # JSON string for index-specific data
721717
error_message = Column(Text, nullable=True)
722718

@@ -726,26 +722,28 @@ class DocumentIndex(Base):
726722
gmt_last_reconciled = Column(DateTime(timezone=True), nullable=True) # Last reconciliation attempt
727723

728724
def __repr__(self):
729-
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, desired={self.desired_state}, actual={self.actual_state})>"
725+
return f"<DocumentIndex(id={self.id}, document_id={self.document_id}, type={self.index_type}, status={self.status})>"
730726

731-
def is_in_sync(self) -> bool:
732-
"""Check if desired and actual states are in sync"""
733-
if self.observed_version < self.version:
734-
return False
727+
def is_pending_processing(self) -> bool:
728+
"""Check if index needs processing (create or update)"""
729+
return (self.status == DocumentIndexStatus.PENDING and
730+
self.observed_version < self.version)
735731

736-
if self.desired_state == IndexDesiredState.PRESENT:
737-
return self.actual_state == IndexActualState.PRESENT
738-
elif self.desired_state == IndexDesiredState.ABSENT:
739-
return self.actual_state == IndexActualState.ABSENT
740-
return False
732+
def is_ready_for_deletion(self) -> bool:
733+
"""Check if index is ready for deletion"""
734+
return self.status == DocumentIndexStatus.DELETING
741735

742-
def update_spec(self, desired_state: IndexDesiredState = None, created_by: str = None):
743-
"""Update the spec (desired state) part"""
744-
if desired_state is not None:
745-
self.desired_state = desired_state
736+
def update_spec(self, created_by: str = None):
737+
"""Update the spec to trigger re-processing"""
746738
if created_by is not None:
747739
self.created_by = created_by
748740
self.version += 1
741+
self.status = DocumentIndexStatus.PENDING # Reset to pending for reprocessing
742+
self.gmt_updated = utc_now()
743+
744+
def mark_for_deletion(self):
745+
"""Mark index for deletion"""
746+
self.status = DocumentIndexStatus.DELETING
749747
self.gmt_updated = utc_now()
750748

751749

aperag/index/base.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class IndexStatus(Enum):
3333
RUNNING = "running"
3434
COMPLETE = "complete"
3535
FAILED = "failed"
36-
SKIPPED = "skipped"
3736

3837

3938
@dataclass

aperag/index/manager.py

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from sqlalchemy import and_, select
1919
from sqlalchemy.ext.asyncio import AsyncSession
2020

21-
from aperag.db.models import DocumentIndex, DocumentIndexType, IndexActualState, IndexDesiredState, utc_now
21+
from aperag.db.models import DocumentIndex, DocumentIndexType, DocumentIndexStatus, utc_now
2222

2323
logger = logging.getLogger(__name__)
2424

@@ -51,14 +51,14 @@ async def create_document_indexes(
5151

5252
if existing_index:
5353
# Update existing index
54-
existing_index.update_spec(IndexDesiredState.PRESENT, user)
54+
existing_index.update_spec(user)
5555
logger.debug(f"Updated index for {document_id}:{index_type} to version {existing_index.version}")
5656
else:
57-
# Create new index
57+
# Create new index with PENDING status
5858
doc_index = DocumentIndex(
5959
document_id=document_id,
6060
index_type=index_type,
61-
desired_state=IndexDesiredState.PRESENT,
61+
status=DocumentIndexStatus.PENDING,
6262
version=1,
6363
created_by=user,
6464
)
@@ -79,9 +79,8 @@ async def update_document_indexes(self, session: AsyncSession, document_id: str)
7979
indexes = result.scalars().all()
8080

8181
for index in indexes:
82-
if index.desired_state == IndexDesiredState.PRESENT:
83-
index.version += 1 # Increment version to trigger re-indexing
84-
index.gmt_updated = utc_now()
82+
# Reset to PENDING to trigger re-indexing for existing indexes
83+
index.update_spec()
8584

8685
async def delete_document_indexes(
8786
self, session: AsyncSession, document_id: str, index_types: Optional[List[DocumentIndexType]] = None
@@ -105,7 +104,7 @@ async def delete_document_indexes(
105104
doc_index = result.scalar_one_or_none()
106105

107106
if doc_index:
108-
doc_index.update_spec(IndexDesiredState.ABSENT)
107+
doc_index.mark_for_deletion()
109108

110109
async def rebuild_document_indexes(
111110
self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType]
@@ -131,13 +130,9 @@ async def rebuild_document_indexes(
131130
doc_index = result.scalar_one_or_none()
132131

133132
if doc_index:
134-
# Only rebuild if the index is present or failed
135-
if doc_index.desired_state == IndexDesiredState.PRESENT:
136-
doc_index.version += 1 # Increment version to trigger re-indexing
137-
doc_index.gmt_updated = utc_now()
138-
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
139-
else:
140-
logger.warning(f"Cannot rebuild {index_type.value} index for document {document_id}: index not present")
133+
# Reset to PENDING to trigger re-indexing
134+
doc_index.update_spec()
135+
logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}")
141136
else:
142137
logger.warning(f"No {index_type.value} index found for document {document_id}")
143138

@@ -150,39 +145,54 @@ async def get_document_index_status(self, session: AsyncSession, document_id: st
150145
document_id: Document ID
151146
152147
Returns:
153-
Dictionary with index status information
148+
Dictionary with index status information including update times
154149
"""
155150
# Get all indexes for the document
156151
stmt = select(DocumentIndex).where(DocumentIndex.document_id == document_id)
157-
result = await session.execute(stmt)
158-
indexes = result.scalars().all()
152+
query_result = await session.execute(stmt)
153+
indexes = query_result.scalars().all()
159154

160-
# Build result
155+
# Build result with all possible index types
161156
result = {"document_id": document_id, "indexes": {}, "overall_status": "complete"}
162157

163-
has_creating = False
158+
has_active_processing = False
164159
has_failed = False
165160

166-
for index in indexes:
167-
index_info = {
168-
"type": index.index_type,
169-
"desired_state": index.desired_state,
170-
"actual_state": index.actual_state,
171-
"in_sync": index.is_in_sync(),
172-
}
173-
174-
if index.actual_state == IndexActualState.CREATING:
175-
has_creating = True
176-
elif index.actual_state == IndexActualState.FAILED:
177-
has_failed = True
178-
index_info["error"] = index.error_message
179-
180-
result["indexes"][index.index_type] = index_info
161+
# Create a map of existing indexes
162+
existing_indexes = {index.index_type: index for index in indexes}
163+
164+
# Process all possible index types
165+
for index_type in [DocumentIndexType.VECTOR, DocumentIndexType.FULLTEXT, DocumentIndexType.GRAPH]:
166+
if index_type in existing_indexes:
167+
# Index exists in database
168+
index = existing_indexes[index_type]
169+
index_info = {
170+
"type": index.index_type,
171+
"status": index.status,
172+
"updated_time": index.gmt_updated,
173+
"needs_processing": index.is_pending_processing(),
174+
}
175+
176+
if index.status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS]:
177+
has_active_processing = True
178+
elif index.status == DocumentIndexStatus.FAILED:
179+
has_failed = True
180+
index_info["error"] = index.error_message
181+
182+
result["indexes"][index.index_type] = index_info
183+
else:
184+
# Index doesn't exist in database - show as skipped
185+
result["indexes"][index_type] = {
186+
"type": index_type,
187+
"status": "skipped",
188+
"updated_time": None,
189+
"needs_processing": False,
190+
}
181191

182192
# Determine overall status
183193
if has_failed:
184194
result["overall_status"] = "failed"
185-
elif has_creating:
195+
elif has_active_processing:
186196
result["overall_status"] = "running"
187197
else:
188198
result["overall_status"] = "complete"

0 commit comments

Comments
 (0)