Skip to content

Commit f66658f

Browse files
committed
fix: code review fixes, cloud streaming, naming alignment
- Critical: preserve text in markdown structure for fallback retrieval - Cloud: SSE response close, folder cache dict, truncate error body - Cloud: filter internal tools, async-safe streaming via to_thread - SQLite: multi-thread connection tracking, context manager - Security: collection name validation, parse_pages range cap - Polish: use count_tokens wrapper, _EXAMPLES_DIR naming, QueryStream public - Backend protocol: add @runtime_checkable
1 parent 92974d3 commit f66658f

File tree

13 files changed

+178
-100
lines changed

13 files changed

+178
-100
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,6 @@ __pycache__
55
.venv/
66
logs/
77
pageindex.egg-info/
8+
*.db
89
venv/
910
uv.lock

examples/cloud_demo.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
"""
2-
PageIndex Cloud Demo
2+
Agentic Vectorless RAG with PageIndex SDK - Cloud Demo
33
4-
Usage:
4+
Uses CloudClient for fully-managed document indexing and QA.
5+
No LLM API key needed — the cloud service handles everything.
6+
7+
Steps:
8+
1 — Upload and index a PDF via PageIndex cloud
9+
2 — Stream a question with tool call visibility
10+
11+
Requirements:
512
pip install pageindex
613
export PAGEINDEX_API_KEY=your-api-key
7-
python examples/cloud_demo.py
814
"""
915
import asyncio
1016
import os
1117
from pathlib import Path
1218
import requests
1319
from pageindex import CloudClient
1420

15-
_DIR = Path(__file__).parent
21+
_EXAMPLES_DIR = Path(__file__).parent
1622
PDF_URL = "https://arxiv.org/pdf/1706.03762.pdf"
17-
PDF_PATH = _DIR / "documents" / "attention.pdf"
23+
PDF_PATH = _EXAMPLES_DIR / "documents" / "attention.pdf"
1824

1925
# Download PDF if needed
2026
if not PDF_PATH.exists():

examples/local_demo.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
"""
2-
PageIndex Local Demo
2+
Agentic Vectorless RAG with PageIndex SDK - Local Demo
33
4-
Usage:
5-
pip install pageindex
6-
python examples/local_demo.py
4+
A simple example of using LocalClient for self-hosted document indexing
5+
and agent-based QA. The agent uses OpenAI Agents SDK to reason over
6+
the document's tree structure index.
7+
8+
Steps:
9+
1 — Download and index a PDF
10+
2 — Stream a question with tool call visibility
11+
12+
Requirements: pip install pageindex openai-agents
713
"""
814
import asyncio
915
from pathlib import Path
1016
import requests
1117
from pageindex import LocalClient
1218

13-
_DIR = Path(__file__).parent
19+
_EXAMPLES_DIR = Path(__file__).parent
1420
PDF_URL = "https://arxiv.org/pdf/1706.03762.pdf"
15-
PDF_PATH = _DIR / "documents" / "attention.pdf"
16-
WORKSPACE = _DIR / "workspace"
21+
PDF_PATH = _EXAMPLES_DIR / "documents" / "attention.pdf"
22+
WORKSPACE = _EXAMPLES_DIR / "workspace"
1723

1824
# Download PDF if needed
1925
if not PDF_PATH.exists():

pageindex/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
# SDK exports
88
from .client import PageIndexClient, LocalClient, CloudClient
9+
from .config import IndexConfig
910
from .collection import Collection
1011
from .parser.protocol import ContentNode, ParsedDocument, DocumentParser
1112
from .storage.protocol import StorageEngine
@@ -23,6 +24,7 @@
2324
"PageIndexClient",
2425
"LocalClient",
2526
"CloudClient",
27+
"IndexConfig",
2628
"Collection",
2729
"ContentNode",
2830
"ParsedDocument",

pageindex/backend/cloud.py

Lines changed: 95 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020

2121
API_BASE = "https://api.pageindex.ai"
2222

23+
_INTERNAL_TOOLS = frozenset({"ToolSearch", "Read", "Grep", "Glob", "Bash", "Edit", "Write"})
24+
2325

