Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/marker_api_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):

def check_health(self):
logger.info("Checking server health...")
response = self.session.get(f"{self.base_url}/health")
response = self.session.get(f"{self.base_url}/healthz")
response.raise_for_status()
health_data = HealthResponse(**response.json())
self.server_type = health_data.type
Expand All @@ -74,7 +74,7 @@ def check_health(self):

async def acheck_health(self):
logger.info("Checking server health asynchronously...")
async with self.async_session.get(f"{self.base_url}/health") as response:
async with self.async_session.get(f"{self.base_url}/healthz") as response:
response.raise_for_status()
health_data = HealthResponse(**(await response.json()))
self.server_type = health_data.type
Expand Down
94 changes: 74 additions & 20 deletions distributed_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import argparse
import uvicorn
import logging
from fastapi import FastAPI, UploadFile, File
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from celery.exceptions import TimeoutError
from fastapi.middleware.cors import CORSMiddleware
from marker_api.celery_worker import celery_app
Expand All @@ -14,8 +14,8 @@
celery_batch_convert,
celery_batch_result,
)
import gradio as gr
from marker_api.demo import demo_ui
# import gradio as gr
# from marker_api.demo import demo_ui
from marker_api.model.schema import (
BatchConversionResponse,
BatchResultResponse,
Expand All @@ -24,15 +24,21 @@
ConversionResponse,
HealthResponse,
ServerType,
PDFConversionResult,
)
from typing import List
import os

# Initialize logging
configure_logging()
logger = logging.getLogger(__name__)

# Global variable to hold model list
app = FastAPI()
app = FastAPI(
title="Marker API Distributed Server",
description="Distributed API for converting documents (PDF, DOCX, PPTX, XLSX, HTML, EPUB) to Markdown using marker",
version="1.7.5"
)

logger.info("Configuring CORS middleware")
app.add_middleware(
Expand All @@ -44,7 +50,7 @@
)


