Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 48 additions & 13 deletions src/basic_memory/cli/commands/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class EmbeddingProgress:
"""Typed CLI progress payload for embedding backfills."""

entity_id: int
index: int
completed: int
total: int


Expand Down Expand Up @@ -147,20 +147,30 @@ def reindex(
False, "--embeddings", "-e", help="Rebuild vector embeddings (requires semantic search)"
),
search: bool = typer.Option(False, "--search", "-s", help="Rebuild full-text search index"),
full: bool = typer.Option(
False,
"--full",
help="Force a full filesystem scan and file reindex instead of the default incremental scan",
),
project: str = typer.Option(
None, "--project", "-p", help="Reindex a specific project (default: all)"
),
): # pragma: no cover
"""Rebuild search indexes and/or vector embeddings without dropping the database.

By default rebuilds everything (search + embeddings if semantic is enabled).
Use --search or --embeddings to rebuild only one.
By default runs incremental search + embeddings (if semantic search is enabled).
Use --full to bypass incremental scan optimization, rebuild all file-backed search rows,
and re-embed all eligible notes.
Use --search or --embeddings to rebuild only one side.

Examples:
bm reindex # Rebuild everything
bm reindex # Incremental search + embeddings
bm reindex --full # Full search + full re-embed
bm reindex --embeddings # Only rebuild vector embeddings
bm reindex --search # Only rebuild FTS index
bm reindex -p claw # Reindex only the 'claw' project
bm reindex --full --search # Full search only
bm reindex --full --embeddings # Full re-embed only
bm reindex -p claw --full # Full reindex for only the 'claw' project
"""
# If neither flag is set, do both
if not embeddings and not search:
Expand All @@ -179,10 +189,19 @@ def reindex(
if not search:
raise typer.Exit(0)

run_with_cleanup(_reindex(app_config, search=search, embeddings=embeddings, project=project))
run_with_cleanup(
_reindex(app_config, search=search, embeddings=embeddings, full=full, project=project)
)


async def _reindex(app_config, search: bool, embeddings: bool, project: str | None):
async def _reindex(
app_config,
*,
search: bool,
embeddings: bool,
full: bool,
project: str | None,
):
"""Run reindex operations."""
from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import create_search_repository
Expand Down Expand Up @@ -220,6 +239,10 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
console.print(f"\n[bold]Project: [cyan]{proj.name}[/cyan][/bold]")

if search:
search_mode_label = "full scan" if full else "incremental scan"
console.print(
f" Rebuilding full-text search index ([cyan]{search_mode_label}[/cyan])..."
)
sync_service = await get_sync_service(proj)
sync_dir = Path(proj.path)
with Progress(
Expand All @@ -244,14 +267,19 @@ async def on_index_progress(update: IndexProgress) -> None:
await sync_service.sync(
sync_dir,
project_name=proj.name,
force_full=full,
sync_embeddings=False,
progress_callback=on_index_progress,
)
progress.update(task, completed=progress.tasks[task].total or 1)

console.print(" [green][/green] Full-text search index rebuilt")
console.print(" [green]done[/green] Full-text search index rebuilt")

if embeddings:
console.print(" Building vector embeddings...")
embedding_mode_label = "full rebuild" if full else "incremental sync"
console.print(
f" Building vector embeddings ([cyan]{embedding_mode_label}[/cyan])..."
)
entity_repository = EntityRepository(session_maker, project_id=proj.id)
search_repository = create_search_repository(
session_maker, project_id=proj.id, app_config=app_config
Expand All @@ -274,20 +302,27 @@ async def on_index_progress(update: IndexProgress) -> None:
def on_progress(entity_id, index, total):
embedding_progress = EmbeddingProgress(
entity_id=entity_id,
index=index,
completed=index,
total=total,
)
# Trigger: repository progress now reports terminal entity completion.
# Why: operators need to see finished embedding work rather than
# entities merely entering prepare.
# Outcome: the CLI bar advances steadily with real completed work.
progress.update(
task,
total=embedding_progress.total,
completed=embedding_progress.index,
completed=embedding_progress.completed,
)

stats = await search_service.reindex_vectors(progress_callback=on_progress)
stats = await search_service.reindex_vectors(
progress_callback=on_progress,
force_full=full,
)
progress.update(task, completed=stats["total_entities"])

console.print(
f" [green][/green] Embeddings complete: "
f" [green]done[/green] Embeddings complete: "
f"{stats['embedded']} entities embedded, "
f"{stats['skipped']} skipped, "
f"{stats['errors']} errors"
Expand Down
10 changes: 8 additions & 2 deletions src/basic_memory/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,14 @@ class BasicMemoryConfig(BaseSettings):
default=None,
description="Embedding vector dimensions. Auto-detected from provider if not set (384 for FastEmbed, 1536 for OpenAI).",
)
# Trigger: full local rebuilds spend most of their time waiting behind shared
# embed flushes, not constructing vectors themselves.
# Why: smaller FastEmbed batches cut queue wait far more than they increase
# write overhead on real-world projects, which makes full reindex materially faster.
# Outcome: default to the smaller local/cloud-safe batch size we benchmarked as
# the current best end-to-end setting in the shared vector sync pipeline.
semantic_embedding_batch_size: int = Field(
default=64,
default=2,
description="Batch size for embedding generation.",
gt=0,
)
Expand All @@ -199,7 +205,7 @@ class BasicMemoryConfig(BaseSettings):
gt=0,
)
semantic_embedding_sync_batch_size: int = Field(
default=64,
default=2,
description="Batch size for vector sync orchestration flushes.",
gt=0,
)
Expand Down
15 changes: 14 additions & 1 deletion src/basic_memory/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ async def write_file_atomic(path: FilePath, content: str) -> None:
temp_path = path_obj.with_suffix(".tmp")

try:
# Use aiofiles for non-blocking write
# Trigger: callers hand us normalized Python text, but the final bytes are allowed
# to use the host platform's native newline convention during the write.
# Why: preserving CRLF on Windows keeps local files aligned with editors like
# Obsidian, while FileService now hashes the persisted file bytes instead of
# the pre-write string.
# Outcome: this async write stays editor-friendly across platforms without
# reintroducing checksum drift in sync or move detection.
async with aiofiles.open(temp_path, mode="w", encoding="utf-8") as f:
await f.write(content)

Expand Down Expand Up @@ -168,6 +174,13 @@ async def format_markdown_builtin(path: Path) -> Optional[str]:

# Only write if content changed
if formatted_content != content:
# Trigger: mdformat may rewrite markdown content, then the host platform
# decides the newline bytes for the follow-up async text write.
# Why: we want formatter output to preserve native newlines instead of
# forcing LF, and the authoritative checksum comes from rereading the
# stored file bytes later in FileService.
# Outcome: formatting remains compatible with local editors on Windows while
# checksum-based sync logic stays anchored to on-disk bytes.
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(formatted_content)

Expand Down
6 changes: 5 additions & 1 deletion src/basic_memory/repository/embedding_provider.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Embedding provider protocol for pluggable semantic backends."""

from typing import Protocol
from typing import Any, Protocol


class EmbeddingProvider(Protocol):
Expand All @@ -16,3 +16,7 @@ async def embed_query(self, text: str) -> list[float]:
async def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""Embed a list of document chunks."""
...

def runtime_log_attrs(self) -> dict[str, Any]:
"""Return provider-specific runtime settings suitable for startup logs."""
...
54 changes: 48 additions & 6 deletions src/basic_memory/repository/embedding_provider_factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Factory for creating configured semantic embedding providers."""

import os
from threading import Lock

from basic_memory.config import BasicMemoryConfig
Expand All @@ -18,19 +19,59 @@

_EMBEDDING_PROVIDER_CACHE: dict[ProviderCacheKey, EmbeddingProvider] = {}
_EMBEDDING_PROVIDER_CACHE_LOCK = Lock()
_FASTEMBED_MAX_THREADS = 8


def _available_cpu_count() -> int | None:
"""Return the CPU budget available to this process when the runtime exposes it."""
process_cpu_count = getattr(os, "process_cpu_count", None)
if callable(process_cpu_count):
cpu_count = process_cpu_count()
if isinstance(cpu_count, int) and cpu_count > 0:
return cpu_count

cpu_count = os.cpu_count()
return cpu_count if cpu_count is not None and cpu_count > 0 else None


def _resolve_fastembed_runtime_knobs(
app_config: BasicMemoryConfig,
) -> tuple[int | None, int | None]:
"""Resolve FastEmbed threads/parallel from explicit config or CPU-aware defaults."""
configured_threads = app_config.semantic_embedding_threads
configured_parallel = app_config.semantic_embedding_parallel
if configured_threads is not None or configured_parallel is not None:
return configured_threads, configured_parallel

available_cpus = _available_cpu_count()
if available_cpus is None:
return None, None

# Trigger: local laptops and cloud workers expose different CPU budgets.
# Why: full rebuilds got faster when FastEmbed used most, but not all, of
# the available CPUs. Leaving a little headroom avoids starving the rest of
# the pipeline while still giving ONNX enough threads to stay busy.
# Outcome: when config leaves the knobs unset, each process reserves a small
# CPU cushion and keeps FastEmbed on the simpler single-process path.
if available_cpus <= 2:
return available_cpus, 1

threads = min(_FASTEMBED_MAX_THREADS, max(2, available_cpus - 2))
return threads, 1


def _provider_cache_key(app_config: BasicMemoryConfig) -> ProviderCacheKey:
"""Build a stable cache key from provider-relevant semantic embedding config."""
resolved_threads, resolved_parallel = _resolve_fastembed_runtime_knobs(app_config)
return (
app_config.semantic_embedding_provider.strip().lower(),
app_config.semantic_embedding_model,
app_config.semantic_embedding_dimensions,
app_config.semantic_embedding_batch_size,
app_config.semantic_embedding_request_concurrency,
app_config.semantic_embedding_cache_dir,
app_config.semantic_embedding_threads,
app_config.semantic_embedding_parallel,
resolved_threads,
resolved_parallel,
)


Expand Down Expand Up @@ -61,12 +102,13 @@ def create_embedding_provider(app_config: BasicMemoryConfig) -> EmbeddingProvide
# Deferred import: fastembed (and its onnxruntime dep) may not be installed
from basic_memory.repository.fastembed_provider import FastEmbedEmbeddingProvider

resolved_threads, resolved_parallel = _resolve_fastembed_runtime_knobs(app_config)
if app_config.semantic_embedding_cache_dir is not None:
extra_kwargs["cache_dir"] = app_config.semantic_embedding_cache_dir
if app_config.semantic_embedding_threads is not None:
extra_kwargs["threads"] = app_config.semantic_embedding_threads
if app_config.semantic_embedding_parallel is not None:
extra_kwargs["parallel"] = app_config.semantic_embedding_parallel
if resolved_threads is not None:
extra_kwargs["threads"] = resolved_threads
if resolved_parallel is not None:
extra_kwargs["parallel"] = resolved_parallel

provider = FastEmbedEmbeddingProvider(
model_name=app_config.semantic_embedding_model,
Expand Down
9 changes: 9 additions & 0 deletions src/basic_memory/repository/fastembed_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ class FastEmbedEmbeddingProvider(EmbeddingProvider):
def _effective_parallel(self) -> int | None:
return self.parallel if self.parallel is not None and self.parallel > 1 else None

def runtime_log_attrs(self) -> dict[str, int | str | None]:
"""Return the resolved runtime knobs that shape FastEmbed throughput."""
return {
"provider_batch_size": self.batch_size,
"threads": self.threads,
"configured_parallel": self.parallel,
"effective_parallel": self._effective_parallel(),
}

def __init__(
self,
model_name: str = "bge-small-en-v1.5",
Expand Down
7 changes: 7 additions & 0 deletions src/basic_memory/repository/openai_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ def __init__(
self._client: Any | None = None
self._client_lock = asyncio.Lock()

def runtime_log_attrs(self) -> dict[str, int]:
"""Return the request fan-out knobs that shape API embedding batches."""
return {
"provider_batch_size": self.batch_size,
"request_concurrency": self.request_concurrency,
}

async def _get_client(self) -> Any:
if self._client is not None:
return self._client
Expand Down
Loading
Loading