2426
class CloudBackend:
2527
def __init__(self, api_key: str):
2628
self._api_key = api_key
2729
self._headers = {"api_key": api_key}
30+
self._folder_id_cache: dict[str, str | None] = {}
2831

2932
# ── HTTP helpers ──────────────────────────────────────────────────────
3033

@@ -38,7 +41,8 @@ def _request(self, method: str, path: str, **kwargs) -> dict:
3841
time.sleep(2 ** attempt)
3942
continue
4043
if resp.status_code != 200:
41-
raise CloudAPIError(f"Cloud API error {resp.status_code}: {resp.text}")
44+
body = resp.text[:500] if resp.text else ""
45+
raise CloudAPIError(f"Cloud API error {resp.status_code}: {body}")
4246
return resp.json() if resp.content else {}
4347
except requests.RequestException as e:
4448
if attempt == 2:
@@ -63,12 +67,14 @@ def _enc(value: str) -> str:
6367
def create_collection(self, name: str) -> None:
6468
self._validate_collection_name(name)
6569
try:
66-
self._request("POST", "/folder/", json={"name": name})
70+
resp = self._request("POST", "/folder/", json={"name": name})
71+
self._folder_id_cache[name] = resp.get("folder", {}).get("id")
6772
except CloudAPIError as e:
6873
if "403" in str(e):
6974
logger.warning(
7075
"Folders require a Max plan. Upgrade at https://dash.pageindex.ai/subscription"
7176
)
77+
self._folder_id_cache[name] = None
7278
else:
7379
raise
7480

@@ -78,31 +84,33 @@ def get_or_create_collection(self, name: str) -> None:
7884
data = self._request("GET", "/folders/")
7985
for folder in data.get("folders", []):
8086
if folder.get("name") == name:
81-
self._folder_id_cache = folder["id"]
87+
self._folder_id_cache[name] = folder["id"]
8288
return
8389
resp = self._request("POST", "/folder/", json={"name": name})
84-
self._folder_id_cache = resp.get("folder", {}).get("id")
90+
self._folder_id_cache[name] = resp.get("folder", {}).get("id")
8591
except CloudAPIError as e:
8692
if "403" in str(e):
8793
logger.warning(
8894
"Folders require a Max plan. Documents will be stored without folder organization. "
8995
"Upgrade at https://dash.pageindex.ai/subscription"
9096
)
91-
self._folder_id_cache = None
97+
self._folder_id_cache[name] = None
9298
else:
9399
raise
94100

95101
def _get_folder_id(self, name: str) -> str | None:
96102
"""Resolve collection name to folder ID. Returns None if folders not available."""
97-
if hasattr(self, '_folder_id_cache'):
98-
return self._folder_id_cache
103+
if name in self._folder_id_cache:
104+
return self._folder_id_cache.get(name)
99105
try:
100106
data = self._request("GET", "/folders/")
101107
for folder in data.get("folders", []):
102108
if folder.get("name") == name:
109+
self._folder_id_cache[name] = folder["id"]
103110
return folder["id"]
104111
except CloudAPIError:
105112
pass
113+
self._folder_id_cache[name] = None
106114
return None
107115

