diff --git a/client/marker_api_client/__init__.py b/client/marker_api_client/__init__.py index 6cb2e5c..4f194ac 100644 --- a/client/marker_api_client/__init__.py +++ b/client/marker_api_client/__init__.py @@ -65,7 +65,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def check_health(self): logger.info("Checking server health...") - response = self.session.get(f"{self.base_url}/health") + response = self.session.get(f"{self.base_url}/healthz") response.raise_for_status() health_data = HealthResponse(**response.json()) self.server_type = health_data.type @@ -74,7 +74,7 @@ def check_health(self): async def acheck_health(self): logger.info("Checking server health asynchronously...") - async with self.async_session.get(f"{self.base_url}/health") as response: + async with self.async_session.get(f"{self.base_url}/healthz") as response: response.raise_for_status() health_data = HealthResponse(**(await response.json())) self.server_type = health_data.type diff --git a/distributed_server.py b/distributed_server.py index 07ff93b..90bd63d 100644 --- a/distributed_server.py +++ b/distributed_server.py @@ -1,7 +1,7 @@ import argparse import uvicorn import logging -from fastapi import FastAPI, UploadFile, File +from fastapi import FastAPI, UploadFile, File, Form, HTTPException from celery.exceptions import TimeoutError from fastapi.middleware.cors import CORSMiddleware from marker_api.celery_worker import celery_app @@ -14,8 +14,8 @@ celery_batch_convert, celery_batch_result, ) -import gradio as gr -from marker_api.demo import demo_ui +# import gradio as gr +# from marker_api.demo import demo_ui from marker_api.model.schema import ( BatchConversionResponse, BatchResultResponse, @@ -24,15 +24,21 @@ ConversionResponse, HealthResponse, ServerType, + PDFConversionResult, ) from typing import List +import os # Initialize logging configure_logging() logger = logging.getLogger(__name__) # Global variable to hold model list -app = FastAPI() +app = FastAPI( + title="Marker API Distributed Server", + description="Distributed API for converting documents (PDF, DOCX, PPTX, XLSX, HTML, EPUB) to Markdown using marker", + version="1.7.5" +) logger.info("Configuring CORS middleware") app.add_middleware( @@ -44,7 +50,7 @@ ) -@app.get("/health", response_model=HealthResponse) +@app.get("/healthz", response_model=HealthResponse) def server(): """ Root endpoint to check server status. @@ -58,19 +64,48 @@ def server(): message="Welcome to Marker-api", type=server_type, workers=worker_count if server_type == ServerType.distributed else None, + supported_formats=["PDF", "DOCX", "PPTX", "XLSX", "HTML", "EPUB"] ) def is_celery_alive() -> bool: logger.debug("Checking if Celery is alive") - try: - result = celery_app.send_task("celery.ping") - result.get(timeout=3) - logger.info("Celery is alive") - return True - except (TimeoutError, Exception) as e: - logger.warning(f"Celery is not responding: {str(e)}") - return False + max_retries = 3 + timeout = 10 # seconds + + for attempt in range(max_retries): + try: + result = celery_app.send_task("celery.ping") + result.get(timeout=timeout) + logger.info("Celery is alive") + return True + except (TimeoutError, Exception) as e: + logger.warning(f"Celery connection attempt {attempt + 1}/{max_retries} failed: {str(e)}") + if attempt < max_retries - 1: + import time + time.sleep(2) # Wait 2 seconds before retrying + + logger.error("All Celery connection attempts failed") + return False + + +def validate_file(file: UploadFile) -> str: + """Validate uploaded file and return file extension""" + # Check file size (limit to 50MB) + if file.size and file.size > 50 * 1024 * 1024: + raise HTTPException(status_code=413, detail="File too large. Maximum size is 50MB.") + + # Check file type + allowed_extensions = {'.pdf', '.docx', '.pptx', '.xlsx', '.html', '.epub'} + file_extension = os.path.splitext(file.filename)[1].lower() if file.filename else '' + + if file_extension not in allowed_extensions: + raise HTTPException( + status_code=400, + detail=f"Unsupported file type: {file_extension}. Supported types: {', '.join(allowed_extensions)}" + ) + + return file_extension def setup_routes(app: FastAPI, celery_live: bool): @@ -79,20 +114,39 @@ def setup_routes(app: FastAPI, celery_live: bool): logger.info("Adding Celery routes") @app.post("/convert", response_model=ConversionResponse) - async def convert_pdf(pdf_file: UploadFile = File(...)): - return await celery_convert_pdf_concurrent_await(pdf_file) + async def convert_document( + file: UploadFile = File(...), + extract_images: bool = Form(True), + paginate: bool = Form(True) + ): + """Convert a document (PDF, DOCX, PPTX, XLSX, HTML, EPUB) to markdown""" + validate_file(file) + return await celery_convert_pdf_concurrent_await(file, extract_images, paginate) @app.post("/celery/convert", response_model=CeleryTaskResponse) - async def celery_convert(pdf_file: UploadFile = File(...)): - return await celery_convert_pdf(pdf_file) + async def celery_convert_document( + file: UploadFile = File(...), + extract_images: bool = Form(False), + paginate: bool = Form(True) + ): + """Submit a document conversion task to Celery queue""" + validate_file(file) + return await celery_convert_pdf(file, extract_images, paginate) @app.get("/celery/result/{task_id}", response_model=CeleryResultResponse) async def get_celery_result(task_id: str): return await celery_result(task_id) @app.post("/batch_convert", response_model=BatchConversionResponse) - async def batch_convert(pdf_files: List[UploadFile] = File(...)): - return await celery_batch_convert(pdf_files) + async def batch_convert( + files: List[UploadFile] = File(...), + extract_images: bool = Form(False), + paginate: bool = Form(True) + ): + """Convert multiple documents to markdown""" + for file in files: + validate_file(file) + return await celery_batch_convert(files, extract_images, paginate) @app.get("/batch_convert/result/{task_id}", response_model=BatchResultResponse) async def get_batch_result(task_id: str): @@ -101,7 +155,7 @@ async def get_batch_result(task_id: str): logger.info("Adding real-time conversion route") else: logger.warning("Celery routes not added as Celery is not alive") - app = gr.mount_gradio_app(app, demo_ui, path="") + # app = gr.mount_gradio_app(app, demo_ui, path="") def parse_args(): diff --git a/docker-compose.cpu.distributed.yml b/docker-compose.cpu.distributed.yml new file mode 100644 index 0000000..ad87510 --- /dev/null +++ b/docker-compose.cpu.distributed.yml @@ -0,0 +1,71 @@ +version: "3.8" + +services: + base: + image: marker-api-cpu-base:latest + build: + context: . + dockerfile: docker/Dockerfile.cpu.base + + celery_worker: + build: + context: . # Keep the build context as the root directory + dockerfile: docker/Dockerfile.cpu.distributed-server + image: marker-api-distributed-cpu + command: celery -A marker_api.celery_worker.celery_app worker --pool=solo -n worker@%h --loglevel=info + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + environment: + - REDIS_HOST=${REDIS_HOST} + links: + - redis + depends_on: + - redis + - base + + app: + container_name: marker-api-cpu-distributed + image: marker-api-distributed-cpu + command: python distributed_server.py --host 0.0.0.0 --port 8080 + environment: + - ENV=production + ports: + - "8086:8080" + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + depends_on: + - redis + - base + + redis: + container_name: redis + image: redis:7.2.4-alpine + ports: + - "6379:6379" + + + flower: + container_name: flower_cpu + image: marker-api-distributed-cpu + command: celery -A marker_api.celery_worker.celery_app flower --port=5555 + ports: + - 5556:5555 + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + environment: + - REDIS_HOST=${REDIS_HOST} + depends_on: + - app + - redis + - celery_worker + - base + +volumes: + model_cache: + +networks: + default: + name: marker-api-network \ No newline at end of file diff --git a/docker-compose.gpu.distributed.yml b/docker-compose.gpu.distributed.yml new file mode 100644 index 0000000..6fdb2dd --- /dev/null +++ b/docker-compose.gpu.distributed.yml @@ -0,0 +1,90 @@ +version: "3.8" + +services: + base: + image: marker-api-gpu-base:latest + build: + context: . + dockerfile: docker/Dockerfile.gpu.base + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] + + celery_worker: + build: + context: . # Keep the build context as the root directory + dockerfile: docker/Dockerfile.gpu.distributed-server # Specify the new path to the GPU Dockerfile + command: celery -A marker_api.celery_worker.celery_app worker --pool=solo -n worker@%h --loglevel=info + image: marker-api-distributed-gpu + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + depends_on: + - redis + - base + environment: + - REDIS_HOST=${REDIS_HOST} + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] # Request GPU support + + app: + container_name: marker-api-gpu-distributed + image: marker-api-distributed-gpu + command: python distributed_server.py --host 0.0.0.0 --port 8080 + environment: + - ENV=production + ports: + - "8080:8080" + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + depends_on: + - redis + - celery_worker + - base + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] # Request GPU support + + redis: + container_name: redis + image: redis:7.2.4-alpine + ports: + - "6379:6379" + + flower: + container_name: flower_gpu + image: marker-api-distributed-gpu + + command: celery -A marker_api.celery_worker.celery_app flower --port=5555 + ports: + - 5556:5555 + volumes: + - .:/app + - model_cache:/home/marker/.cache/huggingface + environment: + - REDIS_HOST=${REDIS_HOST} + depends_on: + - app + - redis + - celery_worker + - base + deploy: + resources: + reservations: + devices: + - capabilities: [gpu] # Request GPU support + +volumes: + model_cache: + +networks: + default: + name: marker-api-network \ No newline at end of file diff --git a/docker/Dockerfile.cpu.base b/docker/Dockerfile.cpu.base new file mode 100644 index 0000000..84903ef --- /dev/null +++ b/docker/Dockerfile.cpu.base @@ -0,0 +1,51 @@ +# Use a base image with Python +FROM python:3.11-slim + +# Install system dependencies required for libGL and DOCX conversion +RUN apt-get update && apt-get install -y \ + libgl1-mesa-glx \ + libglib2.0-0 \ + libglib2.0-dev \ + libcairo2-dev \ + libpango1.0-dev \ + libgdk-pixbuf2.0-dev \ + libffi-dev \ + shared-mime-info \ + libpango-1.0-0 \ + libharfbuzz0b \ + libpangoft2-1.0-0 \ + libfontconfig1 \ + libcairo2 \ + libgdk-pixbuf2.0-0 \ + fonts-liberation \ + build-essential \ + pkg-config \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Create a non-root user +RUN useradd -m -u 1000 marker + +# Create font directory for marker user +RUN mkdir -p /home/marker/.local/share/fonts && \ + chown -R marker:marker /home/marker/.local + +# Set the working directory +WORKDIR /app + +# Copy the entire project +COPY . . + +# Install Python dependencies with distributed extras +RUN pip install -e ".[distributed]" + +USER marker + +# Set font path for marker user +ENV FONT_PATH=/home/marker/.local/share/fonts/font.ttf + +# Pre-load models and save them to a shared location +RUN python -c 'from marker.models import create_model_dict; create_model_dict()' + +# The models will be cached in /home/marker/.cache/huggingface +# This directory will be used as a volume in the distributed setup \ No newline at end of file diff --git a/docker/Dockerfile.cpu.distributed-server b/docker/Dockerfile.cpu.distributed-server index 09e94ed..9b872d1 100644 --- a/docker/Dockerfile.cpu.distributed-server +++ b/docker/Dockerfile.cpu.distributed-server @@ -1,21 +1,9 @@ -# Use a base image with Python -FROM python:3.11-slim +# Use the base image that has models pre-loaded +FROM marker-api-cpu-base:latest -# Install system dependencies required for libGL -RUN apt-get update && apt-get install -y \ - libgl1-mesa-glx \ - libglib2.0-0 \ - && rm -rf /var/lib/apt/lists/* +# Copy the rest of the project files +COPY . . -# Set the working directory -WORKDIR /app - -# Copy the project files into the Docker image -COPY ../ ./ - -# Install Python dependencies -RUN pip install -e . - -RUN python -c 'from marker.models import load_all_models; load_all_models()' - -EXPOSE 8080 +# No need to install dependencies or pre-load models as they're in the base image +# Just expose the port +EXPOSE 8080 \ No newline at end of file diff --git a/docker/Dockerfile.cpu.server b/docker/Dockerfile.cpu.server index 879eb04..605c187 100644 --- a/docker/Dockerfile.cpu.server +++ b/docker/Dockerfile.cpu.server @@ -1,23 +1,43 @@ # Use a base image with Python FROM python:3.11-slim -# Install system dependencies required for libGL +# Install system dependencies required for libGL and DOCX conversion RUN apt-get update && apt-get install -y \ libgl1-mesa-glx \ libglib2.0-0 \ + libglib2.0-dev \ + libcairo2-dev \ + libpango1.0-dev \ + libgdk-pixbuf2.0-dev \ + libffi-dev \ + shared-mime-info \ + libpango-1.0-0 \ + libharfbuzz0b \ + libpangoft2-1.0-0 \ + libfontconfig1 \ + libcairo2 \ + libgdk-pixbuf2.0-0 \ + fonts-liberation \ + build-essential \ + pkg-config \ + gcc \ && rm -rf /var/lib/apt/lists/* # Set the working directory WORKDIR /app # Copy the project files into the Docker image -COPY ../ ./ +COPY . ./ # Install Python dependencies RUN pip install -e . -RUN python -c 'from marker.models import load_all_models; load_all_models()' +# Pre-load models to speed up container startup +# RUN python -c 'from marker.models import load_all_models; load_all_models()' +RUN python -c 'from marker.models import create_model_dict; create_model_dict()' +# Expose the port EXPOSE 8080 -CMD ["python", "server.py", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file +# Set the default command +CMD ["python", "server.py", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/docker/Dockerfile.gpu.base b/docker/Dockerfile.gpu.base new file mode 100644 index 0000000..8a92015 --- /dev/null +++ b/docker/Dockerfile.gpu.base @@ -0,0 +1,64 @@ +ARG CUDA_VERSION="11.8.0" +ARG CUDNN_VERSION="8" +ARG UBUNTU_VERSION="22.04" +ARG MAX_JOBS=4 + +FROM nvidia/cuda:$CUDA_VERSION-cudnn$CUDNN_VERSION-devel-ubuntu$UBUNTU_VERSION + +# ARG PYTHON_VERSION="3.11" +# ARG PYTORCH_VERSION="2.1.2" +# ARG CUDA="118" +# ARG TORCH_CUDA_ARCH_LIST="7.0 7.5 8.0 8.6 9.0+PTX" + +# ENV PYTHON_VERSION=$PYTHON_VERSION +# ENV TORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST + +# RUN apt-get update \ +# && apt-get install -y wget git build-essential ninja-build git-lfs libaio-dev && rm -rf /var/lib/apt/lists/* \ +# && wget \ +# https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ +# && mkdir /root/.conda \ +# && bash Miniconda3-latest-Linux-x86_64.sh -b \ +# && rm -f Miniconda3-latest-Linux-x86_64.sh \ +# && conda create -n "py${PYTHON_VERSION}" python="${PYTHON_VERSION}" + +# ENV PATH="/root/miniconda3/envs/py${PYTHON_VERSION}/bin:${PATH}" + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + wget \ + curl \ + unzip \ + git \ + python3 \ + python3-pip \ + libgl1 \ + libglib2.0-0 \ + curl \ + gnupg2 \ + ca-certificates \ + apt-transport-https \ + software-properties-common \ + libreoffice \ + ffmpeg \ + git-lfs \ + xvfb \ + && ln -s /usr/bin/python3 /usr/bin/python \ + && apt-get update \ + && apt install python3-packaging \ + && rm -rf /var/lib/apt/lists/* + +RUN pip3 install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 + +WORKDIR /app + +# Copy the project files into the Docker image +COPY ../ ./ + +# Install Python dependencies +RUN pip3 install --no-cache-dir -e ".[distributed]" + +# RUN python -c 'from marker.models import load_all_models; load_all_models()' +RUN python -c 'from marker.models import create_model_dict; create_model_dict()' + +EXPOSE 8080 diff --git a/docker/Dockerfile.gpu.distributed-server b/docker/Dockerfile.gpu.distributed-server index 403ece8..a8b94ad 100644 --- a/docker/Dockerfile.gpu.distributed-server +++ b/docker/Dockerfile.gpu.distributed-server @@ -1,63 +1,7 @@ -ARG CUDA_VERSION="11.8.0" -ARG CUDNN_VERSION="8" -ARG UBUNTU_VERSION="22.04" -ARG MAX_JOBS=4 - -FROM nvidia/cuda:$CUDA_VERSION-cudnn$CUDNN_VERSION-devel-ubuntu$UBUNTU_VERSION - -# ARG PYTHON_VERSION="3.11" -# ARG PYTORCH_VERSION="2.1.2" -# ARG CUDA="118" -# ARG TORCH_CUDA_ARCH_LIST="7.0 7.5 8.0 8.6 9.0+PTX" - -# ENV PYTHON_VERSION=$PYTHON_VERSION -# ENV TORCH_CUDA_ARCH_LIST=$TORCH_CUDA_ARCH_LIST - -# RUN apt-get update \ -# && apt-get install -y wget git build-essential ninja-build git-lfs libaio-dev && rm -rf /var/lib/apt/lists/* \ -# && wget \ -# https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ -# && mkdir /root/.conda \ -# && bash Miniconda3-latest-Linux-x86_64.sh -b \ -# && rm -f Miniconda3-latest-Linux-x86_64.sh \ -# && conda create -n "py${PYTHON_VERSION}" python="${PYTHON_VERSION}" - -# ENV PATH="/root/miniconda3/envs/py${PYTHON_VERSION}/bin:${PATH}" - -RUN apt-get update && \ - apt-get install -y --no-install-recommends \ - wget \ - curl \ - unzip \ - git \ - python3 \ - python3-pip \ - libgl1 \ - libglib2.0-0 \ - curl \ - gnupg2 \ - ca-certificates \ - apt-transport-https \ - software-properties-common \ - libreoffice \ - ffmpeg \ - git-lfs \ - xvfb \ - && ln -s /usr/bin/python3 /usr/bin/python \ - && apt-get update \ - && apt install python3-packaging \ - && rm -rf /var/lib/apt/lists/* - -RUN pip3 install --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 - -WORKDIR /app +# Use the GPU base image +FROM marker-api-base-gpu:latest # Copy the project files into the Docker image -COPY ../ ./ - -# Install Python dependencies -RUN pip3 install --no-cache-dir -e . - -RUN python -c 'from marker.models import load_all_models; load_all_models()' +COPY . ./ EXPOSE 8080 diff --git a/docker/Dockerfile.gpu.server b/docker/Dockerfile.gpu.server index d63d9ea..d0873ff 100644 --- a/docker/Dockerfile.gpu.server +++ b/docker/Dockerfile.gpu.server @@ -58,7 +58,8 @@ COPY ../ ./ # Install Python dependencies RUN pip3 install --no-cache-dir -e . -RUN python -c 'from marker.models import load_all_models; load_all_models()' +# RUN python -c 'from marker.models import load_all_models; load_all_models()' +RUN python -c 'from marker.models import create_model_dict; create_model_dict()' EXPOSE 8080 diff --git a/marker_api/celery_routes.py b/marker_api/celery_routes.py index b26cb06..d2fd429 100644 --- a/marker_api/celery_routes.py +++ b/marker_api/celery_routes.py @@ -9,9 +9,9 @@ logger = logging.getLogger(__name__) -async def celery_convert_pdf(pdf_file: UploadFile = File(...)): +async def celery_convert_pdf(pdf_file: UploadFile = File(...), extract_images: bool = False, paginate: bool = True): contents = await pdf_file.read() - task_id = convert_pdf_to_markdown.delay(pdf_file.filename, contents) + task_id = convert_pdf_to_markdown.delay(pdf_file.filename, contents, extract_images, paginate) return {"task_id": str(task_id), "status": "Processing"} @@ -29,18 +29,18 @@ async def celery_offline_root(): return {"message": "Celery is offline. No API is available."} -async def celery_convert_pdf_sync(pdf_file: UploadFile = File(...)): +async def celery_convert_pdf_sync(pdf_file: UploadFile = File(...), extract_images: bool = False, paginate: bool = True): contents = await pdf_file.read() - task = convert_pdf_to_markdown.delay(pdf_file.filename, contents) + task = convert_pdf_to_markdown.delay(pdf_file.filename, contents, extract_images, paginate) result = task.get(timeout=600) # 10-minute timeout return {"status": "Success", "result": result} -async def celery_convert_pdf_concurrent_await(pdf_file: UploadFile = File(...)): +async def celery_convert_pdf_concurrent_await(pdf_file: UploadFile = File(...), extract_images: bool = False, paginate: bool = True): contents = await pdf_file.read() # Start the Celery task - task = convert_pdf_to_markdown.delay(pdf_file.filename, contents) + task = convert_pdf_to_markdown.delay(pdf_file.filename, contents, extract_images, paginate) # Define an asynchronous function to check task status async def check_task_status(): @@ -62,56 +62,11 @@ async def check_task_status(): ) -# async def celery_batch_convert(pdf_files: List[UploadFile] = File(...)): -# batch_data = [] -# for pdf_file in pdf_files: -# contents = await pdf_file.read() -# batch_data.append((pdf_file.filename, contents)) - -# # Start a single task to process the entire batch -# task = process_batch.delay(batch_data) - -# return { -# "task_id": str(task.id), -# "status": "Processing", -# } - - -# async def celery_batch_result(task_id: str): -# task = AsyncResult(task_id) - -# if not task.ready(): -# return JSONResponse( -# status_code=202, -# content={ -# "task_id": str(task_id), -# "status": "Processing", -# }, -# ) - -# try: -# results = task.get() -# return JSONResponse( -# status_code=200, -# content={"task_id": task_id, "status": "Success", "results": results}, -# ) -# except Exception as e: -# logger.error(f"Error retrieving results for task {task_id}: {str(e)}") -# return JSONResponse( -# status_code=500, -# content={ -# "task_id": task_id, -# "status": "Error", -# "message": "An error occurred while retrieving the results", -# }, -# ) - - -async def celery_batch_convert(pdf_files: List[UploadFile] = File(...)): +async def celery_batch_convert(pdf_files: List[UploadFile] = File(...), extract_images: bool = False, paginate: bool = True): batch_data = [] for pdf_file in pdf_files: contents = await pdf_file.read() - batch_data.append((pdf_file.filename, contents)) + batch_data.append((pdf_file.filename, contents, extract_images, paginate)) # Start a single task to process the entire batch task = process_batch.delay(batch_data) diff --git a/marker_api/celery_tasks.py b/marker_api/celery_tasks.py index 3658ef7..a596c3a 100644 --- a/marker_api/celery_tasks.py +++ b/marker_api/celery_tasks.py @@ -1,87 +1,119 @@ from celery import Task from marker_api.celery_worker import celery_app -from marker.convert import convert_single_pdf -from marker.models import load_all_models +from marker.converters.pdf import PdfConverter +from marker.models import create_model_dict +from marker.output import text_from_rendered +from marker.logger import configure_logging import io +import os +import time +import base64 +import tempfile +from PIL import Image import logging from marker_api.utils import process_image_to_base64 from celery.signals import worker_process_init +from marker_api.routes import process_document_file +# Initialize logging +configure_logging() logger = logging.getLogger(__name__) -model_list = None - +converter = None @worker_process_init.connect def initialize_models(**kwargs): - global model_list - if not model_list: - model_list = load_all_models() - print("Models loaded at worker startup") - + global converter + if not converter: + converter = PdfConverter( + artifact_dict=create_model_dict(), + ) + logger.debug("Marker converter initialized at worker startup") -class PDFConversionTask(Task): - abstract = True - def __init__(self): - super().__init__() +class DocumentConversionTask(Task): + _model_dict = None - def __call__(self, *args, **kwargs): - # Use the global model_list initialized at worker startup - return self.run(*args, **kwargs) + @property + def model_dict(self): + if self._model_dict is None: + self._model_dict = create_model_dict() + return self._model_dict @celery_app.task( - ignore_result=False, bind=True, base=PDFConversionTask, name="convert_pdf" + ignore_result=False, bind=True, base=DocumentConversionTask, name="convert_document" ) -def convert_pdf_to_markdown(self, filename, pdf_content): - pdf_file = io.BytesIO(pdf_content) - markdown_text, images, metadata = convert_single_pdf(pdf_file, model_list) - image_data = {} - for i, (img_filename, image) in enumerate(images.items()): - logger.debug(f"Processing image {img_filename}") - image_base64 = process_image_to_base64(image, img_filename) - image_data[img_filename] = image_base64 - - return { - "filename": filename, - "markdown": markdown_text, - "metadata": metadata, - "images": image_data, - "status": "ok", - } - - -# @celery_app.task( -# ignore_result=False, bind=True, base=PDFConversionTask, name="process_batch" -# ) -# def process_batch(self, batch_data): -# results = [] -# for filename, pdf_content in batch_data: -# try: -# result = convert_pdf_to_markdown(filename, pdf_content) -# results.append(result) -# except Exception as e: -# logger.error(f"Error processing {filename}: {str(e)}") -# results.append({"filename": filename, "status": "Error", "error": str(e)}) -# return results +def convert_pdf_to_markdown(self, filename, file_content, extract_images: bool = True, paginate: bool = True): + """ + Convert a PDF file to markdown. + + Args: + filename (str): The original filename + file_content (bytes): The PDF file content + extract_images (bool): Whether to extract images + paginate (bool): Whether to split output into pages + + Returns: + dict: Processing results matching PDFConversionResult schema + """ + return process_document_file( + file_content, + filename=filename, + model_dict=self.model_dict, + extract_images=extract_images, + paginate=paginate + ) @celery_app.task( - ignore_result=False, bind=True, base=PDFConversionTask, name="process_batch" + ignore_result=False, bind=True, base=DocumentConversionTask, name="process_batch" ) def process_batch(self, batch_data): + """ + Process multiple documents in batch. + + Args: + batch_data (list): List of tuples (filename, content, extract_images, paginate) + + Returns: + list: List of processing results + """ results = [] - total = len(batch_data) - for i, (filename, pdf_content) in enumerate(batch_data, start=1): + total_files = len(batch_data) + + for i, (filename, content, extract_images, paginate) in enumerate(batch_data, 1): try: - result = convert_pdf_to_markdown(filename, pdf_content) + # Update progress + self.update_state( + state="PROGRESS", + meta={ + "current": i, + "total": total_files, + "status": f"Processing {filename}" + } + ) + + # Process the document + result = process_document_file( + content, + filename=filename, + model_dict=self.model_dict, + extract_images=extract_images, + paginate=paginate + ) results.append(result) + except Exception as e: logger.error(f"Error processing {filename}: {str(e)}") - results.append({"filename": filename, "status": "Error", "error": str(e)}) - - # Update progress - self.update_state(state="PROGRESS", meta={"current": i, "total": total}) - - return results + results.append({ + "filename": filename, + "markdown": "" if not paginate else [], + "metadata": { + "custom_metadata": {"error": str(e)}, + }, + "images": {}, + "status": "error" + }) + + return results \ No newline at end of file diff --git a/marker_api/celery_worker.py b/marker_api/celery_worker.py index d0f30d3..7e0fe49 100644 --- a/marker_api/celery_worker.py +++ b/marker_api/celery_worker.py @@ -1,21 +1,33 @@ import os from celery import Celery from dotenv import load_dotenv -import multiprocessing +import logging -multiprocessing.set_start_method("spawn") +# Configure logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) load_dotenv(".env") +# Get Redis host from environment or use default +redis_url = os.environ.get("REDIS_HOST", "redis://localhost:6379/0") +logger.info(f"Connecting to Redis at: {redis_url}") + celery_app = Celery( "celery_app", - broker=os.environ.get("REDIS_HOST", "redis://localhost:6379/0"), - backend=os.environ.get("REDIS_HOST", "redis://localhost:6379/0"), + broker=redis_url, + backend=redis_url, include=["marker_api.celery_tasks"], ) +# Add some debug configuration +celery_app.conf.update( + task_track_started=True, + worker_send_task_events=True, + task_send_sent_event=True, +) @celery_app.task(name="celery.ping") def ping(): - print("Ping task received!") # or use a logger + logger.info("Ping task received!") return "pong" diff --git a/marker_api/model/schema.py b/marker_api/model/schema.py index d090515..8a158d4 100644 --- a/marker_api/model/schema.py +++ b/marker_api/model/schema.py @@ -41,14 +41,15 @@ class GeneralMetadata(BaseModel): toc: Optional[List[Dict[str, Any]]] = None pages: Optional[int] = None custom_metadata: Dict[str, Any] = Field(default_factory=dict) + converted_pdf_base64: Optional[str] = None # Base64 encoded PDF content class PDFConversionResult(BaseModel): filename: str - markdown: str + markdown: str # Can be either a single string or a list of strings for paginated output metadata: GeneralMetadata - images: Dict[str, str] - status: str + images: Dict[str, str] = Field(default_factory=dict) + status: str = "success" class ConversionResponse(BaseModel): @@ -68,8 +69,7 @@ class CeleryResultResponse(BaseModel): class BatchConversionResponse(BaseModel): - task_id: str - status: str + results: List[PDFConversionResult] class BatchResultResponse(BaseModel): @@ -79,3 +79,14 @@ class BatchResultResponse(BaseModel): completed: Optional[int] = None total: Optional[int] = None progress: Optional[str] = None + + +class TaskResponse(BaseModel): + task_id: str + status: str + + +class TaskStatusResponse(BaseModel): + task_id: str + status: str + result: Optional[Union[PDFConversionResult, str, Dict[str, Any]]] = None diff --git a/marker_api/routes.py b/marker_api/routes.py index 8f3b66a..c829572 100644 --- a/marker_api/routes.py +++ b/marker_api/routes.py @@ -1,78 +1,243 @@ import os import time import base64 -from marker.convert import convert_single_pdf +import tempfile +import io +import subprocess +from PIL import Image +from marker.converters.pdf import PdfConverter +from marker.models import create_model_dict +from marker.output import text_from_rendered from marker.logger import configure_logging import logging +from marker_api.utils import process_image_to_base64 +from marker.config.parser import ConfigParser # Initialize logging configure_logging() logger = logging.getLogger(__name__) +SUPPORTED_EXTENSIONS = ['.pdf', '.docx', '.doc'] # , '.pptx', '.xlsx', '.html', '.epub'] -# Function to parse PDF and return markdown, metadata, and image data -def parse_pdf_and_return_markdown(pdf_file: bytes, extract_images: bool, model_list): +def convert_docx_to_pdf(docx_path: str, pdf_path: str, original_filename: str = None) -> None: """ - Function to parse a PDF and extract text and images. - + Convert DOCX to PDF using LibreOffice. + Args: - pdf_file (bytes): The content of the PDF file. - extract_images (bool): Whether to extract images or not. - - Returns - tuple: A tuple containing the full text, metadata, and image data (if extracted). + docx_path (str): Path to input DOCX file + pdf_path (str): Path where to save the PDF file + original_filename (str): Original filename for logging purposes """ - logger.debug("Parsing PDF file") - full_text, images, out_meta = convert_single_pdf(pdf_file, model_list) - logger.debug(f"Images extracted: {list(images.keys())}") - image_data = {} - if extract_images: - for i, (filename, image) in enumerate(images.items()): - logger.debug(f"Processing image {filename}") - - # Save image as PNG - image.save(filename, "PNG") + try: + # Convert to absolute paths + docx_path = os.path.abspath(docx_path) + pdf_path = os.path.abspath(pdf_path) + + # Create output directory if it doesn't exist + os.makedirs(os.path.dirname(pdf_path), exist_ok=True) + + # Use LibreOffice to convert DOCX to PDF + # --headless: run without UI + # --convert-to pdf: convert to PDF format + # --outdir: specify output directory + cmd = [ + 'libreoffice', + '--headless', + '--convert-to', 'pdf', # Use the Writer PDF export filter + '--outdir', os.path.dirname(pdf_path), + docx_path + ] + + display_name = original_filename or os.path.basename(docx_path) + logger.info(f"Converting DOCX to PDF: {display_name}") + process = subprocess.run(cmd, capture_output=True, text=True) + + if process.returncode != 0: + raise Exception(f"LibreOffice conversion failed: {process.stderr}") + + # LibreOffice creates the PDF with the same name as input but .pdf extension + # Move it to the desired location if different + generated_pdf = os.path.join( + os.path.dirname(pdf_path), + os.path.splitext(os.path.basename(docx_path))[0] + '.pdf' + ) + if generated_pdf != pdf_path and os.path.exists(generated_pdf): + os.rename(generated_pdf, pdf_path) + + except subprocess.CalledProcessError as e: + raise Exception(f"Failed to convert DOCX to PDF: {str(e)}") + except Exception as e: + raise Exception(f"Error during DOCX to PDF conversion: {str(e)}") - # Read the saved image file as bytes - with open(filename, "rb") as f: - image_bytes = f.read() - - # Convert image to base64 - image_base64 = base64.b64encode(image_bytes).decode("utf-8") - image_data[f"{filename}"] = image_base64 +def parse_document_and_return_markdown(file_bytes: bytes, filename: str, extract_images: bool, model_dict, paginate: bool = True): + """ + Function to parse a document (PDF, DOCX, etc.) and extract text and images. - # Remove the temporary image file - os.remove(filename) + Args: + file_bytes (bytes): The content of the document file. + filename (str): Original filename to determine file type + extract_images (bool): Whether to extract images or not. + model_dict: The model dictionary from create_model_dict() + paginate (bool): Whether to paginate the output based on original document pages. - return full_text, out_meta, image_data + Returns: + tuple: A tuple containing the full text, metadata, image data, and converted PDF content. + """ + try: + start_time = time.time() + + # Create a temporary file with the correct extension + file_extension = os.path.splitext(filename)[1].lower() if filename else '.pdf' + + with tempfile.NamedTemporaryFile(suffix=file_extension, delete=False) as temp_file: + temp_file.write(file_bytes) + temp_file_path = temp_file.name + + original_temp_path = temp_file_path # Keep track of original file + try: + # If the file is a DOCX, convert it to DF first + if file_extension == '.docx' or file_extension == '.doc': + pdf_temp_file = tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) + pdf_temp_path = pdf_temp_file.name + pdf_temp_file.close() + + try: + convert_docx_to_pdf(temp_file_path, pdf_temp_path, filename) + temp_file_path = pdf_temp_path # Use the converted PDF + except Exception as e: + raise Exception(f"Failed to convert DOCX to PDF: {str(e)}") + + # Create converter with model dictionary and pagination config + config = { + "paginate_output": paginate, # Add page breaks + # "format_lines": True, # Better formatting + # "force_ocr": False, # Don't force OCR for DOCX + # "strip_existing_ocr": False, # Keep existing text + # "disable_image_extraction": True, # Keep images + } + config_parser = ConfigParser(config) + # Initialize the converter with config + converter = PdfConverter( + artifact_dict=model_dict, + config=config_parser.generate_config_dict() + ) + + # Convert document using the new API - pass the file path directly + rendered = converter(temp_file_path) + + # Extract text and metadata from rendered output + full_text, _, images = text_from_rendered(rendered) + + end_time = time.time() + processing_time = end_time - start_time + + display_name = filename or "document" + logger.info(f"Document processing completed for '{display_name}' in {processing_time:.2f} seconds") + if isinstance(full_text, list): + logger.info(f"Extracted {len(full_text)} pages from '{display_name}'") + logger.info(f"Total text length: {sum(len(page) for page in full_text)} characters") + else: + logger.info(f"Extracted text length: {len(full_text)} characters from '{display_name}'") + logger.info(f"Images extracted from '{display_name}': {len(images) if images else 0}") + + # Process images if extraction is enabled + image_data = {} + if extract_images and images: + for idx, img in enumerate(images): + image_data[f"image_{idx}"] = process_image_to_base64(img) + + # Get the converted PDF content + converted_pdf_content = None + if hasattr(rendered, 'pdf_bytes') and rendered.pdf_bytes: + converted_pdf_content = base64.b64encode(rendered.pdf_bytes).decode('utf-8') + elif os.path.exists(temp_file_path): + # If pdf_bytes not available, read from the temporary file + with open(temp_file_path, 'rb') as f: + converted_pdf_content = base64.b64encode(f.read()).decode('utf-8') + + return full_text, image_data, processing_time, converted_pdf_content + + finally: + # Clean up the temporary files + for path in [original_temp_path, temp_file_path]: + try: + if path and os.path.exists(path): + os.unlink(path) + except OSError: + pass # File already deleted or doesn't exist + + except Exception as e: + logger.error(f"Error processing document: {str(e)}") + raise -# Function to process a single PDF file -def process_pdf_file(file_content: bytes, filename: str, model_list): +def process_document_file(file_bytes: bytes, filename: str = None, model_dict=None, extract_images: bool = True, paginate: bool = True): """ - Function to process a single PDF file. - + Process a document file (PDF, DOCX, etc.) and return markdown text with metadata. + Args: - file_content (bytes): The content of the PDF file. - filename (str): The name of the PDF file. - model_list: The list of loaded models. - + file_bytes (bytes): The document file content + filename (str): The original filename (optional) + model_dict: Pre-loaded model dictionary + extract_images (bool): Whether to extract images + paginate (bool): Whether to paginate the output based on original document pages + Returns: - dict: A dictionary containing the filename, markdown text, metadata, image data, status, and processing time. + dict: Processing results matching PDFConversionResult schema + """ + try: + # Determine file type from filename + file_extension = os.path.splitext(filename)[1].lower() if filename else '.pdf' + + if file_extension not in SUPPORTED_EXTENSIONS: + raise ValueError(f"Unsupported file format: {file_extension}. Supported formats: {SUPPORTED_EXTENSIONS}") + + logger.info(f"Processing {file_extension} file: {filename}") + + full_text, images, processing_time, converted_pdf = parse_document_and_return_markdown( + file_bytes, filename, extract_images, model_dict, paginate + ) + + # Convert metadata to match GeneralMetadata schema + general_metadata = { + "custom_metadata": { + "processing_time": processing_time, + "file_type": file_extension, + "text_length": len(full_text) if isinstance(full_text, str) else sum(len(page) for page in full_text), + "pages": len(full_text) if isinstance(full_text, list) else None + }, + "converted_pdf_base64": converted_pdf + } + + result = { + "filename": filename or f"document{file_extension}", + "markdown": full_text, # This will be either a string or a list of strings depending on pagination + "metadata": general_metadata, + "images": images if extract_images else {}, + "status": "success" + } + + logger.info(f"Successfully processed {filename}: {len(full_text)} chars, {len(images) if extract_images else 0} images") + return result + + except Exception as e: + logger.error(f"Error in process_document_file: {str(e)}") + return { + "filename": filename or "document.pdf", + "markdown": "" if not paginate else [], + "metadata": { + "custom_metadata": {"error": str(e), "file_type": file_extension if filename else ".pdf"}, + "converted_pdf_base64": None + }, + "images": {}, + "status": "error" + } + + +# Keep the old function for backward compatibility +def process_pdf_file(pdf_file: bytes, filename: str = None, model_dict=None, extract_images: bool = True): + """ + Process a PDF file and return markdown text with metadata. + Wrapper around process_document_file for backward compatibility. """ - entry_time = time.time() - logger.info(f"Entry time for {filename}: {entry_time}") - markdown_text, metadata, image_data = parse_pdf_and_return_markdown( - file_content, extract_images=True, model_list=model_list - ) - completion_time = time.time() - logger.info(f"Model processes complete time for {filename}: {completion_time}") - time_difference = completion_time - entry_time - return { - "filename": filename, - "markdown": markdown_text, - "metadata": metadata, - "images": image_data, - "status": "ok", - "time": time_difference, - } + return process_document_file(pdf_file, filename, model_dict, extract_images) diff --git a/marker_api/tasks.py b/marker_api/tasks.py new file mode 100644 index 0000000..11d020e --- /dev/null +++ b/marker_api/tasks.py @@ -0,0 +1,166 @@ +import os +from celery import Celery +from marker.models import create_model_dict +from marker_api.routes import process_document_file +import logging + +# Initialize logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Get Redis URL from environment variable +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0") + +# Initialize Celery +celery_app = Celery( + "marker_api", + broker=REDIS_URL, + backend=REDIS_URL +) + +# Celery configuration +celery_app.conf.update( + task_serializer="json", + accept_content=["json"], + result_serializer="json", + timezone="UTC", + enable_utc=True, + task_track_started=True, + task_time_limit=30 * 60, # 30 minutes + task_soft_time_limit=25 * 60, # 25 minutes + worker_prefetch_multiplier=1, + worker_max_tasks_per_child=1000, +) + +# Global variable to hold model dictionary +model_dict = None + +def load_models(): + """Load marker models once per worker""" + global model_dict + if model_dict is None: + logger.info("Loading marker models...") + model_dict = create_model_dict() + logger.info("Models loaded successfully!") + return model_dict + +@celery_app.task(bind=True) +def process_document_task(self, file_content: bytes, filename: str): + """ + Celery task to process a document (PDF, DOCX, etc.) and convert it to markdown. + + Args: + file_content (bytes): The document file content + filename (str): The original filename + + Returns: + dict: Processing results + """ + try: + # Update task state to PROGRESS + self.update_state( + state="PROGRESS", + meta={"status": "Loading models and processing document..."} + ) + + # Load models if not already loaded + models = load_models() + + # Update task state + self.update_state( + state="PROGRESS", + meta={"status": f"Converting {filename}..."} + ) + + # Process the document + result = process_document_file( + file_content, + filename=filename, + model_dict=models, + extract_images=True + ) + + logger.info(f"Successfully processed document: {filename}") + return result + + except Exception as e: + logger.error(f"Error processing document {filename}: {str(e)}") + self.update_state( + state="FAILURE", + meta={"error": str(e), "filename": filename} + ) + raise + +@celery_app.task(bind=True) +def process_batch_documents_task(self, files_data: list): + """ + Celery task to process multiple documents in batch. + + Args: + files_data (list): List of dictionaries with 'content' and 'filename' keys + + Returns: + list: List of processing results + """ + try: + # Update task state to PROGRESS + self.update_state( + state="PROGRESS", + meta={"status": "Loading models..."} + ) + + # Load models if not already loaded + models = load_models() + + results = [] + total_files = len(files_data) + + for i, file_data in enumerate(files_data): + try: + # Update progress + self.update_state( + state="PROGRESS", + meta={ + "status": f"Processing file {i+1}/{total_files}: {file_data['filename']}", + "current": i+1, + "total": total_files + } + ) + + # Process the document + result = process_document_file( + file_data['content'], + filename=file_data['filename'], + model_dict=models, + extract_images=True + ) + + results.append(result) + logger.info(f"Successfully processed document {i+1}/{total_files}: {file_data['filename']}") + + except Exception as e: + logger.error(f"Error processing document {file_data['filename']}: {str(e)}") + error_result = { + "filename": file_data['filename'], + "markdown": "", + "metadata": { + "languages": None, + "toc": None, + "pages": None, + "custom_metadata": {"error": str(e)} + }, + "images": {}, + "status": "error" + } + results.append(error_result) + + logger.info(f"Batch processing completed. Processed {len(results)} files.") + return results + + except Exception as e: + logger.error(f"Error in batch processing: {str(e)}") + self.update_state( + state="FAILURE", + meta={"error": str(e)} + ) + raise \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8d19e0e..2ddf7b5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,23 +8,36 @@ packages = [{include = "marker_api"}] [tool.poetry.dependencies] python = ">=3.10,<3.13" -fastapi = "^0.115.0" -uvicorn = "^0.31.1" -celery = {extras = ["redis"], version = "=5.3.6"} -flower = "^2.0.1" -hf-transfer = "^0.1.8" -huggingface-hub = "^0.25.1" -locust = "^2.31.8" -python-multipart = "^0.0.12" -redis = "^5.1.1" -requests = "^2.32.3" +fastapi = "0.115.0" +uvicorn = "0.31.1" +python-multipart = "0.0.12" +requests = "2.32.3" +marker-pdf = {version = "1.7.5", extras = ["full"]} +pillow = "^10.0.0" +torch = "^2.7.0" +pydantic = "^2.4.2" +pydantic-settings = "^2.0.3" +python-dotenv = "^1.0.0" rich = "^13.9.2" -marker-pdf = "^0.2.17" pynvml = "^11.5.3" art = "^6.3" -gradio = "^5.1.0" +starlette = "^0.37.2,<0.39.0" +# DOCX conversion dependencies +weasyprint = "^63.0" +cffi = "^1.15.0" +pycairo = "^1.20.0" +mammoth = "^1.9.0" +pypdf = "^4.0.0" +pdfplumber = "^0.11.0" +python-docx = "^1.1.0" +# Optional distributed server dependencies +celery = {extras = ["redis"], version = "^5.3.6", optional = true} +flower = {version = "^2.0.1", optional = true} +redis = {version = "^5.1.1", optional = true} +[tool.poetry.extras] +distributed = ["celery", "flower", "redis"] [build-system] requires = ["poetry-core"] diff --git a/server.py b/server.py index 81cd9df..dfdf42a 100644 --- a/server.py +++ b/server.py @@ -1,47 +1,114 @@ import os import asyncio import argparse -from fastapi import FastAPI, UploadFile, File +import base64 +import io +from PIL import Image +from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.middleware.cors import CORSMiddleware -from typing import List +from typing import List, Dict, Any import concurrent.futures from marker.logger import configure_logging # Import logging configuration -from marker.models import load_all_models # Import function to load models +from marker.models import create_model_dict # Import function to create model dict from marker_api.routes import ( - process_pdf_file, + process_document_file, # Updated to handle all document types ) from marker_api.utils import print_markerapi_text_art from contextlib import asynccontextmanager import logging -import gradio as gr +# import gradio as gr from marker_api.model.schema import ( BatchConversionResponse, ConversionResponse, HealthResponse, ServerType, + PDFConversionResult, ) -from marker_api.demo import demo_ui +# from marker_api.demo import demo_ui # Initialize logging configure_logging() -logger = logging.getLogger(__name__) -# Global variable to hold model list -model_list = None +def apply_marker_formatter_and_route_to_root( + fmt="[%(asctime)s]-[%(levelname)s]-[%(name)s]: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO, +) -> None: + formatter = logging.Formatter(fmt=fmt, datefmt=datefmt) + marker_logger = logging.getLogger("marker") + root = logging.getLogger() + + # If marker has no handlers, nothing to do + if not marker_logger.handlers: + return + + # Move marker handlers to root with new formatter (prevents duplicates) + handlers_to_move = list(marker_logger.handlers) + for h in handlers_to_move: + try: + h.setFormatter(formatter) + except Exception: + pass + marker_logger.removeHandler(h) + if h not in root.handlers: + root.addHandler(h) + + root.setLevel(level) + marker_logger.propagate = True # marker logs now flow to root + +apply_marker_formatter_and_route_to_root() + +logger = logging.getLogger(__name__) +# Global variable to hold model dict +model_dict = None + + +# def clean_pil_images_from_dict(obj): +# """ +# Recursively clean PIL Image objects from a dictionary/list structure, +# converting them to base64 strings. +# """ +# if isinstance(obj, dict): +# cleaned = {} +# for key, value in obj.items(): +# cleaned[key] = clean_pil_images_from_dict(value) +# return cleaned +# elif isinstance(obj, list): +# return [clean_pil_images_from_dict(item) for item in obj] +# elif isinstance(obj, Image.Image): +# # Convert PIL Image to base64 string +# try: +# img_byte_arr = io.BytesIO() +# obj.save(img_byte_arr, format="PNG") +# img_byte_arr = img_byte_arr.getvalue() +# return base64.b64encode(img_byte_arr).decode() +# except Exception as e: + +# logger.error(f"Error converting PIL Image to base64: {str(e)}") +# return "" +# else: +# return obj # Event that runs on startup to load all models @asynccontextmanager async def lifespan(app: FastAPI): - global model_list + global model_dict logger.debug("--------------------- Loading OCR Model -----------------------") print_markerapi_text_art() - model_list = load_all_models() + model_dict = create_model_dict() yield # Initialize FastAPI app -app = FastAPI(lifespan=lifespan) +app = FastAPI( + title="Marker API", + description="Convert PDF to Markdown with marker-pdf", + version="1.7.5", + docs_url="/docs", + redoc_url="/redoc", + lifespan=lifespan +) # Add CORS middleware to allow cross-origin requests app.add_middleware( @@ -52,49 +119,106 @@ async def lifespan(app: FastAPI): allow_credentials=True, ) -app = gr.mount_gradio_app(app, demo_ui, path="") +# app = gr.mount_gradio_app(app, demo_ui, path="") -@app.get("/health", response_model=HealthResponse) -def server(): - """ - Root endpoint to check server status. - """ - return HealthResponse(message="Welcome to Marker-api", type=ServerType.simple) +@app.get("/healthz") +async def health_check(): + return {"message": "Marker API is running", "status": "ok", "version": "1.7.5"} + +@app.get("/") +async def root(): + return {"message": "Welcome to Marker-api", "version": "1.7.5"} -# Endpoint to convert a single PDF to markdown -@app.post("/convert", response_model=ConversionResponse) -async def convert_pdf_to_markdown(pdf_file: UploadFile): +# Endpoint to convert a single document to markdown +@app.post("/convert", response_model=PDFConversionResult) +async def convert_document_to_markdown( + file: UploadFile = File(...), + extract_images: bool = Form(False), + paginate: bool = Form(True) +): """ - Endpoint to convert a single PDF to markdown. + Endpoint to convert a document (PDF, DOCX, PPTX, XLSX, HTML, EPUB) to markdown. + + Parameters: + - file: The document file to convert + - extract_images: Whether to extract and include images in the output + - paginate: Whether to split the output into pages based on the original document structure """ - logger.debug(f"Received file: {pdf_file.filename}") - file = await pdf_file.read() - response = process_pdf_file(file, pdf_file.filename, model_list) - return ConversionResponse(status="Success", result=response) - - -# Endpoint to convert multiple PDFs to markdown + global model_dict + + if model_dict is None: + raise HTTPException(status_code=503, detail="Models not loaded yet") + + logger.debug(f"Received file: {file.filename}") + + # Check file size (limit to 50MB) + if file.size and file.size > 50 * 1024 * 1024: + raise HTTPException(status_code=413, detail="File too large. Maximum size is 50MB.") + + # Check file type + allowed_extensions = {'.pdf', '.docx', '.pptx', '.xlsx', '.html', '.epub', '.doc'} + file_extension = os.path.splitext(file.filename)[1].lower() if file.filename else '' + + if file_extension not in allowed_extensions: + raise HTTPException( + status_code=400, + detail=f"Unsupported file type: {file_extension}. Supported types: {', '.join(allowed_extensions)}" + ) + + try: + file_content = await file.read() + response = process_document_file( + file_content, + filename=file.filename, + model_dict=model_dict, + extract_images=extract_images, + paginate=paginate + ) + # Clean any PIL Image objects from the response + cleaned_response = response + # cleaned_response = clean_pil_images_from_dict(response) + + return PDFConversionResult(**cleaned_response) + except Exception as e: + logger.error(f"Error processing document: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error processing document: {str(e)}") + + +# Endpoint to convert multiple documents to markdown @app.post("/batch_convert", response_model=BatchConversionResponse) -async def convert_pdfs_to_markdown(pdf_files: List[UploadFile] = File(...)): +async def convert_documents_to_markdown( + files: List[UploadFile] = File(...), + extract_images: bool = Form(False) +): """ - Endpoint to convert multiple PDFs to markdown. + Endpoint to convert multiple documents to markdown. """ - logger.debug(f"Received {len(pdf_files)} files for batch conversion") - - async def process_files(files): + global model_dict + + if model_dict is None: + raise HTTPException(status_code=503, detail="Models not loaded yet") + + logger.debug(f"Received {len(files)} files for batch conversion") + + async def process_files(file_list): loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool: coroutines = [ loop.run_in_executor( - pool, process_pdf_file, await file.read(), file.filename, model_list + pool, + process_document_file, + await file.read(), + file.filename, + model_dict, + extract_images ) - for file in files + for file in file_list ] return await asyncio.gather(*coroutines) - responses = await process_files(pdf_files) + responses = await process_files(files) return BatchConversionResponse(results=responses)