Skip to content
102 changes: 101 additions & 1 deletion docling/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,65 @@ def print_external_plugins(factory: BaseFactory, factory_name: str):
raise typer.Exit()


_PAGE_BREAK_SENTINEL = "\x00_DOCLING_PAGE_BREAK_\x00"


def _has_dynamic_page_vars(placeholder: str) -> bool:
"""Check if the placeholder string contains {prev_page} or {next_page} variables."""
return "{prev_page}" in placeholder or "{next_page}" in placeholder


def _get_content_page_numbers(doc) -> list[int]:
"""Extract sorted unique page numbers from document items with provenance.

This returns the actual page numbers (from doc item provenance) rather than
sequential indices, so that blank pages are correctly skipped in numbering.
"""
seen: set[int] = set()
for item, _ in doc.iterate_items():
if hasattr(item, "prov") and item.prov:
seen.add(item.prov[0].page_no)
return sorted(seen)


def _apply_dynamic_page_breaks(
file_path: Path,
placeholder: str,
content_page_numbers: list[int] | None = None,
) -> None:
"""Post-process a file to replace page break sentinels with formatted placeholders.

When the placeholder contains {prev_page} and/or {next_page} format variables,
each page break sentinel is replaced with the placeholder formatted with
the actual page numbers from the document.

Args:
file_path: Path to the file to process.
placeholder: The placeholder string with optional {prev_page}/{next_page} vars.
content_page_numbers: Sorted list of actual page numbers that have content.
When provided, uses these instead of sequential counting to handle
documents with blank pages correctly.
"""
text = file_path.read_text(encoding="utf-8")
parts = text.split(_PAGE_BREAK_SENTINEL)
if len(parts) <= 1:
return
result = [parts[0]]
for i, part in enumerate(parts[1:]):
if content_page_numbers is not None and i + 1 < len(content_page_numbers):
prev_page = content_page_numbers[i]
next_page = content_page_numbers[i + 1]
else:
prev_page = i + 1
next_page = i + 2
formatted = placeholder.replace("{prev_page}", str(prev_page)).replace(
"{next_page}", str(next_page)
)
result.append(formatted)
result.append(part)
file_path.write_text("".join(result), encoding="utf-8")