@app.get("/health", response_model=HealthResponse)
@app.get("/healthz", response_model=HealthResponse)
def server():
"""
Root endpoint to check server status.
Expand All @@ -58,19 +64,48 @@ def server():
message="Welcome to Marker-api",
type=server_type,
workers=worker_count if server_type == ServerType.distributed else None,
supported_formats=["PDF", "DOCX", "PPTX", "XLSX", "HTML", "EPUB"]
)


def is_celery_alive() -> bool:
logger.debug("Checking if Celery is alive")
try:
result = celery_app.send_task("celery.ping")
result.get(timeout=3)
logger.info("Celery is alive")
return True
except (TimeoutError, Exception) as e:
logger.warning(f"Celery is not responding: {str(e)}")
return False
max_retries = 3
timeout = 10 # seconds

for attempt in range(max_retries):
try:
result = celery_app.send_task("celery.ping")
result.get(timeout=timeout)
logger.info("Celery is alive")
return True
except (TimeoutError, Exception) as e:
logger.warning(f"Celery connection attempt {attempt + 1}/{max_retries} failed: {str(e)}")
if attempt < max_retries - 1:
import time
time.sleep(2) # Wait 2 seconds before retrying

logger.error("All Celery connection attempts failed")
return False


def validate_file(file: UploadFile) -> str:
"""Validate uploaded file and return file extension"""
# Check file size (limit to 50MB)
if file.size and file.size > 50 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large. Maximum size is 50MB.")

# Check file type
allowed_extensions = {'.pdf', '.docx', '.pptx', '.xlsx', '.html', '.epub'}
file_extension = os.path.splitext(file.filename)[1].lower() if file.filename else ''

if file_extension not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {file_extension}. Supported types: {', '.join(allowed_extensions)}"
)

return file_extension


def setup_routes(app: FastAPI, celery_live: bool):
Expand All @@ -79,20 +114,39 @@ def setup_routes(app: FastAPI, celery_live: bool):
logger.info("Adding Celery routes")

@app.post("/convert", response_model=ConversionResponse)
async def convert_pdf(pdf_file: UploadFile = File(...)):
return await celery_convert_pdf_concurrent_await(pdf_file)
async def convert_document(
file: UploadFile = File(...),
extract_images: bool = Form(True),
paginate: bool = Form(True)
):
"""Convert a document (PDF, DOCX, PPTX, XLSX, HTML, EPUB) to markdown"""
validate_file(file)
return await celery_convert_pdf_concurrent_await(file, extract_images, paginate)

@app.post("/celery/convert", response_model=CeleryTaskResponse)
async def celery_convert(pdf_file: UploadFile = File(...)):
return await celery_convert_pdf(pdf_file)
async def celery_convert_document(
file: UploadFile = File(...),
extract_images: bool = Form(False),
paginate: bool = Form(True)
):
"""Submit a document conversion task to Celery queue"""
validate_file(file)
return await celery_convert_pdf(file, extract_images, paginate)

@app.get("/celery/result/{task_id}", response_model=CeleryResultResponse)
async def get_celery_result(task_id: str):
return await celery_result(task_id)

@app.post("/batch_convert", response_model=BatchConversionResponse)
async def batch_convert(pdf_files: List[UploadFile] = File(...)):
return await celery_batch_convert(pdf_files)
async def batch_convert(
files: List[UploadFile] = File(...),
extract_images: bool = Form(False),
paginate: bool = Form(True)
):
"""Convert multiple documents to markdown"""
for file in files:
validate_file(file)
return await celery_batch_convert(files, extract_images, paginate)

@app.get("/batch_convert/result/{task_id}", response_model=BatchResultResponse)
async def get_batch_result(task_id: str):
Expand All @@ -101,7 +155,7 @@ async def get_batch_result(task_id: str):
logger.info("Adding real-time conversion route")
else:
logger.warning("Celery routes not added as Celery is not alive")
app = gr.mount_gradio_app(app, demo_ui, path="")
# app = gr.mount_gradio_app(app, demo_ui, path="")


def parse_args():
Expand Down
71 changes: 71 additions & 0 deletions docker-compose.cpu.distributed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
version: "3.8"

services:
base:
image: marker-api-cpu-base:latest
build:
context: .
dockerfile: docker/Dockerfile.cpu.base

celery_worker:
build:
context: . # Keep the build context as the root directory
dockerfile: docker/Dockerfile.cpu.distributed-server
image: marker-api-distributed-cpu
command: celery -A marker_api.celery_worker.celery_app worker --pool=solo -n worker@%h --loglevel=info
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
environment:
- REDIS_HOST=${REDIS_HOST}
links:
- redis
depends_on:
- redis
- base

app:
container_name: marker-api-cpu-distributed
image: marker-api-distributed-cpu
command: python distributed_server.py --host 0.0.0.0 --port 8080
environment:
- ENV=production
ports:
- "8086:8080"
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
depends_on:
- redis
- base

redis:
container_name: redis
image: redis:7.2.4-alpine
ports:
- "6379:6379"


flower:
container_name: flower_cpu
image: marker-api-distributed-cpu
command: celery -A marker_api.celery_worker.celery_app flower --port=5555
ports:
- 5556:5555
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
environment:
- REDIS_HOST=${REDIS_HOST}
depends_on:
- app
- redis
- celery_worker
- base

volumes:
model_cache:

networks:
default:
name: marker-api-network
90 changes: 90 additions & 0 deletions docker-compose.gpu.distributed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
version: "3.8"

services:
base:
image: marker-api-gpu-base:latest
build:
context: .
dockerfile: docker/Dockerfile.gpu.base
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]

celery_worker:
build:
context: . # Keep the build context as the root directory
dockerfile: docker/Dockerfile.gpu.distributed-server # Specify the new path to the GPU Dockerfile
command: celery -A marker_api.celery_worker.celery_app worker --pool=solo -n worker@%h --loglevel=info
image: marker-api-distributed-gpu
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
depends_on:
- redis
- base
environment:
- REDIS_HOST=${REDIS_HOST}
deploy:
resources:
reservations:
devices:
- capabilities: [gpu] # Request GPU support

app:
container_name: marker-api-gpu-distributed
image: marker-api-distributed-gpu
command: python distributed_server.py --host 0.0.0.0 --port 8080
environment:
- ENV=production
ports:
- "8080:8080"
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
depends_on:
- redis
- celery_worker
- base
deploy:
resources:
reservations:
devices:
- capabilities: [gpu] # Request GPU support

redis:
container_name: redis
image: redis:7.2.4-alpine
ports:
- "6379:6379"

flower:
container_name: flower_gpu
image: marker-api-distributed-gpu

command: celery -A marker_api.celery_worker.celery_app flower --port=5555
ports:
- 5556:5555
volumes:
- .:/app
- model_cache:/home/marker/.cache/huggingface
environment:
- REDIS_HOST=${REDIS_HOST}
depends_on:
- app
- redis
- celery_worker
- base
deploy:
resources:
reservations:
devices:
- capabilities: [gpu] # Request GPU support

volumes:
model_cache:

networks:
default:
name: marker-api-network
51 changes: 51 additions & 0 deletions docker/Dockerfile.cpu.base
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Use a base image with Python
FROM python:3.11-slim

# Install system dependencies required for libGL and DOCX conversion
RUN apt-get update && apt-get install -y \
libgl1-mesa-glx \
libglib2.0-0 \
libglib2.0-dev \
libcairo2-dev \
libpango1.0-dev \
libgdk-pixbuf2.0-dev \
libffi-dev \
shared-mime-info \
libpango-1.0-0 \
libharfbuzz0b \
libpangoft2-1.0-0 \
libfontconfig1 \
libcairo2 \
libgdk-pixbuf2.0-0 \
fonts-liberation \
build-essential \
pkg-config \
gcc \
&& rm -rf /var/lib/apt/lists/*

# Create a non-root user
RUN useradd -m -u 1000 marker

# Create font directory for marker user
RUN mkdir -p /home/marker/.local/share/fonts && \
chown -R marker:marker /home/marker/.local

# Set the working directory
WORKDIR /app

# Copy the entire project
COPY . .

# Install Python dependencies with distributed extras
RUN pip install -e ".[distributed]"

USER marker

# Set font path for marker user
ENV FONT_PATH=/home/marker/.local/share/fonts/font.ttf

# Pre-load models and save them to a shared location
RUN python -c 'from marker.models import create_model_dict; create_model_dict()'

# The models will be cached in /home/marker/.cache/huggingface
# This directory will be used as a volume in the distributed setup
Loading