diff --git a/backend/app/config.py b/backend/app/config.py index c8763b39..47ef0109 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -54,6 +54,7 @@ class Settings(BaseSettings): GOOGLE_SERVICE_ACCOUNT_FILE: str = "" # Celery / Redis background processing + CELERY_ENABLED: bool = True CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/1" CELERY_TASK_TRACK_STARTED: bool = True diff --git a/backend/app/rag/agent.py b/backend/app/rag/agent.py index b7e91d5a..b763a512 100644 --- a/backend/app/rag/agent.py +++ b/backend/app/rag/agent.py @@ -81,6 +81,7 @@ def get_agent_executor( llm = HuggingFaceEndpoint( repo_id=settings.LLM_MODEL, + model=settings.LLM_MODEL, huggingfacehub_api_token=token, max_new_tokens=settings.LLM_MAX_NEW_TOKENS, temperature=settings.LLM_TEMPERATURE, @@ -147,7 +148,8 @@ def generate_answer( model=settings.LLM_MODEL, max_tokens=256, ) - answer = response.choices[0].message.content.strip() if response.choices else "Hello! How can I help you today?" + content = response.choices[0].message.content if response.choices else None + answer = content.strip() if content else "Hello! How can I help you today?" except Exception: answer = "Hello! I'm Document AI Analyst. How can I help you with your documents?" return {"answer": answer, "sources": []} diff --git a/backend/app/rag/summarizer.py b/backend/app/rag/summarizer.py index f380a5e4..c4aa4c1e 100644 --- a/backend/app/rag/summarizer.py +++ b/backend/app/rag/summarizer.py @@ -66,7 +66,8 @@ def generate_document_summary( max_tokens=settings.SUMMARY_MAX_TOKENS, temperature=settings.LLM_TEMPERATURE, ) - summary = response.choices[0].message.content.strip() if response.choices else None + content = response.choices[0].message.content if response.choices else None + summary = content.strip() if content else None return summary or None diff --git a/backend/app/routes/documents.py b/backend/app/routes/documents.py index 50ca8059..0934cf87 100644 --- a/backend/app/routes/documents.py +++ b/backend/app/routes/documents.py @@ -16,10 +16,9 @@ import ipaddress import tempfile from urllib.parse import urlparse -from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, status, Query, BackgroundTasks, Request, Form +from fastapi import APIRouter, Depends, UploadFile, File, status, Query, BackgroundTasks from fastapi.responses import FileResponse from sqlalchemy.orm import Session -from sqlalchemy import select, func from app.database import get_db from app.exceptions import ( @@ -42,7 +41,6 @@ from app.config import get_settings from app.tasks import process_document from app.services.document_ingestion import ingest_document -from app.services.layout_parser import AdvancedPDFParser try: from crawl4ai import AsyncWebCrawler @@ -71,48 +69,12 @@ def _deserialize_doc(doc: Document) -> DocumentResponse: if doc.extracted_urls: try: response = response.model_copy( - update={"extracted_urls": _json.loads(doc.extracted_urls)} + update={"extracted_urls": _json.loads(doc.extracted_urls)} # type: ignore ) except Exception: response = response.model_copy(update={"extracted_urls": []}) return response -def _get_documents_query( - db: Session, - user_id: str, - q: Optional[str] = None, -): - """ - Build a filtered SQLAlchemy select query for documents belonging to a user. - - Applies an optional case-insensitive substring filter on ``original_name``. - Does NOT apply pagination – callers are responsible for ``.limit()`` / - ``.offset()`` so this helper stays reusable. - - Args: - db: Active database session. - user_id: ID of the authenticated user whose documents to query. - q: Optional keyword to filter document names (case-insensitive). - - Returns: - A SQLAlchemy ``Select`` statement ready for count or paginated execution. - """ - base_query = ( - select(Document) - .where( - Document.user_id == user_id, - Document.is_deleted.is_(False), - ) - ) - - if q and q.strip(): - pattern = f"%{q.strip()}%" - base_query = base_query.where( - Document.original_name.ilike(pattern) - ) - - return base_query - async def validate_upload(file: UploadFile): """Validate an uploaded file and save it to a temporary file. @@ -159,15 +121,24 @@ async def validate_upload(file: UploadFile): Path(temp_path).unlink(missing_ok=True) raise ValidationException("File too large") - # libmagic may not be installed in all environments — import lazily + # libmagic may not be installed or might fail on Windows — fallback gracefully try: import magic - # make sure you have installed libmagic in your system, otherwise it will not work - except Exception: - Path(temp_path).unlink(missing_ok=True) - raise ExternalServiceException("dependency", "Server missing 'python-magic' dependency") - - mime = magic.from_file(temp_path, mime=True) + mime = magic.from_file(temp_path, mime=True) + except Exception as e: + logger.warning(f"python-magic check failed: {e}. Falling back to content type from file upload or extension.") + mime = file.content_type + if not mime: + if ext == ".pdf": + mime = "application/pdf" + elif ext == ".docx": + mime = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + elif ext == ".txt": + mime = "text/plain" + elif ext == ".md": + mime = "text/markdown" + else: + mime = "application/octet-stream" if mime not in ALLOWED_MIME_TYPES.get(ext, []): Path(temp_path).unlink(missing_ok=True) @@ -207,8 +178,8 @@ def _crawl_in_new_loop(url: str) -> str: asyncio.set_event_loop(loop) try: async def _crawl(): - browser_config = BrowserConfig() - run_config = CrawlerRunConfig( + browser_config = BrowserConfig() # type: ignore + run_config = CrawlerRunConfig( # type: ignore excluded_tags=['form', 'header'], # Content processing @@ -218,7 +189,7 @@ async def _crawl(): # Cache control # cache_mode=CacheMode.ENABLED ) - async with AsyncWebCrawler(config=browser_config) as crawler: + async with AsyncWebCrawler(config=browser_config) as crawler: # type: ignore result = await crawler.arun(url=url, config=run_config) return result.markdown or "" return loop.run_until_complete(_crawl()) @@ -227,24 +198,46 @@ async def _crawl(): @router.post("/upload", response_model=DocumentResponse, status_code=status.HTTP_202_ACCEPTED) async def upload_document( - request: Request = None, + background_tasks: BackgroundTasks, file: UploadFile = File(...), - chunk_size: int = Form(1000), - chunk_overlap: int = Form(200), - background_tasks: BackgroundTasks = None, + chunk_size: Optional[int] = Query(None, ge=100, le=2000), + chunk_overlap: Optional[int] = Query(None, ge=0), user: User = Depends(get_current_user), db: Session = Depends(get_db), ): + """ + Upload a document and enqueue RAG processing. + + Validates the uploaded file (extension, size, MIME type, integrity), + saves it to the user's directory, creates a database record with status + 'pending', queues a Celery task for chunking and embedding, and returns + 202 Accepted immediately so large documents do not block the API request + while embeddings are generated. + + Args: + file: The uploaded file, provided as a multipart/form-data field in the request. + background_tasks: FastAPI BackgroundTasks instance for in-process fallback execution. + user: The currently authenticated user, injected by the `get_current_user` dependency. + db: Database session, injected by the `get_db` dependency. + chunk_size: Optional custom chunk size for splitting the document. + chunk_overlap: Optional custom chunk overlap for splitting the document. + + Returns: + DocumentResponse: The created document record, validated against the + response model (includes id, filename, original_name, file_size, status, etc.). + + Raises: + HTTPException: With status code 400 if: + - No filename is provided. + - The file extension is not allowed. (only .pdf or .docx) + - The file fails validation checks (size, MIME type, integrity). + HTTPException: With status code 500 if: + - The server lacks the 'python-magic' dependency. + """ # ── Validate file type ─────────────────────────── if not file.filename: raise ValidationException("No filename provided") - # ── Validate chunking params ───────────────────── - if chunk_size < 100 or chunk_size > 2000: - raise ValidationException("Chunk size must be between 100 and 2000") - if chunk_overlap < 0 or chunk_overlap >= chunk_size: - raise ValidationException("Chunk overlap must be non-negative and less than chunk_size") - ext = file.filename.rsplit(".", 1)[-1].lower() if ext not in settings.ALLOWED_EXTENSIONS: raise ValidationException( @@ -254,13 +247,15 @@ async def upload_document( # ── Validate and save file to disk ─────────────── temp_path = await validate_upload(file) - user_dir = os.path.join(settings.UPLOAD_DIR, user.id) + user_dir = os.path.join(settings.UPLOAD_DIR, str(user.id)) os.makedirs(user_dir, exist_ok=True) stored_filename = f"{uuid.uuid4().hex}.{ext}" filepath = os.path.join(user_dir, stored_filename) + # Move temp file to final destination shutil.move(temp_path, filepath) + file_size = Path(filepath).stat().st_size # ── Create database record ─────────────────────── @@ -271,7 +266,7 @@ async def upload_document( file_size=file_size, status="pending", chunk_size=chunk_size, - chunk_overlap=chunk_overlap + chunk_overlap=chunk_overlap, ) db.add(document) db.commit() @@ -279,23 +274,27 @@ async def upload_document( # ── Queue background ingestion ───────────────── task_id = None - try: - task = process_document.delay( - document_id=document.id, - filepath=filepath, - original_name=file.filename, - user_id=user.id, - ) - task_id = task.id - except Exception as e: - logger.warning(f"Celery queue failed, falling back to background task: {e}") + if settings.CELERY_ENABLED: + try: + task = process_document.delay( + document_id=document.id, + filepath=filepath, + original_name=file.filename, + user_id=user.id, + ) + task_id = task.id + except Exception as e: + logger.warning(f"Celery queue failed, falling back to background task: {e}") + settings.CELERY_ENABLED = False + + if not settings.CELERY_ENABLED: if background_tasks: background_tasks.add_task( ingest_document, - document_id=document.id, + document_id=str(document.id), filepath=filepath, original_name=file.filename, - user_id=user.id, + user_id=str(user.id), ) task_id = f"local_{uuid.uuid4().hex}" @@ -303,9 +302,8 @@ async def upload_document( @router.post("/urlupload", status_code=status.HTTP_202_ACCEPTED) async def upload_document_url( + background_tasks: BackgroundTasks, payload: UploadUrl, - request: Request = None, - background_tasks: BackgroundTasks = None, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): @@ -363,7 +361,7 @@ async def upload_document_url( # ── Move temp file to permanent user upload directory ── ext = "txt" - user_dir = os.path.join(settings.UPLOAD_DIR, user.id) + user_dir = os.path.join(settings.UPLOAD_DIR, str(user.id)) os.makedirs(user_dir, exist_ok=True) stored_filename = f"{uuid.uuid4().hex}.{ext}" @@ -377,15 +375,6 @@ async def upload_document_url( url_path = parsed.path.rstrip("/") original_name = f"{parsed.netloc}{url_path or ''}.txt" - # Bind URL crawl metadata to request state and context variables - if request is not None: - request.state.filename = original_name - request.state.filesize = file_size - from app.observability import upload_filename_var, upload_filesize_var - upload_filename_var.set(original_name) - upload_filesize_var.set(file_size) - logger.info(f"URL crawler crawl completed, starting ingestion: {original_name} ({file_size} bytes)") - # ── Create database record ───────────────────────────── document = Document( user_id=user.id, @@ -400,23 +389,27 @@ async def upload_document_url( # ── Queue background ingestion ─────────────────────── task_id = None - try: - task = process_document.delay( - document_id=document.id, - filepath=filepath, - original_name=original_name, - user_id=user.id, - ) - task_id = task.id - except Exception as e: - logger.warning(f"Celery queue failed, falling back to background task: {e}") + if settings.CELERY_ENABLED: + try: + task = process_document.delay( + document_id=document.id, + filepath=filepath, + original_name=original_name, + user_id=user.id, + ) + task_id = task.id + except Exception as e: + logger.warning(f"Celery queue failed, falling back to background task: {e}") + settings.CELERY_ENABLED = False + + if not settings.CELERY_ENABLED: if background_tasks: background_tasks.add_task( ingest_document, - document_id=document.id, + document_id=str(document.id), filepath=filepath, original_name=original_name, - user_id=user.id, + user_id=str(user.id), ) task_id = f"local_{uuid.uuid4().hex}" @@ -464,66 +457,59 @@ def get_document_status( @router.get("/", response_model=DocumentListResponse) def list_documents( - page: int = Query(1, ge=1, description="Page number (1-indexed)"), - per_page: int = Query(20, ge=1, le=100, description="Results per page"), - limit: int = Query(None, ge=1, le=100, description="Alias for per_page"), - q: Optional[str] = Query(None, description="Filter by document name (case-insensitive)"), - query: Optional[str] = Query(None, description="Alias for q – filter by document name"), + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1), user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ - List documents for the authenticated user with offset pagination and - optional keyword search on document names. + List all documents for the authenticated user with pagination. - Pagination is controlled by ``page`` and ``per_page`` (or its alias - ``limit``). Search is applied via ``query`` (or its short alias ``q``), - which performs a case-insensitive substring match on ``original_name``. + Returns a paginated list of documents belonging to the current user, + ordered by upload date (newest first). Args: - page: Page number to retrieve (1-indexed). Defaults to 1. - per_page: Number of documents per page. Defaults to 20, max 100. - limit: Alias for per_page – whichever is supplied takes effect. - q: Case-insensitive substring filter on original_name. - query: Alias for q – whichever is supplied takes effect. - user: Authenticated user injected by get_current_user. - db: Database session injected by get_db. - + page: The page number to retrieve (1: indexed). Defaults to 1. + per_page: The number of documents to return per page. Defaults to 20. + user: The currently authenticated user, injected by the `get_current_user` dependency. + db: Database session, injected by the `get_db` dependency. + Returns: - DocumentListResponse with items, total, page, pages, total_pages, - limit, and query fields. + DocumentListResponse: A response model containing: + - items: A list of DocumentResponse objects for the current page. + - total: The total number of documents for the user. + - page: The current page number. + - pages: The total number of pages available. """ - # Allow `limit` as alias for `per_page`; `query` as alias for `q` - effective_limit = limit if limit is not None else per_page - effective_query = query if query is not None else q - skip = (page - 1) * effective_limit - - # ── Build filtered query via helper ─────────────────────────────────────── - base_query = _get_documents_query(db, user.id, effective_query) - # ── Total count (before pagination) ────────────────────────────────────── - total = db.execute( - select(func.count()).select_from(base_query.subquery()) - ).scalar_one() + """Number of rows to skip""" + skip: int = (page - 1) * per_page - # ── Paginated results ───────────────────────────────────────────────────── - docs = db.execute( - base_query - .order_by(Document.uploaded_at.desc()) - .limit(effective_limit) - .offset(skip) - ).scalars().all() - - total_pages = max(1, (total + effective_limit - 1) // effective_limit) + """Total Pages""" + totalDocuments = ( + db.query(Document) + .filter(Document.user_id == user.id, Document.is_deleted.is_(False)) + .count() + ) + """Total Pages""" + pages = (totalDocuments + per_page - 1) // per_page + + """List all documents for the authenticated user in Paginated form""" + docs = (( + db.execute(select(Document) + .where(Document.user_id == user.id, Document.is_deleted.is_(False)) + .order_by(Document.uploaded_at.desc()) + .limit(per_page).offset(skip)) + ) + .scalars().all()) return DocumentListResponse( items=[_deserialize_doc(d) for d in docs], - total=total, + total=totalDocuments, page=page, - pages=total_pages, - total_pages=total_pages, - limit=effective_limit, - query=effective_query, + pages=pages, + total_pages=pages, + limit=per_page, ) @@ -554,10 +540,10 @@ def update_document( raise ForbiddenException("You do not have permission to update this document") if update.name is not None: - doc.original_name = update.name + doc.original_name = update.name # type: ignore if update.summary is not None: stripped = update.summary.strip() - doc.summary = stripped if stripped else None + doc.summary = stripped if stripped else None # type: ignore db.commit() db.refresh(doc) @@ -635,7 +621,7 @@ def serve_pdf( if not doc: raise NotFoundException("Document") - filepath = os.path.join(settings.UPLOAD_DIR, user.id, doc.filename) + filepath = os.path.join(settings.UPLOAD_DIR, str(user.id), str(doc.filename)) if not os.path.exists(filepath): raise NotFoundException("File") @@ -643,7 +629,7 @@ def serve_pdf( return FileResponse( filepath, media_type="application/pdf", - filename=doc.original_name, + filename=str(doc.original_name), ) @@ -685,8 +671,8 @@ def delete_document( if not doc: raise NotFoundException("Document") - doc.is_deleted = True - doc.deleted_at = datetime.now(timezone.utc) + doc.is_deleted = True # type: ignore + doc.deleted_at = datetime.now(timezone.utc) # type: ignore db.commit() return {"message": f"Document '{doc.original_name}' deleted successfully"} @@ -696,7 +682,7 @@ def delete_document( def update_chunk_settings( document_id: str, settings_update: ChunkSettings, - background_tasks: BackgroundTasks = None, + background_tasks: BackgroundTasks, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): @@ -731,44 +717,48 @@ def update_chunk_settings( if settings_update.chunk_size is not None: if settings_update.chunk_size < 100: raise ValidationException("Chunk size must be at least 100") - doc.chunk_size = settings_update.chunk_size + doc.chunk_size = settings_update.chunk_size # type: ignore if settings_update.chunk_overlap is not None: chunk_size_val = settings_update.chunk_size if settings_update.chunk_size is not None else (doc.chunk_size or settings.CHUNK_SIZE) if settings_update.chunk_overlap >= chunk_size_val: raise ValidationException("Chunk overlap cannot be greater than or equal to chunk size") - doc.chunk_overlap = settings_update.chunk_overlap + doc.chunk_overlap = settings_update.chunk_overlap # type: ignore # Refresh the document record to update the chunk settings before re-ingestion db.commit() db.refresh(doc) # Reset document status, chunk/page counts, summary to trigger re-ingestion with new chunk settings. - doc.status = "pending" - doc.chunk_count = 0 - doc.page_count = 0 - doc.summary = None + doc.status = "pending" # type: ignore + doc.chunk_count = 0 # type: ignore + doc.page_count = 0 # type: ignore + doc.summary = None # type: ignore db.commit() # Queue ingestion with updated chunk settings. The worker reads the new # settings from the document record before re-chunking. task_id = None - try: - task = process_document.delay( - document_id=doc.id, - filepath=os.path.join(settings.UPLOAD_DIR, user.id, doc.filename), - original_name=doc.original_name, - user_id=user.id, - ) - task_id = task.id - except Exception as e: - logger.warning(f"Celery queue failed, falling back to background task: {e}") + if settings.CELERY_ENABLED: + try: + task = process_document.delay( + document_id=str(doc.id), + filepath=os.path.join(settings.UPLOAD_DIR, str(user.id), str(doc.filename)), + original_name=str(doc.original_name), + user_id=str(user.id), + ) + task_id = task.id + except Exception as e: + logger.warning(f"Celery queue failed, falling back to background task: {e}") + settings.CELERY_ENABLED = False + + if not settings.CELERY_ENABLED: if background_tasks: background_tasks.add_task( ingest_document, - document_id=doc.id, - filepath=os.path.join(settings.UPLOAD_DIR, user.id, doc.filename), - original_name=doc.original_name, - user_id=user.id, + document_id=str(doc.id), + filepath=os.path.join(settings.UPLOAD_DIR, str(user.id), str(doc.filename)), + original_name=str(doc.original_name), + user_id=str(user.id), ) task_id = f"local_{uuid.uuid4().hex}" @@ -779,7 +769,7 @@ def update_chunk_settings( @router.post("/{document_id}/retry", response_model=DocumentResponse) def retry_document_processing( document_id: str, - background_tasks: BackgroundTasks = None, + background_tasks: BackgroundTasks, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): @@ -797,39 +787,43 @@ def retry_document_processing( if not doc: raise NotFoundException("Document") - if doc.status != "failed": + if str(doc.status) != "failed": raise ValidationException("Only failed documents can be retried") - doc.status = "pending" - doc.processing_progress = 0 - doc.processing_stage = "queued" - doc.error_message = None - doc.last_error_traceback = None - doc.completed_at = None - doc.chunk_count = 0 - doc.page_count = 0 + doc.status = "pending" # type: ignore + doc.processing_progress = 0 # type: ignore + doc.processing_stage = "queued" # type: ignore + doc.error_message = None # type: ignore + doc.last_error_traceback = None # type: ignore + doc.completed_at = None # type: ignore + doc.chunk_count = 0 # type: ignore + doc.page_count = 0 # type: ignore db.commit() # Re-queue ingestion - filepath = os.path.join(settings.UPLOAD_DIR, user.id, doc.filename) + filepath = os.path.join(settings.UPLOAD_DIR, str(user.id), str(doc.filename)) task_id = None - try: - task = process_document.delay( - document_id=doc.id, - filepath=filepath, - original_name=doc.original_name, - user_id=user.id, - ) - task_id = task.id - except Exception as e: - logger.warning(f"Celery queue failed for retry, falling back to background task: {e}") + if settings.CELERY_ENABLED: + try: + task = process_document.delay( + document_id=str(doc.id), + filepath=filepath, + original_name=str(doc.original_name), + user_id=str(user.id), + ) + task_id = task.id + except Exception as e: + logger.warning(f"Celery queue failed for retry, falling back to background task: {e}") + settings.CELERY_ENABLED = False + + if not settings.CELERY_ENABLED: if background_tasks: background_tasks.add_task( ingest_document, - document_id=doc.id, + document_id=str(doc.id), filepath=filepath, - original_name=doc.original_name, - user_id=user.id, + original_name=str(doc.original_name), + user_id=str(user.id), ) task_id = f"local_{uuid.uuid4().hex}" diff --git a/backend/app/services/document_ingestion.py b/backend/app/services/document_ingestion.py index 6e76d79d..307316d2 100644 --- a/backend/app/services/document_ingestion.py +++ b/backend/app/services/document_ingestion.py @@ -13,7 +13,7 @@ settings = get_settings() -def _update_progress(document_id: str, progress: int, stage: str, error: str = None): +def _update_progress(document_id: str, progress: int, stage: str, error: str | None = None): """Update document progress fields in the database.""" from app.database import SessionLocal @@ -21,10 +21,10 @@ def _update_progress(document_id: str, progress: int, stage: str, error: str = N try: doc = db.query(Document).filter(Document.id == document_id).first() if doc: - doc.processing_progress = progress - doc.processing_stage = stage + doc.processing_progress = progress # type: ignore + doc.processing_stage = stage # type: ignore if error: - doc.error_message = error + doc.error_message = error # type: ignore db.commit() except Exception as e: logger.warning("Failed to update progress for %s: %s", document_id, e) @@ -49,16 +49,16 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id logger.error("Document %s not found for ingestion", document_id) return - doc.status = "processing" - doc.processing_stage = "extracting" - doc.processing_progress = 10 - doc.error_message = None - doc.last_error_traceback = None + doc.status = "processing" # type: ignore + doc.processing_stage = "extracting" # type: ignore + doc.processing_progress = 10 # type: ignore + doc.error_message = None # type: ignore + doc.last_error_traceback = None # type: ignore db.commit() page_count = get_page_count(filepath) - doc.page_count = page_count - doc.processing_progress = 20 + doc.page_count = page_count # type: ignore + doc.processing_progress = 20 # type: ignore db.commit() try: @@ -67,8 +67,8 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id chunk_kwargs["chunk_size"] = doc.chunk_size if doc.chunk_overlap is not None: chunk_kwargs["chunk_overlap"] = doc.chunk_overlap - doc.processing_stage = "chunking" - doc.processing_progress = 30 + doc.processing_stage = "chunking" # type: ignore + doc.processing_progress = 30 # type: ignore db.commit() chunks = chunk_document(filepath, **chunk_kwargs) except TypeError: @@ -107,14 +107,14 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id # ── End proximity caption pass ──────────────────────────────────────── if not chunks: - doc.status = "failed" - doc.processing_progress = 0 - doc.error_message = "No text could be extracted from the document" + doc.status = "failed" # type: ignore + doc.processing_progress = 0 # type: ignore + doc.error_message = "No text could be extracted from the document" # type: ignore db.commit() return - doc.processing_progress = 50 - doc.processing_stage = "indexing" + doc.processing_progress = 50 # type: ignore + doc.processing_stage = "indexing" # type: ignore db.commit() try: @@ -125,8 +125,8 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id except Exception as e: logger.warning("Could not build knowledge graph for document %s: %s", document_id, e) - doc.processing_progress = 70 - doc.processing_stage = "embedding" + doc.processing_progress = 70 # type: ignore + doc.processing_stage = "embedding" # type: ignore db.commit() chunk_count = store_chunks( @@ -138,7 +138,7 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id persist_document_keywords(doc, chunks, db) - doc.processing_progress = 85 + doc.processing_progress = 85 # type: ignore db.commit() try: @@ -146,11 +146,11 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id summary = generate_document_summary(filepath, max_sentences=2) if summary: - doc.summary = summary + doc.summary = summary # type: ignore db.commit() except Exception as e: logger.warning("Could not generate summary for document %s: %s", document_id, e) - doc.summary = None + doc.summary = None # type: ignore # ── URL extraction pass (PDF only) ──────────────────────────────── ext = filepath.rsplit(".", 1)[-1].lower() @@ -160,7 +160,7 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id import json urls = extract_urls_from_pdf(filepath) - doc.extracted_urls = json.dumps(urls) if urls else None + doc.extracted_urls = json.dumps(urls) if urls else None # type: ignore db.commit() logger.info( "Extracted %s URLs from document %s", @@ -175,12 +175,12 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id ) # ── End URL extraction pass ─────────────────────────────────────── - doc.chunk_count = chunk_count - doc.status = "ready" - doc.processing_progress = 100 - doc.processing_stage = "completed" - doc.completed_at = datetime.now(timezone.utc) - doc.error_message = None + doc.chunk_count = chunk_count # type: ignore + doc.status = "ready" # type: ignore + doc.processing_progress = 100 # type: ignore + doc.processing_stage = "completed" # type: ignore + doc.completed_at = datetime.now(timezone.utc) # type: ignore + doc.error_message = None # type: ignore db.commit() logger.info( @@ -199,10 +199,10 @@ def ingest_document(document_id: str, filepath: str, original_name: str, user_id Document.is_deleted.is_(False), ).first() if doc: - doc.status = "failed" - doc.processing_progress = 0 - doc.error_message = str(e)[:500] - doc.last_error_traceback = traceback.format_exc()[:2000] + doc.status = "failed" # type: ignore + doc.processing_progress = 0 # type: ignore + doc.error_message = str(e)[:500] # type: ignore + doc.last_error_traceback = traceback.format_exc()[:2000] # type: ignore db.commit() except Exception: logger.exception("Failed to mark document %s as failed", document_id) diff --git a/frontend/src/components/chat/ChatPanel.tsx b/frontend/src/components/chat/ChatPanel.tsx index 927bcb4a..2576ee9f 100644 --- a/frontend/src/components/chat/ChatPanel.tsx +++ b/frontend/src/components/chat/ChatPanel.tsx @@ -1,6 +1,5 @@ "use client"; -import { toast } from "sonner"; import { useState, useRef, useEffect } from "react"; import { useTranslation } from "react-i18next"; import type { DocInfo } from "@/app/dashboard/page"; @@ -8,17 +7,12 @@ import { api, API_BASE } from "@/lib/api"; import { useChatStore, type ChatMsg, - type SourceBoundingBox, type SourceChunk, + type SourceBoundingBox, } from "@/store/chat-store"; -import { buttonVariants } from "@/components/ui/button"; +import { Button } from "@/components/ui/button"; import { Skeleton } from "@/components/ui/skeleton"; import { Textarea } from "@/components/ui/textarea"; -import { - Tooltip, - TooltipContent, - TooltipTrigger, -} from "@/components/ui/tooltip"; import MessageBubble from "./MessageBubble"; import SourceCard from "./SourceCard"; import { @@ -29,10 +23,16 @@ import { Download, Mic, MicOff, + Settings2, HelpCircle, - ChevronDown, } from "lucide-react"; import { cn } from "@/lib/utils"; + +// Custom toast notification helper (change to your project's toast library if different) +const toast = { + info: (msg: string) => console.log("Toast info:", msg), +}; + interface ISpeechRecognitionEvent { resultIndex: number; results: { @@ -67,14 +67,12 @@ interface WindowWithSpeech extends Window { webkitSpeechRecognition?: new () => ISpeechRecognition; } -interface CitationTarget { - page: number; - highlightRects?: SourceBoundingBox[]; -} - interface Props { activeDoc: DocInfo | null; - onCitationClick: (target: CitationTarget) => void; + onCitationClick: (payload: { + page: number; + highlightRects?: SourceBoundingBox[]; + }) => void; } export default function ChatPanel({ activeDoc, onCitationClick }: Props) { @@ -94,21 +92,26 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { (state) => state.fetchSessionHistory, ); + // local state for topK since it is not defined in useChatStore + const [topK, setTopK] = useState(5); + const [showExportMenu, setShowExportMenu] = useState(false); - const MAX_CHARACTERS = 2000; + const [showSettingsMenu, setShowSettingsMenu] = useState(false); const [isRecording, setIsRecording] = useState(false); const [speechError, setSpeechError] = useState(null); - const [showScrollButton, setShowScrollButton] = useState(false); + // New State for Keyboard Shortcuts Help Modal + const [showHelpModal, setShowHelpModal] = useState(false); const recognitionRef = useRef(null); const initialInputRef = useRef(""); const textareaRef = useRef(null); const bottomRef = useRef(null); - const containerRef = useRef(null); const prevDocId = useRef(null); const exportMenuRef = useRef(null); + const settingsMenuRef = useRef(null); const abortControllerRef = useRef(null); + const showEmptyState = messages.length === 0 && !isTyping && !historyLoading; useEffect(() => { @@ -130,31 +133,9 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { // Auto-scroll to bottom whenever messages change useEffect(() => { - if (containerRef.current) { - const { scrollHeight, scrollTop, clientHeight } = containerRef.current; - if (scrollHeight - scrollTop - clientHeight < 150) { - bottomRef.current?.scrollIntoView({ behavior: "smooth" }); - } - } else { - bottomRef.current?.scrollIntoView({ behavior: "smooth" }); - } + bottomRef.current?.scrollIntoView({ behavior: "smooth" }); }, [messages]); - const handleScroll = () => { - if (!containerRef.current) return; - const { scrollTop, scrollHeight, clientHeight } = containerRef.current; - setShowScrollButton(scrollTop < scrollHeight - clientHeight - 100); - }; - - const scrollToBottom = () => { - if (containerRef.current) { - containerRef.current.scrollTo({ - top: containerRef.current.scrollHeight, - behavior: "smooth", - }); - } - }; - useEffect(() => { return () => { resetChat(); @@ -188,6 +169,8 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { role: string; content: string; sources?: SourceChunk[]; + feedback?: "up" | "down" | null; + created_at?: string; }>; }>(`/api/v1/chat/history/${documentId}`) .then((data) => { @@ -199,6 +182,8 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { role: m.role as "user" | "assistant", content: m.content, sources: m.sources || [], + feedback: m.feedback, + created_at: m.created_at, })), ); }) @@ -212,12 +197,6 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { }; }, [activeSessionId, activeDoc, fetchSessionHistory, setMessages]); - const handleStop = () => { - abortControllerRef.current?.abort(); - setStreaming(false); - setIsTyping(false); - }; - const handleSend = async () => { if (!input.trim() || streaming) return; @@ -226,12 +205,14 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { const abortController = new AbortController(); abortControllerRef.current = abortController; + // Add user message const userMsg: ChatMsg = { id: `user-${Date.now()}`, role: "user", content: question, sources: [], + created_at: new Date().toISOString(), }; setMessages((prev) => [...prev, userMsg]); @@ -242,247 +223,79 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { setIsTyping(true); try { - // Try WebSocket first for real-time agentic thought streaming - const token = - typeof window !== "undefined" ? localStorage.getItem("token") : null; - const base = API_BASE || window.location.origin; - const wsScheme = base.startsWith("https") - ? "wss" - : base.startsWith("http") - ? "ws" - : "wss"; - const host = base.replace(/^https?:/, ""); - const wsUrl = `${wsScheme}:${host}/api/v1/chat/ws${token ? `?token=${encodeURIComponent(token)}` : ""}`; - - const ws = new WebSocket(wsUrl); - - let onAbort: (() => void) | null = null; - - const wsDone = new Promise((resolve, reject) => { - onAbort = () => { - try { - ws.close(); - } catch { - // ignore - } - reject(new DOMException("The user aborted a request.", "AbortError")); - }; - abortController.signal.addEventListener("abort", onAbort); - - ws.onopen = () => { - // Send initial payload - ws.send( - JSON.stringify({ - question, - document_id: activeDoc?.id || null, - session_id: activeSessionId, - }), - ); - }; - - // If WS doesn't open within 800ms, treat as failure and fallback - const connectTimeout = setTimeout(() => { - try { - ws.close(); - } catch { - // ignore - } - reject(new Error("WebSocket connection timeout")); - }, 800); - - ws.onmessage = (ev) => { - clearTimeout(connectTimeout); - try { - const event = JSON.parse(ev.data); - if (event.type === "token") { - if (!assistantCreated) { - assistantCreated = true; - setIsTyping(false); - - const assistantMsg: ChatMsg = { - id: assistantId, - role: "assistant", - content: event.data as string, - sources: [], - isStreaming: true, - }; - - setMessages((prev) => [...prev, assistantMsg]); - } else { - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { ...m, content: m.content + (event.data as string) } - : m, - ), - ); - } - } else if (event.type === "sources") { - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { ...m, sources: event.data as SourceChunk[] } - : m, - ), - ); - } else if (event.type === "thought") { - // Append thoughts as a temporary assistant note (optional UI handling) - // For simplicity, add to assistant message content in brackets - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { ...m, content: m.content + `\n[thought] ${event.data}` } - : m, - ), - ); - } else if (event.type === "error") { - setIsTyping(false); - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { - ...m, - content: `Error: ${event.data}`, - isStreaming: false, - } - : m, - ), - ); - ws.close(); - reject(new Error(String(event.data))); - } else if (event.type === "done") { - setMessages((prev) => - prev.map((m) => - m.id === assistantId ? { ...m, isStreaming: false } : m, - ), - ); - ws.close(); - resolve(); - } - } catch { - // ignore malformed messages - } - }; - - ws.onerror = () => { - clearTimeout(connectTimeout); - reject(new Error("WebSocket error")); - }; - - ws.onclose = () => { - resolve(); - }; + const stream = api.streamPost("/api/v1/chat/ask/stream", { + question, + document_id: activeDoc?.id || null, + session_id: activeSessionId, + ...(topK ? { top_k: topK } : {}), }); - try { - await wsDone; - } finally { - if (onAbort) { - abortController.signal.removeEventListener("abort", onAbort); - } - } - } catch (err) { - if ( - err instanceof Error && - (err.name === "AbortError" || - err.message === "The user aborted a request.") - ) { - return; - } - // Fallback to existing SSE stream if WebSocket fails - try { - const stream = api.streamPost( - "/api/v1/chat/ask/stream", - { - question, - document_id: activeDoc?.id || null, - session_id: activeSessionId, - }, - abortController.signal, - ); - - for await (const event of stream) { - if (event.type === "token") { - if (!assistantCreated) { - assistantCreated = true; - setIsTyping(false); - - const assistantMsg: ChatMsg = { - id: assistantId, - role: "assistant", - content: event.data as string, - sources: [], - isStreaming: true, - }; - - setMessages((prev) => [...prev, assistantMsg]); - } else { - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { ...m, content: m.content + (event.data as string) } - : m, - ), - ); - } - } else if (event.type === "sources") { - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { ...m, sources: event.data as SourceChunk[] } - : m, - ), - ); - } else if (event.type === "error") { + for await (const event of stream) { + if (event.type === "token") { + // Create assistant message only when first token arrives + if (!assistantCreated) { + assistantCreated = true; setIsTyping(false); + + const assistantMsg: ChatMsg = { + id: assistantId, + role: "assistant", + content: event.data as string, + sources: [], + isStreaming: true, + created_at: new Date().toISOString(), + }; + + setMessages((prev) => [...prev, assistantMsg]); + } else { setMessages((prev) => prev.map((m) => m.id === assistantId - ? { - ...m, - content: `Error: ${event.data}`, - isStreaming: false, - } + ? { ...m, content: m.content + (event.data as string) } : m, ), ); - } else if (event.type === "done") { - setMessages((prev) => - prev.map((m) => - m.id === assistantId ? { ...m, isStreaming: false } : m, - ), - ); } - } - } catch (err2) { - setIsTyping(false); - if ( - err2 instanceof Error && - (err2.name === "AbortError" || - err2.message === "The user aborted a request.") - ) { + } else if (event.type === "sources") { + setMessages((prev) => + prev.map((m) => + m.id === assistantId + ? { ...m, sources: event.data as SourceChunk[] } + : m, + ), + ); + } else if (event.type === "error") { + setIsTyping(false); + setMessages((prev) => + prev.map((m) => + m.id === assistantId + ? { ...m, content: `Error: ${event.data}`, isStreaming: false } + : m, + ), + ); + } else if (event.type === "done") { setMessages((prev) => prev.map((m) => m.id === assistantId ? { ...m, isStreaming: false } : m, ), ); - return; } - setMessages((prev) => - prev.map((m) => - m.id === assistantId - ? { - ...m, - content: t("chat.fallbackError", { - message: - err2 instanceof Error ? err2.message : "Unknown error", - }), - isStreaming: false, - } - : m, - ), - ); } + } catch (err) { + setIsTyping(false); + setMessages((prev) => + prev.map((m) => + m.id === assistantId + ? { + ...m, + content: t("chat.fallbackError", { + message: err instanceof Error ? err.message : "Unknown error", + }), + isStreaming: false, + } + : m, + ), + ); } finally { setStreaming(false); setIsTyping(false); @@ -494,9 +307,8 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { try { await api.delete(`/api/v1/chat/history/${activeDoc.id}`); setMessages([]); - toast.info("Chat history cleared"); } catch { - // silent fail preserved; no additional toast for this scenario + //silent fail } }; @@ -514,9 +326,8 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { document.body.removeChild(a); }; - // Close export dropdown on outside click + // Close menus on outside click useEffect(() => { - if (!showExportMenu) return; const handleClickOutside = (e: MouseEvent) => { if ( exportMenuRef.current && @@ -524,10 +335,17 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { ) { setShowExportMenu(false); } + if ( + showSettingsMenu && + settingsMenuRef.current && + !settingsMenuRef.current.contains(e.target as Node) + ) { + setShowSettingsMenu(false); + } }; document.addEventListener("mousedown", handleClickOutside); return () => document.removeEventListener("mousedown", handleClickOutside); - }, [showExportMenu]); + }, [showExportMenu, showSettingsMenu]); // Cleanup speech recognition on unmount useEffect(() => { @@ -650,13 +468,16 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { } }; - const handleExportMenuKeyDown = (e: React.KeyboardEvent) => { + const handleMenuKeyDown = ( + e: React.KeyboardEvent, + menuType: "export" | "settings", + ) => { if (e.key === "Escape") { - setShowExportMenu(false); + if (menuType === "export") setShowExportMenu(false); + if (menuType === "settings") setShowSettingsMenu(false); } }; - // ── NEW KEYBOARD SHORTCUTS ENGINE EFFECT ────────────────────────── // ── KEYBOARD SHORTCUTS ENGINE EFFECT ────────────────────────── useEffect(() => { const handleGlobalKeyDown = (e: KeyboardEvent) => { @@ -676,13 +497,17 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { // Shortcut 2: Escape → Abort SSE stream OR clear input OR close modal if (e.key === "Escape") { - if (streaming) { + if (streaming && abortControllerRef.current) { e.preventDefault(); - handleStop(); + abortControllerRef.current.abort(); + setStreaming(false); + setIsTyping(false); toast.info("Response cancelled"); } else if (document.activeElement === textareaRef.current) { e.preventDefault(); setInput(""); + } else if (showHelpModal) { + setShowHelpModal(false); } else if (showExportMenu) { setShowExportMenu(false); } @@ -694,6 +519,12 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { textareaRef.current?.focus(); } + // Shortcut 4: Ctrl/Cmd + / → Toggle shortcuts help modal + if (isCmdOrCtrl && e.key === "/") { + e.preventDefault(); + setShowHelpModal((prev) => !prev); + } + // Shortcut 5: Ctrl/Cmd + Shift + C → Clear chat history if (isCmdOrCtrl && e.shiftKey && (e.key === "c" || e.key === "C")) { e.preventDefault(); @@ -721,15 +552,12 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) { return () => { window.removeEventListener("keydown", handleGlobalKeyDown); }; - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [input, streaming, showExportMenu, messages]); // Dependencies updated to capture fresh state data + }, [input, streaming, showHelpModal, showExportMenu, messages]); // Dependencies updated to capture fresh state data return ( -
+
{/* ── Chat Messages ──────────────────────────── */}
@@ -805,23 +633,8 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) {
- {/* Scroll to bottom button */} - - {/* ── Input Area ─────────────────────────────── */} -
+
{/* Status / Error Message Area */} {(isRecording || speechError) && ( @@ -869,7 +682,6 @@ export default function ChatPanel({ activeDoc, onCitationClick }: Props) {