Skip to content

Commit 79b8254

Browse files
authored
refactor: Enhance sitemap extraction and summarization features (#185)
- Added support for configurable sitemap parsing via environment variable `SITEMAP_PARSER` with options: `docusaurus`, `astro`, and `generic`. - Introduced new config map for extractor sitemap settings. - Updated `PageSummaryEnhancer` to group documents by URL for non-numeric pages and maintain separation for paged documents. - Enhanced `LangchainSummarizer` to respect max concurrency settings during summarization. - Improved error logging for source uploads in `DefaultSourceUploader`. - Added comprehensive tests for new sitemap parsing functions and summarization logic. - Updated README and documentation to reflect changes and provide guidance on memory management for backend pods.
1 parent fdd8570 commit 79b8254

17 files changed

Lines changed: 770 additions & 85 deletions

File tree

infrastructure/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ frontend:
257257

258258
The following values should be adjusted for the deployment:
259259

260+
> ⓘ INFO: If the backend pod gets `OOMKilled` (exit code `137`) on local k3d/Tilt setups, reduce `backend.workers` (each uvicorn worker is a separate Python process), disable reranking `RERANKER_ENABLED: false` or pin a smaller Flashrank model (e.g. `RERANKER_MODEL: ms-marco-TinyBERT-L-2-v2`), and/or increase the memory available to Docker/k3d.
261+
260262
```yaml
261263
backend:
262264
secrets:

infrastructure/rag/templates/_admin_backend_and_extractor_helpers.tpl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
{{- printf "%s-source-uploader-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
6969
{{- end -}}
7070

71+
{{- define "configmap.extractorSitemapName" -}}
72+
{{- printf "%s-extractor-sitemap-configmap" .Release.Name | trunc 63 | trimSuffix "-" -}}
73+
{{- end -}}
74+
7175
# image
7276
{{- define "adminBackend.fullImageName" -}}
7377
{{- $tag := default .Chart.AppVersion .Values.adminBackend.image.tag -}}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
apiVersion: v1
2+
kind: ConfigMap
3+
metadata:
4+
name: {{ template "configmap.extractorSitemapName" . }}
5+
data:
6+
{{- range $key, $value := .Values.extractor.envs.sitemap }}
7+
{{ $key }}: {{ $value | quote }}
8+
{{- end }}

infrastructure/rag/templates/extractor/deployment.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ spec:
110110
envFrom:
111111
- configMapRef:
112112
name: {{ template "configmap.s3Name" . }}
113+
- configMapRef:
114+
name: {{ template "configmap.extractorSitemapName" . }}
113115
- secretRef:
114116
name: {{ template "secret.s3Name" . }}
115117
{{- $hfCacheDir := include "extractor.huggingfaceCacheDir" . }}

infrastructure/rag/values.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ backend:
100100
- "--loop"
101101
- "asyncio"
102102

103+
# Note: Each uvicorn worker is a separate Python process and can significantly
104+
# increase memory usage.
103105
workers: 3
104106
wsMaxQueue: 6
105107

@@ -222,6 +224,7 @@ backend:
222224
RERANKER_K_DOCUMENTS: 5
223225
RERANKER_MIN_RELEVANCE_SCORE: 0.001
224226
RERANKER_ENABLED: true
227+
RERANKER_MODEL: "ms-marco-MultiBERT-L-12"
225228
chatHistory:
226229
CHAT_HISTORY_LIMIT: 4
227230
CHAT_HISTORY_REVERSE: true
@@ -355,6 +358,7 @@ adminBackend:
355358
USECASE_KEYVALUE_PORT: 6379
356359
USECASE_KEYVALUE_HOST: "rag-keydb"
357360
sourceUploader:
361+
# Large sitemap ingestions (per-page summaries) can take > 1 hour.
358362
SOURCE_UPLOADER_TIMEOUT: 3600
359363

360364
extractor:
@@ -408,6 +412,13 @@ extractor:
408412
# Directory inside the container to use as writable cache for ModelScope / OCR models
409413
modelscopeCacheDir: /var/modelscope
410414

415+
envs:
416+
sitemap:
417+
# Controls how HTML pages are parsed when loading from an XML sitemap.
418+
# Options: "docusaurus" (default), "astro", "generic"
419+
# Note: https://docs.stackit.cloud is built with Astro/Starlight -> use "astro".
420+
SITEMAP_PARSER: docusaurus
421+
411422
adminFrontend:
412423
name: admin-frontend
413424
replicaCount: 1

libs/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ For sitemap sources, additional parameters can be provided, e.g.:
331331

332332
Technically, all parameters of the `SitemapLoader` from LangChain can be provided.
333333

334+
The HTML parsing logic can be tuned via the `SITEMAP_PARSER` environment variable (default: `docusaurus`; options: `docusaurus`, `astro`, `generic`). For Helm deployments, set `extractor.envs.sitemap.SITEMAP_PARSER` in `infrastructure/rag/values.yaml`. You can also override the parser per upload by passing a `sitemap_parser` key/value pair (same options) in the `/upload_source` request (available as a dropdown in the admin frontend).
334335

335336
### 3.3 Replaceable parts
336337

libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,11 @@ def _thread_worker(self, source_name, source_type, kwargs, timeout):
149149
)
150150
)
151151
except asyncio.TimeoutError:
152-
logger.error("Upload of %s timed out after %s seconds", source_name, timeout)
152+
logger.error(
153+
"Upload of %s timed out after %s seconds (increase SOURCE_UPLOADER_TIMEOUT to allow longer ingestions)",
154+
source_name,
155+
timeout,
156+
)
153157
self._key_value_store.upsert(source_name, Status.ERROR)
154158
except Exception:
155159
logger.exception("Error while uploading %s", source_name)
Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Module for enhancing the summary of pages by grouping information by page and summarizing each page."""
22

3-
from asyncio import gather
3+
import asyncio
44
from hashlib import sha256
55
from typing import Optional
6+
from typing import Any
67

78
from langchain_core.documents import Document
89
from langchain_core.runnables import RunnableConfig
@@ -25,8 +26,36 @@ class PageSummaryEnhancer(SummaryEnhancer):
2526
"""
2627

2728
BASE64_IMAGE_KEY = "base64_image"
29+
DOCUMENT_URL_KEY = "document_url"
2830
DEFAULT_PAGE_NR = 1
2931

32+
@staticmethod
33+
def _parse_max_concurrency(config: Optional[RunnableConfig]) -> int:
34+
if not config:
35+
return 1
36+
raw = config.get("max_concurrency")
37+
if raw is None:
38+
return 1
39+
try:
40+
return max(1, int(raw))
41+
except (TypeError, ValueError):
42+
return 1
43+
44+
def _group_key(self, piece: Document) -> tuple[Any, ...]:
45+
document_url = piece.metadata.get(self.DOCUMENT_URL_KEY)
46+
page = piece.metadata.get("page", self.DEFAULT_PAGE_NR)
47+
48+
# For paged documents (PDF/docling/etc.) keep per-page summaries even if a shared document URL exists.
49+
if isinstance(page, int) or (isinstance(page, str) and page != "Unknown Title"):
50+
return ("page_number", document_url, page)
51+
52+
# For sources like sitemaps/confluence, `page` can be a non-unique title (or missing),
53+
# so group by the page URL when available to ensure one summary per page.
54+
if document_url:
55+
return ("document_url", document_url)
56+
57+
return ("page", page)
58+
3059
async def _asummarize_page(self, page_pieces: list[Document], config: Optional[RunnableConfig]) -> Document:
3160
full_page_content = " ".join([piece.page_content for piece in page_pieces])
3261
summary = await self._summarizer.ainvoke(full_page_content, config)
@@ -39,24 +68,46 @@ async def _asummarize_page(self, page_pieces: list[Document], config: Optional[R
3968
return Document(metadata=meta, page_content=summary)
4069

4170
async def _acreate_summary(self, information: list[Document], config: Optional[RunnableConfig]) -> list[Document]:
42-
distinct_pages = []
71+
grouped = self._group_information(information)
72+
max_concurrency = self._parse_max_concurrency(config)
73+
return await self._summarize_groups(grouped, config, max_concurrency=max_concurrency)
74+
75+
def _group_information(self, information: list[Document]) -> list[list[Document]]:
76+
ordered_keys: list[tuple[Any, ...]] = []
77+
groups: dict[tuple[Any, ...], list[Document]] = {}
4378
for info in information:
44-
if info.metadata.get("page", self.DEFAULT_PAGE_NR) not in distinct_pages:
45-
distinct_pages.append(info.metadata.get("page", self.DEFAULT_PAGE_NR))
46-
47-
grouped = []
48-
for page in distinct_pages:
49-
group = []
50-
for compare_info in information:
51-
if compare_info.metadata.get("page", self.DEFAULT_PAGE_NR) == page:
52-
group.append(compare_info)
53-
if (
54-
self._chunker_settings
55-
and len(" ".join([item.page_content for item in group])) < self._chunker_settings.max_size
56-
):
57-
continue
58-
grouped.append(group)
59-
60-
summary_tasks = [self._asummarize_page(info_group, config) for info_group in tqdm(grouped)]
61-
62-
return await gather(*summary_tasks)
79+
key = self._group_key(info)
80+
if key not in groups:
81+
ordered_keys.append(key)
82+
groups[key] = []
83+
groups[key].append(info)
84+
return [groups[key] for key in ordered_keys]
85+
86+
async def _summarize_groups(
87+
self,
88+
grouped: list[list[Document]],
89+
config: Optional[RunnableConfig],
90+
*,
91+
max_concurrency: int,
92+
) -> list[Document]:
93+
if max_concurrency == 1:
94+
summaries: list[Document] = []
95+
for info_group in tqdm(grouped):
96+
summaries.append(await self._asummarize_page(info_group, config))
97+
return summaries
98+
99+
semaphore = asyncio.Semaphore(max_concurrency)
100+
results: list[Document | None] = [None] * len(grouped)
101+
102+
async def _run(idx: int, info_group: list[Document]) -> tuple[int, Document]:
103+
async with semaphore:
104+
return idx, await self._asummarize_page(info_group, config)
105+
106+
tasks = [asyncio.create_task(_run(idx, info_group)) for idx, info_group in enumerate(grouped)]
107+
with tqdm(total=len(tasks)) as pbar:
108+
for task in asyncio.as_completed(tasks):
109+
idx, summary = await task
110+
results[idx] = summary
111+
pbar.update(1)
112+
113+
return [summary for summary in results if summary is not None]

libs/admin-api-lib/src/admin_api_lib/impl/summarizer/langchain_summarizer.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,24 @@ def __init__(
4444
self._semaphore = semaphore
4545
self._retry_decorator_settings = create_retry_decorator_settings(summarizer_settings, retry_decorator_settings)
4646

47+
@staticmethod
48+
def _parse_max_concurrency(config: RunnableConfig) -> Optional[int]:
49+
"""Parse max concurrency from a RunnableConfig.
50+
51+
Returns
52+
-------
53+
Optional[int]
54+
An integer >= 1 if configured and valid, otherwise None.
55+
"""
56+
max_concurrency = config.get("max_concurrency")
57+
if max_concurrency is None:
58+
return None
59+
60+
try:
61+
return max(1, int(max_concurrency))
62+
except (TypeError, ValueError):
63+
return None
64+
4765
async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig] = None) -> SummarizerOutput:
4866
"""
4967
Asynchronously invokes the summarization process on the given query.
@@ -77,9 +95,8 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
7795
langchain_documents = self._chunker.split_documents([document])
7896
logger.debug("Summarizing %d chunk(s)...", len(langchain_documents))
7997

80-
# Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
81-
tasks = [asyncio.create_task(self._summarize_chunk(doc.page_content, config)) for doc in langchain_documents]
82-
outputs = await asyncio.gather(*tasks)
98+
max_concurrency = self._parse_max_concurrency(config)
99+
outputs = await self._summarize_documents(langchain_documents, config, max_concurrency=max_concurrency)
83100

84101
if len(outputs) == 1:
85102
return outputs[0]
@@ -93,6 +110,34 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
93110
)
94111
return await self._summarize_chunk(merged, config)
95112

113+
async def _summarize_documents(
114+
self,
115+
documents: list[Document],
116+
config: RunnableConfig,
117+
*,
118+
max_concurrency: Optional[int],
119+
) -> list[SummarizerOutput]:
120+
"""Summarize a set of already-chunked documents.
121+
122+
Notes
123+
-----
124+
This optionally limits task fan-out using a per-call semaphore (max_concurrency).
125+
The actual LLM call concurrency is always bounded by the instance semaphore held
126+
inside `_summarize_chunk`.
127+
"""
128+
if max_concurrency == 1:
129+
return [await self._summarize_chunk(doc.page_content, config) for doc in documents]
130+
131+
limiter: asyncio.Semaphore | None = asyncio.Semaphore(max_concurrency) if max_concurrency is not None else None
132+
133+
async def _run(doc: Document) -> SummarizerOutput:
134+
if limiter is None:
135+
return await self._summarize_chunk(doc.page_content, config)
136+
async with limiter:
137+
return await self._summarize_chunk(doc.page_content, config)
138+
139+
return await asyncio.gather(*(_run(doc) for doc in documents))
140+
96141
def _create_chain(self) -> Runnable:
97142
return self._langfuse_manager.get_base_prompt(self.__class__.__name__) | self._langfuse_manager.get_base_llm(
98143
self.__class__.__name__
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import asyncio
2+
3+
import pytest
4+
from langchain_core.documents import Document
5+
6+
from admin_api_lib.impl.settings.summarizer_settings import SummarizerSettings
7+
from admin_api_lib.impl.summarizer.langchain_summarizer import LangchainSummarizer
8+
from rag_core_lib.impl.settings.retry_decorator_settings import RetryDecoratorSettings
9+
from rag_core_lib.impl.utils.async_threadsafe_semaphore import AsyncThreadsafeSemaphore
10+
11+
12+
class _StaticChunker:
13+
def __init__(self, docs: list[Document]):
14+
self._docs = docs
15+
16+
def split_documents(self, _docs: list[Document]) -> list[Document]:
17+
return self._docs
18+
19+
20+
class _ConcurrencyTrackingSummarizer(LangchainSummarizer):
21+
def __init__(self, docs: list[Document]):
22+
super().__init__(
23+
langfuse_manager=object(), # type: ignore[arg-type]
24+
chunker=_StaticChunker(docs), # type: ignore[arg-type]
25+
semaphore=AsyncThreadsafeSemaphore(100),
26+
summarizer_settings=SummarizerSettings(),
27+
retry_decorator_settings=RetryDecoratorSettings(),
28+
)
29+
self.in_flight = 0
30+
self.max_in_flight = 0
31+
32+
async def _summarize_chunk(self, text: str, config): # type: ignore[override]
33+
self.in_flight += 1
34+
self.max_in_flight = max(self.max_in_flight, self.in_flight)
35+
await asyncio.sleep(0.01)
36+
self.in_flight -= 1
37+
return text
38+
39+
40+
@pytest.mark.asyncio
41+
async def test_langchain_summarizer_respects_max_concurrency_one():
42+
docs = [Document(page_content=f"chunk-{idx}") for idx in range(5)]
43+
summarizer = _ConcurrencyTrackingSummarizer(docs)
44+
45+
await summarizer.ainvoke("input", config={"max_concurrency": 1})
46+
47+
assert summarizer.max_in_flight == 1
48+
49+
50+
@pytest.mark.asyncio
51+
async def test_langchain_summarizer_respects_max_concurrency_limit():
52+
docs = [Document(page_content=f"chunk-{idx}") for idx in range(8)]
53+
summarizer = _ConcurrencyTrackingSummarizer(docs)
54+
55+
await summarizer.ainvoke("input", config={"max_concurrency": 2})
56+
57+
assert summarizer.max_in_flight <= 2

0 commit comments

Comments
 (0)