Skip to content

Commit 36ca070

Browse files
committed
wip: tune fastembed defaults
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 3709287 commit 36ca070

File tree

4 files changed

+218
-6
lines changed

4 files changed

+218
-6
lines changed

src/basic_memory/repository/embedding_provider_factory.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Factory for creating configured semantic embedding providers."""
22

3+
import os
34
from threading import Lock
45

56
from basic_memory.config import BasicMemoryConfig
@@ -20,17 +21,54 @@
2021
_EMBEDDING_PROVIDER_CACHE_LOCK = Lock()
2122

2223

24+
def _available_cpu_count() -> int | None:
25+
"""Return the CPU budget available to this process when the runtime exposes it."""
26+
process_cpu_count = getattr(os, "process_cpu_count", None)
27+
if callable(process_cpu_count):
28+
cpu_count = process_cpu_count()
29+
if cpu_count is not None and cpu_count > 0:
30+
return cpu_count
31+
32+
cpu_count = os.cpu_count()
33+
return cpu_count if cpu_count is not None and cpu_count > 0 else None
34+
35+
36+
def _resolve_fastembed_runtime_knobs(app_config: BasicMemoryConfig) -> tuple[int | None, int | None]:
37+
"""Resolve FastEmbed threads/parallel from explicit config or CPU-aware defaults."""
38+
configured_threads = app_config.semantic_embedding_threads
39+
configured_parallel = app_config.semantic_embedding_parallel
40+
if configured_threads is not None or configured_parallel is not None:
41+
return configured_threads, configured_parallel
42+
43+
available_cpus = _available_cpu_count()
44+
if available_cpus is None:
45+
return None, None
46+
47+
# Trigger: local laptops and cloud workers expose different CPU budgets.
48+
# Why: FastEmbed throughput wants enough ONNX threads to use the machine,
49+
# but the multiprocessing-style ``parallel`` fan-out can add a lot of
50+
# overhead for this workload and make full rebuilds slower instead of faster.
51+
# Outcome: when config leaves the knobs unset, each process uses a bounded
52+
# thread count and keeps FastEmbed on the simpler single-process path.
53+
if available_cpus <= 2:
54+
return available_cpus, 1
55+
56+
threads = min(8, available_cpus)
57+
return threads, 1
58+
59+
2360
def _provider_cache_key(app_config: BasicMemoryConfig) -> ProviderCacheKey:
2461
"""Build a stable cache key from provider-relevant semantic embedding config."""
62+
resolved_threads, resolved_parallel = _resolve_fastembed_runtime_knobs(app_config)
2563
return (
2664
app_config.semantic_embedding_provider.strip().lower(),
2765
app_config.semantic_embedding_model,
2866
app_config.semantic_embedding_dimensions,
2967
app_config.semantic_embedding_batch_size,
3068
app_config.semantic_embedding_request_concurrency,
3169
app_config.semantic_embedding_cache_dir,
32-
app_config.semantic_embedding_threads,
33-
app_config.semantic_embedding_parallel,
70+
resolved_threads,
71+
resolved_parallel,
3472
)
3573

3674

@@ -61,12 +99,13 @@ def create_embedding_provider(app_config: BasicMemoryConfig) -> EmbeddingProvide
6199
# Deferred import: fastembed (and its onnxruntime dep) may not be installed
62100
from basic_memory.repository.fastembed_provider import FastEmbedEmbeddingProvider
63101

102+
resolved_threads, resolved_parallel = _resolve_fastembed_runtime_knobs(app_config)
64103
if app_config.semantic_embedding_cache_dir is not None:
65104
extra_kwargs["cache_dir"] = app_config.semantic_embedding_cache_dir
66-
if app_config.semantic_embedding_threads is not None:
67-
extra_kwargs["threads"] = app_config.semantic_embedding_threads
68-
if app_config.semantic_embedding_parallel is not None:
69-
extra_kwargs["parallel"] = app_config.semantic_embedding_parallel
105+
if resolved_threads is not None:
106+
extra_kwargs["threads"] = resolved_threads
107+
if resolved_parallel is not None:
108+
extra_kwargs["parallel"] = resolved_parallel
70109

71110
provider = FastEmbedEmbeddingProvider(
72111
model_name=app_config.semantic_embedding_model,

src/basic_memory/repository/search_repository_base.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,7 @@ async def _sync_entity_vectors_internal(
800800
batch_start = time.perf_counter()
801801
backend_name = type(self).__name__.removesuffix("SearchRepository").lower()
802802

803+
self._log_vector_sync_runtime_settings(backend_name=backend_name, entities_total=total_entities)
803804
logger.info(
804805
"Vector batch sync start: project_id={project_id} entities_total={entities_total} "
805806
"sync_batch_size={sync_batch_size} prepare_window_size={prepare_window_size}",
@@ -1595,6 +1596,51 @@ def _finalize_completed_entity_syncs(
15951596

15961597
return queue_wait_seconds_total
15971598

1599+
def _log_vector_sync_runtime_settings(self, *, backend_name: str, entities_total: int) -> None:
1600+
"""Log the resolved embedding runtime knobs before the first prepare window.
1601+
1602+
Trigger: a vector sync batch is about to start real work.
1603+
Why: operators need one place to confirm the provider/runtime settings that
1604+
this run will actually use, especially when threads/parallel are auto-tuned.
1605+
Outcome: the log shows the resolved values once per batch without changing
1606+
the hot-path control flow or adding more telemetry structure.
1607+
"""
1608+
assert self._embedding_provider is not None
1609+
1610+
from basic_memory.repository.fastembed_provider import FastEmbedEmbeddingProvider
1611+
1612+
provider = self._embedding_provider
1613+
if isinstance(provider, FastEmbedEmbeddingProvider):
1614+
logger.info(
1615+
"Vector batch runtime settings: project_id={project_id} backend={backend} "
1616+
"entities_total={entities_total} provider={provider} model_name={model_name} "
1617+
"dimensions={dimensions} provider_batch_size={provider_batch_size} "
1618+
"sync_batch_size={sync_batch_size} threads={threads} "
1619+
"configured_parallel={configured_parallel} effective_parallel={effective_parallel}",
1620+
project_id=self.project_id,
1621+
backend=backend_name,
1622+
entities_total=entities_total,
1623+
provider=type(provider).__name__,
1624+
model_name=provider.model_name,
1625+
dimensions=provider.dimensions,
1626+
provider_batch_size=provider.batch_size,
1627+
sync_batch_size=self._semantic_embedding_sync_batch_size,
1628+
threads=provider.threads,
1629+
configured_parallel=provider.parallel,
1630+
effective_parallel=provider._effective_parallel(),
1631+
)
1632+
return
1633+
1634+
logger.info(
1635+
"Vector batch runtime settings: project_id={project_id} backend={backend} "
1636+
"entities_total={entities_total} provider={provider} sync_batch_size={sync_batch_size}",
1637+
project_id=self.project_id,
1638+
backend=backend_name,
1639+
entities_total=entities_total,
1640+
provider=type(provider).__name__,
1641+
sync_batch_size=self._semantic_embedding_sync_batch_size,
1642+
)
1643+
15981644
def _log_vector_sync_complete(
15991645
self,
16001646
*,

tests/repository/test_openai_provider.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import pytest
99

1010
from basic_memory.config import BasicMemoryConfig
11+
import basic_memory.repository.embedding_provider_factory as embedding_provider_factory_module
1112
from basic_memory.repository.embedding_provider_factory import (
1213
create_embedding_provider,
1314
reset_embedding_provider_cache,
@@ -264,6 +265,52 @@ def test_embedding_provider_factory_forwards_fastembed_runtime_knobs():
264265
assert provider.parallel == 2
265266

266267

268+
def test_embedding_provider_factory_auto_tunes_fastembed_runtime_knobs_from_cpu_budget(monkeypatch):
269+
"""Unset FastEmbed runtime knobs should resolve from available CPU budget."""
270+
monkeypatch.setattr(embedding_provider_factory_module.os, "process_cpu_count", lambda: 8)
271+
monkeypatch.setattr(embedding_provider_factory_module.os, "cpu_count", lambda: 8)
272+
273+
config = BasicMemoryConfig(
274+
env="test",
275+
projects={"test-project": "/tmp/basic-memory-test"},
276+
default_project="test-project",
277+
semantic_search_enabled=True,
278+
semantic_embedding_provider="fastembed",
279+
semantic_embedding_threads=None,
280+
semantic_embedding_parallel=None,
281+
)
282+
283+
provider = create_embedding_provider(config)
284+
285+
assert isinstance(provider, FastEmbedEmbeddingProvider)
286+
assert provider.threads == 8
287+
assert provider.parallel == 1
288+
289+
290+
def test_embedding_provider_factory_auto_tuning_stays_conservative_on_small_cpu_budget(
291+
monkeypatch,
292+
):
293+
"""Small workers should not get an oversized FastEmbed runtime footprint."""
294+
monkeypatch.setattr(embedding_provider_factory_module.os, "process_cpu_count", lambda: 2)
295+
monkeypatch.setattr(embedding_provider_factory_module.os, "cpu_count", lambda: 2)
296+
297+
config = BasicMemoryConfig(
298+
env="test",
299+
projects={"test-project": "/tmp/basic-memory-test"},
300+
default_project="test-project",
301+
semantic_search_enabled=True,
302+
semantic_embedding_provider="fastembed",
303+
semantic_embedding_threads=None,
304+
semantic_embedding_parallel=None,
305+
)
306+
307+
provider = create_embedding_provider(config)
308+
309+
assert isinstance(provider, FastEmbedEmbeddingProvider)
310+
assert provider.threads == 2
311+
assert provider.parallel == 1
312+
313+
267314
def test_embedding_provider_factory_reuses_provider_for_same_cache_key():
268315
"""Factory should reuse the same provider instance for identical config values."""
269316
config_a = BasicMemoryConfig(
@@ -289,6 +336,36 @@ def test_embedding_provider_factory_reuses_provider_for_same_cache_key():
289336
assert provider_a is provider_b
290337

291338

339+
def test_embedding_provider_factory_reuses_auto_tuned_provider_for_same_cpu_budget(monkeypatch):
340+
"""Auto-tuned FastEmbed providers should still reuse the process cache."""
341+
monkeypatch.setattr(embedding_provider_factory_module.os, "process_cpu_count", lambda: 8)
342+
monkeypatch.setattr(embedding_provider_factory_module.os, "cpu_count", lambda: 8)
343+
344+
config_a = BasicMemoryConfig(
345+
env="test",
346+
projects={"test-project": "/tmp/basic-memory-test"},
347+
default_project="test-project",
348+
semantic_search_enabled=True,
349+
semantic_embedding_provider="fastembed",
350+
semantic_embedding_threads=None,
351+
semantic_embedding_parallel=None,
352+
)
353+
config_b = BasicMemoryConfig(
354+
env="test",
355+
projects={"test-project": "/tmp/basic-memory-test"},
356+
default_project="test-project",
357+
semantic_search_enabled=True,
358+
semantic_embedding_provider="fastembed",
359+
semantic_embedding_threads=None,
360+
semantic_embedding_parallel=None,
361+
)
362+
363+
provider_a = create_embedding_provider(config_a)
364+
provider_b = create_embedding_provider(config_b)
365+
366+
assert provider_a is provider_b
367+
368+
292369
@pytest.mark.asyncio
293370
async def test_openai_provider_runs_batches_concurrently_and_preserves_output_order(monkeypatch):
294371
"""Concurrent request fan-out should keep batch order stable."""

tests/repository/test_semantic_search_base.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import pytest
1313

1414
import basic_memory.repository.search_repository_base as search_repository_base_module
15+
from basic_memory.repository.fastembed_provider import FastEmbedEmbeddingProvider
1516
from basic_memory.repository.search_repository_base import (
1617
MAX_VECTOR_CHUNK_CHARS,
1718
SearchRepositoryBase,
@@ -702,3 +703,52 @@ async def _stub_flush(flush_jobs, entity_runtime, synced_entity_ids):
702703
assert histogram_names.count("vector_sync_write_seconds") == 2
703704
assert histogram_names.count("vector_sync_batch_total_seconds") == 1
704705
assert [name for name, _, _ in counter_calls].count("vector_sync_entities_total") == 1
706+
707+
708+
@pytest.mark.asyncio
709+
async def test_sync_entity_vectors_batch_logs_resolved_fastembed_runtime_settings(monkeypatch):
710+
"""Batch start should log the resolved FastEmbed knobs that shape this run."""
711+
repo = _ConcreteRepo()
712+
repo._semantic_enabled = True
713+
repo._embedding_provider = FastEmbedEmbeddingProvider(
714+
batch_size=128,
715+
dimensions=384,
716+
threads=4,
717+
parallel=2,
718+
)
719+
720+
async def _stub_prepare_window(entity_ids: list[int]):
721+
return [
722+
_PreparedEntityVectorSync(
723+
entity_id=entity_id,
724+
sync_start=0.0,
725+
source_rows_count=1,
726+
embedding_jobs=[],
727+
entity_skipped=True,
728+
)
729+
for entity_id in entity_ids
730+
]
731+
732+
info_calls: list[tuple[str, dict]] = []
733+
734+
def _capture_info(message: str, **kwargs):
735+
info_calls.append((message, kwargs))
736+
737+
monkeypatch.setattr(repo, "_prepare_entity_vector_jobs_window", _stub_prepare_window)
738+
monkeypatch.setattr(search_repository_base_module.logger, "info", _capture_info)
739+
740+
result = await repo.sync_entity_vectors_batch([1])
741+
742+
assert result.entities_synced == 1
743+
runtime_logs = [
744+
kwargs
745+
for message, kwargs in info_calls
746+
if message.startswith("Vector batch runtime settings:")
747+
]
748+
assert len(runtime_logs) == 1
749+
assert runtime_logs[0]["model_name"] == "bge-small-en-v1.5"
750+
assert runtime_logs[0]["provider_batch_size"] == 128
751+
assert runtime_logs[0]["sync_batch_size"] == 64
752+
assert runtime_logs[0]["threads"] == 4
753+
assert runtime_logs[0]["configured_parallel"] == 2
754+
assert runtime_logs[0]["effective_parallel"] == 2

0 commit comments

Comments
 (0)