diff --git a/aperag/api/components/schemas/document.yaml b/aperag/api/components/schemas/document.yaml index fdeefc025..6095ed88f 100644 --- a/aperag/api/components/schemas/document.yaml +++ b/aperag/api/components/schemas/document.yaml @@ -18,27 +18,33 @@ document: vector_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped fulltext_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped graph_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped vector_index_updated: type: string format: date-time diff --git a/aperag/db/models.py b/aperag/db/models.py index 030d2e23d..dcdf46e94 100644 --- a/aperag/db/models.py +++ b/aperag/db/models.py @@ -159,22 +159,19 @@ class LightRAGDocStatus(str, Enum): FAILED = "failed" -# Add new enums for K8s-inspired design -class IndexDesiredState(str, Enum): - """Desired state for index - what we want""" +# Document index status for simplified single-status model +class DocumentIndexStatus(str, Enum): + """Document index lifecycle status""" - PRESENT = "present" - ABSENT = "absent" + PENDING = "pending" # Waiting for processing (create/update) + CREATING = "creating" # Index creation/update task in progress + ACTIVE = "active" # Index is up-to-date and ready for use + DELETING = "deleting" # Deletion has been requested + DELETION_IN_PROGRESS = "deletion_in_progress" # Index deletion task in progress + FAILED = "failed" # The last operation failed -class IndexActualState(str, Enum): - """Actual state for index - what currently exists""" - ABSENT = "absent" - CREATING = "creating" - PRESENT = "present" - DELETING = "deleting" - FAILED = "failed" # Models @@ -243,13 +240,13 @@ def get_overall_index_status(self, session) -> "DocumentStatus": if not document_indexes: return DocumentStatus.PENDING - states = [idx.actual_state for idx in document_indexes] + statuses = [idx.status for idx in document_indexes] - if any(state == IndexActualState.FAILED for state in states): + if any(status == DocumentIndexStatus.FAILED for status in statuses): return DocumentStatus.FAILED - elif any(state == IndexActualState.CREATING for state in states): + elif any(status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS] for status in statuses): return DocumentStatus.RUNNING - elif all(state == IndexActualState.PRESENT for state in states): + elif all(status == DocumentIndexStatus.ACTIVE for status in statuses): return DocumentStatus.COMPLETE else: return DocumentStatus.PENDING @@ -700,7 +697,7 @@ class LightRAGLLMCacheModel(Base): class DocumentIndex(Base): - """Document index - combines spec and status into single table""" + """Document index - simplified single status model""" __tablename__ = "document_index" __table_args__ = (UniqueConstraint("document_id", "index_type", name="uq_document_index"),) @@ -709,14 +706,13 @@ class DocumentIndex(Base): document_id = Column(String(24), nullable=False, index=True) index_type = Column(EnumColumn(DocumentIndexType), nullable=False, index=True) - # Desired state (spec) fields - desired_state = Column(EnumColumn(IndexDesiredState), nullable=False, default=IndexDesiredState.PRESENT, index=True) + # Single status field for lifecycle management + status = Column(EnumColumn(DocumentIndexStatus), nullable=False, default=DocumentIndexStatus.PENDING, index=True) version = Column(Integer, nullable=False, default=1) # Incremented on each spec change - created_by = Column(String(256), nullable=False) # User who created this spec - - # Actual state (status) fields - actual_state = Column(EnumColumn(IndexActualState), nullable=False, default=IndexActualState.ABSENT, index=True) observed_version = Column(Integer, nullable=False, default=0) # Last processed spec version + created_by = Column(String(256), nullable=False) # User who created this spec + + # Index-specific data and error information index_data = Column(Text, nullable=True) # JSON string for index-specific data error_message = Column(Text, nullable=True) @@ -726,26 +722,28 @@ class DocumentIndex(Base): gmt_last_reconciled = Column(DateTime(timezone=True), nullable=True) # Last reconciliation attempt def __repr__(self): - return f"" + return f"" - def is_in_sync(self) -> bool: - """Check if desired and actual states are in sync""" - if self.observed_version < self.version: - return False + def is_pending_processing(self) -> bool: + """Check if index needs processing (create or update)""" + return (self.status == DocumentIndexStatus.PENDING and + self.observed_version < self.version) - if self.desired_state == IndexDesiredState.PRESENT: - return self.actual_state == IndexActualState.PRESENT - elif self.desired_state == IndexDesiredState.ABSENT: - return self.actual_state == IndexActualState.ABSENT - return False + def is_ready_for_deletion(self) -> bool: + """Check if index is ready for deletion""" + return self.status == DocumentIndexStatus.DELETING - def update_spec(self, desired_state: IndexDesiredState = None, created_by: str = None): - """Update the spec (desired state) part""" - if desired_state is not None: - self.desired_state = desired_state + def update_spec(self, created_by: str = None): + """Update the spec to trigger re-processing""" if created_by is not None: self.created_by = created_by self.version += 1 + self.status = DocumentIndexStatus.PENDING # Reset to pending for reprocessing + self.gmt_updated = utc_now() + + def mark_for_deletion(self): + """Mark index for deletion""" + self.status = DocumentIndexStatus.DELETING self.gmt_updated = utc_now() diff --git a/aperag/index/base.py b/aperag/index/base.py index 82b8e3f8e..08d75ef58 100644 --- a/aperag/index/base.py +++ b/aperag/index/base.py @@ -33,7 +33,6 @@ class IndexStatus(Enum): RUNNING = "running" COMPLETE = "complete" FAILED = "failed" - SKIPPED = "skipped" @dataclass diff --git a/aperag/index/manager.py b/aperag/index/manager.py index 7bcf8cbdd..9fa522ebb 100644 --- a/aperag/index/manager.py +++ b/aperag/index/manager.py @@ -18,7 +18,7 @@ from sqlalchemy import and_, select from sqlalchemy.ext.asyncio import AsyncSession -from aperag.db.models import DocumentIndex, DocumentIndexType, IndexActualState, IndexDesiredState, utc_now +from aperag.db.models import DocumentIndex, DocumentIndexType, DocumentIndexStatus, utc_now logger = logging.getLogger(__name__) @@ -51,14 +51,14 @@ async def create_document_indexes( if existing_index: # Update existing index - existing_index.update_spec(IndexDesiredState.PRESENT, user) + existing_index.update_spec(user) logger.debug(f"Updated index for {document_id}:{index_type} to version {existing_index.version}") else: - # Create new index + # Create new index with PENDING status doc_index = DocumentIndex( document_id=document_id, index_type=index_type, - desired_state=IndexDesiredState.PRESENT, + status=DocumentIndexStatus.PENDING, version=1, created_by=user, ) @@ -79,9 +79,8 @@ async def update_document_indexes(self, session: AsyncSession, document_id: str) indexes = result.scalars().all() for index in indexes: - if index.desired_state == IndexDesiredState.PRESENT: - index.version += 1 # Increment version to trigger re-indexing - index.gmt_updated = utc_now() + # Reset to PENDING to trigger re-indexing for existing indexes + index.update_spec() async def delete_document_indexes( self, session: AsyncSession, document_id: str, index_types: Optional[List[DocumentIndexType]] = None @@ -105,7 +104,7 @@ async def delete_document_indexes( doc_index = result.scalar_one_or_none() if doc_index: - doc_index.update_spec(IndexDesiredState.ABSENT) + doc_index.mark_for_deletion() async def rebuild_document_indexes( self, session: AsyncSession, document_id: str, index_types: List[DocumentIndexType] @@ -131,13 +130,9 @@ async def rebuild_document_indexes( doc_index = result.scalar_one_or_none() if doc_index: - # Only rebuild if the index is present or failed - if doc_index.desired_state == IndexDesiredState.PRESENT: - doc_index.version += 1 # Increment version to trigger re-indexing - doc_index.gmt_updated = utc_now() - logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}") - else: - logger.warning(f"Cannot rebuild {index_type.value} index for document {document_id}: index not present") + # Reset to PENDING to trigger re-indexing + doc_index.update_spec() + logger.info(f"Triggered rebuild for {index_type.value} index of document {document_id}") else: logger.warning(f"No {index_type.value} index found for document {document_id}") @@ -150,39 +145,54 @@ async def get_document_index_status(self, session: AsyncSession, document_id: st document_id: Document ID Returns: - Dictionary with index status information + Dictionary with index status information including update times """ # Get all indexes for the document stmt = select(DocumentIndex).where(DocumentIndex.document_id == document_id) - result = await session.execute(stmt) - indexes = result.scalars().all() + query_result = await session.execute(stmt) + indexes = query_result.scalars().all() - # Build result + # Build result with all possible index types result = {"document_id": document_id, "indexes": {}, "overall_status": "complete"} - has_creating = False + has_active_processing = False has_failed = False - for index in indexes: - index_info = { - "type": index.index_type, - "desired_state": index.desired_state, - "actual_state": index.actual_state, - "in_sync": index.is_in_sync(), - } - - if index.actual_state == IndexActualState.CREATING: - has_creating = True - elif index.actual_state == IndexActualState.FAILED: - has_failed = True - index_info["error"] = index.error_message - - result["indexes"][index.index_type] = index_info + # Create a map of existing indexes + existing_indexes = {index.index_type: index for index in indexes} + + # Process all possible index types + for index_type in [DocumentIndexType.VECTOR, DocumentIndexType.FULLTEXT, DocumentIndexType.GRAPH]: + if index_type in existing_indexes: + # Index exists in database + index = existing_indexes[index_type] + index_info = { + "type": index.index_type, + "status": index.status, + "updated_time": index.gmt_updated, + "needs_processing": index.is_pending_processing(), + } + + if index.status in [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS]: + has_active_processing = True + elif index.status == DocumentIndexStatus.FAILED: + has_failed = True + index_info["error"] = index.error_message + + result["indexes"][index.index_type] = index_info + else: + # Index doesn't exist in database - show as skipped + result["indexes"][index_type] = { + "type": index_type, + "status": "skipped", + "updated_time": None, + "needs_processing": False, + } # Determine overall status if has_failed: result["overall_status"] = "failed" - elif has_creating: + elif has_active_processing: result["overall_status"] = "running" else: result["overall_status"] = "complete" diff --git a/aperag/index/reconciler.py b/aperag/index/reconciler.py index 9a27438b7..f88350a73 100644 --- a/aperag/index/reconciler.py +++ b/aperag/index/reconciler.py @@ -24,9 +24,8 @@ Document, DocumentIndex, DocumentIndexType, + DocumentIndexStatus, DocumentStatus, - IndexActualState, - IndexDesiredState, ) from aperag.tasks.scheduler import TaskScheduler, create_task_scheduler from aperag.utils.utils import utc_now @@ -47,31 +46,22 @@ def _get_reconciliation_conditions(operation_type: str, document_ids: List[str] This is the authoritative source for determining which indexes can be processed. Args: - operation_type: 'create', 'update', or 'delete' + operation_type: 'create_update' or 'delete' document_ids: Optional list of document IDs to filter by Returns: List of SQLAlchemy conditions """ - if operation_type in ["create", "update"]: + if operation_type == "create_update": conditions = [ - DocumentIndex.desired_state == IndexDesiredState.PRESENT, - # Need reconciliation: either version mismatch or state mismatch - or_( - DocumentIndex.observed_version < DocumentIndex.version, - DocumentIndex.actual_state.in_([IndexActualState.ABSENT, IndexActualState.FAILED]), - ), - # For create/update operations, exclude both CREATING and DELETING states - DocumentIndex.actual_state.notin_([IndexActualState.CREATING, IndexActualState.DELETING]), + # Indexes that are pending and need processing + DocumentIndex.status == DocumentIndexStatus.PENDING, + DocumentIndex.observed_version < DocumentIndex.version, ] elif operation_type == "delete": conditions = [ - DocumentIndex.desired_state == IndexDesiredState.ABSENT, - # Only delete indexes that actually exist or are being created - DocumentIndex.actual_state.in_([IndexActualState.CREATING, IndexActualState.PRESENT]), - # For delete operations, allow claiming indexes in CREATING state to enable deletion override - # Only exclude DELETING state to prevent concurrent deletions - DocumentIndex.actual_state != IndexActualState.DELETING, + # Indexes that are marked for deletion + DocumentIndex.status == DocumentIndexStatus.DELETING, ] else: raise ValueError(f"Unknown operation_type: {operation_type}") @@ -102,20 +92,13 @@ def reconcile_all(self, document_ids: List[str] = None): # Group by document ID and operation type for batch processing from collections import defaultdict - doc_operations = defaultdict(lambda: {"create": [], "update": [], "delete": []}) + doc_operations = defaultdict(lambda: {"create_update": [], "delete": []}) for doc_index in all_indexes_needing_reconciliation: # Group operations by document and type - if doc_index.desired_state == IndexDesiredState.PRESENT: - # Check if this is an update (version mismatch with existing index data) or creation - if doc_index.index_data and doc_index.observed_version > 0: - # Index has data and was observed before - this is an update - operation_type = "update" - else: - # No existing data or never observed - this is a creation - operation_type = "create" - doc_operations[doc_index.document_id][operation_type].append(doc_index) - elif doc_index.desired_state == IndexDesiredState.ABSENT: + if doc_index.status == DocumentIndexStatus.PENDING: + doc_operations[doc_index.document_id]["create_update"].append(doc_index) + elif doc_index.status == DocumentIndexStatus.DELETING: doc_operations[doc_index.document_id]["delete"].append(doc_index) logger.info(f"Found {len(doc_operations)} documents need to be reconciled") @@ -142,20 +125,20 @@ def _get_indexes_needing_reconciliation( State modifications will happen in individual document transactions. """ # Use shared reconciliation conditions - create_conditions = self._get_reconciliation_conditions("create", document_ids) + create_update_conditions = self._get_reconciliation_conditions("create_update", document_ids) delete_conditions = self._get_reconciliation_conditions("delete", document_ids) - # Query for indexes that need creating - create_stmt = select(DocumentIndex).where(and_(*create_conditions)) - create_result = session.execute(create_stmt) - create_indexes = create_result.scalars().all() + # Query for indexes that need creating/updating + create_update_stmt = select(DocumentIndex).where(and_(*create_update_conditions)) + create_update_result = session.execute(create_update_stmt) + create_update_indexes = create_update_result.scalars().all() # Query for indexes that need deleting delete_stmt = select(DocumentIndex).where(and_(*delete_conditions)) delete_result = session.execute(delete_stmt) delete_indexes = delete_result.scalars().all() - all_indexes = list(create_indexes) + list(delete_indexes) + all_indexes = list(create_update_indexes) + list(delete_indexes) logger.debug(f"Found {len(all_indexes)} indexes needing reconciliation") return all_indexes @@ -190,10 +173,10 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to """ try: for index_id, operation_type in indexes_to_claim: - if operation_type in ["create", "update"]: - target_state = IndexActualState.CREATING + if operation_type == "create_update": + target_status = DocumentIndexStatus.CREATING elif operation_type == "delete": - target_state = IndexActualState.DELETING + target_status = DocumentIndexStatus.DELETION_IN_PROGRESS else: continue @@ -209,7 +192,7 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to update_stmt = ( update(DocumentIndex) .where(and_(*where_conditions)) - .values(actual_state=target_state, gmt_updated=utc_now(), gmt_last_reconciled=utc_now()) + .values(status=target_status, gmt_updated=utc_now(), gmt_last_reconciled=utc_now()) ) result = session.execute(update_stmt) @@ -227,32 +210,33 @@ def _claim_document_indexes(self, session: Session, document_id: str, indexes_to def _reconcile_document_operations(self, document_id: str, operations: dict): """ Reconcile operations for a single document, using batch processing when possible - States are already updated to CREATING/DELETING before calling this method + States are already updated to CREATING/DELETION_IN_PROGRESS before calling this method """ - create_index_types = [] - for doc_index in operations["create"]: - create_index_types.append(doc_index.index_type) - if create_index_types: - # Add document_id to task for better idempotency checking - task_id = f"create_index_{document_id}_{int(time.time())}" + create_update_index_types = [] + for doc_index in operations["create_update"]: + create_update_index_types.append(doc_index.index_type) + if create_update_index_types: + # All indexes in a single document update batch should have the same version + version_to_process = operations["create_update"][0].version + # Add document_id and version to task for better idempotency checking and logging + task_id = f"create_update_index_{document_id}_{version_to_process}_{int(time.time())}" + + # Create task context with all metadata + task_context = { + "version": version_to_process, + "operation": "create_update", + "created_at": time.time(), + } + self.task_scheduler.schedule_create_index( - index_types=create_index_types, document_id=document_id, task_id=task_id - ) - logger.info( - f"Scheduled create index task {task_id} for document {document_id} with types {create_index_types}" - ) - - update_index_types = [] - for doc_index in operations["update"]: - update_index_types.append(doc_index.index_type) - if update_index_types: - task_id = f"update_index_{document_id}_{int(time.time())}" - self.task_scheduler.schedule_update_index( - index_types=update_index_types, document_id=document_id, task_id=task_id + index_types=create_update_index_types, + document_id=document_id, + task_context=task_context, + task_id=task_id, ) logger.info( - f"Scheduled update index task {task_id} for document {document_id} with types {update_index_types}" + f"Scheduled create/update index task {task_id} for document {document_id} (version {version_to_process}) with types {create_update_index_types}" ) delete_index_types = [] @@ -288,8 +272,13 @@ def _update_document_status(document_id: str, session: Session): session.add(document) @staticmethod - def on_index_created(document_id: str, index_type: str, index_data: str = None): + def on_index_created(document_id: str, index_type: str, task_context: dict, index_data: str = None): """Called when index creation succeeds""" + version_processed = task_context.get('version') + if version_processed is None: + logger.error(f"Missing version in task_context for index creation callback: {task_context}") + return + for session in get_sync_session(): # Use atomic update with state validation update_stmt = ( @@ -298,12 +287,13 @@ def on_index_created(document_id: str, index_type: str, index_data: str = None): and_( DocumentIndex.document_id == document_id, DocumentIndex.index_type == DocumentIndexType(index_type), - DocumentIndex.actual_state == IndexActualState.CREATING, # Only allow transition from CREATING + DocumentIndex.status == DocumentIndexStatus.CREATING, # Only allow transition from CREATING + DocumentIndex.version == version_processed, # Crucial check for the specific version ) ) .values( - actual_state=IndexActualState.PRESENT, - observed_version=DocumentIndex.version, # Mark as processed + status=DocumentIndexStatus.ACTIVE, + observed_version=version_processed, # Mark this specific version as processed index_data=index_data, error_message=None, gmt_updated=utc_now(), @@ -314,17 +304,24 @@ def on_index_created(document_id: str, index_type: str, index_data: str = None): result = session.execute(update_stmt) if result.rowcount > 0: IndexTaskCallbacks._update_document_status(document_id, session) - logger.info(f"{index_type} index creation completed for document {document_id}") + logger.info( + f"V{version_processed} {index_type} index creation completed for document {document_id}" + ) session.commit() else: logger.warning( - f"Index creation callback ignored for document {document_id} type {index_type} - not in CREATING state" + f"Index creation callback ignored for document {document_id} (version {version_processed}, type {index_type}) - not in CREATING state or version mismatch" ) session.rollback() @staticmethod - def on_index_failed(document_id: str, index_type: str, error_message: str): + def on_index_failed(document_id: str, index_type: str, task_context: dict, error_message: str): """Called when index operation fails""" + version_processed = task_context.get('version') + if version_processed is None: + logger.error(f"Missing version in task_context for index failure callback: {task_context}") + return + for session in get_sync_session(): # Use atomic update with state validation update_stmt = ( @@ -333,12 +330,15 @@ def on_index_failed(document_id: str, index_type: str, error_message: str): and_( DocumentIndex.document_id == document_id, DocumentIndex.index_type == DocumentIndexType(index_type), - # Only allow transition from CREATING or DELETING states - DocumentIndex.actual_state.in_([IndexActualState.CREATING, IndexActualState.DELETING]), + # Only allow transition from CREATING or DELETION_IN_PROGRESS states + DocumentIndex.status.in_( + [DocumentIndexStatus.CREATING, DocumentIndexStatus.DELETION_IN_PROGRESS] + ), + DocumentIndex.version == version_processed, # Crucial check for the specific version ) ) .values( - actual_state=IndexActualState.FAILED, + status=DocumentIndexStatus.FAILED, error_message=error_message, gmt_updated=utc_now(), gmt_last_reconciled=utc_now(), @@ -348,46 +348,43 @@ def on_index_failed(document_id: str, index_type: str, error_message: str): result = session.execute(update_stmt) if result.rowcount > 0: IndexTaskCallbacks._update_document_status(document_id, session) - logger.error(f"{index_type} index operation failed for document {document_id}: {error_message}") + logger.error( + f"V{version_processed} {index_type} index operation failed for document {document_id}: {error_message}" + ) session.commit() else: logger.warning( - f"Index failure callback ignored for document {document_id} type {index_type} - not in CREATING or DELETING state" + f"Index failure callback ignored for document {document_id} (version {version_processed}, type {index_type}) - not in correct state or version mismatch" ) session.rollback() @staticmethod def on_index_deleted(document_id: str, index_type: str): - """Called when index deletion succeeds""" + """Called when index deletion succeeds - hard delete the record""" for session in get_sync_session(): - # Use atomic update with state validation - update_stmt = ( - update(DocumentIndex) + # Hard delete the record from database + delete_stmt = ( + select(DocumentIndex) .where( and_( DocumentIndex.document_id == document_id, DocumentIndex.index_type == DocumentIndexType(index_type), - DocumentIndex.actual_state == IndexActualState.DELETING, # Only allow transition from DELETING + DocumentIndex.status == DocumentIndexStatus.DELETION_IN_PROGRESS, # Only allow deletion from DELETION_IN_PROGRESS ) ) - .values( - actual_state=IndexActualState.ABSENT, - observed_version=DocumentIndex.version, # Mark as processed - index_data=None, - error_message=None, - gmt_updated=utc_now(), - gmt_last_reconciled=utc_now(), - ) ) - result = session.execute(update_stmt) - if result.rowcount > 0: + result = session.execute(delete_stmt) + index_to_delete = result.scalar_one_or_none() + + if index_to_delete: + session.delete(index_to_delete) IndexTaskCallbacks._update_document_status(document_id, session) logger.info(f"{index_type} index deletion completed for document {document_id}") session.commit() else: logger.warning( - f"Index deletion callback ignored for document {document_id} type {index_type} - not in DELETING state" + f"Index deletion callback ignored for document {document_id} type {index_type} - not in DELETION_IN_PROGRESS state" ) session.rollback() diff --git a/aperag/migration/versions/20250624002818-63d90cdaba57.py b/aperag/migration/versions/20250624002818-63d90cdaba57.py new file mode 100644 index 000000000..49cb18e7f --- /dev/null +++ b/aperag/migration/versions/20250624002818-63d90cdaba57.py @@ -0,0 +1,62 @@ +"""empty message + +Revision ID: 63d90cdaba57 +Revises: 23c0533b6b63 +Create Date: 2025-06-24 00:28:18.326732 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision: str = '63d90cdaba57' +down_revision: Union[str, None] = '23c0533b6b63' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('document_index', sa.Column('status', sa.Enum('pending', 'creating', 'active', 'deleting', 'deletion_in_progress', 'failed', name='documentindexstatus'), nullable=False)) + op.alter_column('document_index', 'observed_version', + existing_type=sa.INTEGER(), + nullable=False) + op.drop_index(op.f('ix_document_indexes_actual_state'), table_name='document_index') + op.drop_index(op.f('ix_document_indexes_desired_state'), table_name='document_index') + op.drop_index(op.f('ix_document_indexes_document_id'), table_name='document_index') + op.drop_index(op.f('ix_document_indexes_id'), table_name='document_index') + op.drop_index(op.f('ix_document_indexes_index_type'), table_name='document_index') + op.create_index(op.f('ix_document_index_document_id'), 'document_index', ['document_id'], unique=False) + op.create_index(op.f('ix_document_index_id'), 'document_index', ['id'], unique=False) + op.create_index(op.f('ix_document_index_index_type'), 'document_index', ['index_type'], unique=False) + op.create_index(op.f('ix_document_index_status'), 'document_index', ['status'], unique=False) + op.drop_column('document_index', 'desired_state') + op.drop_column('document_index', 'actual_state') + op.create_unique_constraint('uq_lightrag_doc_status_workspace_id', 'lightrag_doc_status', ['workspace', 'id']) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint('uq_lightrag_doc_status_workspace_id', 'lightrag_doc_status', type_='unique') + op.add_column('document_index', sa.Column('actual_state', postgresql.ENUM('absent', 'creating', 'present', 'deleting', 'failed', name='indexactualstate'), autoincrement=False, nullable=True)) + op.add_column('document_index', sa.Column('desired_state', postgresql.ENUM('present', 'absent', name='indexdesiredstate'), autoincrement=False, nullable=False)) + op.drop_index(op.f('ix_document_index_status'), table_name='document_index') + op.drop_index(op.f('ix_document_index_index_type'), table_name='document_index') + op.drop_index(op.f('ix_document_index_id'), table_name='document_index') + op.drop_index(op.f('ix_document_index_document_id'), table_name='document_index') + op.create_index(op.f('ix_document_indexes_index_type'), 'document_index', ['index_type'], unique=False) + op.create_index(op.f('ix_document_indexes_id'), 'document_index', ['id'], unique=False) + op.create_index(op.f('ix_document_indexes_document_id'), 'document_index', ['document_id'], unique=False) + op.create_index(op.f('ix_document_indexes_desired_state'), 'document_index', ['desired_state'], unique=False) + op.create_index(op.f('ix_document_indexes_actual_state'), 'document_index', ['actual_state'], unique=False) + op.alter_column('document_index', 'observed_version', + existing_type=sa.INTEGER(), + nullable=True) + op.drop_column('document_index', 'status') + # ### end Alembic commands ### diff --git a/aperag/schema/view_models.py b/aperag/schema/view_models.py index d34cbdcbf..afe5d2d73 100644 --- a/aperag/schema/view_models.py +++ b/aperag/schema/view_models.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: openapi.merged.yaml -# timestamp: 2025-06-23T10:33:05+00:00 +# timestamp: 2025-06-23T16:25:43+00:00 from __future__ import annotations @@ -539,13 +539,37 @@ class Document(BaseModel): ] ] = None vector_index_status: Optional[ - Literal['PENDING', 'RUNNING', 'COMPLETE', 'FAILED', 'SKIPPED'] + Literal[ + 'pending', + 'creating', + 'active', + 'deleting', + 'deletion_in_progress', + 'failed', + 'skipped', + ] ] = None fulltext_index_status: Optional[ - Literal['PENDING', 'RUNNING', 'COMPLETE', 'FAILED', 'SKIPPED'] + Literal[ + 'pending', + 'creating', + 'active', + 'deleting', + 'deletion_in_progress', + 'failed', + 'skipped', + ] ] = None graph_index_status: Optional[ - Literal['PENDING', 'RUNNING', 'COMPLETE', 'FAILED', 'SKIPPED'] + Literal[ + 'pending', + 'creating', + 'active', + 'deleting', + 'deletion_in_progress', + 'failed', + 'skipped', + ] ] = None vector_index_updated: Optional[datetime] = Field( None, description='Vector index last updated time' diff --git a/aperag/service/document_service.py b/aperag/service/document_service.py index b1809e699..01a4bc0ca 100644 --- a/aperag/service/document_service.py +++ b/aperag/service/document_service.py @@ -84,60 +84,18 @@ async def build_document_response( # Get index status from new tables index_status_info = await document_index_manager.get_document_index_status(session, document.id) - # Convert new format to old API format for backward compatibility indexes = index_status_info.get("indexes", {}) - # Map new states to old enum values for API compatibility - def map_state_to_old_enum(actual_state: str): - if actual_state == "absent": - return "SKIPPED" - elif actual_state == "creating": - return "RUNNING" - elif actual_state == "present": - return "COMPLETE" - elif actual_state == "failed": - return "FAILED" - else: - return "PENDING" - - # Get individual index update times from DocumentIndex table - from sqlalchemy import select - from aperag.db.models import DocumentIndex, DocumentIndexType - - vector_updated = None - fulltext_updated = None - graph_updated = None - - # Query for each index type's update time - for index_type, var_name in [ - (DocumentIndexType.VECTOR, 'vector_updated'), - (DocumentIndexType.FULLTEXT, 'fulltext_updated'), - (DocumentIndexType.GRAPH, 'graph_updated') - ]: - stmt = select(DocumentIndex).where( - DocumentIndex.document_id == document.id, - DocumentIndex.index_type == index_type - ) - result = await session.execute(stmt) - index_record = result.scalar_one_or_none() - if index_record: - if var_name == 'vector_updated': - vector_updated = index_record.gmt_updated - elif var_name == 'fulltext_updated': - fulltext_updated = index_record.gmt_updated - elif var_name == 'graph_updated': - graph_updated = index_record.gmt_updated - return Document( id=document.id, name=document.name, status=document.status, - vector_index_status=map_state_to_old_enum(indexes.get("vector", {}).get("actual_state", "absent")), - fulltext_index_status=map_state_to_old_enum(indexes.get("fulltext", {}).get("actual_state", "absent")), - graph_index_status=map_state_to_old_enum(indexes.get("graph", {}).get("actual_state", "absent")), - vector_index_updated=vector_updated, - fulltext_index_updated=fulltext_updated, - graph_index_updated=graph_updated, + vector_index_status=indexes.get("vector", {}).get("status"), + fulltext_index_status=indexes.get("fulltext", {}).get("status"), + graph_index_status=indexes.get("graph", {}).get("status"), + vector_index_updated=indexes.get("vector", {}).get("updated_time"), + fulltext_index_updated=indexes.get("fulltext", {}).get("updated_time"), + graph_index_updated=indexes.get("graph", {}).get("updated_time"), size=document.size, created=document.gmt_created, updated=document.gmt_updated, diff --git a/aperag/tasks/scheduler.py b/aperag/tasks/scheduler.py index 9319b044f..f248d84f9 100644 --- a/aperag/tasks/scheduler.py +++ b/aperag/tasks/scheduler.py @@ -16,6 +16,7 @@ from abc import ABC, abstractmethod from typing import Any, List, Optional +from aperag.db.models import DocumentIndexType from aperag.tasks.utils import cleanup_local_document, parse_document_content logger = logging.getLogger(__name__) @@ -35,14 +36,17 @@ class TaskScheduler(ABC): """Abstract base class for task schedulers""" @abstractmethod - def schedule_create_index(self, document_id: str, index_types: List[str], **kwargs) -> str: + def schedule_create_index( + self, index_types: List[DocumentIndexType], document_id: str, task_context: dict, task_id: str = None + ): """ - Schedule single index creation task (legacy support) + Schedule single index creation task Args: document_id: Document ID to process index_types: List of index types (vector, fulltext, graph) - **kwargs: Additional arguments + task_context: Task metadata context (version, etc.) + task_id: Task ID for tracking (optional) Returns: Task ID for tracking @@ -236,20 +240,32 @@ def get_task_status(self, task_id: str) -> Optional[TaskResult]: class CeleryTaskScheduler(TaskScheduler): """Celery implementation of TaskScheduler - Direct workflow execution""" - def schedule_create_index(self, document_id: str, index_types: List[str], **kwargs) -> str: - """Schedule index creation workflow""" - from config.celery_tasks import create_document_indexes_workflow + def schedule_create_index( + self, index_types: List["DocumentIndexType"], document_id: str, task_context: dict, task_id: str = None + ) -> str: + """Schedule index creation tasks - one task per index type""" + from config.celery_tasks import create_index_task + from celery import group try: - # Execute workflow and return AsyncResult ID (not calling .get()) - workflow_result = create_document_indexes_workflow(document_id, index_types) - workflow_id = workflow_result.id # Use .id instead of .get('workflow_id') + version = task_context.get('version', 'unknown') + + # Create individual tasks for each index type + task_signatures = [] + for index_type in index_types: + task_sig = create_index_task.s(document_id, index_type.value, task_context) + task_signatures.append(task_sig) + + # Execute tasks in parallel using group + group_result = group(task_signatures).apply_async() + + index_types_str = [it.value for it in index_types] logger.debug( - f"Scheduled create indexes workflow {workflow_id} for document {document_id} with types {index_types}" + f"Scheduled create index group {group_result.id} for document {document_id} version {version} with types {index_types_str}" ) - return workflow_id + return group_result.id except Exception as e: - logger.error(f"Failed to schedule create indexes workflow for document {document_id}: {str(e)}") + logger.error(f"Failed to schedule create index tasks for document {document_id}: {str(e)}") raise def schedule_update_index(self, document_id: str, index_types: List[str], **kwargs) -> str: diff --git a/config/celery_tasks.py b/config/celery_tasks.py index 059f682aa..341480ebc 100644 --- a/config/celery_tasks.py +++ b/config/celery_tasks.py @@ -133,6 +133,17 @@ def _handle_index_success(self, document_id: str, index_type: str, index_data: d except Exception as e: logger.warning(f"Failed to execute index success callback for {index_type} of {document_id}: {e}", exc_info=True) + def _handle_index_success_with_context(self, document_id: str, index_type: str, task_context: dict, index_data: dict = None): + try: + from aperag.index.reconciler import index_task_callbacks + index_data_json = json.dumps(index_data) if index_data else None + index_task_callbacks.on_index_created(document_id, index_type, task_context, index_data_json) + version = task_context.get('version', 'unknown') + logger.info(f"Index success callback executed for {index_type} index of document {document_id} version {version}") + except Exception as e: + version = task_context.get('version', 'unknown') + logger.warning(f"Failed to execute index success callback for {index_type} of {document_id} version {version}: {e}", exc_info=True) + def _handle_index_deletion_success(self, document_id: str, index_type: str): try: from aperag.index.reconciler import index_task_callbacks @@ -148,6 +159,16 @@ def _handle_index_failure(self, document_id: str, index_types: List[str], error_ logger.info(f"Index failure callback executed for {index_types} indexes of document {document_id}") except Exception as e: logger.warning(f"Failed to execute index failure callback for {document_id}: {e}", exc_info=True) + + def _handle_index_failure_with_context(self, document_id: str, index_type: str, task_context: dict, error_msg: str): + try: + from aperag.index.reconciler import index_task_callbacks + index_task_callbacks.on_index_failed(document_id, index_type, task_context, error_msg) + version = task_context.get('version', 'unknown') + logger.info(f"Index failure callback executed for {index_type} index of document {document_id} version {version}") + except Exception as e: + version = task_context.get('version', 'unknown') + logger.warning(f"Failed to execute index failure callback for {index_type} of {document_id} version {version}: {e}", exc_info=True) # ========== Core Document Processing Tasks ========== @@ -174,46 +195,52 @@ def parse_document_task(self, document_id: str) -> dict: @current_app.task(bind=True, base=BaseIndexTask, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60}) -def create_index_task(self, document_id: str, index_type: str, parsed_data_dict: dict) -> dict: +def create_index_task(self, document_id: str, index_type: str, task_context: dict, parsed_data_dict: dict = None) -> dict: """ Create a single index for a document Args: document_id: Document ID to process index_type: Type of index to create ('vector', 'fulltext', 'graph') - parsed_data_dict: Serialized ParsedDocumentData from parse_document_task + task_context: Task metadata context (version, etc.) + parsed_data_dict: Serialized ParsedDocumentData (optional, will parse if not provided) Returns: Serialized IndexTaskResult """ try: - logger.info(f"Starting to create {index_type} index for document {document_id}") + version = task_context.get('version', 'unknown') + logger.info(f"Starting to create {index_type} index for document {document_id} version {version}") - # Convert dict back to structured data - parsed_data = ParsedDocumentData.from_dict(parsed_data_dict) + # Parse document content if not provided + if parsed_data_dict: + parsed_data = ParsedDocumentData.from_dict(parsed_data_dict) + else: + parsed_data = document_index_task.parse_document(document_id) # Execute index creation result = document_index_task.create_index(document_id, index_type, parsed_data) - # Handle success/failure callbacks + # Handle success/failure callbacks with task context if result.success: - logger.info(f"Successfully created {index_type} index for document {document_id}") - self._handle_index_success(document_id, index_type, result.data) + logger.info(f"Successfully created {index_type} index for document {document_id} version {version}") + self._handle_index_success_with_context(document_id, index_type, task_context, result.data) else: - logger.error(f"Failed to create {index_type} index for document {document_id}: {result.error}") + logger.error(f"Failed to create {index_type} index for document {document_id} version {version}: {result.error}") # Only mark as failed if all retries are exhausted if self.request.retries >= self.max_retries: - self._handle_index_failure(document_id, [index_type], result.error) + self._handle_index_failure_with_context(document_id, index_type, task_context, result.error) return result.to_dict() except Exception as e: - error_msg = f"Failed to create {index_type} index for document {document_id}: {str(e)}" + version = task_context.get('version', 'unknown') + error_msg = f"Failed to create {index_type} index for document {document_id} version {version}: {str(e)}" logger.error(error_msg, exc_info=True) # Only mark as failed if all retries are exhausted if self.request.retries >= self.max_retries: - self._handle_index_failure(document_id, [index_type], error_msg) + self._handle_index_failure_with_context(document_id, index_type, task_context, error_msg) raise diff --git a/docs/indexing_architecture.md b/docs/indexing_architecture.md index ce4578ba3..fd2ba6387 100644 --- a/docs/indexing_architecture.md +++ b/docs/indexing_architecture.md @@ -11,12 +11,12 @@ graph TB subgraph "Frontend Chain (Synchronous Fast Response)" A[API Request] --> B[FrontendIndexManager] B --> C[Write to DocumentIndex Table] - C --> D[Set desired_state=PRESENT] + C --> D[Set status=PENDING] end subgraph "Backend Chain (Asynchronous Task Processing)" E[Periodic Task reconcile_indexes_task] --> F[BackendIndexReconciler.reconcile_all] - F --> G[Detect desired_state != actual_state] + F --> G[Detect status=pending or deleting] G --> H[TaskScheduler schedules async tasks] end @@ -36,7 +36,7 @@ graph TB subgraph "State Feedback" Q --> R[IndexTaskCallbacks] - R --> S[Update actual_state=PRESENT] + R --> S[Update status=ACTIVE] S --> T[Next reconciliation check] end @@ -60,18 +60,37 @@ graph TB ### 2. State-Driven Reconciliation -Records desired and actual states for each document index through the `DocumentIndex` database table: +Records the lifecycle status for each document index through the `DocumentIndex` database table: ```python +class DocumentIndexStatus(str, Enum): + PENDING = "pending" # Waiting for processing (create/update) + CREATING = "creating" # Index creation/update task in progress + ACTIVE = "active" # Index is up-to-date and ready for use + DELETING = "deleting" # Deletion has been requested + DELETION_IN_PROGRESS = "deletion_in_progress" # Index deletion task in progress + FAILED = "failed" # The last operation failed + class DocumentIndex(BaseModel): document_id: str - index_type: DocumentIndexType # vector/fulltext/graph - desired_state: IndexDesiredState # PRESENT/ABSENT - actual_state: IndexActualState # ABSENT/CREATING/PRESENT/DELETING/FAILED - version: int # Version number, increment to trigger rebuild + index_type: DocumentIndexType # vector/fulltext/graph + status: DocumentIndexStatus # Index lifecycle status + version: int # Version number, increment to trigger rebuild + observed_version: int # Last processed version number +``` + +**State Transition Flow:** +``` +[Non-existent] → PENDING → CREATING → ACTIVE + ↑ ↓ + PENDING ← FAILED + +ACTIVE → DELETING → DELETION_IN_PROGRESS → [Hard Delete] + ↓ ↓ +DELETING ← FAILED ``` -The reconciler periodically scans all records and triggers corresponding operations when `desired_state != actual_state`. +The reconciler periodically scans all records and triggers corresponding operations when indexes are in `pending` or `deleting` states with version mismatches. ### 3. TaskScheduler Abstraction Layer Design @@ -196,9 +215,9 @@ Write DocumentIndex table records: { document_id: "doc123", index_type: "vector", - desired_state: "PRESENT", - actual_state: "ABSENT", - version: 1 + status: "pending", + version: 1, + observed_version: 0 } ↓ API returns 200 immediately @@ -208,7 +227,7 @@ Periodic task reconcile_indexes_task (executes every 30 seconds) ↓ BackendIndexReconciler.reconcile_all() ↓ -Detects desired_state=PRESENT, actual_state=ABSENT +Detects status=pending, observed_version < version ↓ CeleryTaskScheduler.schedule_create_index(doc123, ["vector", "fulltext", "graph"]) ↓ @@ -219,7 +238,7 @@ parse_document_task("doc123") ├── Download document file to local temp directory ├── Call docparser to parse document content ├── Return ParsedDocumentData.to_dict() -└── Update actual_state="CREATING" +└── Update status="creating" ↓ trigger_create_indexes_workflow(parsed_data, "doc123", ["vector", "fulltext", "graph"]) ├── Create group parallel tasks @@ -261,7 +280,7 @@ API returns immediately # 2. Backend Chain reconcile_indexes_task detects version mismatch ↓ -actual_state=PRESENT but version outdated, determined as needing update +status=active but observed_version < version, determined as needing update ↓ schedule_update_index() -> update_document_indexes_workflow() @@ -277,12 +296,12 @@ User deletes document triggering index deletion: # 1. Frontend Chain API Call -> FrontendIndexManager.delete_document_indexes() ↓ -Set desired_state="ABSENT" +Set status="deleting" ↓ API returns immediately # 2. Backend Chain -Detects desired_state=ABSENT, actual_state=PRESENT +Detects status=deleting ↓ schedule_delete_index() -> delete_document_indexes_workflow() @@ -354,7 +373,7 @@ class IndexTaskCallbacks: def on_index_failed(document_id: str, index_type: str, error_message: str): """Task failure callback""" # Update database state - doc_index.actual_state = IndexActualState.FAILED + doc_index.status = DocumentIndexStatus.FAILED doc_index.error_message = error_message # Will retry on next reconcile diff --git a/docs/indexing_architecture_zh.md b/docs/indexing_architecture_zh.md index fbdcfdf46..f277cebca 100644 --- a/docs/indexing_architecture_zh.md +++ b/docs/indexing_architecture_zh.md @@ -11,12 +11,12 @@ graph TB subgraph "Frontend Chain (同步快速响应)" A[API请求] --> B[FrontendIndexManager] B --> C[写入DocumentIndex表] - C --> D[设置desired_state=PRESENT] + C --> D[设置status=PENDING] end subgraph "Backend Chain (异步任务处理)" E[定时任务reconcile_indexes_task] --> F[BackendIndexReconciler.reconcile_all] - F --> G[检测desired_state != actual_state] + F --> G[检测status=pending或deleting] G --> H[TaskScheduler调度异步任务] end @@ -36,7 +36,7 @@ graph TB subgraph "状态反馈" Q --> R[IndexTaskCallbacks] - R --> S[更新actual_state=PRESENT] + R --> S[更新status=ACTIVE] S --> T[下次调谐检查] end @@ -60,18 +60,37 @@ graph TB ### 2. 状态驱动调谐 -通过数据库表`DocumentIndex`记录每个文档索引的期望状态和实际状态: +通过数据库表`DocumentIndex`记录每个文档索引的生命周期状态: ```python +class DocumentIndexStatus(str, Enum): + PENDING = "pending" # 等待处理(创建或更新) + CREATING = "creating" # 索引创建/更新任务正在进行 + ACTIVE = "active" # 索引处于最新状态,可用 + DELETING = "deleting" # 已请求删除 + DELETION_IN_PROGRESS = "deletion_in_progress" # 索引删除任务正在进行 + FAILED = "failed" # 上次操作失败 + class DocumentIndex(BaseModel): document_id: str - index_type: DocumentIndexType # vector/fulltext/graph - desired_state: IndexDesiredState # PRESENT/ABSENT - actual_state: IndexActualState # ABSENT/CREATING/PRESENT/DELETING/FAILED - version: int # 版本号,递增触发重建 + index_type: DocumentIndexType # vector/fulltext/graph + status: DocumentIndexStatus # 索引生命周期状态 + version: int # 版本号,递增触发重建 + observed_version: int # 最后处理的版本号 +``` + +**状态流转图:** +``` +[不存在] → PENDING → CREATING → ACTIVE + ↑ ↓ + PENDING ← FAILED + +ACTIVE → DELETING → DELETION_IN_PROGRESS → [硬删除] + ↓ ↓ +DELETING ← FAILED ``` -调谐器定时扫描所有记录,当`desired_state != actual_state`时触发相应操作。 +调谐器定时扫描所有记录,当索引处于`pending`或`deleting`状态且版本不匹配时触发相应操作。 ### 3. TaskScheduler抽象层设计 @@ -196,9 +215,9 @@ API调用 -> FrontendIndexManager.create_document_indexes() { document_id: "doc123", index_type: "vector", - desired_state: "PRESENT", - actual_state: "ABSENT", - version: 1 + status: "pending", + version: 1, + observed_version: 0 } ↓ API立即返回200 @@ -208,7 +227,7 @@ API立即返回200 ↓ BackendIndexReconciler.reconcile_all() ↓ -检测到desired_state=PRESENT, actual_state=ABSENT +检测到status=pending, observed_version < version ↓ CeleryTaskScheduler.schedule_create_index(doc123, ["vector", "fulltext", "graph"]) ↓ @@ -219,7 +238,7 @@ parse_document_task("doc123") ├── 下载文档文件到本地临时目录 ├── 调用docparser解析文档内容 ├── 返回ParsedDocumentData.to_dict() -└── 更新actual_state="CREATING" +└── 更新status="creating" ↓ trigger_create_indexes_workflow(parsed_data, "doc123", ["vector", "fulltext", "graph"]) ├── 创建group并行任务 @@ -261,7 +280,7 @@ API立即返回 # 2. 后端链路 reconcile_indexes_task检测到version不匹配 ↓ -actual_state=PRESENT但version过期,判定为需要更新 +status=active但observed_version < version,判定为需要更新 ↓ schedule_update_index() -> update_document_indexes_workflow() @@ -277,12 +296,12 @@ parse_document_task -> trigger_update_indexes_workflow -> 并行update_index_tas # 1. 前端链路 API调用 -> FrontendIndexManager.delete_document_indexes() ↓ -设置desired_state="ABSENT" +设置status="deleting" ↓ API立即返回 # 2. 后端链路 -检测到desired_state=ABSENT, actual_state=PRESENT +检测到status=deleting ↓ schedule_delete_index() -> delete_document_indexes_workflow() @@ -354,7 +373,7 @@ class IndexTaskCallbacks: def on_index_failed(document_id: str, index_type: str, error_message: str): """任务失败回调""" # 更新数据库状态 - doc_index.actual_state = IndexActualState.FAILED + doc_index.status = DocumentIndexStatus.FAILED doc_index.error_message = error_message # 下次reconcile时会重新尝试 diff --git a/frontend/src/api/models/document.ts b/frontend/src/api/models/document.ts index 24cb7774e..1d33f13b2 100644 --- a/frontend/src/api/models/document.ts +++ b/frontend/src/api/models/document.ts @@ -112,29 +112,35 @@ export const DocumentStatusEnum = { export type DocumentStatusEnum = typeof DocumentStatusEnum[keyof typeof DocumentStatusEnum]; export const DocumentVectorIndexStatusEnum = { - PENDING: 'PENDING', - RUNNING: 'RUNNING', - COMPLETE: 'COMPLETE', - FAILED: 'FAILED', - SKIPPED: 'SKIPPED' + pending: 'pending', + creating: 'creating', + active: 'active', + deleting: 'deleting', + deletion_in_progress: 'deletion_in_progress', + failed: 'failed', + skipped: 'skipped' } as const; export type DocumentVectorIndexStatusEnum = typeof DocumentVectorIndexStatusEnum[keyof typeof DocumentVectorIndexStatusEnum]; export const DocumentFulltextIndexStatusEnum = { - PENDING: 'PENDING', - RUNNING: 'RUNNING', - COMPLETE: 'COMPLETE', - FAILED: 'FAILED', - SKIPPED: 'SKIPPED' + pending: 'pending', + creating: 'creating', + active: 'active', + deleting: 'deleting', + deletion_in_progress: 'deletion_in_progress', + failed: 'failed', + skipped: 'skipped' } as const; export type DocumentFulltextIndexStatusEnum = typeof DocumentFulltextIndexStatusEnum[keyof typeof DocumentFulltextIndexStatusEnum]; export const DocumentGraphIndexStatusEnum = { - PENDING: 'PENDING', - RUNNING: 'RUNNING', - COMPLETE: 'COMPLETE', - FAILED: 'FAILED', - SKIPPED: 'SKIPPED' + pending: 'pending', + creating: 'creating', + active: 'active', + deleting: 'deleting', + deletion_in_progress: 'deletion_in_progress', + failed: 'failed', + skipped: 'skipped' } as const; export type DocumentGraphIndexStatusEnum = typeof DocumentGraphIndexStatusEnum[keyof typeof DocumentGraphIndexStatusEnum]; diff --git a/frontend/src/api/openapi.merged.yaml b/frontend/src/api/openapi.merged.yaml index dc843b41b..a1a3362f4 100644 --- a/frontend/src/api/openapi.merged.yaml +++ b/frontend/src/api/openapi.merged.yaml @@ -2772,27 +2772,33 @@ components: vector_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped fulltext_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped graph_index_status: type: string enum: - - PENDING - - RUNNING - - COMPLETE - - FAILED - - SKIPPED + - pending + - creating + - active + - deleting + - deletion_in_progress + - failed + - skipped vector_index_updated: type: string format: date-time diff --git a/frontend/src/constants/index.ts b/frontend/src/constants/index.ts index 66dae34f3..f55992807 100644 --- a/frontend/src/constants/index.ts +++ b/frontend/src/constants/index.ts @@ -215,11 +215,13 @@ export const UI_INDEX_STATUS: { | 'default' | 'warning'; } = { - PENDING: 'warning', - RUNNING: 'processing', - FAILED: 'error', - COMPLETE: 'success', - SKIPPED: 'default', + pending: 'warning', + creating: 'processing', + active: 'success', + deleting: 'warning', + deletion_in_progress: 'processing', + failed: 'error', + skipped: 'default', }; export const SUPPORTED_DOC_EXTENSIONS = [ diff --git a/frontend/src/locales/en-US.ts b/frontend/src/locales/en-US.ts index 3eb0089db..00c887c38 100644 --- a/frontend/src/locales/en-US.ts +++ b/frontend/src/locales/en-US.ts @@ -222,11 +222,13 @@ export default { 'document.status.DELETED': 'Deleted', 'document.status.DELETING': 'Deleting', 'document.index.status': 'Index Status', - 'document.index.status.PENDING': 'Pending', - 'document.index.status.RUNNING': 'Running', - 'document.index.status.COMPLETE': 'Completed', - 'document.index.status.FAILED': 'Failed', - 'document.index.status.SKIPPED': 'Skipped', + 'document.index.status.pending': 'Pending', + 'document.index.status.creating': 'Creating', + 'document.index.status.active': 'Active', + 'document.index.status.deleting': 'Deleting', + 'document.index.status.deletion_in_progress': 'Deleting', + 'document.index.status.failed': 'Failed', + 'document.index.status.skipped': 'Skipped', 'document.index.type.vector': 'Vector Index', 'document.index.type.fulltext': 'Fulltext Index', 'document.index.type.graph': 'Graph Index', diff --git a/frontend/src/locales/zh-CN.ts b/frontend/src/locales/zh-CN.ts index fbe8b8667..ed876656c 100644 --- a/frontend/src/locales/zh-CN.ts +++ b/frontend/src/locales/zh-CN.ts @@ -220,11 +220,13 @@ export default { 'document.status.DELETED': '已删除', 'document.status.DELETING': '删除中', 'document.index.status': '索引状态', - 'document.index.status.PENDING': '待处理', - 'document.index.status.RUNNING': '运行中', - 'document.index.status.COMPLETE': '已完成', - 'document.index.status.FAILED': '失败', - 'document.index.status.SKIPPED': '已跳过', + 'document.index.status.pending': '待处理', + 'document.index.status.creating': '创建中', + 'document.index.status.active': '活跃', + 'document.index.status.deleting': '删除中', + 'document.index.status.deletion_in_progress': '删除中', + 'document.index.status.failed': '失败', + 'document.index.status.skipped': '已跳过', 'document.index.type.vector': '向量索引', 'document.index.type.fulltext': '全文索引', 'document.index.type.graph': '图索引',