Skip to content

Commit 540da41

Browse files
authored
perf(sync): batch file indexing in core (#726)
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 3e40cb9 commit 540da41

23 files changed

+2810
-656
lines changed

src/basic_memory/cli/commands/db.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Database management commands."""
22

3+
from dataclasses import dataclass
34
from pathlib import Path
45

56
import typer
@@ -12,13 +13,47 @@
1213
from basic_memory.cli.app import app
1314
from basic_memory.cli.commands.command_utils import run_with_cleanup
1415
from basic_memory.config import ConfigManager, ProjectMode
16+
from basic_memory.indexing import IndexProgress
1517
from basic_memory.repository import ProjectRepository
1618
from basic_memory.services.initialization import reconcile_projects_with_config
1719
from basic_memory.sync.sync_service import get_sync_service
1820

1921
console = Console()
2022

2123

24+
@dataclass(slots=True)
25+
class EmbeddingProgress:
26+
"""Typed CLI progress payload for embedding backfills."""
27+
28+
entity_id: int
29+
index: int
30+
total: int
31+
32+
33+
def _format_eta(seconds: float | None) -> str:
34+
"""Render a compact ETA string for CLI progress descriptions."""
35+
if seconds is None:
36+
return "--:--"
37+
38+
whole_seconds = max(int(seconds), 0)
39+
minutes, remaining_seconds = divmod(whole_seconds, 60)
40+
hours, remaining_minutes = divmod(minutes, 60)
41+
if hours:
42+
return f"{hours:d}:{remaining_minutes:02d}:{remaining_seconds:02d}"
43+
return f"{remaining_minutes:02d}:{remaining_seconds:02d}"
44+
45+
46+
def _format_index_progress(progress: IndexProgress) -> str:
47+
"""Render typed index progress as a compact Rich task description."""
48+
files_per_minute = int(progress.files_per_minute) if progress.files_per_minute else 0
49+
return (
50+
" Indexing files... "
51+
f"{progress.files_processed}/{progress.files_total} files | "
52+
f"{progress.batches_completed}/{progress.batches_total} batches | "
53+
f"{files_per_minute}/min | ETA {_format_eta(progress.eta_seconds)}"
54+
)
55+
56+
2257
async def _reindex_projects(app_config):
2358
"""Reindex all projects in a single async context.
2459
@@ -185,10 +220,34 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
185220
console.print(f"\n[bold]Project: [cyan]{proj.name}[/cyan][/bold]")
186221

187222
if search:
188-
console.print(" Rebuilding full-text search index...")
189223
sync_service = await get_sync_service(proj)
190224
sync_dir = Path(proj.path)
191-
await sync_service.sync(sync_dir, project_name=proj.name)
225+
with Progress(
226+
SpinnerColumn(),
227+
TextColumn("[progress.description]{task.description}"),
228+
BarColumn(),
229+
TaskProgressColumn(),
230+
console=console,
231+
) as progress:
232+
task = progress.add_task(" Indexing files... scanning changes", total=1)
233+
234+
async def on_index_progress(update: IndexProgress) -> None:
235+
total = update.files_total or 1
236+
completed = update.files_processed if update.files_total else 1
237+
progress.update(
238+
task,
239+
description=_format_index_progress(update),
240+
total=total,
241+
completed=min(completed, total),
242+
)
243+
244+
await sync_service.sync(
245+
sync_dir,
246+
project_name=proj.name,
247+
progress_callback=on_index_progress,
248+
)
249+
progress.update(task, completed=progress.tasks[task].total or 1)
250+
192251
console.print(" [green]✓[/green] Full-text search index rebuilt")
193252

194253
if embeddings:
@@ -213,7 +272,16 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
213272
task = progress.add_task(" Embedding entities...", total=None)
214273

215274
def on_progress(entity_id, index, total):
216-
progress.update(task, total=total, completed=index)
275+
embedding_progress = EmbeddingProgress(
276+
entity_id=entity_id,
277+
index=index,
278+
total=total,
279+
)
280+
progress.update(
281+
task,
282+
total=embedding_progress.total,
283+
completed=embedding_progress.index,
284+
)
217285

218286
stats = await search_service.reindex_vectors(progress_callback=on_progress)
219287
progress.update(task, completed=stats["total_entities"])

src/basic_memory/config.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ class BasicMemoryConfig(BaseSettings):
193193
description="Batch size for embedding generation.",
194194
gt=0,
195195
)
196+
semantic_embedding_request_concurrency: int = Field(
197+
default=4,
198+
description="Maximum number of concurrent provider requests for batched embedding generation when the active provider supports request-level concurrency.",
199+
gt=0,
200+
)
196201
semantic_embedding_sync_batch_size: int = Field(
197202
default=64,
198203
description="Batch size for vector sync orchestration flushes.",
@@ -286,6 +291,31 @@ class BasicMemoryConfig(BaseSettings):
286291
description="Maximum number of files to process concurrently during sync. Limits memory usage on large projects (2000+ files). Lower values reduce memory consumption.",
287292
gt=0,
288293
)
294+
index_batch_size: int = Field(
295+
default=32,
296+
description="Maximum number of changed files to load into one indexing batch.",
297+
gt=0,
298+
)
299+
index_batch_max_bytes: int = Field(
300+
default=8 * 1024 * 1024,
301+
description="Maximum total bytes to load into one indexing batch. Large files still run as single-file batches.",
302+
gt=0,
303+
)
304+
index_parse_max_concurrent: int = Field(
305+
default=8,
306+
description="Maximum number of markdown parse tasks to run concurrently inside one indexing batch.",
307+
gt=0,
308+
)
309+
index_entity_max_concurrent: int = Field(
310+
default=4,
311+
description="Maximum number of entity create/update tasks to run concurrently inside one indexing batch.",
312+
gt=0,
313+
)
314+
index_metadata_update_max_concurrent: int = Field(
315+
default=4,
316+
description="Maximum number of metadata/search refresh tasks to run concurrently inside one indexing batch.",
317+
gt=0,
318+
)
289319

