Skip to content

Commit 86fca84

Browse files
authored
feat: show indexing progress gracefully (#71)
1 parent 6477ce4 commit 86fca84

11 files changed

Lines changed: 368 additions & 43 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ classifiers = [
2323

2424
dependencies = [
2525
"mcp>=1.0.0",
26-
"cocoindex[litellm]==1.0.0a31",
26+
"cocoindex[litellm]==1.0.0a32",
2727
"sentence-transformers>=2.2.0",
2828
"sqlite-vec>=0.1.0",
2929
"pydantic>=2.0.0",

src/cocoindex_code/cli.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
if TYPE_CHECKING:
1111
from .client import DaemonClient
1212

13-
from .protocol import ProjectStatusResponse, SearchResponse
13+
from .protocol import IndexingProgress, ProjectStatusResponse, SearchResponse
1414
from .settings import (
1515
default_project_settings,
1616
default_user_settings,
@@ -80,8 +80,21 @@ def resolve_default_path(project_root: Path) -> str | None:
8080
return f"{rel.as_posix()}/*"
8181

8282

83+
def _format_progress(progress: IndexingProgress) -> str:
84+
"""Format an IndexingProgress snapshot as a human-readable string."""
85+
return (
86+
f"{progress.num_execution_starts} files listed"
87+
f" | {progress.num_adds} added, {progress.num_deletes} deleted,"
88+
f" {progress.num_reprocesses} reprocessed,"
89+
f" {progress.num_unchanged} unchanged,"
90+
f" error: {progress.num_errors}"
91+
)
92+
93+
8394
def print_index_stats(status: ProjectStatusResponse) -> None:
8495
"""Print formatted index statistics."""
96+
if status.progress is not None:
97+
_typer.echo(f"Indexing in progress: {_format_progress(status.progress)}")
8598
_typer.echo("\nIndex stats:")
8699
_typer.echo(f" Chunks: {status.total_chunks}")
87100
_typer.echo(f" Files: {status.total_files}")
@@ -153,13 +166,40 @@ def init(
153166
@app.command()
154167
def index() -> None:
155168
"""Create/update index for the codebase."""
169+
from rich.console import Console as _Console
170+
from rich.live import Live as _Live
171+
from rich.spinner import Spinner as _Spinner
172+
156173
client, project_root = require_daemon_for_project()
157-
_typer.echo("Indexing...")
158-
try:
159-
resp = client.index(project_root)
160-
except RuntimeError as e:
161-
_typer.echo(f"Indexing failed: {e}", err=True)
162-
raise _typer.Exit(code=1)
174+
err_console = _Console(stderr=True)
175+
last_progress_line: str | None = None
176+
177+
with _Live(_Spinner("dots", "Indexing..."), console=err_console, transient=True) as live:
178+
179+
def _on_waiting() -> None:
180+
live.update(
181+
_Spinner(
182+
"dots",
183+
"Another indexing is ongoing, waiting for it to finish...",
184+
)
185+
)
186+
187+
def _on_progress(progress: IndexingProgress) -> None:
188+
nonlocal last_progress_line
189+
last_progress_line = f"Indexing: {_format_progress(progress)}"
190+
live.update(_Spinner("dots", last_progress_line))
191+
192+
try:
193+
resp = client.index(project_root, on_progress=_on_progress, on_waiting=_on_waiting)
194+
except RuntimeError as e:
195+
live.stop()
196+
_typer.echo(f"Indexing failed: {e}", err=True)
197+
raise _typer.Exit(code=1)
198+
199+
# Print the final progress line so it remains visible after the spinner clears
200+
if last_progress_line is not None:
201+
_typer.echo(last_progress_line, err=True)
202+
163203
if not resp.success:
164204
_typer.echo(f"Indexing failed: {resp.message}", err=True)
165205
raise _typer.Exit(code=1)

src/cocoindex_code/client.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import subprocess
99
import sys
1010
import time
11+
from collections.abc import Callable
1112
from multiprocessing.connection import Client, Connection
1213
from pathlib import Path
1314

@@ -18,8 +19,11 @@
1819
ErrorResponse,
1920
HandshakeRequest,
2021
HandshakeResponse,
22+
IndexingProgress,
23+
IndexProgressUpdate,
2124
IndexRequest,
2225
IndexResponse,
26+
IndexWaitingNotice,
2327
ProjectStatusRequest,
2428
ProjectStatusResponse,
2529
Request,
@@ -59,9 +63,30 @@ def handshake(self) -> HandshakeResponse:
5963
"""Send version handshake."""
6064
return self._send(HandshakeRequest(version=__version__)) # type: ignore[return-value]
6165

62-
def index(self, project_root: str) -> IndexResponse:
63-
"""Request indexing. Blocks until complete."""
64-
return self._send(IndexRequest(project_root=project_root)) # type: ignore[return-value]
66+
def index(
67+
self,
68+
project_root: str,
69+
on_progress: Callable[[IndexingProgress], None] | None = None,
70+
on_waiting: Callable[[], None] | None = None,
71+
) -> IndexResponse:
72+
"""Request indexing with streaming progress. Blocks until complete."""
73+
self._conn.send_bytes(encode_request(IndexRequest(project_root=project_root)))
74+
while True:
75+
data = self._conn.recv_bytes()
76+
resp = decode_response(data)
77+
if isinstance(resp, ErrorResponse):
78+
raise RuntimeError(f"Daemon error: {resp.message}")
79+
if isinstance(resp, IndexWaitingNotice):
80+
if on_waiting is not None:
81+
on_waiting()
82+
continue
83+
if isinstance(resp, IndexProgressUpdate):
84+
if on_progress is not None:
85+
on_progress(resp.progress)
86+
continue
87+
if isinstance(resp, IndexResponse):
88+
return resp
89+
raise RuntimeError(f"Unexpected response: {type(resp).__name__}")
6590

6691
def search(
6792
self,

src/cocoindex_code/daemon.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import sys
1010
import threading
1111
import time
12+
from collections.abc import AsyncIterator
1213
from multiprocessing.connection import Connection, Listener
1314
from pathlib import Path
1415
from typing import Any
@@ -22,8 +23,12 @@
2223
ErrorResponse,
2324
HandshakeRequest,
2425
HandshakeResponse,
26+
IndexingProgress,
27+
IndexProgressUpdate,
2528
IndexRequest,
2629
IndexResponse,
30+
IndexStreamResponse,
31+
IndexWaitingNotice,
2732
ProjectStatusRequest,
2833
ProjectStatusResponse,
2934
Request,
@@ -113,14 +118,43 @@ async def get_project(self, project_root: str) -> Project:
113118
self._indexing[project_root] = False
114119
return self._projects[project_root]
115120

116-
async def update_index(self, project_root: str) -> None:
117-
"""Update index for project, serialized by per-project lock."""
121+
async def update_index(self, project_root: str) -> AsyncIterator[IndexStreamResponse]:
122+
"""Update index, yielding progress updates and a final IndexResponse."""
118123
project = await self.get_project(project_root)
119124
lock = self._index_locks[project_root]
125+
126+
# If lock is already held, notify the client and block until released
127+
if lock.locked():
128+
yield IndexWaitingNotice()
129+
120130
async with lock:
121131
self._indexing[project_root] = True
122132
try:
123-
await project.update_index()
133+
progress_queue: asyncio.Queue[IndexingProgress] = asyncio.Queue()
134+
135+
def on_progress(progress: IndexingProgress) -> None:
136+
progress_queue.put_nowait(progress)
137+
138+
update_task = asyncio.create_task(project.update_index(on_progress=on_progress))
139+
140+
# Drain the queue until the update completes
141+
while not update_task.done():
142+
try:
143+
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.1)
144+
yield IndexProgressUpdate(progress=progress)
145+
except TimeoutError:
146+
continue
147+
148+
# Drain any remaining items
149+
while not progress_queue.empty():
150+
yield IndexProgressUpdate(progress=progress_queue.get_nowait())
151+
152+
# Propagate any exception from the update task
153+
update_task.result()
154+
155+
yield IndexResponse(success=True)
156+
except Exception as e:
157+
yield IndexResponse(success=False, message=str(e))
124158
finally:
125159
self._indexing[project_root] = False
126160

@@ -177,11 +211,14 @@ def get_status(self, project_root: str) -> ProjectStatusResponse:
177211
" GROUP BY language ORDER BY cnt DESC"
178212
).fetchall()
179213

214+
is_indexing = self._indexing.get(project_root, False)
215+
progress = project.indexing_stats if is_indexing else None
180216
return ProjectStatusResponse(
181-
indexing=self._indexing.get(project_root, False),
217+
indexing=is_indexing,
182218
total_chunks=total_chunks,
183219
total_files=total_files,
184220
languages={lang: cnt for lang, cnt in lang_rows},
221+
progress=progress,
185222
)
186223

187224
def list_projects(self) -> list[DaemonProjectInfo]:
@@ -246,8 +283,12 @@ def _recv() -> bytes:
246283
handshake_done = True
247284
continue
248285

249-
resp = await _dispatch(req, registry, start_time, shutdown_event)
250-
conn.send_bytes(encode_response(resp))
286+
result = await _dispatch(req, registry, start_time, shutdown_event)
287+
if isinstance(result, AsyncIterator):
288+
async for resp in result:
289+
conn.send_bytes(encode_response(resp))
290+
else:
291+
conn.send_bytes(encode_response(result))
251292

252293
if isinstance(req, StopRequest):
253294
break
@@ -265,16 +306,21 @@ async def _dispatch(
265306
registry: ProjectRegistry,
266307
start_time: float,
267308
shutdown_event: asyncio.Event,
268-
) -> Response:
269-
"""Dispatch a request to the appropriate handler."""
309+
) -> Response | AsyncIterator[IndexStreamResponse]:
310+
"""Dispatch a request to the appropriate handler.
311+
312+
Returns a single Response for most requests, or an AsyncIterator for
313+
streaming requests (IndexRequest).
314+
"""
270315
try:
271316
if isinstance(req, IndexRequest):
272-
await registry.update_index(req.project_root)
273-
return IndexResponse(success=True)
317+
return registry.update_index(req.project_root)
274318

275319
if isinstance(req, SearchRequest):
276320
if req.refresh:
277-
await registry.update_index(req.project_root)
321+
# Consume the index stream silently for refresh
322+
async for _ in registry.update_index(req.project_root):
323+
pass
278324
results = await registry.search(
279325
project_root=req.project_root,
280326
query=req.query,

src/cocoindex_code/project.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
from __future__ import annotations
44

55
import asyncio
6+
from collections.abc import Callable
67
from pathlib import Path
78

89
import cocoindex as coco
910
from cocoindex.connectors import sqlite
1011

1112
from .indexer import indexer_main
13+
from .protocol import IndexingProgress
1214
from .settings import PROJECT_SETTINGS, ProjectSettings
1315
from .shared import CODEBASE_DIR, EMBEDDER, SQLITE_DB, Embedder
1416

@@ -18,14 +20,43 @@ class Project:
1820
_app: coco.App[[], None]
1921
_index_lock: asyncio.Lock
2022
_initial_index_done: bool = False
23+
_indexing_stats: IndexingProgress | None = None
2124

22-
async def update_index(self, *, report_to_stdout: bool = False) -> None:
23-
"""Update the index, serializing concurrent calls via lock."""
24-
async with self._index_lock:
25-
try:
26-
await self._app.update(report_to_stdout=report_to_stdout)
27-
finally:
28-
self._initial_index_done = True
25+
async def update_index(
26+
self,
27+
*,
28+
on_progress: Callable[[IndexingProgress], None] | None = None,
29+
) -> None:
30+
"""Update the index, streaming progress via callback.
31+
32+
The lock is NOT acquired here — callers (e.g. ProjectRegistry) are
33+
responsible for serialization so they can inspect lock state and
34+
yield one-shot snapshots before blocking.
35+
"""
36+
try:
37+
handle = self._app.update()
38+
async for snapshot in handle.watch():
39+
file_stats = snapshot.stats.by_processor.get("process_file")
40+
if file_stats is not None:
41+
progress = IndexingProgress(
42+
num_execution_starts=file_stats.num_execution_starts,
43+
num_unchanged=file_stats.num_unchanged,
44+
num_adds=file_stats.num_adds,
45+
num_deletes=file_stats.num_deletes,
46+
num_reprocesses=file_stats.num_reprocesses,
47+
num_errors=file_stats.num_errors,
48+
)
49+
self._indexing_stats = progress
50+
if on_progress is not None:
51+
on_progress(progress)
52+
await asyncio.sleep(0.1)
53+
finally:
54+
self._indexing_stats = None
55+
self._initial_index_done = True
56+
57+
@property
58+
def indexing_stats(self) -> IndexingProgress | None:
59+
return self._indexing_stats
2960

3061
@property
3162
def env(self) -> coco.Environment:

src/cocoindex_code/protocol.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,29 @@ class IndexResponse(_msgspec.Struct, tag="index"):
6363
message: str | None = None
6464

6565

66+
class IndexingProgress(_msgspec.Struct):
67+
"""Indexing stats snapshot, shared between progress updates and status responses."""
68+
69+
num_execution_starts: int
70+
num_unchanged: int
71+
num_adds: int
72+
num_deletes: int
73+
num_reprocesses: int
74+
num_errors: int
75+
76+
77+
class IndexProgressUpdate(_msgspec.Struct, tag="index_progress"):
78+
"""Streamed during indexing — one per stats change, before the final IndexResponse."""
79+
80+
progress: IndexingProgress
81+
82+
83+
class IndexWaitingNotice(_msgspec.Struct, tag="index_waiting"):
84+
"""Sent when another indexing is already in progress and the client must wait."""
85+
86+
pass
87+
88+
6689
class SearchResult(_msgspec.Struct):
6790
file_path: str
6891
language: str
@@ -85,6 +108,7 @@ class ProjectStatusResponse(_msgspec.Struct, tag="project_status"):
85108
total_chunks: int
86109
total_files: int
87110
languages: dict[str, int]
111+
progress: IndexingProgress | None = None
88112

89113

90114
class DaemonProjectInfo(_msgspec.Struct):
@@ -109,13 +133,17 @@ class ErrorResponse(_msgspec.Struct, tag="error"):
109133
Response = (
110134
HandshakeResponse
111135
| IndexResponse
136+
| IndexProgressUpdate
137+
| IndexWaitingNotice
112138
| SearchResponse
113139
| ProjectStatusResponse
114140
| DaemonStatusResponse
115141
| StopResponse
116142
| ErrorResponse
117143
)
118144

145+
IndexStreamResponse = IndexProgressUpdate | IndexWaitingNotice | IndexResponse | ErrorResponse
146+
119147
# ---------------------------------------------------------------------------
120148
# Encode / decode helpers (msgpack binary)
121149
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)