Skip to content

Commit ad61b25

Browse files
authored
fix: trigger load-time indexing correctly (#108)
1 parent 70c84f2 commit ad61b25

3 files changed

Lines changed: 75 additions & 92 deletions

File tree

src/cocoindex_code/cli.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def print_index_stats(status: ProjectStatusResponse) -> None:
8181
if status.progress is not None:
8282
_typer.echo(f"Indexing in progress: {_format_progress(status.progress)}")
8383
if not status.index_exists:
84-
_typer.echo("\nIndex not created yet. Run `ccc index` to build the index.")
84+
_typer.echo("\nIndex not created yet.")
8585
return
8686
_typer.echo("\nIndex stats:")
8787
_typer.echo(f" Chunks: {status.total_chunks}")

src/cocoindex_code/daemon.py

Lines changed: 47 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -101,91 +101,64 @@ class ProjectRegistry:
101101
"""Manages loaded projects and their indexes."""
102102

103103
_projects: dict[str, Project]
104-
_index_locks: dict[str, asyncio.Lock]
105104
_embedder: Embedder
106105

107106
def __init__(self, embedder: Embedder) -> None:
108107
self._projects = {}
109-
self._index_locks = {}
110-
self._load_time_done: dict[str, asyncio.Event] = {}
111108
self._embedder = embedder
112109

113-
async def get_project(self, project_root: str, *, suppress_auto_index: bool = False) -> Project:
110+
async def get_project(self, project_root: str) -> Project:
114111
"""Get or create a Project for the given root. Lazy initialization.
115112
116-
When a project is newly loaded and *suppress_auto_index* is False,
117-
a background indexing task (load-time indexing) is fired so the project
118-
is indexed immediately. Callers that will index right away (e.g.
119-
IndexRequest, SearchRequest with refresh) should pass
120-
``suppress_auto_index=True``.
113+
Only loads the project — does **not** trigger indexing. Callers
114+
that need indexing should call ``ensure_indexing_started`` (for
115+
background auto-index) or ``update_index`` (for explicit streaming
116+
index) separately.
121117
"""
122118
if project_root not in self._projects:
123119
root = Path(project_root)
124120
project = await Project.create(root, self._embedder)
125121
self._projects[project_root] = project
126-
self._index_locks[project_root] = asyncio.Lock()
127-
self._load_time_done[project_root] = asyncio.Event()
128-
if not suppress_auto_index:
129-
asyncio.create_task(self._run_index(project_root))
130122
return self._projects[project_root]
131123

132-
def should_wait_for_indexing(self, project_root: str) -> bool:
133-
"""Check if search should wait before querying.
124+
async def ensure_indexing_started(self, project_root: str) -> None:
125+
"""Kick off background indexing and wait until it has actually started.
134126
135-
Returns True if the index lock is held (indexing actively running)
136-
or the initial indexing hasn't completed yet (covers the window
137-
between task creation and lock acquisition).
138-
"""
139-
lock = self._index_locks.get(project_root)
140-
if lock is not None and lock.locked():
141-
return True
142-
event = self._load_time_done.get(project_root)
143-
return event is not None and not event.is_set()
127+
Returns once the indexing task holds the lock. Safe to call multiple
128+
times — only the first call spawns a task; subsequent calls return
129+
immediately.
144130
145-
async def wait_for_indexing_done(self, project_root: str) -> None:
146-
"""Wait until no indexing is in progress and initial indexing is complete."""
147-
# Wait for the initial indexing to complete (if pending)
148-
event = self._load_time_done.get(project_root)
149-
if event is not None:
150-
await event.wait()
151-
# Wait for any ongoing indexing to finish (lock released)
152-
lock = self._index_locks.get(project_root)
153-
if lock is not None and lock.locked():
154-
await lock.acquire()
155-
lock.release()
156-
157-
async def _run_index(
158-
self,
159-
project_root: str,
160-
on_progress: Callable[[IndexingProgress], None] | None = None,
161-
) -> None:
162-
"""Run indexing for a project, acquiring and releasing the per-project lock.
131+
``IndexRequest`` callers should skip this and use ``update_index``
132+
instead so they can stream progress.
133+
"""
134+
project = self._projects[project_root]
135+
if project._initial_index_done.is_set() or project._index_lock.locked():
136+
return
137+
started = asyncio.Event()
138+
asyncio.create_task(project.run_index(on_started=started))
139+
await started.wait()
163140

164-
This is the single place where indexing actually happens. It is used
165-
both as a fire-and-forget background task (load-time indexing) and as a
166-
spawned task inside ``update_index`` (client-driven indexing).
141+
def should_wait_for_indexing(self, project_root: str) -> bool:
142+
"""Check if search should wait before querying.
167143
168-
On completion (success or failure) it marks load-time as done
169-
(idempotent) and releases the lock.
144+
Returns True if indexing has been started but not yet completed.
170145
"""
171-
project = self._projects[project_root]
172-
lock = self._index_locks[project_root]
146+
project = self._projects.get(project_root)
147+
return project is not None and not project._initial_index_done.is_set()
173148

174-
await lock.acquire()
175-
try:
176-
await project.update_index(
177-
on_progress=on_progress,
178-
)
179-
except Exception:
180-
logger.exception("Indexing failed for %s", project_root)
181-
finally:
182-
event = self._load_time_done.get(project_root)
183-
if event is not None:
184-
event.set()
185-
lock.release()
149+
async def wait_for_indexing_done(self, project_root: str) -> None:
150+
"""Wait until initial indexing is complete and no indexing is running."""
151+
project = self._projects.get(project_root)
152+
if project is None:
153+
return
154+
await project._initial_index_done.wait()
155+
if project._index_lock.locked():
156+
async with project._index_lock:
157+
pass
186158

187159
async def update_index(
188-
self, project_root: str, *, suppress_auto_index: bool = True
160+
self,
161+
project_root: str,
189162
) -> AsyncIterator[IndexStreamResponse]:
190163
"""Update index, yielding progress updates and a final IndexResponse.
191164
@@ -196,17 +169,15 @@ async def update_index(
196169
The actual indexing runs in a separate task (``_run_index``) so that
197170
client disconnects (``GeneratorExit``) do not abort the indexing.
198171
"""
199-
await self.get_project(project_root, suppress_auto_index=suppress_auto_index)
200-
lock = self._index_locks[project_root]
172+
project = await self.get_project(project_root)
201173

202174
# If lock is already held, notify the client before blocking
203-
if lock.locked():
175+
if project._index_lock.locked():
204176
yield IndexWaitingNotice()
205177

206178
progress_queue: asyncio.Queue[IndexingProgress] = asyncio.Queue()
207179
index_task = asyncio.create_task(
208-
self._run_index(
209-
project_root,
180+
project.run_index(
210181
on_progress=lambda p: progress_queue.put_nowait(p),
211182
)
212183
)
@@ -295,8 +266,7 @@ def get_status(self, project_root: str) -> ProjectStatusResponse:
295266
total_files = 0
296267
lang_rows = []
297268

298-
lock = self._index_locks.get(project_root)
299-
is_indexing = lock is not None and lock.locked()
269+
is_indexing = project._index_lock.locked()
300270
progress = project.indexing_stats if is_indexing else None
301271
return ProjectStatusResponse(
302272
indexing=is_indexing,
@@ -311,15 +281,13 @@ def remove_project(self, project_root: str) -> bool:
311281
"""Remove a project from the registry. Returns True if it was loaded."""
312282
import gc
313283

314-
was_loaded = project_root in self._projects
315284
project = self._projects.pop(project_root, None)
316-
self._index_locks.pop(project_root, None)
317-
self._load_time_done.pop(project_root, None)
318285
if project is not None:
319286
project.close()
320287
del project
321288
gc.collect()
322-
return was_loaded
289+
return True
290+
return False
323291

324292
def close_all(self) -> None:
325293
"""Close all loaded projects and release resources."""
@@ -328,18 +296,16 @@ def close_all(self) -> None:
328296
for project in self._projects.values():
329297
project.close()
330298
self._projects.clear()
331-
self._index_locks.clear()
332-
self._load_time_done.clear()
333299
gc.collect()
334300

335301
def list_projects(self) -> list[DaemonProjectInfo]:
336302
"""List all loaded projects with their indexing state."""
337303
return [
338304
DaemonProjectInfo(
339305
project_root=root,
340-
indexing=self._index_locks[root].locked(),
306+
indexing=project._index_lock.locked(),
341307
)
342-
for root in self._projects
308+
for root, project in self._projects.items()
343309
]
344310

345311

@@ -452,10 +418,10 @@ async def _dispatch(
452418
return registry.update_index(req.project_root)
453419

454420
if isinstance(req, SearchRequest):
455-
# Ensure the project is loaded (may trigger load-time indexing)
456421
await registry.get_project(req.project_root)
422+
await registry.ensure_indexing_started(req.project_root)
457423

458-
# If load-time indexing is in progress, return a streaming response
424+
# If indexing is in progress, return a streaming response
459425
if registry.should_wait_for_indexing(req.project_root):
460426
return _search_with_wait(registry, req)
461427

@@ -475,6 +441,8 @@ async def _dispatch(
475441
)
476442

477443
if isinstance(req, ProjectStatusRequest):
444+
await registry.get_project(req.project_root)
445+
await registry.ensure_indexing_started(req.project_root)
478446
return registry.get_status(req.project_root)
479447

480448
if isinstance(req, DaemonStatusRequest):

src/cocoindex_code/project.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class Project:
2323
_env: coco.Environment
2424
_app: coco.App[[], None]
2525
_index_lock: asyncio.Lock
26-
_initial_index_done: bool = False
26+
_initial_index_done: asyncio.Event
2727
_indexing_stats: IndexingProgress | None = None
2828

2929
def close(self) -> None:
@@ -34,17 +34,35 @@ def close(self) -> None:
3434
except Exception:
3535
pass
3636

37-
async def update_index(
37+
async def run_index(
3838
self,
39-
*,
4039
on_progress: Callable[[IndexingProgress], None] | None = None,
40+
on_started: asyncio.Event | None = None,
4141
) -> None:
42-
"""Update the index, streaming progress via callback.
42+
"""Acquire the index lock, run indexing, and release.
4343
44-
The lock is NOT acquired here — callers (e.g. ProjectRegistry) are
45-
responsible for serialization so they can inspect lock state and
46-
yield one-shot snapshots before blocking.
44+
If *on_started* is provided, it is set once the lock is acquired
45+
(i.e. indexing has truly begun). On completion (success or failure)
46+
``_initial_index_done`` is set.
4747
"""
48+
async with self._index_lock:
49+
self._indexing_stats = IndexingProgress(
50+
num_execution_starts=0,
51+
num_unchanged=0,
52+
num_adds=0,
53+
num_deletes=0,
54+
num_reprocesses=0,
55+
num_errors=0,
56+
)
57+
if on_started is not None:
58+
on_started.set()
59+
await self._update_index(on_progress=on_progress)
60+
61+
async def _update_index(
62+
self,
63+
on_progress: Callable[[IndexingProgress], None] | None = None,
64+
) -> None:
65+
"""Run indexing (lock must already be held)."""
4866
try:
4967
handle = self._app.update()
5068
async for snapshot in handle.watch():
@@ -63,8 +81,8 @@ async def update_index(
6381
on_progress(progress)
6482
await asyncio.sleep(0.1)
6583
finally:
84+
self._initial_index_done.set()
6685
self._indexing_stats = None
67-
self._initial_index_done = True
6886

6987
@property
7088
def indexing_stats(self) -> IndexingProgress | None:
@@ -74,10 +92,6 @@ def indexing_stats(self) -> IndexingProgress | None:
7492
def env(self) -> coco.Environment:
7593
return self._env
7694

77-
@property
78-
def is_initial_index_done(self) -> bool:
79-
return self._initial_index_done
80-
8195
@staticmethod
8296
async def create(
8397
project_root: Path,
@@ -115,4 +129,5 @@ async def create(
115129
result._env = env
116130
result._app = app
117131
result._index_lock = asyncio.Lock()
132+
result._initial_index_done = asyncio.Event()
118133
return result

0 commit comments

Comments
 (0)