Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions opencontractserver/tasks/doc_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ def ingest_doc(self, user_id: int, doc_id: int) -> dict[str, Any]:
# threshold (single request) and >= 2 descriptors otherwise — there is
# no one-element case — so a truthiness check expresses the contract.
chunk_inputs = parser_instance.prepare_chunk_inputs(doc_id)
if chunk_inputs:
chunk_count = len(chunk_inputs)
max_concurrent_chunks = parser_instance.max_concurrent_chunks
if chunk_inputs and chunk_count <= max_concurrent_chunks:
from opencontractserver.tasks.chunk_tasks import (
parse_document_chunk,
reassemble_and_save_chunks,
Expand All @@ -670,12 +672,18 @@ def ingest_doc(self, user_id: int, doc_id: int) -> dict[str, Any]:
)
logger.info(
f"[ingest_doc] Document {doc_id}: dispatching "
f"{len(chunk_inputs)} chunks via chord"
f"{chunk_count} chunks via chord"
)
# Replaces this task with the chord; Celery appends the remaining
# ingest chain (remap, unlock) after the callback, and the chain's
# link_error errback still rescues failures.
return self.replace(chord(header, callback))
if chunk_inputs:
logger.info(
f"[ingest_doc] Document {doc_id}: parsing {chunk_count} "
"chunks in-process because it exceeds max_concurrent_chunks="
f"{max_concurrent_chunks}"
)

# Call the parser's process_document method (synchronous / non-chunked path)
try:
Expand Down
30 changes: 30 additions & 0 deletions opencontractserver/tests/test_chunk_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,33 @@ def test_ingest_doc_large_pdf_replaces_with_chord_when_not_eager(self):
).get()
replace_mock.assert_called_once()
inline_parse.assert_not_called()

def test_ingest_doc_large_pdf_falls_back_when_chunk_count_exceeds_limit(self):
"""Do not enqueue more chord header tasks than max_concurrent_chunks."""
from unittest.mock import patch

from opencontractserver.tasks import doc_tasks

doc, user = self._doc(8) # max_pages_per_chunk=2, min=2 → 4 chunks
parser = _FakeChunkedParser()
parser.max_concurrent_chunks = 3
with override_settings(CELERY_TASK_ALWAYS_EAGER=False):
with patch.object(
doc_tasks,
"_resolve_parser_for_ingest",
return_value=(
"opencontractserver.tests.test_chunked_parser._FakeChunkedParser",
parser,
{},
),
), patch.object(
doc_tasks.ingest_doc, "replace"
) as replace_mock, patch.object(
_FakeChunkedParser, "process_document", return_value=None
) as inline_parse:
result = doc_tasks.ingest_doc.apply(
kwargs=dict(user_id=user.id, doc_id=doc.id)
).get()
self.assertEqual(result["status"], "success")
replace_mock.assert_not_called()
inline_parse.assert_called_once()