Skip to content

Commit f6a03bf

Browse files
authored
refactor: make more project-level operations clearly at Project level (#109)
1 parent ad61b25 commit f6a03bf

2 files changed

Lines changed: 174 additions & 193 deletions

File tree

src/cocoindex_code/daemon.py

Lines changed: 16 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import logging
77
import os
88
import signal
9-
import sqlite3
109
import sys
1110
import threading
1211
import time
@@ -24,34 +23,28 @@
2423
ErrorResponse,
2524
HandshakeRequest,
2625
HandshakeResponse,
27-
IndexingProgress,
28-
IndexProgressUpdate,
2926
IndexRequest,
30-
IndexResponse,
3127
IndexStreamResponse,
3228
IndexWaitingNotice,
3329
ProjectStatusRequest,
34-
ProjectStatusResponse,
3530
RemoveProjectRequest,
3631
RemoveProjectResponse,
3732
Request,
3833
Response,
3934
SearchRequest,
4035
SearchResponse,
41-
SearchResult,
4236
SearchStreamResponse,
4337
StopRequest,
4438
StopResponse,
4539
decode_request,
4640
encode_response,
4741
)
48-
from .query import query_codebase
4942
from .settings import (
5043
global_settings_mtime_us,
5144
load_user_settings,
5245
user_settings_dir,
5346
)
54-
from .shared import SQLITE_DB, Embedder, create_embedder
47+
from .shared import Embedder, create_embedder
5548

5649
logger = logging.getLogger(__name__)
5750

@@ -98,7 +91,7 @@ def daemon_log_path() -> Path:
9891

9992

10093
class ProjectRegistry:
101-
"""Manages loaded projects and their indexes."""
94+
"""Cache of loaded projects, keyed by project root path."""
10295

10396
_projects: dict[str, Project]
10497
_embedder: Embedder
@@ -108,175 +101,13 @@ def __init__(self, embedder: Embedder) -> None:
108101
self._embedder = embedder
109102

110103
async def get_project(self, project_root: str) -> Project:
111-
"""Get or create a Project for the given root. Lazy initialization.
112-
113-
Only loads the project — does **not** trigger indexing. Callers
114-
that need indexing should call ``ensure_indexing_started`` (for
115-
background auto-index) or ``update_index`` (for explicit streaming
116-
index) separately.
117-
"""
104+
"""Get or create a Project for the given root. Lazy initialization."""
118105
if project_root not in self._projects:
119106
root = Path(project_root)
120107
project = await Project.create(root, self._embedder)
121108
self._projects[project_root] = project
122109
return self._projects[project_root]
123110

