Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [

dependencies = [
"mcp>=1.0.0",
"cocoindex[litellm]==1.0.0a31",
"cocoindex[litellm]==1.0.0a32",
"sentence-transformers>=2.2.0",
"sqlite-vec>=0.1.0",
"pydantic>=2.0.0",
Expand Down
54 changes: 47 additions & 7 deletions src/cocoindex_code/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
if TYPE_CHECKING:
from .client import DaemonClient

from .protocol import ProjectStatusResponse, SearchResponse
from .protocol import IndexingProgress, ProjectStatusResponse, SearchResponse
from .settings import (
default_project_settings,
default_user_settings,
Expand Down Expand Up @@ -80,8 +80,21 @@ def resolve_default_path(project_root: Path) -> str | None:
return f"{rel.as_posix()}/*"


def _format_progress(progress: IndexingProgress) -> str:
"""Format an IndexingProgress snapshot as a human-readable string."""
return (
f"{progress.num_execution_starts} files listed"
f" | {progress.num_adds} added, {progress.num_deletes} deleted,"
f" {progress.num_reprocesses} reprocessed,"
f" {progress.num_unchanged} unchanged,"
f" error: {progress.num_errors}"
)


def print_index_stats(status: ProjectStatusResponse) -> None:
"""Print formatted index statistics."""
if status.progress is not None:
_typer.echo(f"Indexing in progress: {_format_progress(status.progress)}")
_typer.echo("\nIndex stats:")
_typer.echo(f" Chunks: {status.total_chunks}")
_typer.echo(f" Files: {status.total_files}")
Expand Down Expand Up @@ -153,13 +166,40 @@ def init(
@app.command()
def index() -> None:
"""Create/update index for the codebase."""
from rich.console import Console as _Console
from rich.live import Live as _Live
from rich.spinner import Spinner as _Spinner

client, project_root = require_daemon_for_project()
_typer.echo("Indexing...")
try:
resp = client.index(project_root)
except RuntimeError as e:
_typer.echo(f"Indexing failed: {e}", err=True)
raise _typer.Exit(code=1)
err_console = _Console(stderr=True)
last_progress_line: str | None = None

with _Live(_Spinner("dots", "Indexing..."), console=err_console, transient=True) as live:

def _on_waiting() -> None:
live.update(
_Spinner(
"dots",
"Another indexing is ongoing, waiting for it to finish...",
)
)

def _on_progress(progress: IndexingProgress) -> None:
nonlocal last_progress_line
last_progress_line = f"Indexing: {_format_progress(progress)}"
live.update(_Spinner("dots", last_progress_line))

try:
resp = client.index(project_root, on_progress=_on_progress, on_waiting=_on_waiting)
except RuntimeError as e:
live.stop()
_typer.echo(f"Indexing failed: {e}", err=True)
raise _typer.Exit(code=1)

# Print the final progress line so it remains visible after the spinner clears
if last_progress_line is not None:
_typer.echo(last_progress_line, err=True)

if not resp.success:
_typer.echo(f"Indexing failed: {resp.message}", err=True)
raise _typer.Exit(code=1)
Expand Down
31 changes: 28 additions & 3 deletions src/cocoindex_code/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import subprocess
import sys
import time
from collections.abc import Callable
from multiprocessing.connection import Client, Connection
from pathlib import Path

Expand All @@ -18,8 +19,11 @@
ErrorResponse,
HandshakeRequest,
HandshakeResponse,
IndexingProgress,
IndexProgressUpdate,
IndexRequest,
IndexResponse,
IndexWaitingNotice,
ProjectStatusRequest,
ProjectStatusResponse,
Request,
Expand Down Expand Up @@ -59,9 +63,30 @@ def handshake(self) -> HandshakeResponse:
"""Send version handshake."""
return self._send(HandshakeRequest(version=__version__)) # type: ignore[return-value]

def index(self, project_root: str) -> IndexResponse:
"""Request indexing. Blocks until complete."""
return self._send(IndexRequest(project_root=project_root)) # type: ignore[return-value]
def index(
self,
project_root: str,
on_progress: Callable[[IndexingProgress], None] | None = None,
on_waiting: Callable[[], None] | None = None,
) -> IndexResponse:
"""Request indexing with streaming progress. Blocks until complete."""
self._conn.send_bytes(encode_request(IndexRequest(project_root=project_root)))
while True:
data = self._conn.recv_bytes()
resp = decode_response(data)
if isinstance(resp, ErrorResponse):
raise RuntimeError(f"Daemon error: {resp.message}")
if isinstance(resp, IndexWaitingNotice):
if on_waiting is not None:
on_waiting()
continue
if isinstance(resp, IndexProgressUpdate):
if on_progress is not None:
on_progress(resp.progress)
continue
if isinstance(resp, IndexResponse):
return resp
raise RuntimeError(f"Unexpected response: {type(resp).__name__}")

def search(
self,
Expand Down
68 changes: 57 additions & 11 deletions src/cocoindex_code/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import threading
import time
from collections.abc import AsyncIterator
from multiprocessing.connection import Connection, Listener
from pathlib import Path
from typing import Any
Expand All @@ -22,8 +23,12 @@
ErrorResponse,
HandshakeRequest,
HandshakeResponse,
IndexingProgress,
IndexProgressUpdate,
IndexRequest,
IndexResponse,
IndexStreamResponse,
IndexWaitingNotice,
ProjectStatusRequest,
ProjectStatusResponse,
Request,
Expand Down Expand Up @@ -113,14 +118,43 @@ async def get_project(self, project_root: str) -> Project:
self._indexing[project_root] = False
return self._projects[project_root]

async def update_index(self, project_root: str) -> None:
"""Update index for project, serialized by per-project lock."""
async def update_index(self, project_root: str) -> AsyncIterator[IndexStreamResponse]:
"""Update index, yielding progress updates and a final IndexResponse."""
project = await self.get_project(project_root)
lock = self._index_locks[project_root]

# If lock is already held, notify the client and block until released
if lock.locked():
yield IndexWaitingNotice()

async with lock:
self._indexing[project_root] = True
try:
await project.update_index()
progress_queue: asyncio.Queue[IndexingProgress] = asyncio.Queue()

def on_progress(progress: IndexingProgress) -> None:
progress_queue.put_nowait(progress)

update_task = asyncio.create_task(project.update_index(on_progress=on_progress))

# Drain the queue until the update completes
while not update_task.done():
try:
progress = await asyncio.wait_for(progress_queue.get(), timeout=0.1)
yield IndexProgressUpdate(progress=progress)
except TimeoutError:
continue

# Drain any remaining items
while not progress_queue.empty():
yield IndexProgressUpdate(progress=progress_queue.get_nowait())

# Propagate any exception from the update task
update_task.result()

yield IndexResponse(success=True)
except Exception as e:
yield IndexResponse(success=False, message=str(e))
finally:
self._indexing[project_root] = False

Expand Down Expand Up @@ -177,11 +211,14 @@ def get_status(self, project_root: str) -> ProjectStatusResponse:
" GROUP BY language ORDER BY cnt DESC"
).fetchall()

is_indexing = self._indexing.get(project_root, False)
progress = project.indexing_stats if is_indexing else None
return ProjectStatusResponse(
indexing=self._indexing.get(project_root, False),
indexing=is_indexing,
total_chunks=total_chunks,
total_files=total_files,
languages={lang: cnt for lang, cnt in lang_rows},
progress=progress,
)

def list_projects(self) -> list[DaemonProjectInfo]:
Expand Down Expand Up @@ -246,8 +283,12 @@ def _recv() -> bytes:
handshake_done = True
continue

resp = await _dispatch(req, registry, start_time, shutdown_event)
conn.send_bytes(encode_response(resp))
result = await _dispatch(req, registry, start_time, shutdown_event)
if isinstance(result, AsyncIterator):
async for resp in result:
conn.send_bytes(encode_response(resp))
else:
conn.send_bytes(encode_response(result))

if isinstance(req, StopRequest):
break
Expand All @@ -265,16 +306,21 @@ async def _dispatch(
registry: ProjectRegistry,
start_time: float,
shutdown_event: asyncio.Event,
) -> Response:
"""Dispatch a request to the appropriate handler."""
) -> Response | AsyncIterator[IndexStreamResponse]:
"""Dispatch a request to the appropriate handler.

Returns a single Response for most requests, or an AsyncIterator for
streaming requests (IndexRequest).
"""
try:
if isinstance(req, IndexRequest):
await registry.update_index(req.project_root)
return IndexResponse(success=True)
return registry.update_index(req.project_root)

if isinstance(req, SearchRequest):
if req.refresh:
await registry.update_index(req.project_root)
# Consume the index stream silently for refresh
async for _ in registry.update_index(req.project_root):
pass
results = await registry.search(
project_root=req.project_root,
query=req.query,
Expand Down
45 changes: 38 additions & 7 deletions src/cocoindex_code/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from __future__ import annotations

import asyncio
from collections.abc import Callable
from pathlib import Path

import cocoindex as coco
from cocoindex.connectors import sqlite

from .indexer import indexer_main
from .protocol import IndexingProgress
from .settings import PROJECT_SETTINGS, ProjectSettings
from .shared import CODEBASE_DIR, EMBEDDER, SQLITE_DB, Embedder

Expand All @@ -18,14 +20,43 @@ class Project:
_app: coco.App[[], None]
_index_lock: asyncio.Lock
_initial_index_done: bool = False
_indexing_stats: IndexingProgress | None = None

async def update_index(self, *, report_to_stdout: bool = False) -> None:
"""Update the index, serializing concurrent calls via lock."""
async with self._index_lock:
try:
await self._app.update(report_to_stdout=report_to_stdout)
finally:
self._initial_index_done = True
async def update_index(
self,
*,
on_progress: Callable[[IndexingProgress], None] | None = None,
) -> None:
"""Update the index, streaming progress via callback.

The lock is NOT acquired here — callers (e.g. ProjectRegistry) are
responsible for serialization so they can inspect lock state and
yield one-shot snapshots before blocking.
"""
try:
handle = self._app.update()
async for snapshot in handle.watch():
file_stats = snapshot.stats.by_processor.get("process_file")
if file_stats is not None:
progress = IndexingProgress(
num_execution_starts=file_stats.num_execution_starts,
num_unchanged=file_stats.num_unchanged,
num_adds=file_stats.num_adds,
num_deletes=file_stats.num_deletes,
num_reprocesses=file_stats.num_reprocesses,
num_errors=file_stats.num_errors,
)
self._indexing_stats = progress
if on_progress is not None:
on_progress(progress)
await asyncio.sleep(0.1)
finally:
self._indexing_stats = None
self._initial_index_done = True

@property
def indexing_stats(self) -> IndexingProgress | None:
return self._indexing_stats

@property
def env(self) -> coco.Environment:
Expand Down
28 changes: 28 additions & 0 deletions src/cocoindex_code/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,29 @@ class IndexResponse(_msgspec.Struct, tag="index"):
message: str | None = None


class IndexingProgress(_msgspec.Struct):
"""Indexing stats snapshot, shared between progress updates and status responses."""

num_execution_starts: int
num_unchanged: int
num_adds: int
num_deletes: int
num_reprocesses: int
num_errors: int


class IndexProgressUpdate(_msgspec.Struct, tag="index_progress"):
"""Streamed during indexing — one per stats change, before the final IndexResponse."""

progress: IndexingProgress


class IndexWaitingNotice(_msgspec.Struct, tag="index_waiting"):
"""Sent when another indexing is already in progress and the client must wait."""

pass


class SearchResult(_msgspec.Struct):
file_path: str
language: str
Expand All @@ -85,6 +108,7 @@ class ProjectStatusResponse(_msgspec.Struct, tag="project_status"):
total_chunks: int
total_files: int
languages: dict[str, int]
progress: IndexingProgress | None = None


class DaemonProjectInfo(_msgspec.Struct):
Expand All @@ -109,13 +133,17 @@ class ErrorResponse(_msgspec.Struct, tag="error"):
Response = (
HandshakeResponse
| IndexResponse
| IndexProgressUpdate
| IndexWaitingNotice
| SearchResponse
| ProjectStatusResponse
| DaemonStatusResponse
| StopResponse
| ErrorResponse
)

IndexStreamResponse = IndexProgressUpdate | IndexWaitingNotice | IndexResponse | ErrorResponse

# ---------------------------------------------------------------------------
# Encode / decode helpers (msgpack binary)
# ---------------------------------------------------------------------------
Expand Down
Loading
Loading