Skip to content

Commit 1cb99e1

Browse files
committed
wip: checkpoint full reindex parity and sqlite vector fixes
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 5ed358d commit 1cb99e1

File tree

11 files changed

+741
-66
lines changed

11 files changed

+741
-66
lines changed

src/basic_memory/cli/commands/db.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class EmbeddingProgress:
2626
"""Typed CLI progress payload for embedding backfills."""
2727

2828
entity_id: int
29-
index: int
29+
completed: int
3030
total: int
3131

3232

@@ -147,20 +147,30 @@ def reindex(
147147
False, "--embeddings", "-e", help="Rebuild vector embeddings (requires semantic search)"
148148
),
149149
search: bool = typer.Option(False, "--search", "-s", help="Rebuild full-text search index"),
150+
full: bool = typer.Option(
151+
False,
152+
"--full",
153+
help="Force a full filesystem scan and file reindex instead of the default incremental scan",
154+
),
150155
project: str = typer.Option(
151156
None, "--project", "-p", help="Reindex a specific project (default: all)"
152157
),
153158
): # pragma: no cover
154159
"""Rebuild search indexes and/or vector embeddings without dropping the database.
155160
156-
By default rebuilds everything (search + embeddings if semantic is enabled).
157-
Use --search or --embeddings to rebuild only one.
161+
By default runs incremental search + embeddings (if semantic search is enabled).
162+
Use --full to bypass incremental scan optimization, rebuild all file-backed search rows,
163+
and re-embed all eligible notes.
164+
Use --search or --embeddings to rebuild only one side.
158165
159166
Examples:
160-
bm reindex # Rebuild everything
167+
bm reindex # Incremental search + embeddings
168+
bm reindex --full # Full search + full re-embed
161169
bm reindex --embeddings # Only rebuild vector embeddings
162170
bm reindex --search # Only rebuild FTS index
163-
bm reindex -p claw # Reindex only the 'claw' project
171+
bm reindex --full --search # Full search only
172+
bm reindex --full --embeddings # Full re-embed only
173+
bm reindex -p claw --full # Full reindex for only the 'claw' project
164174
"""
165175
# If neither flag is set, do both
166176
if not embeddings and not search:
@@ -179,10 +189,19 @@ def reindex(
179189
if not search:
180190
raise typer.Exit(0)
181191

182-
run_with_cleanup(_reindex(app_config, search=search, embeddings=embeddings, project=project))
192+
run_with_cleanup(
193+
_reindex(app_config, search=search, embeddings=embeddings, full=full, project=project)
194+
)
183195

184196

185-
async def _reindex(app_config, search: bool, embeddings: bool, project: str | None):
197+
async def _reindex(
198+
app_config,
199+
*,
200+
search: bool,
201+
embeddings: bool,
202+
full: bool,
203+
project: str | None,
204+
):
186205
"""Run reindex operations."""
187206
from basic_memory.repository import EntityRepository
188207
from basic_memory.repository.search_repository import create_search_repository
@@ -220,6 +239,10 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
220239
console.print(f"\n[bold]Project: [cyan]{proj.name}[/cyan][/bold]")
221240

222241
if search:
242+
search_mode_label = "full scan" if full else "incremental scan"
243+
console.print(
244+
f" Rebuilding full-text search index ([cyan]{search_mode_label}[/cyan])..."
245+
)
223246
sync_service = await get_sync_service(proj)
224247
sync_dir = Path(proj.path)
225248
with Progress(
@@ -244,14 +267,19 @@ async def on_index_progress(update: IndexProgress) -> None:
244267
await sync_service.sync(
245268
sync_dir,
246269
project_name=proj.name,
270+
force_full=full,
271+
sync_embeddings=False,
247272
progress_callback=on_index_progress,
248273
)
249274
progress.update(task, completed=progress.tasks[task].total or 1)
250275

251276
console.print(" [green]✓[/green] Full-text search index rebuilt")
252277

253278
if embeddings:
254-
console.print(" Building vector embeddings...")
279+
embedding_mode_label = "full rebuild" if full else "incremental sync"
280+
console.print(
281+
f" Building vector embeddings ([cyan]{embedding_mode_label}[/cyan])..."
282+
)
255283
entity_repository = EntityRepository(session_maker, project_id=proj.id)
256284
search_repository = create_search_repository(
257285
session_maker, project_id=proj.id, app_config=app_config
@@ -274,16 +302,23 @@ async def on_index_progress(update: IndexProgress) -> None:
274302
def on_progress(entity_id, index, total):
275303
embedding_progress = EmbeddingProgress(
276304
entity_id=entity_id,
277-
index=index,
305+
completed=index,
278306
total=total,
279307
)
308+
# Trigger: repository progress now reports terminal entity completion.
309+
# Why: operators need to see finished embedding work rather than
310+
# entities merely entering prepare.
311+
# Outcome: the CLI bar advances steadily with real completed work.
280312
progress.update(
281313
task,
282314
total=embedding_progress.total,
283-
completed=embedding_progress.index,
315+
completed=embedding_progress.completed,
284316
)
285317

286-
stats = await search_service.reindex_vectors(progress_callback=on_progress)
318+
stats = await search_service.reindex_vectors(
319+
progress_callback=on_progress,
320+
force_full=full,
321+
)
287322
progress.update(task, completed=stats["total_entities"])
288323

289324
console.print(

src/basic_memory/repository/search_repository_base.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,22 @@ async def _sync_entity_vectors_internal(
814814
failed_entity_ids: set[int] = set()
815815
deferred_entity_ids: set[int] = set()
816816
synced_entity_ids: set[int] = set()
817+
completed_entities = 0
818+
819+
def emit_progress(entity_id: int) -> None:
820+
"""Report terminal entity progress to callers such as the CLI.
821+
822+
Trigger: an entity reaches a terminal state in this sync run.
823+
Why: operators need progress based on completed work, not the moment
824+
an entity merely enters prepare.
825+
Outcome: the progress bar advances when an entity is done for this
826+
run, whether it synced, skipped, deferred, or failed.
827+
"""
828+
nonlocal completed_entities
829+
if progress_callback is None:
830+
return
831+
completed_entities += 1
832+
progress_callback(entity_id, completed_entities, total_entities)
817833

818834
prepare_window_size = self._vector_prepare_window_size()
819835
with telemetry.started_span(
@@ -826,13 +842,6 @@ async def _sync_entity_vectors_internal(
826842
for window_start in range(0, total_entities, prepare_window_size):
827843
window_entity_ids = entity_ids[window_start : window_start + prepare_window_size]
828844

829-
if progress_callback is not None:
830-
# Trigger: prepare runs in bounded windows instead of strict one-by-one order.
831-
# Why: callbacks still need deterministic per-entity positions before the window starts.
832-
# Outcome: progress advances in prepare_window_size bursts.
833-
for offset, entity_id in enumerate(window_entity_ids, start=window_start):
834-
progress_callback(entity_id, offset, total_entities)
835-
836845
prepared_window = await self._prepare_entity_vector_jobs_window(window_entity_ids)
837846

838847
for entity_id, prepared in zip(window_entity_ids, prepared_window, strict=True):
@@ -847,6 +856,7 @@ async def _sync_entity_vectors_internal(
847856
entity_id=entity_id,
848857
error=str(prepared),
849858
)
859+
emit_progress(entity_id)
850860
continue
851861

852862
embedding_jobs_count = len(prepared.embedding_jobs)
@@ -886,6 +896,7 @@ async def _sync_entity_vectors_internal(
886896
shard_count=prepared.shard_count,
887897
remaining_jobs_after_shard=prepared.remaining_jobs_after_shard,
888898
)
899+
emit_progress(entity_id)
889900
continue
890901

891902
entity_runtime[entity_id] = _EntitySyncRuntime(
@@ -933,6 +944,7 @@ async def _sync_entity_vectors_internal(
933944
entity_runtime=entity_runtime,
934945
synced_entity_ids=synced_entity_ids,
935946
deferred_entity_ids=deferred_entity_ids,
947+
progress_callback=emit_progress,
936948
)
937949
except Exception as exc:
938950
if not continue_on_error:
@@ -952,6 +964,8 @@ async def _sync_entity_vectors_internal(
952964
chunk_count=len(flush_jobs),
953965
error=str(exc),
954966
)
967+
for failed_entity_id in affected_entity_ids:
968+
emit_progress(failed_entity_id)
955969

956970
if pending_jobs:
957971
flush_jobs = list(pending_jobs)
@@ -968,6 +982,7 @@ async def _sync_entity_vectors_internal(
968982
entity_runtime=entity_runtime,
969983
synced_entity_ids=synced_entity_ids,
970984
deferred_entity_ids=deferred_entity_ids,
985+
progress_callback=emit_progress,
971986
)
972987
except Exception as exc:
973988
if not continue_on_error:
@@ -987,6 +1002,8 @@ async def _sync_entity_vectors_internal(
9871002
chunk_count=len(flush_jobs),
9881003
error=str(exc),
9891004
)
1005+
for failed_entity_id in affected_entity_ids:
1006+
emit_progress(failed_entity_id)
9901007

9911008
# Trigger: this should never happen after all flushes succeed.
9921009
# Why: remaining jobs mean runtime tracking drifted from queued jobs.
@@ -1002,6 +1019,8 @@ async def _sync_entity_vectors_internal(
10021019
project_id=self.project_id,
10031020
unfinished_entities=orphan_runtime_entities,
10041021
)
1022+
for failed_entity_id in orphan_runtime_entities:
1023+
emit_progress(failed_entity_id)
10051024

10061025
# Keep result counters aligned with successful/failed terminal states.
10071026
synced_entity_ids.difference_update(failed_entity_ids)
@@ -1527,6 +1546,7 @@ def _finalize_completed_entity_syncs(
15271546
entity_runtime: dict[int, _EntitySyncRuntime],
15281547
synced_entity_ids: set[int],
15291548
deferred_entity_ids: set[int],
1549+
progress_callback: Callable[[int], None] | None = None,
15301550
) -> float:
15311551
"""Finalize completed entities and return cumulative queue wait seconds."""
15321552
queue_wait_seconds_total = 0.0
@@ -1570,6 +1590,8 @@ def _finalize_completed_entity_syncs(
15701590
remaining_jobs_after_shard=runtime.remaining_jobs_after_shard,
15711591
)
15721592
entity_runtime.pop(entity_id, None)
1593+
if progress_callback is not None:
1594+
progress_callback(entity_id)
15731595

15741596
return queue_wait_seconds_total
15751597

src/basic_memory/repository/sqlite_search_repository.py

Lines changed: 85 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ def __init__(
5656
self._app_config.semantic_embedding_sync_batch_size
5757
)
5858
self._embedding_provider = embedding_provider
59-
self._sqlite_vec_lock = asyncio.Lock()
59+
self._sqlite_vec_load_lock = asyncio.Lock()
60+
self._sqlite_prepare_write_lock = asyncio.Lock()
6061
self._vector_tables_initialized = False
6162
self._vector_dimensions = 384
6263

@@ -357,7 +358,13 @@ async def _ensure_sqlite_vec_loaded(self, session) -> None:
357358
"pip install -U basic-memory"
358359
) from exc
359360

360-
async with self._sqlite_vec_lock:
361+
# Trigger: sqlite-vec must be loaded on each SQLite connection before
362+
# vec tables and functions are visible.
363+
# Why: extension loading is connection-local, so we need one narrow
364+
# critical section to avoid racing two coroutines on the same step.
365+
# Outcome: connection setup stays serialized without blocking unrelated
366+
# prepare work behind the write-side lock.
367+
async with self._sqlite_vec_load_lock:
361368
try:
362369
await session.execute(text("SELECT vec_version()"))
363370
return
@@ -558,6 +565,76 @@ async def _delete_stale_chunks(
558565
stale_params,
559566
)
560567

568+
async def delete_entity_vector_rows(self, entity_id: int) -> None:
569+
"""Delete one entity's vec rows on a sqlite-vec-enabled connection."""
570+
await self._ensure_vector_tables()
571+
572+
async with db.scoped_session(self.session_maker) as session:
573+
await self._ensure_sqlite_vec_loaded(session)
574+
575+
# Constraint: sqlite-vec virtual tables are only visible after vec0 is
576+
# loaded on this exact connection.
577+
# Why: generic repository sessions can reach search_vector_chunks but still
578+
# fail with "no such module: vec0" when touching embeddings.
579+
# Outcome: service-level cleanup routes vec-table deletes through this helper.
580+
await self._delete_entity_chunks(session, entity_id)
581+
await session.commit()
582+
583+
async def delete_project_vector_rows(self) -> None:
584+
"""Delete all vector rows for this project on a sqlite-vec-enabled connection."""
585+
await self._ensure_vector_tables()
586+
587+
async with db.scoped_session(self.session_maker) as session:
588+
await self._ensure_sqlite_vec_loaded(session)
589+
590+
# Constraint: sqlite-vec stores embeddings separately with no cascade delete.
591+
# Why: full rebuild must clear embeddings before chunk rows or stale vectors remain.
592+
# Outcome: the next sync recreates the project's derived vectors from scratch.
593+
await session.execute(
594+
text(
595+
"DELETE FROM search_vector_embeddings WHERE rowid IN ("
596+
"SELECT id FROM search_vector_chunks WHERE project_id = :project_id)"
597+
),
598+
{"project_id": self.project_id},
599+
)
600+
await session.execute(
601+
text("DELETE FROM search_vector_chunks WHERE project_id = :project_id"),
602+
{"project_id": self.project_id},
603+
)
604+
await session.commit()
605+
606+
async def delete_stale_vector_rows(self) -> None:
607+
"""Delete vector rows whose source entities no longer exist."""
608+
await self._ensure_vector_tables()
609+
610+
async with db.scoped_session(self.session_maker) as session:
611+
await self._ensure_sqlite_vec_loaded(session)
612+
613+
stale_entity_filter = (
614+
"entity_id NOT IN (SELECT id FROM entity WHERE project_id = :project_id)"
615+
)
616+
params = {"project_id": self.project_id}
617+
618+
# Trigger: deleted entities left behind derived vector rows.
619+
# Why: sqlite-vec does not provide cascade cleanup from our chunk table.
620+
# Outcome: stale vector state disappears before coverage stats or reindex runs.
621+
await session.execute(
622+
text(
623+
"DELETE FROM search_vector_embeddings WHERE rowid IN ("
624+
"SELECT id FROM search_vector_chunks "
625+
f"WHERE project_id = :project_id AND {stale_entity_filter})"
626+
),
627+
params,
628+
)
629+
await session.execute(
630+
text(
631+
"DELETE FROM search_vector_chunks "
632+
f"WHERE project_id = :project_id AND {stale_entity_filter}"
633+
),
634+
params,
635+
)
636+
await session.commit()
637+
561638
def _distance_to_similarity(self, distance: float) -> float:
562639
"""Convert L2 distance to cosine similarity for normalized embeddings.
563640
@@ -569,7 +646,12 @@ def _distance_to_similarity(self, distance: float) -> float:
569646
@asynccontextmanager
570647
async def _prepare_entity_write_scope(self):
571648
"""SQLite keeps the shared read window, but funnels prepare writes through one lock."""
572-
async with self._sqlite_vec_lock:
649+
# Trigger: the shared prepare window fans out per entity after batched reads.
650+
# Why: SQLite still benefits from shared reads, but write transactions do
651+
# not get meaningfully faster when we open many at once.
652+
# Outcome: one entity at a time mutates chunk rows, while vec extension
653+
# loading uses its own separate lock and cannot deadlock this path.
654+
async with self._sqlite_prepare_write_lock:
573655
yield
574656

575657
def _prepare_window_existing_rows_sql(self, placeholders: str) -> str:

0 commit comments

Comments
 (0)