124-
async def ensure_indexing_started(self, project_root: str) -> None:
125-
"""Kick off background indexing and wait until it has actually started.
126-
127-
Returns once the indexing task holds the lock. Safe to call multiple
128-
times — only the first call spawns a task; subsequent calls return
129-
immediately.
130-
131-
``IndexRequest`` callers should skip this and use ``update_index``
132-
instead so they can stream progress.
133-
"""
134-
project = self._projects[project_root]
135-
if project._initial_index_done.is_set() or project._index_lock.locked():
136-
return
137-
started = asyncio.Event()
138-
asyncio.create_task(project.run_index(on_started=started))
139-
await started.wait()
140-
141-
def should_wait_for_indexing(self, project_root: str) -> bool:
142-
"""Check if search should wait before querying.
143-
144-
Returns True if indexing has been started but not yet completed.
145-
"""
146-
project = self._projects.get(project_root)
147-
return project is not None and not project._initial_index_done.is_set()
148-
149-
async def wait_for_indexing_done(self, project_root: str) -> None:
150-
"""Wait until initial indexing is complete and no indexing is running."""
151-
project = self._projects.get(project_root)
152-
if project is None:
153-
return
154-
await project._initial_index_done.wait()
155-
if project._index_lock.locked():
156-
async with project._index_lock:
157-
pass
158-
159-
async def update_index(
160-
self,
161-
project_root: str,
162-
) -> AsyncIterator[IndexStreamResponse]:
163-
"""Update index, yielding progress updates and a final IndexResponse.
164-
165-
Streams ``IndexProgressUpdate`` messages while indexing is in progress,
166-
ending with a terminal ``IndexResponse``. If the lock is already held,
167-
yields ``IndexWaitingNotice`` first.
168-
169-
The actual indexing runs in a separate task (``_run_index``) so that
170-
client disconnects (``GeneratorExit``) do not abort the indexing.
171-
"""
172-
project = await self.get_project(project_root)
173-
174-
# If lock is already held, notify the client before blocking
175-
if project._index_lock.locked():
176-
yield IndexWaitingNotice()
177-
178-
progress_queue: asyncio.Queue[IndexingProgress] = asyncio.Queue()
179-
index_task = asyncio.create_task(
180-
project.run_index(
181-
on_progress=lambda p: progress_queue.put_nowait(p),
182-
)
183-
)
184-
185-
try:
186-
# Drain the queue until the task completes
187-
while not index_task.done():
188-
try:
189-
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.1)
190-
yield IndexProgressUpdate(progress=progress)
191-
except TimeoutError:
192-
continue
193-
194-
# Drain any remaining items
195-
while not progress_queue.empty():
196-
yield IndexProgressUpdate(progress=progress_queue.get_nowait())
197-
198-
# Propagate any exception from the index task
199-
index_task.result()
200-
201-
yield IndexResponse(success=True)
202-
except GeneratorExit:
203-
# Client disconnected — _run_index continues in background and
204-
# handles cleanup (release lock, clear _indexing) when done.
205-
return
206-
except Exception as e:
207-
yield IndexResponse(success=False, message=str(e))
208-
209-
async def search(
210-
self,
211-
project_root: str,
212-
query: str,
213-
languages: list[str] | None = None,
214-
paths: list[str] | None = None,
215-
limit: int = 5,
216-
offset: int = 0,
217-
) -> list[SearchResult]:
218-
"""Search within a project."""
219-
project = await self.get_project(project_root)
220-
root = Path(project_root)
221-
target_db = root / ".cocoindex_code" / "target_sqlite.db"
222-
results = await query_codebase(
223-
query=query,
224-
target_sqlite_db_path=target_db,
225-
env=project.env,
226-
limit=limit,
227-
offset=offset,
228-
languages=languages,
229-
paths=paths,
230-
)
231-
return [
232-
SearchResult(
233-
file_path=r.file_path,
234-
language=r.language,
235-
content=r.content,
236-
start_line=r.start_line,
237-
end_line=r.end_line,
238-
score=r.score,
239-
)
240-
for r in results
241-
]
242-
243-
def get_status(self, project_root: str) -> ProjectStatusResponse:
244-
"""Get index stats for a project."""
245-
project = self._projects.get(project_root)
246-
if project is None:
247-
return ProjectStatusResponse(
248-
indexing=False, total_chunks=0, total_files=0, languages={}
249-
)
250-
251-
db = project.env.get_context(SQLITE_DB)
252-
index_exists = True
253-
try:
254-
with db.readonly() as conn:
255-
total_chunks = conn.execute("SELECT COUNT(*) FROM code_chunks_vec").fetchone()[0]
256-
total_files = conn.execute(
257-
"SELECT COUNT(DISTINCT file_path) FROM code_chunks_vec"
258-
).fetchone()[0]
259-
lang_rows = conn.execute(
260-
"SELECT language, COUNT(*) as cnt FROM code_chunks_vec"
261-
" GROUP BY language ORDER BY cnt DESC"
262-
).fetchall()
263-
except sqlite3.OperationalError:
264-
index_exists = False
265-
total_chunks = 0
266-
total_files = 0
267-
lang_rows = []
268-
269-
is_indexing = project._index_lock.locked()
270-
progress = project.indexing_stats if is_indexing else None
271-
return ProjectStatusResponse(
272-
indexing=is_indexing,
273-
total_chunks=total_chunks,
274-
total_files=total_files,
275-
languages={lang: cnt for lang, cnt in lang_rows},
276-
progress=progress,
277-
index_exists=index_exists,
278-
)
279-
280111
def remove_project(self, project_root: str) -> bool:
281112
"""Remove a project from the registry. Returns True if it was loaded."""
282113
import gc
@@ -377,14 +208,13 @@ async def handle_connection(
377208

378209

379210
async def _search_with_wait(
380-
registry: ProjectRegistry, req: SearchRequest
211+
project: Project, req: SearchRequest
381212
) -> AsyncIterator[SearchStreamResponse]:
382213
"""Stream search response, waiting for ongoing indexing first."""
383214
yield IndexWaitingNotice()
384-
await registry.wait_for_indexing_done(req.project_root)
215+
await project.wait_for_indexing_done()
385216
try:
386-
results = await registry.search(
387-
project_root=req.project_root,
217+
results = await project.search(
388218
query=req.query,
389219
languages=req.languages,
390220
paths=req.paths,
@@ -415,18 +245,17 @@ async def _dispatch(
415245
"""
416246
try:
417247
if isinstance(req, IndexRequest):
418-
return registry.update_index(req.project_root)
248+
project = await registry.get_project(req.project_root)
249+
return project.stream_index()
419250

420251
if isinstance(req, SearchRequest):
421-
await registry.get_project(req.project_root)
422-
await registry.ensure_indexing_started(req.project_root)
252+
project = await registry.get_project(req.project_root)
253+
await project.ensure_indexing_started()
423254

424-
# If indexing is in progress, return a streaming response
425-
if registry.should_wait_for_indexing(req.project_root):
426-
return _search_with_wait(registry, req)
255+
if project.should_wait_for_indexing:
256+
return _search_with_wait(project, req)
427257

428-
results = await registry.search(
429-
project_root=req.project_root,
258+
results = await project.search(
430259
query=req.query,
431260
languages=req.languages,
432261
paths=req.paths,
@@ -441,9 +270,9 @@ async def _dispatch(
441270
)
442271

443272
if isinstance(req, ProjectStatusRequest):
444-
await registry.get_project(req.project_root)
445-
await registry.ensure_indexing_started(req.project_root)
446-
return registry.get_status(req.project_root)
273+
project = await registry.get_project(req.project_root)
274+
await project.ensure_indexing_started()
275+
return project.get_status()
447276

448277
if isinstance(req, DaemonStatusRequest):
449278
return DaemonStatusResponse(

0 commit comments

Comments
 (0)