diff --git a/docling/datamodel/pipeline_options.py b/docling/datamodel/pipeline_options.py index 202754fc62..69cc49be97 100644 --- a/docling/datamodel/pipeline_options.py +++ b/docling/datamodel/pipeline_options.py @@ -943,6 +943,21 @@ class PipelineOptions(BaseOptions): ), ] = None + def _get_compatibility_payload(self) -> dict[str, Any]: + """Get payload for compatibility hashing. + + Base implementation returns full model dump. Subclasses with do_* fields + should override to exclude them. + + Returns: + Dictionary suitable for compatibility hashing + """ + return self.model_dump(serialize_as_any=True) + + def _get_runtime_toggle_payload(self) -> dict[str, bool]: + """Get payload with runtime-togglable do_* fields.""" + return {} + class ConvertPipelineOptions(PipelineOptions): """Base configuration for document conversion pipelines.""" @@ -980,6 +995,22 @@ class ConvertPipelineOptions(PipelineOptions): False # True: extract data in tabular format from bar-, pie and line-charts ) + def _get_compatibility_payload(self) -> dict[str, Any]: + """Override to exclude do_* fields from compatibility check.""" + payload = super()._get_compatibility_payload() + # Explicitly exclude do_* fields owned by this class + payload.pop("do_picture_classification", None) + payload.pop("do_picture_description", None) + payload.pop("do_chart_extraction", None) + return payload + + def _get_runtime_toggle_payload(self) -> dict[str, bool]: + return { + "do_picture_classification": self.do_picture_classification, + "do_picture_description": self.do_picture_description, + "do_chart_extraction": self.do_chart_extraction, + } + class PaginatedPipelineOptions(ConvertPipelineOptions): """Configuration for pipelines processing paginated documents.""" @@ -1333,6 +1364,28 @@ class PdfPipelineOptions(PaginatedPipelineOptions): ), ] = 100 + def _get_compatibility_payload(self) -> dict[str, Any]: + """Override to exclude do_* fields from compatibility check.""" + payload = super()._get_compatibility_payload() + # Explicitly exclude do_* fields owned by this class + payload.pop("do_table_structure", None) + payload.pop("do_ocr", None) + payload.pop("do_code_enrichment", None) + payload.pop("do_formula_enrichment", None) + return payload + + def _get_runtime_toggle_payload(self) -> dict[str, bool]: + payload = super()._get_runtime_toggle_payload() + payload.update( + { + "do_table_structure": self.do_table_structure, + "do_ocr": self.do_ocr, + "do_code_enrichment": self.do_code_enrichment, + "do_formula_enrichment": self.do_formula_enrichment, + } + ) + return payload + class ProcessingPipeline(str, Enum): """Available document processing pipeline types for different use cases. diff --git a/docling/document_converter.py b/docling/document_converter.py index 5b9a269e2d..b304807ae6 100644 --- a/docling/document_converter.py +++ b/docling/document_converter.py @@ -253,13 +253,69 @@ def _get_initialized_pipelines( ) -> dict[tuple[Type[BasePipeline], str], BasePipeline]: return self.initialized_pipelines - def _get_pipeline_options_hash(self, pipeline_options: PipelineOptions) -> str: - """Generate a hash of pipeline options to use as part of the cache key.""" - options_str = str(pipeline_options.model_dump()) + def _get_pipeline_options_hash( + self, pipeline_options: PipelineOptions, for_compatibility: bool = False + ) -> str: + """Generate a hash of pipeline options. + + Args: + pipeline_options: Options to hash + for_compatibility: If True, use compatibility payload (excludes do_* fields) + + Returns: + MD5 hash string + """ + if for_compatibility: + options_str = str(pipeline_options._get_compatibility_payload()) + else: + options_str = str(pipeline_options.model_dump(serialize_as_any=True)) + return hashlib.md5( options_str.encode("utf-8"), usedforsecurity=False ).hexdigest() + def _check_options_compatibility( + self, initialized_options: PipelineOptions, override_options: PipelineOptions + ) -> bool: + """Check if override options are compatible with initialized pipeline. + + Compatible means: + - Same options class type + - Compatibility payloads match (non-do_* fields are identical) + - Override does not enable do_* flags that were disabled at init + + Args: + initialized_options: Options used to initialize pipeline + override_options: Options to use for this execution + + Returns: + True if compatible, False otherwise + """ + # Must be same class + if type(initialized_options) is not type(override_options): + return False + + # Compatibility hashes must match (all fields except do_*) + init_compat_hash = self._get_pipeline_options_hash( + initialized_options, for_compatibility=True + ) + override_compat_hash = self._get_pipeline_options_hash( + override_options, for_compatibility=True + ) + + if init_compat_hash != override_compat_hash: + return False + + initialized_toggles = initialized_options._get_runtime_toggle_payload() + override_toggles = override_options._get_runtime_toggle_payload() + + for toggle_name, override_value in override_toggles.items(): + init_value = initialized_toggles[toggle_name] + if override_value and not init_value: + return False + + return True + def initialize_pipeline(self, format: InputFormat): """Initialize the conversion pipeline for the selected format. @@ -289,6 +345,7 @@ def convert( max_num_pages: int = sys.maxsize, max_file_size: int = sys.maxsize, page_range: PageRange = DEFAULT_PAGE_RANGE, + format_options: Optional[dict[InputFormat, PipelineOptions]] = None, ) -> ConversionResult: """Convert one document fetched from a file path, URL, or DocumentStream. @@ -306,6 +363,9 @@ def convert( Documents exceeding this number will not be converted. max_file_size: Maximum file size to convert. page_range: Range of pages to convert. + format_options: Optional mapping of formats to pipeline options to override + initialized options. Must be compatible: same options class, identical + non-do_* fields, and do_* flags may only change from True to False. Returns: The conversion result, which contains a `DoclingDocument` in the `document` @@ -321,6 +381,7 @@ def convert( max_file_size=max_file_size, headers=headers, page_range=page_range, + format_options=format_options, ) return next(all_res) @@ -333,6 +394,7 @@ def convert_all( max_num_pages: int = sys.maxsize, max_file_size: int = sys.maxsize, page_range: PageRange = DEFAULT_PAGE_RANGE, + format_options: Optional[dict[InputFormat, PipelineOptions]] = None, ) -> Iterator[ConversionResult]: """Convert multiple documents from file paths, URLs, or DocumentStreams. @@ -346,6 +408,9 @@ def convert_all( max_file_size: Maximum number of pages accepted per document. Documents exceeding this number will be skipped. page_range: Range of pages to convert in each document. + format_options: Optional mapping of formats to pipeline options to override + initialized options. Must be compatible: same options class, identical + non-do_* fields, and do_* flags may only change from True to False. Yields: The conversion results, each containing a `DoclingDocument` in the @@ -362,7 +427,11 @@ def convert_all( conv_input = _DocumentConversionInput( path_or_stream_iterator=source, limits=limits, headers=headers ) - conv_res_iter = self._convert(conv_input, raises_on_error=raises_on_error) + conv_res_iter = self._convert( + conv_input, + raises_on_error=raises_on_error, + override_format_options=format_options, + ) had_result = False for conv_res in conv_res_iter: @@ -438,7 +507,10 @@ def convert_string( raise ValueError(f"format {format} is not supported in `convert_string`") def _convert( - self, conv_input: _DocumentConversionInput, raises_on_error: bool + self, + conv_input: _DocumentConversionInput, + raises_on_error: bool, + override_format_options: Optional[dict[InputFormat, PipelineOptions]] = None, ) -> Iterator[ConversionResult]: start_time = time.monotonic() @@ -448,7 +520,9 @@ def _convert( ): _log.info("Going to convert document batch...") process_func = partial( - self._process_document, raises_on_error=raises_on_error + self._process_document, + raises_on_error=raises_on_error, + override_format_options=override_format_options, ) if ( @@ -505,13 +579,20 @@ def _get_pipeline(self, doc_format: InputFormat) -> Optional[BasePipeline]: return self.initialized_pipelines[cache_key] def _process_document( - self, in_doc: InputDocument, raises_on_error: bool + self, + in_doc: InputDocument, + raises_on_error: bool, + override_format_options: Optional[dict[InputFormat, PipelineOptions]] = None, ) -> ConversionResult: valid = ( self.allowed_formats is not None and in_doc.format in self.allowed_formats ) if valid: - conv_res = self._execute_pipeline(in_doc, raises_on_error=raises_on_error) + conv_res = self._execute_pipeline( + in_doc, + raises_on_error=raises_on_error, + override_format_options=override_format_options, + ) else: error_message = f"File format not allowed: {in_doc.file}" if raises_on_error: @@ -529,12 +610,53 @@ def _process_document( return conv_res def _execute_pipeline( - self, in_doc: InputDocument, raises_on_error: bool + self, + in_doc: InputDocument, + raises_on_error: bool, + override_format_options: Optional[dict[InputFormat, PipelineOptions]] = None, ) -> ConversionResult: if in_doc.valid: pipeline = self._get_pipeline(in_doc.format) + + # Look up override options for this document's format + override_options = None + if override_format_options is not None: + override_options = override_format_options.get(in_doc.format) + + # If override options provided, check compatibility and handle accordingly + if override_options is not None and pipeline is not None: + is_compatible = self._check_options_compatibility( + pipeline.pipeline_options, override_options + ) + + if not is_compatible: + error_message = ( + "Pipeline override options are incompatible with the " + "initialized pipeline. Overrides may only change do_* " + "flags from True to False while keeping all non-do_* " + "fields unchanged." + ) + if raises_on_error: + raise ConversionError(error_message) + + return ConversionResult( + input=in_doc, + status=ConversionStatus.FAILURE, + errors=[ + ErrorItem( + component_type=DoclingComponentType.USER_INPUT, + module_name=self.__class__.__name__, + error_message=error_message, + ) + ], + ) + if pipeline is not None: - conv_res = pipeline.execute(in_doc, raises_on_error=raises_on_error) + conv_res = pipeline.execute( + in_doc, + raises_on_error=raises_on_error, + override_options=override_options, + ) else: if raises_on_error: raise ConversionError( diff --git a/docling/pipeline/base_pipeline.py b/docling/pipeline/base_pipeline.py index 88bf7d304e..050c28e8bb 100644 --- a/docling/pipeline/base_pipeline.py +++ b/docling/pipeline/base_pipeline.py @@ -1,3 +1,4 @@ +import contextvars import functools import logging import time @@ -42,6 +43,11 @@ _log = logging.getLogger(__name__) +# Thread-local context for override options +_override_options_context: contextvars.ContextVar[Optional[PipelineOptions]] = ( + contextvars.ContextVar("override_options", default=None) +) + class BasePipeline(ABC): def __init__(self, pipeline_options: PipelineOptions): @@ -62,11 +68,27 @@ def __init__(self, pipeline_options: PipelineOptions): "When defined, it must point to a folder containing all models required by the pipeline." ) - def execute(self, in_doc: InputDocument, raises_on_error: bool) -> ConversionResult: + def get_effective_options(self) -> PipelineOptions: + """Get effective options for current execution context. + + Returns override options if set in context, else initialized options. + """ + override = _override_options_context.get() + return override if override is not None else self.pipeline_options + + def execute( + self, + in_doc: InputDocument, + raises_on_error: bool, + override_options: Optional[PipelineOptions] = None, + ) -> ConversionResult: conv_res = ConversionResult(input=in_doc) - _log.info(f"Processing document {in_doc.file.name}") + # Set override options in thread-local context + token = _override_options_context.set(override_options) + try: + _log.info(f"Processing document {in_doc.file.name}") with TimeRecorder( conv_res, "pipeline_total", scope=ProfilingScope.DOCUMENT ): @@ -89,6 +111,8 @@ def execute(self, in_doc: InputDocument, raises_on_error: bool) -> ConversionRes else: raise RuntimeError(f"Pipeline {self.__class__.__name__} failed") from e finally: + # Reset context + _override_options_context.reset(token) self._unload(conv_res) return conv_res @@ -112,7 +136,7 @@ def _prepare_elements( yield prepared_element with TimeRecorder(conv_res, "doc_enrich", scope=ProfilingScope.DOCUMENT): - for model in self.enrichment_pipe: + for model in self._get_enrichment_pipe_for_execution(): for element_batch in chunkify( _prepare_elements(conv_res, model), model.elements_batch_size, @@ -124,6 +148,11 @@ def _prepare_elements( return conv_res + def _get_enrichment_pipe_for_execution( + self, + ) -> Iterable[GenericEnrichmentModel[Any]]: + return self.enrichment_pipe + @abstractmethod def _determine_status(self, conv_res: ConversionResult) -> ConversionStatus: pass @@ -196,6 +225,32 @@ def _get_picture_description_model( accelerator_options=self.pipeline_options.accelerator_options, ) + def _get_enrichment_pipe_for_execution( + self, + ) -> Iterable[GenericEnrichmentModel[Any]]: + effective_options = self.get_effective_options() + assert isinstance(effective_options, ConvertPipelineOptions) + + do_picture_classification = ( + effective_options.do_picture_classification + or effective_options.do_chart_extraction + ) + do_picture_description = effective_options.do_picture_description + do_chart_extraction = effective_options.do_chart_extraction + + for model in self.enrichment_pipe: + if isinstance(model, DocumentPictureClassifier): + if do_picture_classification: + yield model + elif isinstance(model, PictureDescriptionBaseModel): + if do_picture_description: + yield model + elif isinstance(model, ChartExtractionModelGraniteVision): + if do_chart_extraction: + yield model + else: + yield model + @classmethod @abstractmethod def get_default_options(cls) -> ConvertPipelineOptions: diff --git a/docling/pipeline/standard_pdf_pipeline.py b/docling/pipeline/standard_pdf_pipeline.py index 58026b47b5..45aee6cb0b 100644 --- a/docling/pipeline/standard_pdf_pipeline.py +++ b/docling/pipeline/standard_pdf_pipeline.py @@ -26,7 +26,15 @@ from typing import Any, Callable, Iterable, List, Optional, Sequence, Tuple, cast import numpy as np -from docling_core.types.doc import DocItem, ImageRef, PictureItem, TableItem +from docling_core.types.doc import ( + CodeItem, + DocItem, + DocItemLabel, + ImageRef, + PictureItem, + TableItem, + TextItem, +) from docling.backend.abstract_backend import AbstractDocumentBackend from docling.backend.pdf_backend import PdfDocumentBackend @@ -38,8 +46,15 @@ Page, ) from docling.datamodel.document import ConversionResult -from docling.datamodel.pipeline_options import ThreadedPdfPipelineOptions +from docling.datamodel.pipeline_options import ( + PdfPipelineOptions, + ThreadedPdfPipelineOptions, +) from docling.datamodel.settings import settings +from docling.models.base_model import ( + GenericEnrichmentModel, + ItemAndImageEnrichmentElement, +) from docling.models.factories import ( get_layout_factory, get_ocr_factory, @@ -108,6 +123,46 @@ def is_complete_failure(self) -> bool: return self.success_count == 0 and self.failure_count > 0 +class _PassthroughPageModel: + def __call__( + self, conv_res: ConversionResult, page_batch: Iterable[Page] + ) -> Iterable[Page]: + yield from page_batch + + +class _RuntimeCodeFormulaModel(GenericEnrichmentModel[ItemAndImageEnrichmentElement]): + def __init__( + self, + model: CodeFormulaVlmModel, + *, + do_code_enrichment: bool, + do_formula_enrichment: bool, + ) -> None: + self._model = model + self._do_code_enrichment = do_code_enrichment + self._do_formula_enrichment = do_formula_enrichment + self.elements_batch_size = model.elements_batch_size + + def is_processable(self, doc: Any, element: Any) -> bool: + if isinstance(element, CodeItem): + return self._do_code_enrichment + if isinstance(element, TextItem): + return self._do_formula_enrichment and element.label == DocItemLabel.FORMULA + return False + + def prepare_element( + self, conv_res: ConversionResult, element: Any + ) -> Optional[ItemAndImageEnrichmentElement]: + if not self.is_processable(conv_res.document, element): + return None + return self._model.prepare_element(conv_res, element) + + def __call__( + self, doc: Any, element_batch: Iterable[ItemAndImageEnrichmentElement] + ) -> Iterable[Any]: + yield from self._model(doc, element_batch) + + class ThreadedQueue: """Bounded queue with blocking put/ get_batch and explicit *close()* semantics.""" @@ -428,9 +483,9 @@ class RunContext: class StandardPdfPipeline(ConvertPipeline): """High-performance PDF pipeline with multi-threaded stages.""" - def __init__(self, pipeline_options: ThreadedPdfPipelineOptions) -> None: + def __init__(self, pipeline_options: PdfPipelineOptions) -> None: super().__init__(pipeline_options) - self.pipeline_options: ThreadedPdfPipelineOptions = pipeline_options + self.pipeline_options: PdfPipelineOptions = pipeline_options self._run_seq = itertools.count(1) # deterministic, monotonic run ids # initialise heavy models once @@ -513,6 +568,26 @@ def _make_ocr_model(self, art_path: Optional[Path]) -> Any: accelerator_options=self.pipeline_options.accelerator_options, ) + def _get_enrichment_pipe_for_execution( + self, + ) -> Iterable[GenericEnrichmentModel[Any]]: + effective_options = self.get_effective_options() + assert isinstance(effective_options, PdfPipelineOptions) + + for model in super()._get_enrichment_pipe_for_execution(): + if isinstance(model, CodeFormulaVlmModel): + if ( + effective_options.do_code_enrichment + or effective_options.do_formula_enrichment + ): + yield _RuntimeCodeFormulaModel( + model, + do_code_enrichment=effective_options.do_code_enrichment, + do_formula_enrichment=effective_options.do_formula_enrichment, + ) + else: + yield model + def _release_page_resources(self, item: ThreadedItem) -> None: page = item.payload if page is None: @@ -531,6 +606,18 @@ def _release_page_resources(self, item: ThreadedItem) -> None: def _create_run_ctx(self) -> RunContext: opts = self.pipeline_options + effective_options = self.get_effective_options() + assert isinstance(effective_options, PdfPipelineOptions) + + ocr_model: Any = ( + self.ocr_model if effective_options.do_ocr else _PassthroughPageModel() + ) + table_model: Any = ( + self.table_model + if effective_options.do_table_structure + else _PassthroughPageModel() + ) + timed_out_run_ids: set[int] = set() preprocess = PreprocessThreadedStage( batch_timeout=opts.batch_polling_interval_seconds, @@ -540,7 +627,7 @@ def _create_run_ctx(self) -> RunContext: ) ocr = ThreadedPipelineStage( name="ocr", - model=self.ocr_model, + model=ocr_model, batch_size=opts.ocr_batch_size, batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, @@ -556,7 +643,7 @@ def _create_run_ctx(self) -> RunContext: ) table = ThreadedPipelineStage( name="table", - model=self.table_model, + model=table_model, batch_size=opts.table_batch_size, batch_timeout=opts.batch_polling_interval_seconds, queue_max_size=opts.queue_max_size, diff --git a/tests/test_options.py b/tests/test_options.py index 2286a5c493..0604216f3c 100644 --- a/tests/test_options.py +++ b/tests/test_options.py @@ -11,6 +11,7 @@ from docling.datamodel.base_models import ConversionStatus, InputFormat, QualityGrade from docling.datamodel.document import ConversionResult from docling.datamodel.pipeline_options import ( + ConvertPipelineOptions, PdfPipelineOptions, TableFormerMode, ) @@ -201,3 +202,56 @@ def test_confidence(test_doc_path): assert doc_result.confidence.mean_grade == QualityGrade.EXCELLENT assert doc_result.confidence.low_grade == QualityGrade.EXCELLENT + + +def test_override_compatibility_allows_disabling_do_flags(): + converter = DocumentConverter() + initialized = PdfPipelineOptions( + do_ocr=True, + do_table_structure=True, + do_code_enrichment=True, + do_formula_enrichment=True, + ) + override = initialized.model_copy(deep=True) + override.do_ocr = False + override.do_table_structure = False + override.do_code_enrichment = False + override.do_formula_enrichment = False + + assert converter._check_options_compatibility(initialized, override) + + +def test_override_compatibility_rejects_enabling_do_flags(): + converter = DocumentConverter() + initialized = PdfPipelineOptions( + do_ocr=False, + do_table_structure=False, + do_code_enrichment=False, + do_formula_enrichment=False, + ) + override = initialized.model_copy(deep=True) + override.do_ocr = True + + assert not converter._check_options_compatibility(initialized, override) + + +def test_override_compatibility_rejects_non_do_changes(): + converter = DocumentConverter() + initialized = PdfPipelineOptions() + override = initialized.model_copy(deep=True) + override.ocr_options.bitmap_area_threshold = 0.12 + + assert not converter._check_options_compatibility(initialized, override) + + +def test_override_compatibility_rejects_enabling_chart_extraction(): + converter = DocumentConverter() + initialized = ConvertPipelineOptions( + do_picture_classification=False, + do_picture_description=False, + do_chart_extraction=False, + ) + override = initialized.model_copy(deep=True) + override.do_chart_extraction = True + + assert not converter._check_options_compatibility(initialized, override)