Skip to content

Commit e6da408

Browse files
authored
chore: improve pdf rendering (#1102)
1 parent 6bc5b62 commit e6da408

8 files changed

Lines changed: 476 additions & 150 deletions

File tree

aperag/docparser/chunking.py

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,67 @@ def __init__(self, chunk_size: int, chunk_overlap: int, tokenizer: Callable[[str
4141

4242
def __call__(self, parts: list[Part]) -> list[Part]:
4343
groups = self._to_groups(parts)
44+
groups = self._merge_consecutive_title_groups(groups)
4445
return self._rechunk(groups)
4546

47+
def _is_pure_title_group(self, group: Group) -> bool:
48+
"""A group is considered a pure title if it has a title and only one item."""
49+
return group.title_level > 0 and len(group.items) == 1
50+
51+
def _merge_consecutive_title_groups(self, groups: list[Group]) -> list[Group]:
52+
if not groups:
53+
return []
54+
55+
new_groups: list[Group] = []
56+
i = 0
57+
while i < len(groups):
58+
current_group = groups[i]
59+
60+
if not self._is_pure_title_group(current_group):
61+
new_groups.append(current_group)
62+
i += 1
63+
continue
64+
65+
# It's a pure title group, let's look ahead to merge.
66+
merged_items = list(current_group.items)
67+
# The highest level is the smallest number.
68+
highest_level = current_group.title_level
69+
70+
j = i + 1
71+
# 1. Merge consecutive pure title groups
72+
while j < len(groups):
73+
next_group = groups[j]
74+
if not self._is_pure_title_group(next_group):
75+
break # Stop merging titles
76+
77+
# Check hierarchy: don't merge a higher-level title (e.g., H2 into an H3 group)
78+
if next_group.title_level < highest_level:
79+
break
80+
81+
# Merge it
82+
merged_items.extend(next_group.items)
83+
j += 1
84+
85+
# 2. After merging titles, try to merge one more content group
86+
if j < len(groups):
87+
next_group = groups[j]
88+
if not self._is_pure_title_group(next_group):
89+
if next_group.title_level == 0 or next_group.title_level >= current_group.title_level:
90+
merged_items.extend(next_group.items)
91+
j += 1 # This content group is also merged
92+
93+
# Create the new merged group
94+
# The title and title_level of the merged group should be from the first group.
95+
new_group = Group(
96+
title_level=current_group.title_level,
97+
title=current_group.title,
98+
items=merged_items,
99+
)
100+
new_groups.append(new_group)
101+
i = j # Move index to the next un-processed group
102+
103+
return new_groups
104+
46105
def _to_groups(self, parts: list[Part]) -> list[Group]:
47106
result: list[Group] = []
48107
curr_group: Group | None = None
@@ -120,27 +179,37 @@ def _rechunk(self, groups: list[Group]) -> list[Part]:
120179
# If the single part is too large, split it into smaller chunks
121180
splitter = SimpleSemanticSplitter(self.tokenizer)
122181
chunks = splitter.split(part.content, self.chunk_size, self.chunk_overlap)
123-
metadata = part.metadata
182+
metadata = part.metadata.copy()
124183
metadata.pop("tokens", None)
184+
metadata["splitted"] = True
125185
for chunk in chunks:
126186
parts.append(Part(content=chunk, metadata=metadata.copy()))
127187
else:
128188
parts.append(part)
129189

130190
# Rechunk the parts
131191
assert last_part is None
132-
highest_level_in_last_part = group.title_level # All parts are in the same group
133192
tokens_sum = 0
193+
prev_part_splitted = False
134194
for part in parts:
195+
curr_part_splitted = part.metadata.get("splitted", False)
135196
tokens = self._count_tokens(part)
136-
if tokens_sum + tokens > self.chunk_size:
197+
# Don't merge parts if too many tokens, or the previous part is splitted.
198+
if tokens_sum + tokens > self.chunk_size or (prev_part_splitted and not curr_part_splitted):
137199
if last_part is not None:
138200
result.append(last_part)
139201
last_part = None
140202
tokens_sum = 0
141203

142204
last_part = self._append_part_to_part(part, last_part, titles)
143205
tokens_sum += tokens
206+
prev_part_splitted = curr_part_splitted
207+
208+
# Don't merge any group into a partial group
209+
if last_part is not None:
210+
result.append(last_part)
211+
last_part = None
212+
highest_level_in_last_part = None
144213

145214
if last_part is not None:
146215
result.append(last_part)

aperag/index/document_parser.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import io
1516
import logging
1617
from pathlib import Path
1718
from typing import Any, Dict, List, Optional
1819

20+
import pikepdf
21+
22+
from aperag.docparser.base import AssetBinPart, MarkdownPart, PdfPart
1923
from aperag.docparser.doc_parser import DocParser
2024
from aperag.objectstore.base import get_object_store
2125

@@ -61,9 +65,22 @@ def parse_document(
6165
raise ValueError(f"unsupported file type: {filepath_obj.suffix}")
6266

6367
parts = parser.parse_file(filepath_obj, file_metadata)
68+
69+
# If there are no PdfPart in parts and the doc is a pdf, then add the doc itself as a PdfPart
70+
if filepath_obj.suffix.lower() == ".pdf":
71+
if not any(isinstance(p, PdfPart) for p in parts):
72+
with open(filepath_obj, "rb") as f:
73+
parts.append(PdfPart(data=f.read()))
74+
6475
logger.info(f"Parsed document {filepath} into {len(parts)} parts")
6576
return parts
6677

78+
def linearize_pdf(self, data: bytes) -> bytes:
79+
with pikepdf.open(io.BytesIO(data)) as pdf:
80+
with io.BytesIO() as buffer:
81+
pdf.save(buffer, linearize=True)
82+
return buffer.getvalue()
83+
6784
def save_processed_content_and_assets(self, doc_parts: List[Any], object_store_base_path: Optional[str]) -> str:
6885
"""
6986
Save processed content and assets to object storage.
@@ -78,7 +95,6 @@ def save_processed_content_and_assets(self, doc_parts: List[Any], object_store_b
7895
Raises:
7996
Exception: If object storage operations fail
8097
"""
81-
from aperag.docparser.base import AssetBinPart, MarkdownPart, PdfPart
8298

8399
content = ""
84100

@@ -104,8 +120,9 @@ def save_processed_content_and_assets(self, doc_parts: List[Any], object_store_b
104120

105121
if pdf_part is not None:
106122
converted_pdf_upload_path = f"{base_path}/converted.pdf"
107-
obj_store.put(converted_pdf_upload_path, pdf_part.data)
108-
logger.info(f"uploaded converted pdf to {md_upload_path}, size: {len(pdf_part.data)}")
123+
linearized_pdf_data = self.linearize_pdf(pdf_part.data)
124+
obj_store.put(converted_pdf_upload_path, linearized_pdf_data)
125+
logger.info(f"uploaded converted pdf to {converted_pdf_upload_path}, size: {len(linearized_pdf_data)}")
109126

110127
# Save assets
111128
asset_count = 0

aperag/service/document_service.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import logging
1717
import mimetypes
1818
import os
19+
import re
1920
from typing import List
2021

2122
from fastapi import HTTPException, UploadFile
@@ -575,14 +576,14 @@ async def _get_document_preview(session):
575576

576577
converted_pdf_object_path = None
577578
index_data = json.loads(doc_index.index_data) if doc_index and doc_index.index_data else {}
578-
if index_data.get("has_pdf_source_map") and not document.name.lower().endswith(".pdf"):
579+
if index_data.get("has_pdf_source_map"):
579580
# If the parsing result contains pdf_source_map metadata,
580581
# it means it is a PDF or has been converted to a PDF.
581-
# But only converted documents have a converted.pdf file.
582-
pdf_path = f"{document.object_store_base_path()}/converted.pdf"
582+
converted_pdf_name = "converted.pdf"
583+
pdf_path = f"{document.object_store_base_path()}/{converted_pdf_name}"
583584
exists = await async_obj_store.obj_exists(pdf_path)
584585
if exists:
585-
converted_pdf_object_path = "converted.pdf"
586+
converted_pdf_object_path = converted_pdf_name
586587

587588
# 5. Construct and return response
588589
return DocumentPreview(
@@ -596,9 +597,12 @@ async def _get_document_preview(session):
596597
# Execute query with proper session management
597598
return await self.db_ops._execute_query(_get_document_preview)
598599

599-
async def get_document_object(self, user_id: str, collection_id: str, document_id: str, path: str):
600+
async def get_document_object(
601+
self, user_id: str, collection_id: str, document_id: str, path: str, range_header: str = None
602+
):
600603
"""
601604
Get a file object associated with a document from the object store.
605+
Supports HTTP Range requests.
602606
"""
603607

604608
# Use database operations with proper session management
@@ -622,19 +626,49 @@ async def _get_document_object(session):
622626
# 2. Get the object from object store
623627
try:
624628
async_obj_store = get_async_object_store()
625-
get_obj_result = await async_obj_store.get(full_path)
629+
headers = {"Accept-Ranges": "bytes"}
630+
content_type, _ = mimetypes.guess_type(full_path)
631+
if content_type is None:
632+
content_type = "application/octet-stream"
633+
headers["Content-Type"] = content_type
634+
635+
if range_header:
636+
# For range requests, we need the total size first.
637+
total_size = await async_obj_store.get_obj_size(full_path)
638+
if total_size is None:
639+
raise HTTPException(status_code=404, detail="Object not found at specified path")
640+
641+
range_match = re.match(r"bytes=(\d+)-(\d*)", range_header)
642+
if not range_match:
643+
raise HTTPException(status_code=400, detail="Invalid range header format")
644+
645+
start_byte = int(range_match.group(1))
646+
end_byte_str = range_match.group(2)
647+
end_byte = int(end_byte_str) if end_byte_str else total_size - 1
648+
649+
if start_byte >= total_size or end_byte >= total_size or start_byte > end_byte:
650+
headers["Content-Range"] = f"bytes */{total_size}"
651+
raise HTTPException(status_code=416, headers=headers, detail="Requested range not satisfiable")
626652

653+
# Use stream_range to get the partial content
654+
range_result = await async_obj_store.stream_range(full_path, start=start_byte, end=end_byte)
655+
if not range_result:
656+
raise HTTPException(status_code=404, detail="Object not found at specified path")
657+
658+
data_stream, content_length = range_result
659+
headers["Content-Range"] = f"bytes {start_byte}-{end_byte}/{total_size}"
660+
headers["Content-Length"] = str(content_length)
661+
return StreamingResponse(data_stream, status_code=206, headers=headers)
662+
663+
# Full content response - optimized to use size from get()
664+
get_obj_result = await async_obj_store.get(full_path)
627665
if not get_obj_result:
628666
raise HTTPException(status_code=404, detail="Object not found at specified path")
629667

630-
data_stream, _ = get_obj_result
631-
632-
# 3. Stream the response
633-
content_type, _ = mimetypes.guess_type(full_path)
634-
if content_type is None:
635-
content_type = "application/octet-stream"
668+
data_stream, file_size = get_obj_result
669+
headers["Content-Length"] = str(file_size)
670+
return StreamingResponse(data_stream, headers=headers)
636671

637-
return StreamingResponse(data_stream, media_type=content_type)
638672
except Exception as e:
639673
logger.error(f"Failed to get object for document {document_id} at path {full_path}: {e}", exc_info=True)
640674
raise HTTPException(status_code=500, detail="Failed to get object from store")

aperag/views/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,12 +178,14 @@ async def get_document_preview(
178178
operation_id="get_document_object",
179179
)
180180
async def get_document_object(
181+
request: Request,
181182
collection_id: str,
182183
document_id: str,
183184
path: str,
184185
user: User = Depends(current_user),
185186
):
186-
return await document_service.get_document_object(user.id, collection_id, document_id, path)
187+
range_header = request.headers.get("range")
188+
return await document_service.get_document_object(user.id, collection_id, document_id, path, range_header)
187189

188190

189191
@router.post("/collections/{collection_id}/documents/{document_id}/rebuild_indexes", tags=["documents"])

0 commit comments

Comments
 (0)