Skip to content

Commit bac2792

Browse files
committed
fix: correct get_document_structure return type, real-time cloud streaming
- Fix return type annotation: dict -> list (tree structure is a list) - Fix not-found return: {} -> [] for consistency - Cloud streaming: replace batch-then-yield with asyncio.Queue for true real-time event delivery via background thread
1 parent 6d547ab commit bac2792

5 files changed

Lines changed: 41 additions & 35 deletions

File tree

pageindex/backend/cloud.py

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ def get_document(self, collection: str, doc_id: str) -> dict:
157157
"status": resp.get("status", ""),
158158
}
159159

160-
def get_document_structure(self, collection: str, doc_id: str) -> dict:
160+
def get_document_structure(self, collection: str, doc_id: str) -> list:
161161
resp = self._request("GET", f"/doc/{self._enc(doc_id)}/", params={"type": "tree", "summary": "true"})
162162
return resp.get("tree", resp.get("structure", []))
163163

@@ -205,27 +205,20 @@ async def query_stream(self, collection: str, question: str,
205205
doc_ids: list[str] | None = None) -> AsyncIterator[QueryEvent]:
206206
"""Streaming query via cloud chat/completions SSE.
207207
208-
block_metadata.type values:
209-
- text_block_start: start of text output block
210-
- text: text content delta
211-
- text_stop: end of text output block
212-
- mcp_tool_use_start: tool call started (has tool_name, server_name)
213-
- tool_use: tool call argument delta
214-
- tool_use_stop: tool call ended
215-
216-
Note: Uses synchronous ``requests`` under the hood. The blocking
217-
HTTP call and line iteration are offloaded to a thread via
218-
``asyncio.to_thread`` so the caller's event loop is not blocked.
219-
For full async streaming consider migrating to ``httpx.AsyncClient``.
208+
Events are yielded in real-time as they arrive from the server.
209+
A background thread handles the blocking HTTP stream and pushes
210+
events through an asyncio.Queue for true async streaming.
220211
"""
221212
import asyncio
213+
import threading
222214

223215
doc_id = doc_ids if doc_ids else self._get_all_doc_ids(collection)
224-
225216
headers = self._headers
217+
queue: asyncio.Queue[QueryEvent | None] = asyncio.Queue()
218+
loop = asyncio.get_event_loop()
226219