108116
def list_collections(self) -> list[str]:
@@ -204,67 +212,90 @@ async def query_stream(self, collection: str, question: str,
204212
- mcp_tool_use_start: tool call started (has tool_name, server_name)
205213
- tool_use: tool call argument delta
206214
- tool_use_stop: tool call ended
215+
216+
Note: Uses synchronous ``requests`` under the hood. The blocking
217+
HTTP call and line iteration are offloaded to a thread via
218+
``asyncio.to_thread`` so the caller's event loop is not blocked.
219+
For full async streaming consider migrating to ``httpx.AsyncClient``.
207220
"""
221+
import asyncio
222+
208223
doc_id = doc_ids if doc_ids else self._get_all_doc_ids(collection)
209-
resp = requests.post(
210-
f"{API_BASE}/chat/completions/",
211-
headers=self._headers,
212-
json={
213-
"messages": [{"role": "user", "content": question}],
214-
"doc_id": doc_id,
215-
"stream": True,
216-
"stream_metadata": True,
217-
},
218-
stream=True,
219-
timeout=120,
220-
)
221-
if resp.status_code != 200:
222-
raise CloudAPIError(f"Cloud streaming error {resp.status_code}: {resp.text}")
223-
224-
current_tool_name = None
225-
current_tool_args = []
226-
227-
for line in resp.iter_lines(decode_unicode=True):
228-
if not line or not line.startswith("data: "):
229-
continue
230-
data_str = line[6:]
231-
if data_str.strip() == "[DONE]":
232-
break
233-
try:
234-
chunk = json.loads(data_str)
235-
except json.JSONDecodeError:
236-
continue
237-
238-
meta = chunk.get("block_metadata", {})
239-
block_type = meta.get("type", "")
240-
choices = chunk.get("choices", [])
241-
delta = choices[0].get("delta", {}) if choices else {}
242-
content = delta.get("content", "")
243-
244-
if block_type == "mcp_tool_use_start":
245-
current_tool_name = meta.get("tool_name", "")
246-
current_tool_args = []
247-
248-
elif block_type == "tool_use":
249-
if content:
250-
current_tool_args.append(content)
251-
252-
elif block_type == "tool_use_stop":
253-
# Skip internal tools (ToolSearch, Read, Grep, etc.)
254-
_INTERNAL_TOOLS = {"ToolSearch", "Read", "Grep", "Glob", "Bash", "Edit", "Write"}
255-
if current_tool_name and current_tool_name not in _INTERNAL_TOOLS:
256-
args_str = "".join(current_tool_args)
257-
yield QueryEvent(type="tool_call", data={
258-
"name": current_tool_name,
259-
"args": args_str,
260-
})
261-
current_tool_name = None
262-
current_tool_args = []
263224

264-
elif block_type == "text" and content:
265-
yield QueryEvent(type="answer_delta", data=content)
225+
headers = self._headers
226+
227+
def _stream() -> list[tuple[str, object]]:
228+
"""Execute the blocking SSE request in a worker thread."""
229+
resp = requests.post(
230+
f"{API_BASE}/chat/completions/",
231+
headers=headers,
232+
json={
233+
"messages": [{"role": "user", "content": question}],
234+
"doc_id": doc_id,
235+
"stream": True,
236+
"stream_metadata": True,
237+
},
238+
stream=True,
239+
timeout=120,
240+
)
241+
try:
242+
if resp.status_code != 200:
243+
body = resp.text[:500] if resp.text else ""
244+
raise CloudAPIError(
245+
f"Cloud streaming error {resp.status_code}: {body}"
246+
)
266247

267-
yield QueryEvent(type="answer_done", data="")
248+
events: list[tuple[str, object]] = []
249+
current_tool_name = None
250+
current_tool_args: list[str] = []
251+
252+
for line in resp.iter_lines(decode_unicode=True):
253+
if not line or not line.startswith("data: "):
254+
continue
255+
data_str = line[6:]
256+
if data_str.strip() == "[DONE]":
257+
break
258+
try:
259+
chunk = json.loads(data_str)
260+
except json.JSONDecodeError:
261+
continue
262+
263+
meta = chunk.get("block_metadata", {})
264+
block_type = meta.get("type", "")
265+
choices = chunk.get("choices", [])
266+
delta = choices[0].get("delta", {}) if choices else {}
267+
content = delta.get("content", "")
268+
269+
if block_type == "mcp_tool_use_start":
270+
current_tool_name = meta.get("tool_name", "")
271+
current_tool_args = []
272+
273+
elif block_type == "tool_use":
274+
if content:
275+
current_tool_args.append(content)
276+
277+
elif block_type == "tool_use_stop":
278+
# Skip internal tools (ToolSearch, Read, Grep, etc.)
279+
if current_tool_name and current_tool_name not in _INTERNAL_TOOLS:
280+
args_str = "".join(current_tool_args)
281+
events.append(("tool_call", {
282+
"name": current_tool_name,
283+
"args": args_str,
284+
}))
285+
current_tool_name = None
286+
current_tool_args = []
287+
288+
elif block_type == "text" and content:
289+
events.append(("answer_delta", content))
290+
291+
events.append(("answer_done", ""))
292+
return events
293+
finally:
294+
resp.close()
295+
296+
events = await asyncio.to_thread(_stream)
297+
for event_type, event_data in events:
298+
yield QueryEvent(type=event_type, data=event_data)
268299

269300
def _get_all_doc_ids(self, collection: str) -> list[str]:
270301
"""Get all document IDs in a collection."""

pageindex/backend/local.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# pageindex/backend/local.py
22
import hashlib
33
import os
4+
import re
45
import uuid
56
import shutil
67
from pathlib import Path
@@ -12,17 +13,19 @@
1213
from ..index.pipeline import build_index
1314
from ..index.utils import parse_pages, get_pdf_page_content, get_md_page_content, remove_fields
1415
from ..backend.protocol import AgentTools
15-
from ..errors import FileTypeError, DocumentNotFoundError, IndexingError
16+
from ..errors import FileTypeError, DocumentNotFoundError, IndexingError, PageIndexError
17+
18+
_COLLECTION_NAME_RE = re.compile(r'^[a-zA-Z0-9_-]{1,128}$')
1619

1720

1821
class LocalBackend:
1922
def __init__(self, storage: StorageEngine, files_dir: str, model: str = None,
20-
retrieve_model: str = None, api_key: str = None):
23+
retrieve_model: str = None, index_config=None):
2124
self._storage = storage
2225
self._files_dir = Path(files_dir)
2326
self._model = model
2427
self._retrieve_model = retrieve_model or model
25-
self._api_key = api_key
28+
self._index_config = index_config
2629
self._parsers: list[DocumentParser] = [PdfParser(), MarkdownParser()]
2730

2831
def register_parser(self, parser: DocumentParser) -> None:
@@ -39,10 +42,16 @@ def _resolve_parser(self, file_path: str) -> DocumentParser:
3942
raise FileTypeError(f"No parser for extension: {ext}")
4043

4144
# Collection management
45+
def _validate_collection_name(self, name: str) -> None:
46+
if not _COLLECTION_NAME_RE.match(name):
47+
raise PageIndexError(f"Invalid collection name: {name!r}. Must be 1-128 chars of [a-zA-Z0-9_-].")
48+
4249
def create_collection(self, name: str) -> None:
50+
self._validate_collection_name(name)
4351
self._storage.create_collection(name)
4452

4553
def get_or_create_collection(self, name: str) -> None:
54+
self._validate_collection_name(name)
4655
self._storage.get_or_create_collection(name)
4756

4857
def list_collections(self) -> list[str]:
@@ -87,21 +96,26 @@ def add_document(self, collection: str, file_path: str) -> str:
8796

8897
try:
8998
parsed = parser.parse(file_path, model=self._model)
90-
result = build_index(parsed, model=self._model)
99+
result = build_index(parsed, model=self._model, opt=self._index_config)
91100

92101
# Cache page text for fast retrieval (avoids re-reading files)
93102
pages = [{"page": n.index, "content": n.content}
94103
for n in parsed.nodes if n.content]
95104

96-
# Strip text from structure to save storage space
97-
clean_structure = remove_fields(result["structure"], fields=["text"])
105+
# Strip text from structure to save storage space (PDF only;
106+
# markdown needs text in structure for fallback retrieval)
107+
doc_type = ext.lstrip(".")
108+
if doc_type == "pdf":
109+
clean_structure = remove_fields(result["structure"], fields=["text"])
110+
else:
111+
clean_structure = result["structure"]
98112

99113
self._storage.save_document(collection, doc_id, {
100114
"doc_name": parsed.doc_name,
101115
"doc_description": result.get("doc_description", ""),
102116
"file_path": str(managed_path),
103117
"file_hash": file_hash,
104-
"doc_type": ext.lstrip("."),
118+
"doc_type": doc_type,
105119
"structure": clean_structure,
106120
"pages": pages,
107121
})

0 commit comments

Comments
 (0)