290320
kebab_filenames: bool = Field(
291321
default=False,

src/basic_memory/db.py

Lines changed: 0 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -44,101 +44,6 @@
4444
_session_maker: Optional[async_sessionmaker[AsyncSession]] = None
4545

4646

47-
async def _needs_semantic_embedding_backfill(
48-
app_config: BasicMemoryConfig,
49-
session_maker: async_sessionmaker[AsyncSession],
50-
) -> bool:
51-
"""Check if entities exist but vector embeddings are empty.
52-
53-
This is the reliable way to detect that embeddings need to be generated,
54-
regardless of how migrations were applied (fresh DB, upgrade, reset, etc.).
55-
"""
56-
if not app_config.semantic_search_enabled:
57-
return False
58-
59-
try:
60-
async with scoped_session(session_maker) as session:
61-
entity_count = (
62-
await session.execute(text("SELECT COUNT(*) FROM entity"))
63-
).scalar() or 0
64-
if entity_count == 0:
65-
return False
66-
67-
# Check if vector chunks table exists and is empty
68-
embedding_count = (
69-
await session.execute(text("SELECT COUNT(*) FROM search_vector_chunks"))
70-
).scalar() or 0
71-
72-
return embedding_count == 0
73-
except Exception as exc:
74-
# Table might not exist yet (pre-migration)
75-
logger.debug(f"Could not check embedding status: {exc}")
76-
return False
77-
78-
79-
async def _run_semantic_embedding_backfill(
80-
app_config: BasicMemoryConfig,
81-
session_maker: async_sessionmaker[AsyncSession],
82-
) -> None:
83-
"""Backfill semantic embeddings for all active projects/entities."""
84-
if not app_config.semantic_search_enabled:
85-
logger.info("Skipping automatic semantic embedding backfill: semantic search is disabled.")
86-
return
87-
88-
async with scoped_session(session_maker) as session:
89-
project_result = await session.execute(
90-
text("SELECT id, name FROM project WHERE is_active = :is_active ORDER BY id"),
91-
{"is_active": True},
92-
)
93-
projects = [(int(row[0]), str(row[1])) for row in project_result.fetchall()]
94-
95-
if not projects:
96-
logger.info("Skipping automatic semantic embedding backfill: no active projects found.")
97-
return
98-
99-
repository_class = (
100-
PostgresSearchRepository
101-
if app_config.database_backend == DatabaseBackend.POSTGRES
102-
else SQLiteSearchRepository
103-
)
104-
105-
total_entities = 0
106-
for project_id, project_name in projects:
107-
async with scoped_session(session_maker) as session:
108-
entity_result = await session.execute(
109-
text("SELECT id FROM entity WHERE project_id = :project_id ORDER BY id"),
110-
{"project_id": project_id},
111-
)
112-
entity_ids = [int(row[0]) for row in entity_result.fetchall()]
113-
114-
if not entity_ids:
115-
continue
116-
117-
total_entities += len(entity_ids)
118-
logger.info(
119-
"Automatic semantic embedding backfill: "
120-
f"project={project_name}, entities={len(entity_ids)}"
121-
)
122-
123-
search_repository = repository_class(
124-
session_maker,
125-
project_id=project_id,
126-
app_config=app_config,
127-
)
128-
batch_result = await search_repository.sync_entity_vectors_batch(entity_ids)
129-
if batch_result.entities_failed > 0:
130-
logger.warning(
131-
"Automatic semantic embedding backfill encountered entity failures: "
132-
f"project={project_name}, failed={batch_result.entities_failed}, "
133-
f"failed_entity_ids={batch_result.failed_entity_ids}"
134-
)
135-
136-
logger.info(
137-
"Automatic semantic embedding backfill complete: "
138-
f"projects={len(projects)}, entities={total_entities}"
139-
)
140-
141-
14247
class DatabaseType(Enum):
14348
"""Types of supported databases."""
14449

@@ -521,14 +426,6 @@ async def run_migrations(
521426
else:
522427
await SQLiteSearchRepository(session_maker, 1).init_search_index()
523428

524-
# Check if backfill is needed — actual backfill runs in background
525-
# from the MCP server lifespan to avoid blocking startup.
526-
if await _needs_semantic_embedding_backfill(app_config, session_maker):
527-
logger.info(
528-
"Semantic embeddings missing — backfill will run in background after startup"
529-
)
530-
else:
531-
logger.info("Semantic embeddings: up to date")
532429
except Exception as e: # pragma: no cover
533430
logger.error(f"Error running migrations: {e}")
534431
raise
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Reusable indexing primitives shared by local sync and future remote callers."""
2+
3+
from basic_memory.indexing.batch_indexer import BatchIndexer
4+
from basic_memory.indexing.batching import build_index_batches
5+
from basic_memory.indexing.models import (
6+
IndexedEntity,
7+
IndexBatch,
8+
IndexFileMetadata,
9+
IndexFileWriter,
10+
IndexFrontmatterUpdate,
11+
IndexFrontmatterWriteResult,
12+
IndexingBatchResult,
13+
IndexInputFile,
14+
IndexProgress,
15+
)
16+
17+
__all__ = [
18+
"BatchIndexer",
19+
"IndexedEntity",
20+
"IndexBatch",
21+
"IndexFileMetadata",
22+
"IndexFileWriter",
23+
"IndexFrontmatterUpdate",
24+
"IndexFrontmatterWriteResult",
25+
"IndexingBatchResult",
26+
"IndexInputFile",
27+
"IndexProgress",
28+
"build_index_batches",
29+
]

0 commit comments

Comments
 (0)