227-
def _stream() -> list[tuple[str, object]]:
228-
"""Execute the blocking SSE request in a worker thread."""
220+
def _stream():
221+
"""Background thread: read SSE and push events to queue."""
229222
resp = requests.post(
230223
f"{API_BASE}/chat/completions/",
231224
headers=headers,
@@ -241,11 +234,13 @@ def _stream() -> list[tuple[str, object]]:
241234
try:
242235
if resp.status_code != 200:
243236
body = resp.text[:500] if resp.text else ""
244-
raise CloudAPIError(
245-
f"Cloud streaming error {resp.status_code}: {body}"
237+
loop.call_soon_threadsafe(
238+
queue.put_nowait,
239+
QueryEvent(type="answer_done",
240+
data=f"Cloud streaming error {resp.status_code}: {body}"),
246241
)
242+
return
247243

248-
events: list[tuple[str, object]] = []
249244
current_tool_name = None
250245
current_tool_args: list[str] = []
251246

@@ -275,27 +270,38 @@ def _stream() -> list[tuple[str, object]]:
275270
current_tool_args.append(content)
276271

277272
elif block_type == "tool_use_stop":
278-
# Skip internal tools (ToolSearch, Read, Grep, etc.)
279273
if current_tool_name and current_tool_name not in _INTERNAL_TOOLS:
280274
args_str = "".join(current_tool_args)
281-
events.append(("tool_call", {
282-
"name": current_tool_name,
283-
"args": args_str,
284-
}))
275+
loop.call_soon_threadsafe(
276+
queue.put_nowait,
277+
QueryEvent(type="tool_call", data={
278+
"name": current_tool_name,
279+
"args": args_str,
280+
}),
281+
)
285282
current_tool_name = None
286283
current_tool_args = []
287284

288285
elif block_type == "text" and content:
289-
events.append(("answer_delta", content))
286+
loop.call_soon_threadsafe(
287+
queue.put_nowait,
288+
QueryEvent(type="answer_delta", data=content),
289+
)
290290

291-
events.append(("answer_done", ""))
292-
return events
293291
finally:
294292
resp.close()
293+
loop.call_soon_threadsafe(queue.put_nowait, None) # sentinel
294+
295+
thread = threading.Thread(target=_stream, daemon=True)
296+
thread.start()
297+
298+
while True:
299+
event = await queue.get()
300+
if event is None:
301+
break
302+
yield event
295303

296-
events = await asyncio.to_thread(_stream)
297-
for event_type, event_data in events:
298-
yield QueryEvent(type=event_type, data=event_data)
304+
thread.join(timeout=5)
299305

300306
def _get_all_doc_ids(self, collection: str) -> list[str]:
301307
"""Get all document IDs in a collection."""

pageindex/backend/local.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def add_document(self, collection: str, file_path: str) -> str:
128128
def get_document(self, collection: str, doc_id: str) -> dict:
129129
return self._storage.get_document(collection, doc_id)
130130

131-
def get_document_structure(self, collection: str, doc_id: str) -> dict:
131+
def get_document_structure(self, collection: str, doc_id: str) -> list:
132132
return self._storage.get_document_structure(collection, doc_id)
133133

134134
def get_page_content(self, collection: str, doc_id: str, pages: str) -> list:

pageindex/backend/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def delete_collection(self, name: str) -> None: ...
2323
# Document management
2424
def add_document(self, collection: str, file_path: str) -> str: ...
2525
def get_document(self, collection: str, doc_id: str) -> dict: ...
26-
def get_document_structure(self, collection: str, doc_id: str) -> dict: ...
26+
def get_document_structure(self, collection: str, doc_id: str) -> list: ...
2727
def get_page_content(self, collection: str, doc_id: str, pages: str) -> list: ...
2828
def list_documents(self, collection: str) -> list[dict]: ...
2929
def delete_document(self, collection: str, doc_id: str) -> None: ...

pageindex/storage/protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def delete_collection(self, name: str) -> None: ...
1111
def save_document(self, collection: str, doc_id: str, doc: dict) -> None: ...
1212
def find_document_by_hash(self, collection: str, file_hash: str) -> str | None: ...
1313
def get_document(self, collection: str, doc_id: str) -> dict: ...
14-
def get_document_structure(self, collection: str, doc_id: str) -> dict: ...
14+
def get_document_structure(self, collection: str, doc_id: str) -> list: ...
1515
def get_pages(self, collection: str, doc_id: str) -> list | None: ...
1616
def list_documents(self, collection: str) -> list[dict]: ...
1717
def delete_document(self, collection: str, doc_id: str) -> None: ...

pageindex/storage/sqlite.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,14 @@ def get_document(self, collection: str, doc_id: str) -> dict:
101101
return {"doc_id": row[0], "doc_name": row[1], "doc_description": row[2],
102102
"file_path": row[3], "doc_type": row[4]}
103103

104-
def get_document_structure(self, collection: str, doc_id: str) -> dict:
104+
def get_document_structure(self, collection: str, doc_id: str) -> list:
105105
conn = self._get_conn()
106106
row = conn.execute(
107107
"SELECT structure FROM documents WHERE doc_id = ? AND collection_name = ?",
108108
(doc_id, collection),
109109
).fetchone()
110110
if not row:
111-
return {}
111+
return []
112112
return json.loads(row[0])
113113

114114
def get_pages(self, collection: str, doc_id: str) -> list | None:

0 commit comments

Comments
 (0)