def export_documents(
conv_results: Iterable[ConversionResult],
output_dir: Path,
Expand All @@ -216,6 +275,7 @@ def export_documents(
print_timings: bool,
export_timings: bool,
image_export_mode: ImageRefMode,
page_break_placeholder: str | None = None,
):
success_count = 0
failure_count = 0
Expand All @@ -225,6 +285,13 @@ def export_documents(
success_count += 1
doc_filename = conv_res.input.file.stem

# Pre-compute content page numbers for dynamic page breaks
content_page_numbers = None
if page_break_placeholder is not None and _has_dynamic_page_vars(
page_break_placeholder
):
content_page_numbers = _get_content_page_numbers(conv_res.document)

# Export JSON format:
if export_json:
fname = output_dir / f"{doc_filename}.json"
Expand Down Expand Up @@ -279,19 +346,44 @@ def export_documents(
if export_txt:
fname = output_dir / f"{doc_filename}.txt"
_log.info(f"writing TXT output to {fname}")
use_dynamic = (
page_break_placeholder is not None
and _has_dynamic_page_vars(page_break_placeholder)
)
conv_res.document.save_as_markdown(
filename=fname,
strict_text=True,
image_mode=ImageRefMode.PLACEHOLDER,
page_break_placeholder=(
_PAGE_BREAK_SENTINEL if use_dynamic else page_break_placeholder
),
)
if use_dynamic:
assert page_break_placeholder is not None
_apply_dynamic_page_breaks(
fname, page_break_placeholder, content_page_numbers
)

# Export Markdown format:
if export_md:
fname = output_dir / f"{doc_filename}.md"
_log.info(f"writing Markdown output to {fname}")
use_dynamic = (
page_break_placeholder is not None
and _has_dynamic_page_vars(page_break_placeholder)
)
conv_res.document.save_as_markdown(
filename=fname, image_mode=image_export_mode
filename=fname,
image_mode=image_export_mode,
page_break_placeholder=(
_PAGE_BREAK_SENTINEL if use_dynamic else page_break_placeholder
),
)
if use_dynamic:
assert page_break_placeholder is not None
_apply_dynamic_page_breaks(
fname, page_break_placeholder, content_page_numbers
)

# Export Document Tags format:
if export_doctags:
Expand Down Expand Up @@ -426,6 +518,13 @@ def convert( # noqa: C901
help="Image export mode for image-capable document outputs (JSON, YAML, HTML, HTML split-page, and Markdown). Text, DocTags, and WebVTT outputs do not export images. With `placeholder`, only the position of the image is marked in the output. In `embedded` mode, the image is embedded as base64 encoded string. In `referenced` mode, the image is exported in PNG format and referenced from the main exported document.",
),
] = ImageRefMode.EMBEDDED,
page_break_placeholder: Annotated[
str | None,
typer.Option(
...,
help="Specify a custom page break placeholder string for Markdown and Text exports. When set, this string is inserted between pages in the output. Supports {prev_page} and {next_page} format variables for dynamic page numbers. Examples: '---', '<!-- page-break -->', '---\\n*[Page {next_page}]*\\n---'.",
),
] = None,
pipeline: Annotated[
ProcessingPipeline,
typer.Option(..., help="Choose the pipeline to process PDF or image files."),
Expand Down Expand Up @@ -968,6 +1067,7 @@ def convert( # noqa: C901
print_timings=profiling,
export_timings=save_profiling,
image_export_mode=image_export_mode,
page_break_placeholder=page_break_placeholder,
)

end_time = time.time() - start_time
Expand Down
171 changes: 170 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
from docling_core.types.doc import ImageRefMode
from typer.testing import CliRunner

from docling.cli.main import _should_generate_export_images, app
from docling.cli.main import (
_PAGE_BREAK_SENTINEL,
_apply_dynamic_page_breaks,
_get_content_page_numbers,
_has_dynamic_page_vars,
_should_generate_export_images,
app,
)
from docling.datamodel.base_models import OutputFormat

runner = CliRunner()
Expand All @@ -30,6 +37,26 @@ def test_cli_convert(tmp_path):
assert converted.exists()


def test_cli_page_break_placeholder(tmp_path):
source = "./tests/data/pdf/2305.03393v1-pg9.pdf"
output = tmp_path / "out"
output.mkdir()
placeholder = "<!-- page-break -->"
result = runner.invoke(
app,
[
source,
"--output",
str(output),
"--page-break-placeholder",
placeholder,
],
)
assert result.exit_code == 0
converted = output / f"{Path(source).stem}.md"
assert converted.exists()


@pytest.mark.parametrize(
("image_export_mode", "to_formats", "expected"),
[
Expand Down Expand Up @@ -122,3 +149,145 @@ def test_cli_audio_extensions_coverage():
assert ext in audio_extensions, (
f"Audio extension {ext} not found in FormatToExtensions[InputFormat.AUDIO]"
)


@pytest.mark.parametrize(
("placeholder", "expected"),
[
("---", False),
("<!-- page-break -->", False),
("Page {next_page}", True),
("End of page {prev_page}", True),
("---\n*[Page {next_page}]*\n---", True),
("{prev_page} -> {next_page}", True),
],
)
def test_has_dynamic_page_vars(placeholder, expected):
assert _has_dynamic_page_vars(placeholder) is expected


def test_apply_dynamic_page_breaks(tmp_path):
content = (
"Content from page 1\n\n"
f"{_PAGE_BREAK_SENTINEL}\n\n"
"Content from page 2\n\n"
f"{_PAGE_BREAK_SENTINEL}\n\n"
"Content from page 3"
)
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

_apply_dynamic_page_breaks(file_path, "---\n*[Page {next_page}]*\n---")

result = file_path.read_text(encoding="utf-8")
assert _PAGE_BREAK_SENTINEL not in result
assert "---\n*[Page 2]*\n---" in result
assert "---\n*[Page 3]*\n---" in result


def test_apply_dynamic_page_breaks_prev_page(tmp_path):
content = f"Content from page 1\n\n{_PAGE_BREAK_SENTINEL}\n\nContent from page 2"
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

_apply_dynamic_page_breaks(file_path, "[End of page {prev_page}]")

result = file_path.read_text(encoding="utf-8")
assert _PAGE_BREAK_SENTINEL not in result
assert "[End of page 1]" in result


def test_apply_dynamic_page_breaks_both_vars(tmp_path):
content = f"Page 1\n{_PAGE_BREAK_SENTINEL}\nPage 2"
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

_apply_dynamic_page_breaks(file_path, "({prev_page} -> {next_page})")

result = file_path.read_text(encoding="utf-8")
assert "(1 -> 2)" in result


def test_apply_dynamic_page_breaks_no_sentinel(tmp_path):
content = "Content with no page breaks"
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

_apply_dynamic_page_breaks(file_path, "---\n*[Page {next_page}]*\n---")

result = file_path.read_text(encoding="utf-8")
assert result == content


def test_cli_dynamic_page_break_placeholder(tmp_path):
source = "./tests/data/pdf/normal_4pages.pdf"
output = tmp_path / "out"
output.mkdir()
placeholder = "--- Page {next_page} ---"
result = runner.invoke(
app,
[
source,
"--output",
str(output),
"--page-break-placeholder",
placeholder,
],
)
assert result.exit_code == 0
converted = output / f"{Path(source).stem}.md"
assert converted.exists()
content = converted.read_text(encoding="utf-8")
# Should not contain raw page break markers or sentinels
assert "#_#_DOCLING_DOC_PAGE_BREAK" not in content
assert _PAGE_BREAK_SENTINEL not in content
# Multi-page PDF should have page break placeholders with actual page numbers
assert "--- Page 2 ---" in content


def test_apply_dynamic_page_breaks_with_blank_pages(tmp_path):
"""Test that page numbering uses actual page numbers when blank pages exist."""
content = (
"Content from page 1\n\n"
f"{_PAGE_BREAK_SENTINEL}\n\n"
"Content from page 2\n\n"
f"{_PAGE_BREAK_SENTINEL}\n\n"
"Content from page 3\n\n"
f"{_PAGE_BREAK_SENTINEL}\n\n"
"Content from page 5"
)
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

# Page 4 is blank, so content pages are [1, 2, 3, 5]
_apply_dynamic_page_breaks(
file_path,
"---\n*[Page {next_page}]*\n---",
content_page_numbers=[1, 2, 3, 5],
)

result = file_path.read_text(encoding="utf-8")
assert _PAGE_BREAK_SENTINEL not in result
assert "---\n*[Page 2]*\n---" in result
assert "---\n*[Page 3]*\n---" in result
assert "---\n*[Page 5]*\n---" in result
assert "---\n*[Page 4]*\n---" not in result


def test_apply_dynamic_page_breaks_prev_page_with_blank_pages(tmp_path):
"""Test {prev_page} with blank pages uses actual page numbers."""
content = f"Page 1\n{_PAGE_BREAK_SENTINEL}\nPage 3\n{_PAGE_BREAK_SENTINEL}\nPage 5"
file_path = tmp_path / "test.md"
file_path.write_text(content, encoding="utf-8")

_apply_dynamic_page_breaks(
file_path,
"({prev_page} -> {next_page})",
content_page_numbers=[1, 3, 5],
)

result = file_path.read_text(encoding="utf-8")
assert "(1 -> 3)" in result
assert "(3 -> 5)" in result
assert "(1 -> 2)" not in result
assert "(2 -> 3)" not in result
Loading