diff --git a/CHANGELOG.md b/CHANGELOG.md index 48626ee88..e0c7061fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,19 @@ All notable changes to this project are documented in this file. Release notes are grouped by theme rather than listing every commit. +## [Unreleased] + +### Features + +- **Embedding cache rewritten on SQLite with per-utterance keys.** Embeddings are now cached one row per `(model, utterance, prompt)` in a single SQLite database (`/embeddings.db`) instead of one `.npy` file per call. Utterances shared across calls are embedded and stored once, so overlapping calls reuse the overlap — removing the old whole-list-or-nothing cache misses and the unbounded `.npy` inode growth. Writes are atomic and safe for concurrent processes/threads on one host (WAL). +- **`AUTOINTENT_CACHE_DIR`** environment variable to relocate the on-disk cache (defaults to the OS cache dir). It currently governs the embedding cache only; the structured-output cache is unchanged. + +### Notes + +- The new cache uses a different key scheme, so existing `.npy` embedding caches are not reused (a one-time recompute on first run). The old `embeddings/` directory is left untouched and may be deleted manually. + +--- + ## [0.3.2] — 2026-06-22 Compared to [0.3.1](https://github.com/deeppavlov/AutoIntent/releases/tag/v0.3.1). A maintenance release focused on caching correctness and CI/test coverage. No breaking changes. diff --git a/docs/superpowers/plans/2026-06-25-sqlite-embedding-cache.md b/docs/superpowers/plans/2026-06-25-sqlite-embedding-cache.md new file mode 100644 index 000000000..e577c2fdb --- /dev/null +++ b/docs/superpowers/plans/2026-06-25-sqlite-embedding-cache.md @@ -0,0 +1,1084 @@ +# SQLite per-utterance embedding cache — Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the `.npy`-file-per-call embedding cache with a single SQLite database keyed per utterance, lifting the triplicated cache code into one template method. + +**Architecture:** A new `SQLiteEmbeddingCache` stores one float32 vector per `(model, utterance, prompt)` key in `/embeddings.db`. `BaseEmbeddingBackend.embed` becomes a concrete template that splits a call into cache hits/misses, computes only misses via each backend's new `_embed_uncached`, and reassembles in input order. Cache location is configurable via `AUTOINTENT_CACHE_DIR`. + +**Tech Stack:** Python 3.10, stdlib `sqlite3` (no new dependency), numpy, xxhash (`Hasher`), pytest, ruff (`select=ALL`), mypy (strict). + +**Design spec:** `docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md` — read it before starting; it carries the rationale for every decision below. + +## Global Constraints + +- **Verification policy (maintainer rule):** Do **NOT** run heavy/exhaustive pytest locally — it can freeze the machine. The local gate for every task is **`ruff check`** + **`mypy src/autointent tests`** only. All pytest verification happens **on CI after the draft PR is pushed**. Each task's pytest commands are listed for reference / CI; the red→green TDD signal is: write the test first, gate locally on ruff+mypy, confirm on CI. +- **Scope:** Embedding cache only. Do **NOT** touch `src/autointent/generation/_cache.py` (structured-output cache) or any non-embedding subsystem. +- **No new dependency.** Stdlib `sqlite3` only. +- **mypy:** strict, `python_version = "3.10"`, covers **both** `src/autointent` and `tests`. Every new function/test needs full annotations. +- **ruff:** `select = ["ALL"]`, `target-version = "py310"`. New non-`utils` modules need module/class/function docstrings, `%`-style logging args (no f-strings in `logger.*`), named constants instead of magic numbers, `from __future__ import annotations`, `pathlib` for paths, `zip(..., strict=True)`. +- **No behavior change to per-utterance vector values** or to public `Embedder.embed` / backend `embed` signatures/return types. +- **Fresh start:** do not migrate or delete the old `.npy` cache. +- **Commit messages** end with: `Co-Authored-By: Claude Opus 4.8 `. + +--- + +## File Structure + +| File | Responsibility | +|---|---| +| `src/autointent/_cache_dir.py` (new) | `get_cache_dir()` — resolve cache base dir from `AUTOINTENT_CACHE_DIR` or appdirs | +| `src/autointent/_wrappers/embedder/_sqlite_cache.py` (new) | `SQLiteEmbeddingCache`, `utterance_key`, `get_embedding_cache`, constants | +| `src/autointent/_wrappers/embedder/base.py` (mod) | template `embed` + `_embed_cached` + `_to_tensor` + abstract `_embed_uncached` + `config`/`supports_cache` | +| `…/sentence_transformers.py`, `openai.py`, `vllm.py`, `hashing_vectorizer.py` (mod) | each: `config` narrowing + `_embed_uncached` | +| `…/utils.py` (delete) | obsolete `get_embeddings_path` | +| `tests/_fixtures/fake_openai_embedding.py` (mod) | `config` narrowing + `_embed_uncached` | +| `tests/conftest.py` (mod) | global autouse `AUTOINTENT_CACHE_DIR` isolation fixture | +| `tests/test_cache_dir.py` (new) | `get_cache_dir()` unit tests | +| `tests/embedder/test_sqlite_cache.py` (new) | `SQLiteEmbeddingCache` / `utterance_key` unit tests | +| `tests/embedder/test_caching.py` (mod) | per-utterance reuse / dedup / order / empty-input tests | +| `CHANGELOG.md` (mod) | Unreleased entry | + +--- + +## Task 1: Cache-dir helper + global test isolation fixture + +**Files:** +- Create: `src/autointent/_cache_dir.py` +- Modify: `tests/conftest.py` (add autouse fixture) +- Test: `tests/test_cache_dir.py` + +**Interfaces:** +- Produces: `get_cache_dir() -> pathlib.Path` (honors `AUTOINTENT_CACHE_DIR`, else `appdirs.user_cache_dir("autointent")`). + +- [ ] **Step 1: Write the failing test** — `tests/test_cache_dir.py` + +```python +from __future__ import annotations + +from typing import TYPE_CHECKING + +from autointent._cache_dir import get_cache_dir + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + + +def test_get_cache_dir_honors_env_var(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "custom")) + assert get_cache_dir() == tmp_path / "custom" + + +def test_get_cache_dir_falls_back_to_appdirs(monkeypatch: pytest.MonkeyPatch) -> None: + # The global autouse isolation fixture sets the env var for every test, so unset it here. + monkeypatch.delenv("AUTOINTENT_CACHE_DIR", raising=False) + result = get_cache_dir() + assert result.name == "autointent" or "autointent" in str(result) +``` + +- [ ] **Step 2: (reference) test command for CI** + +Run on CI: `pytest tests/test_cache_dir.py -v` → expected FAIL initially (`No module named autointent._cache_dir`). + +- [ ] **Step 3: Implement `src/autointent/_cache_dir.py`** + +```python +"""Resolution of the base directory for autointent on-disk caches.""" + +from __future__ import annotations + +import os +from pathlib import Path + +from appdirs import user_cache_dir + + +def get_cache_dir() -> Path: + """Return the base directory for autointent on-disk caches. + + Honors the ``AUTOINTENT_CACHE_DIR`` environment variable; otherwise falls back to + ``appdirs.user_cache_dir("autointent")``. Resolved fresh on each call so tests and + parallel workers can redirect it via the env var. + + Note: + Currently consumed only by the embedding cache. The structured-output cache + still uses ``user_cache_dir("autointent")`` directly and is unaffected by this + variable. + + Returns: + The cache base directory as a ``Path``. + """ + override = os.environ.get("AUTOINTENT_CACHE_DIR") + return Path(override) if override else Path(user_cache_dir("autointent")) +``` + +- [ ] **Step 4: Add the global autouse isolation fixture** to `tests/conftest.py` + +Append at the end of `tests/conftest.py` (it already imports `pytest` at runtime and `Path` under `TYPE_CHECKING`; the annotation stays unquoted because `from __future__ import annotations` is at the top — a quoted `"Path"` would trip ruff `UP037`): + +```python +@pytest.fixture(autouse=True) +def _isolate_embedding_cache(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Redirect the embedding SQLite cache to a per-test directory. + + Because ``use_cache`` defaults to True, any test that builds a default-config + embedder could otherwise write the embedding DB to the real OS cache dir. A unique + per-test ``tmp_path`` also keeps the per-utterance reuse test in + tests/embedder/test_caching.py hermetic (its two embeds must share one DB file). + """ + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "ai_cache")) +``` + +- [ ] **Step 5: Local gate** + +Run: `ruff check src/autointent/_cache_dir.py tests/test_cache_dir.py tests/conftest.py` +Run: `mypy src/autointent tests` +Expected: both clean. + +- [ ] **Step 6: Commit** + +```bash +git add src/autointent/_cache_dir.py tests/test_cache_dir.py tests/conftest.py +git commit -m "feat(cache): add get_cache_dir() + global embedding-cache test isolation + +Co-Authored-By: Claude Opus 4.8 " +``` + +--- + +## Task 2: `SQLiteEmbeddingCache` store + unit tests + +**Files:** +- Create: `src/autointent/_wrappers/embedder/_sqlite_cache.py` +- Test: `tests/embedder/test_sqlite_cache.py` + +**Interfaces:** +- Consumes: `get_cache_dir()` (Task 1), `autointent._hash.Hasher`. +- Produces: + - `SCHEMA_VERSION: int = 1`, `BUSY_TIMEOUT_MS: int = 30000` + - `utterance_key(model_hash: int, utterance: str, prompt: str | None) -> str` + - `SQLiteEmbeddingCache(db_path: Path)` with + `get_many(model_hash: int, keys: list[str]) -> dict[str, npt.NDArray[np.float32]]` and + `set_many(model_hash: int, entries: dict[str, npt.NDArray[np.float32]]) -> None` + - `get_embedding_cache() -> SQLiteEmbeddingCache` (memoized by resolved db path) + +- [ ] **Step 1: Write the failing tests** — `tests/embedder/test_sqlite_cache.py` + +```python +from __future__ import annotations + +import sqlite3 +from typing import TYPE_CHECKING + +import numpy as np + +from autointent._wrappers.embedder._sqlite_cache import ( + SCHEMA_VERSION, + SQLiteEmbeddingCache, + get_embedding_cache, + utterance_key, +) + +if TYPE_CHECKING: + from pathlib import Path + + import numpy.typing as npt + import pytest + + +def _vec(values: list[float]) -> npt.NDArray[np.float32]: + return np.asarray(values, dtype=np.float32) + + +def test_set_get_roundtrip(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(123, {"k1": _vec([1.0, 2.0, 3.0])}) + got = cache.get_many(123, ["k1"]) + assert set(got) == {"k1"} + np.testing.assert_array_equal(got["k1"], _vec([1.0, 2.0, 3.0])) + assert got["k1"].shape == (3,) + + +def test_get_partial_hit(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {"a": _vec([1.0, 1.0])}) + got = cache.get_many(1, ["a", "b"]) + assert set(got) == {"a"} + + +def test_get_empty_keys_returns_empty(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + assert cache.get_many(1, []) == {} + + +def test_set_empty_entries_is_noop(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {}) # must not create/raise + assert cache.get_many(1, ["anything"]) == {} + + +def test_model_hash_filter(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(111, {"shared": _vec([1.0, 2.0])}) + # A different model must not read model 111's row even for the same key string. + assert cache.get_many(222, ["shared"]) == {} + assert set(cache.get_many(111, ["shared"])) == {"shared"} + + +def test_insert_or_ignore_does_not_overwrite(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {"k": _vec([1.0, 2.0])}) + cache.set_many(1, {"k": _vec([9.0, 9.0])}) # ignored + np.testing.assert_array_equal(cache.get_many(1, ["k"])["k"], _vec([1.0, 2.0])) + + +def test_chunking_over_variable_limit(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + entries = {f"k{i}": _vec([float(i)]) for i in range(2000)} + cache.set_many(1, entries) + got = cache.get_many(1, list(entries)) + assert len(got) == 2000 + np.testing.assert_array_equal(got["k1999"], _vec([1999.0])) + + +def test_schema_version_and_columns(tmp_path: Path) -> None: + db = tmp_path / "e.db" + cache = SQLiteEmbeddingCache(db) + cache.set_many(1, {"k": _vec([1.0])}) # triggers schema init + with sqlite3.connect(db) as conn: + assert conn.execute("PRAGMA user_version").fetchone()[0] == SCHEMA_VERSION + cols = {row[1] for row in conn.execute("PRAGMA table_info(embeddings)")} + indexes = {row[1] for row in conn.execute("PRAGMA index_list(embeddings)")} + assert {"key", "model_hash", "dim", "vector", "size_bytes", "created_at", "last_accessed"} <= cols + assert { + "idx_embeddings_last_accessed", + "idx_embeddings_created_at", + "idx_embeddings_model_hash", + } <= indexes + + +def test_version_mismatch_triggers_rebuild(tmp_path: Path) -> None: + db = tmp_path / "e.db" + SQLiteEmbeddingCache(db).set_many(1, {"old": _vec([1.0])}) + # Simulate an older/newer schema: bump user_version so the next instance rebuilds. + with sqlite3.connect(db) as conn: + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION + 1}") + fresh = SQLiteEmbeddingCache(db) + fresh.set_many(1, {"new": _vec([2.0])}) # forces _ensure_schema -> rebuild + assert fresh.get_many(1, ["old"]) == {} # old row dropped by rebuild + + +def test_corrupted_db_degrades_to_miss(tmp_path: Path) -> None: + db = tmp_path / "e.db" + db.write_bytes(b"this is not a sqlite database") + cache = SQLiteEmbeddingCache(db) + # Must not raise; reads miss and writes no-op. + assert cache.get_many(1, ["k"]) == {} + cache.set_many(1, {"k": _vec([1.0])}) + + +def test_dim_mismatch_row_skipped(tmp_path: Path) -> None: + db = tmp_path / "e.db" + cache = SQLiteEmbeddingCache(db) + cache.set_many(1, {"k": _vec([1.0, 2.0])}) + # Corrupt the stored dim so blob length disagrees. + with sqlite3.connect(db) as conn: + conn.execute("UPDATE embeddings SET dim = 99 WHERE key = 'k'") + conn.commit() + assert cache.get_many(1, ["k"]) == {} # skipped, not raised + + +def test_get_embedding_cache_memoized_by_path(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "c")) + first = get_embedding_cache() + second = get_embedding_cache() + assert first is second + + +def test_utterance_key_distinctness() -> None: + base = utterance_key(1, "hello", None) + assert base == utterance_key(1, "hello", None) + assert base != utterance_key(2, "hello", None) + assert base != utterance_key(1, "world", None) + assert base != utterance_key(1, "hello", "Query:") +``` + +- [ ] **Step 2: (reference) test command for CI** + +Run on CI: `pytest tests/embedder/test_sqlite_cache.py -v` → expected FAIL initially (module missing). + +- [ ] **Step 3: Implement `src/autointent/_wrappers/embedder/_sqlite_cache.py`** + +```python +"""SQLite-backed per-utterance embedding cache. + +Stores one float32 vector per ``(model, utterance, prompt)`` key in a single SQLite +database, replacing the previous one-``.npy``-file-per-call cache. See +``docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md``. +""" + +from __future__ import annotations + +import logging +import sqlite3 +import threading +import time +from typing import TYPE_CHECKING, cast + +import numpy as np + +from autointent._cache_dir import get_cache_dir +from autointent._hash import Hasher + +if TYPE_CHECKING: + from pathlib import Path + + import numpy.typing as npt + +logger = logging.getLogger(__name__) + +SCHEMA_VERSION = 1 +BUSY_TIMEOUT_MS = 30_000 +_DB_FILENAME = "embeddings.db" +_FLOAT32_NBYTES = 4 +# SQLite's default SQLITE_MAX_VARIABLE_NUMBER is 999 on older builds; stay well under it. +_KEY_CHUNK_SIZE = 900 + +_CREATE_TABLE = """ +CREATE TABLE IF NOT EXISTS embeddings ( + key TEXT PRIMARY KEY, + model_hash TEXT NOT NULL, + dim INTEGER NOT NULL, + vector BLOB NOT NULL, + size_bytes INTEGER NOT NULL, + created_at REAL NOT NULL, + last_accessed REAL NOT NULL +) +""" +_CREATE_INDEXES = ( + "CREATE INDEX IF NOT EXISTS idx_embeddings_last_accessed ON embeddings(last_accessed)", + "CREATE INDEX IF NOT EXISTS idx_embeddings_created_at ON embeddings(created_at)", + "CREATE INDEX IF NOT EXISTS idx_embeddings_model_hash ON embeddings(model_hash)", +) +_INSERT = ( + "INSERT OR IGNORE INTO embeddings " + "(key, model_hash, dim, vector, size_bytes, created_at, last_accessed) " + "VALUES (?, ?, ?, ?, ?, ?, ?)" +) + + +def utterance_key(model_hash: int, utterance: str, prompt: str | None) -> str: + """Compute the per-utterance cache key from model identity, utterance, and prompt. + + Args: + model_hash: The backend's model-identity hash (``get_hash()``). + utterance: The original (non-prompted) utterance text. + prompt: The resolved task prompt, or ``None``. + + Returns: + A hex digest uniquely identifying ``(model_hash, utterance, prompt)``. + """ + hasher = Hasher() + hasher.update(model_hash) + hasher.update(utterance) + if prompt: + hasher.update(prompt) + return hasher.hexdigest() + + +class SQLiteEmbeddingCache: + """Per-utterance embedding cache backed by a single SQLite database. + + Thread-safe (a fresh short-lived connection per call) and process-safe on a local + filesystem (WAL + ``busy_timeout``). Never raises into callers: any cache I/O failure + degrades to a miss / no-op and is logged. + """ + + def __init__(self, db_path: Path) -> None: + """Initialize the cache bound to ``db_path`` (schema is created lazily).""" + self._db_path = db_path + self._initialized = False + self._init_lock = threading.Lock() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self._db_path, timeout=BUSY_TIMEOUT_MS / 1000, isolation_level=None) + conn.execute(f"PRAGMA busy_timeout = {BUSY_TIMEOUT_MS}") + conn.execute("PRAGMA synchronous = NORMAL") + return conn + + def _ensure_schema(self) -> None: + """Create the table/indexes once per instance; rebuild on a schema-version change. + + The version check + (re)create runs inside ``BEGIN IMMEDIATE`` with a post-lock + re-read of ``user_version`` so two processes opening a stale DB cannot double-drop. + """ + if self._initialized: + return + with self._init_lock: + if not self._initialized: # another thread may have initialized while we waited + self._db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._connect() + try: + mode = conn.execute("PRAGMA journal_mode = WAL").fetchone() + if mode is not None and str(mode[0]).lower() != "wal": + logger.debug("SQLite embedding cache: WAL unavailable (journal_mode=%s)", mode[0]) + conn.execute("BEGIN IMMEDIATE") + version = conn.execute("PRAGMA user_version").fetchone()[0] + if version != SCHEMA_VERSION: + conn.execute("DROP TABLE IF EXISTS embeddings") + conn.execute(_CREATE_TABLE) + for index_sql in _CREATE_INDEXES: + conn.execute(index_sql) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.execute("COMMIT") + finally: + conn.close() + self._initialized = True + + def get_many(self, model_hash: int, keys: list[str]) -> dict[str, npt.NDArray[np.float32]]: + """Return cached vectors for ``keys`` under ``model_hash`` (missing keys omitted).""" + if not keys: + return {} + model_hash_str = str(model_hash) + result: dict[str, npt.NDArray[np.float32]] = {} + try: + self._ensure_schema() + conn = self._connect() + try: + for start in range(0, len(keys), _KEY_CHUNK_SIZE): + chunk = keys[start : start + _KEY_CHUNK_SIZE] + placeholders = ",".join("?" * len(chunk)) + query = ( + "SELECT key, vector, dim FROM embeddings " # noqa: S608 - only '?' is interpolated; values are bound + f"WHERE model_hash = ? AND key IN ({placeholders})" + ) + for row_key, blob, dim in conn.execute(query, (model_hash_str, *chunk)): + vector = self._deserialize(blob, dim) + if vector is not None: + result[row_key] = vector + finally: + conn.close() + except (sqlite3.Error, OSError) as exc: + logger.warning("SQLite embedding cache read failed (%s); recomputing.", exc) + return {} + return result + + def set_many(self, model_hash: int, entries: dict[str, npt.NDArray[np.float32]]) -> None: + """Insert vectors for new keys under ``model_hash`` (existing keys are untouched).""" + if not entries: + return + model_hash_str = str(model_hash) + now = time.time() + rows: list[tuple[str, str, int, bytes, int, float, float]] = [] + for key, vector in entries.items(): + blob = np.ascontiguousarray(vector, dtype=np.float32).tobytes() + rows.append((key, model_hash_str, int(vector.shape[-1]), blob, len(blob), now, now)) + try: + self._ensure_schema() + conn = self._connect() + try: + conn.execute("BEGIN IMMEDIATE") + conn.executemany(_INSERT, rows) + conn.execute("COMMIT") + finally: + conn.close() + except (sqlite3.Error, OSError) as exc: + logger.warning("SQLite embedding cache write failed (%s); continuing uncached.", exc) + + @staticmethod + def _deserialize(blob: bytes, dim: int) -> npt.NDArray[np.float32] | None: + try: + if len(blob) != dim * _FLOAT32_NBYTES: + logger.warning("SQLite embedding cache: blob length %d != dim %d; skipping.", len(blob), dim) + return None + return cast("npt.NDArray[np.float32]", np.frombuffer(blob, dtype=np.float32)) + except Exception as exc: # noqa: BLE001 - a bad row must never break embed() + logger.warning("SQLite embedding cache: failed to deserialize a row (%s); skipping.", exc) + return None + + +_INSTANCES: dict[str, SQLiteEmbeddingCache] = {} +_INSTANCES_LOCK = threading.Lock() + + +def get_embedding_cache() -> SQLiteEmbeddingCache: + """Return the process-wide cache for the current cache dir (memoized by db path).""" + db_path = get_cache_dir() / _DB_FILENAME + key = str(db_path) + with _INSTANCES_LOCK: + cache = _INSTANCES.get(key) + if cache is None: + cache = SQLiteEmbeddingCache(db_path) + _INSTANCES[key] = cache + return cache +``` + +- [ ] **Step 4: Local gate** + +Run: `ruff check src/autointent/_wrappers/embedder/_sqlite_cache.py tests/embedder/test_sqlite_cache.py` +Run: `mypy src/autointent tests` +Expected: clean. If ruff flags `C901`/`PLR0912` on `_ensure_schema` or `get_many`, extract a small helper (e.g. `_run_schema_init(conn)`); do not add blanket noqas. + +- [ ] **Step 5: Commit** + +```bash +git add src/autointent/_wrappers/embedder/_sqlite_cache.py tests/embedder/test_sqlite_cache.py +git commit -m "feat(cache): add SQLiteEmbeddingCache per-utterance store + +Co-Authored-By: Claude Opus 4.8 " +``` + +--- + +## Task 3: Lift caching into the backend base + migrate all backends + +This is one atomic refactor: making `_embed_uncached` abstract forces every subclass to implement it in the same commit. Touches `base.py`, four backends, the test fake, and deletes `utils.py`. + +**Files:** +- Modify: `src/autointent/_wrappers/embedder/base.py` +- Modify: `…/sentence_transformers.py`, `…/openai.py`, `…/vllm.py`, `…/hashing_vectorizer.py` +- Modify: `tests/_fixtures/fake_openai_embedding.py` +- Delete: `src/autointent/_wrappers/embedder/utils.py` + +**Interfaces:** +- Consumes: `get_embedding_cache`, `utterance_key` (Task 2). +- Produces (on `BaseEmbeddingBackend`): concrete `embed(...)`; `_embed_cached(utterances, prompt)`; + `_to_tensor(embeddings) -> torch.Tensor`; abstract `_embed_uncached(utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]`; class attrs `config: EmbedderConfig`, `supports_cache: bool = True`. + +- [ ] **Step 1: Rewrite `base.py`** to the following full content + +```python +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Literal, cast, overload + +import numpy as np + +from ._sqlite_cache import get_embedding_cache, utterance_key + +if TYPE_CHECKING: + from pathlib import Path + + import numpy.typing as npt + import torch + + from autointent.configs import EmbedderConfig, TaskTypeEnum + + +class BaseEmbeddingBackend(ABC): + """Abstract base class for embedding backends.""" + + config: EmbedderConfig + supports_training: bool = False + supports_cache: bool = True + + @abstractmethod + def __init__(self, config: EmbedderConfig) -> None: + """Initialize the embedding backend with configuration.""" + ... + + @abstractmethod + def clear_ram(self) -> None: + """Clear the backend from RAM.""" + ... + + @overload + def embed( + self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] + ) -> torch.Tensor: ... + + @overload + def embed( + self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False + ) -> npt.NDArray[np.float32]: ... + + def embed( + self, + utterances: list[str], + task_type: TaskTypeEnum | None = None, + return_tensors: bool = False, + ) -> npt.NDArray[np.float32] | torch.Tensor: + """Calculate embeddings for a list of utterances, using a per-utterance cache. + + Empty input, ``use_cache=False``, or a backend that opts out of caching + (``supports_cache=False``) bypasses the cache and calls ``_embed_uncached`` + directly, preserving each backend's existing empty-input behavior. + + Args: + utterances: List of input texts to calculate embeddings for. + task_type: Type of task for which embeddings are calculated. + return_tensors: If True, return a PyTorch tensor; otherwise, a numpy array. + + Returns: + A numpy array or PyTorch tensor of embeddings. + """ + prompt = self.config.get_prompt(task_type) + if not utterances or not self.config.use_cache or not self.supports_cache: + embeddings = self._embed_uncached(utterances, prompt) + else: + embeddings = self._embed_cached(utterances, prompt) + if return_tensors: + return self._to_tensor(embeddings) + return embeddings + + def _embed_cached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Embed via the SQLite per-utterance cache: reuse hits, compute only misses.""" + cache = get_embedding_cache() + model_hash = self.get_hash() + keys = [utterance_key(model_hash, utterance, prompt) for utterance in utterances] + unique_keys = list(dict.fromkeys(keys)) + cached = cache.get_many(model_hash, unique_keys) + missing = [key for key in unique_keys if key not in cached] + if missing: + key_to_utterance: dict[str, str] = {} + for utterance, key in zip(utterances, keys, strict=True): + if key in cached or key in key_to_utterance: + continue + key_to_utterance[key] = utterance + missing_utterances = [key_to_utterance[key] for key in missing] + computed = self._embed_uncached(missing_utterances, prompt) + new_entries = {key: computed[index] for index, key in enumerate(missing)} + cache.set_many(model_hash, new_entries) + cached.update(new_entries) + return cast("npt.NDArray[np.float32]", np.stack([cached[key] for key in keys])) + + @abstractmethod + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute embeddings WITHOUT caching, returning a ``(N, dim)`` float32 array. + + The backend applies ``prompt`` in its own way (ST passes it to ``encode``; + OpenAI/vLLM prepend it; HashingVectorizer ignores it). Each backend keeps its + current empty-input behavior here (ST/OpenAI/vLLM raise; HV returns ``(0, dim)``). + """ + ... + + def _to_tensor(self, embeddings: npt.NDArray[np.float32]) -> torch.Tensor: + """Convert a numpy embedding matrix to a torch tensor (CPU by default).""" + import torch + + return torch.from_numpy(embeddings) + + @abstractmethod + def similarity( + self, embeddings1: npt.NDArray[np.float32], embeddings2: npt.NDArray[np.float32] + ) -> npt.NDArray[np.float32]: + """Calculate similarity between two sets of embeddings. + + Args: + embeddings1: First set of embeddings (size n). + embeddings2: Second set of embeddings (size m). + + Returns: + A numpy array of similarities (size n x m). + """ + ... + + @abstractmethod + def get_hash(self) -> int: + """Compute a hash value for the backend configuration and model state. + + Returns: + The hash value of the backend. + """ + ... + + @abstractmethod + def dump(self, path: Path) -> None: + """Save the backend state to disk. + + Args: + path: Path to the directory where the backend will be saved. + """ + ... + + @classmethod + @abstractmethod + def load(cls, path: Path) -> BaseEmbeddingBackend: + """Load the backend state from disk. + + Args: + path: Path to the directory where the backend is stored. + + Returns: + Loaded backend instance. + """ + ... +``` + +- [ ] **Step 2: Migrate `sentence_transformers.py`** + + 1. Remove the import `from .utils import get_embeddings_path` (line ~22). + 2. Add a narrowing class annotation just below the class docstring, beside `_model`: + ```python + class SentenceTransformerEmbeddingBackend(BaseEmbeddingBackend): + """SentenceTransformer-based embedding backend implementation.""" + + supports_training: bool = True + config: SentenceTransformerEmbeddingConfig + _model: SentenceTransformer | None + ``` + 3. Delete the entire `embed` method **and its two `@overload` stubs** (lines ~165–254) and replace with `_embed_uncached` + a `_to_tensor` override: + ```python + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute SentenceTransformer embeddings without caching.""" + if len(utterances) == 0: + msg = "Empty input" + logger.error(msg) + raise ValueError(msg) + + model = self._load_model() + logger.debug( + "Calculating embeddings with model %s, batch_size=%d, max_seq_length=%s, embedder_device=%s, prompt=%s", + self.config.model_name, + self.config.batch_size, + str(self.config.tokenizer_config.max_length), + self.config.device, + prompt, + ) + if self.config.tokenizer_config.max_length is not None: + model.max_seq_length = self.config.tokenizer_config.max_length + + embeddings = cast( + "npt.NDArray[np.float32]", + model.encode( + utterances, + convert_to_numpy=True, + batch_size=self.config.batch_size, + normalize_embeddings=True, + prompt=prompt, + ), + ) + return embeddings.astype(np.float32, copy=False) + + def _to_tensor(self, embeddings: npt.NDArray[np.float32]) -> torch.Tensor: + """Convert to a tensor on the configured device (preserves prior cache-hit behavior).""" + device = self.config.device or "cpu" + return torch.from_numpy(embeddings).to(device) + ``` + **Imports to remove (ruff F401):** drop `Literal, overload` from the `typing` import (keep `TYPE_CHECKING, cast`); drop `TaskTypeEnum` from the `if TYPE_CHECKING:` block (the old `embed` signature was its only user). **Keep** `cast` and `torch` (used by `_embed_uncached`/`_to_tensor`/`clear_ram`/`_set_training_seed`) and `npt`. + +- [ ] **Step 3: Migrate `openai.py`** + + 1. Remove `from .utils import get_embeddings_path` (line ~20). + 2. Add narrowing annotation under the class docstring: + ```python + class OpenaiEmbeddingBackend(BaseEmbeddingBackend): + """OpenAI-based embedding backend implementation.""" + + config: OpenaiEmbeddingConfig + _client: openai.OpenAI | None = None + _async_client: openai.AsyncOpenAI | None = None + ``` + 3. Replace the `embed` method **and its two `@overload` stubs** (lines ~169–241) with: + ```python + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute OpenAI embeddings without caching.""" + if len(utterances) == 0: + msg = "Empty input" + logger.error(msg) + raise ValueError(msg) + + if prompt: + utterances = [f"{prompt} {utterance}" for utterance in utterances] + + logger.debug( + "Calculating embeddings with OpenAI model %s, batch_size=%d, max_tokens_in_batch=%s, " + "dimensions=%s, prompt=%s, max_concurrent=%s", + self.config.model_name, + self.config.batch_size, + str(self.config.max_tokens_in_batch), + str(self.config.dimensions), + prompt, + self.config.max_concurrent, + ) + + if self.config.max_concurrent is not None: + return self._process_embeddings_async(utterances) + return self._process_embeddings_sync(utterances) + ``` + **Imports to remove (ruff F401):** drop `Literal, overload` from the `typing` import (keep `cast`, used by `similarity`); remove `import torch` (line ~13 — only the old `embed` used it); drop `TaskTypeEnum` from the `if TYPE_CHECKING:` block. **Keep** `np`, `npt`, and `Hasher` (used by `get_hash`). + +- [ ] **Step 4: Migrate `vllm.py`** + + 1. Remove `from .utils import get_embeddings_path` (line ~17). + 2. Add narrowing annotation under the class docstring: + ```python + class VllmEmbeddingBackend(BaseEmbeddingBackend): + """vLLM-based embedding backend implementation.""" + + supports_training: bool = False + config: VllmEmbeddingConfig + ``` + 3. Replace the `embed` method (lines ~80–139, no overloads in this file) with: + ```python + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute vLLM embeddings without caching.""" + if len(utterances) == 0: + msg = "Empty input" + logger.error(msg) + raise ValueError(msg) + + if prompt: + utterances = [f"{prompt} {utterance}" for utterance in utterances] + + model = self._load_model() + logger.debug( + "Calculating embeddings with vLLM model %s, batch_size=%d", + self.config.model_name, + self.config.batch_size, + ) + outputs = model.encode(utterances, pooling_task="embed", **self.config.extra_encode_kwargs) + all_embeddings = [output.outputs.embedding for output in outputs] + return np.array(all_embeddings, dtype=np.float32) + ``` + **Imports to remove (ruff F401):** drop `TaskTypeEnum` from the `if TYPE_CHECKING:` block (the old `embed` signature was its only user). **Keep** `cast` (used by `similarity`), `torch` (used by `clear_ram`), `np`, `npt`, and `Hasher` (used by `get_hash`). (This file has no `embed` overloads to remove.) + +- [ ] **Step 5: Migrate `hashing_vectorizer.py`** + + 1. Add narrowing annotation + `supports_cache = False` under the class docstring: + ```python + class HashingVectorizerEmbeddingBackend(BaseEmbeddingBackend): + """HashingVectorizer-based embedding backend implementation. + + This backend uses sklearn's HashingVectorizer for fast, stateless text vectorization. + Ideal for testing as it requires no model downloads and is very fast. + """ + + supports_training: bool = False + supports_cache: bool = False + config: HashingVectorizerEmbeddingConfig + ``` + 2. Replace the `embed` method **and its two `@overload` stubs** (lines ~77–109) with: + ```python + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: # noqa: ARG002 + """Compute HashingVectorizer embeddings (prompt is ignored; never cached).""" + embeddings_sparse = self._vectorizer.transform(utterances) + embeddings: npt.NDArray[np.float32] = embeddings_sparse.toarray().astype(np.float32) + return embeddings + ``` + **Imports to remove (ruff F401):** drop `Literal, overload` from the `typing` import; remove `import torch` (only the old `embed` used it); remove `from autointent.configs import TaskTypeEnum` (a runtime import on line ~15, now unused). **Keep** `np`, `npt`, and `Hasher` (used by `get_hash`). `# noqa: ARG002` on `_embed_uncached` covers the unused `prompt` parameter. + +- [ ] **Step 6: Migrate `tests/_fixtures/fake_openai_embedding.py`** + + 1. Add narrowing annotation under the class docstring: + ```python + class FakeOpenaiEmbeddingBackend(BaseEmbeddingBackend): + """In-process stand-in for OpenaiEmbeddingBackend. ... (keep existing docstring)""" + + supports_training = False + config: OpenaiEmbeddingConfig + ``` + (`OpenaiEmbeddingConfig` is already imported under `TYPE_CHECKING` in this file.) + 2. Replace the `embed` method **and its two `@overload` stubs** with: + ```python + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + # Touch the lazy attribute so test_client_lazy_loading observes the transition. + self._client = self._client or object() + dim = self.config.dimensions or 1536 + # Prompt is already resolved by the base; mirror BaseEmbedderConfig.get_prompt seeding. + seed_extra = f"{self.config.model_name}|{prompt or ''}" + vectors: npt.NDArray[np.float32] = np.stack( + [_seeded_vector(text, dim, seed_extra=seed_extra) for text in utterances] + ) + return vectors + ``` + **Imports to remove (ruff F401):** drop `Literal, overload` from the `typing` import; remove `import torch` (only the old `embed` used it); drop `TaskTypeEnum` from the `if TYPE_CHECKING:` block. **Keep** `np`, `npt`, `pytest` (used by the `patch_openai_embedding_backend` fixture), `hashlib`, `json`, and `OpenaiEmbeddingConfig`. The fake now inherits `embed`/`_to_tensor` from the base. + +- [ ] **Step 7: Delete the obsolete util** + +```bash +git rm src/autointent/_wrappers/embedder/utils.py +``` + +(Confirm no remaining importer: `grep -rn get_embeddings_path src tests` returns nothing.) + +- [ ] **Step 8: Local gate** + +Run: `grep -rn "get_embeddings_path" src tests` → expect no output. +Run: `ruff check src/autointent/_wrappers/embedder tests/_fixtures/fake_openai_embedding.py` +Run: `mypy src/autointent tests` +Expected: all clean. (mypy must show no `attr-defined` on `self.config.*`; this validates the narrowing.) + +- [ ] **Step 9: (reference) CI test commands** + +On CI: `pytest tests/embedder/test_caching.py tests/embedder/test_hash.py tests/embedder/test_memory.py tests/embedder/test_dump_load.py tests/embedder/test_openai_backend.py tests/embedder/test_prompts.py -v` → expect PASS (consistency preserved). + +- [ ] **Step 10: Commit** + +```bash +git add -A src/autointent/_wrappers/embedder tests/_fixtures/fake_openai_embedding.py +git commit -m "refactor(embedder): lift embedding cache into a per-utterance template method + +Move the triplicated .npy cache block out of the ST/OpenAI/vLLM backends into a +single BaseEmbeddingBackend.embed template backed by SQLiteEmbeddingCache. Backends +now implement _embed_uncached; HashingVectorizer opts out via supports_cache=False. + +Co-Authored-By: Claude Opus 4.8 " +``` + +--- + +## Task 4: Per-utterance behavior tests (reuse / dedup / order / empty) + +**Files:** +- Modify: `tests/embedder/test_caching.py` (append new tests) + +**Interfaces:** +- Consumes: `Embedder`, `create_sentence_transformer_config`, the global isolation fixture (Task 1), the SQLite store (Task 2), the refactored backends (Task 3). + +- [ ] **Step 1: Append the new tests** to `tests/embedder/test_caching.py` + +Add these runtime imports at the top (next to the existing ones — `os`, `sqlite3`, and `Path` are used at runtime here via `Path(os.environ[...])`): + +```python +import os +import sqlite3 +from pathlib import Path + +from autointent.configs import HashingVectorizerEmbeddingConfig +``` + +And add `import numpy.typing as npt` to the file's existing `if TYPE_CHECKING:` block (used only in the `spy` annotation below). + +Append: + +```python +def _embedding_row_count() -> int: + db_path = Path(os.environ["AUTOINTENT_CACHE_DIR"]) / "embeddings.db" + if not db_path.exists(): + return 0 + with sqlite3.connect(db_path) as conn: + return int(conn.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0]) + + +class TestPerUtteranceCaching: + """Per-utterance keying: shared utterances are stored once and reused across calls.""" + + def test_overlapping_calls_store_each_utterance_once(self) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + + embedder.embed(["alpha", "beta"]) + embedder.embed(["beta", "gamma"]) # 'beta' overlaps + + # Whole-list keying would store 2 list blobs; per-utterance stores 3 rows. + assert _embedding_row_count() == 3 + + def test_duplicate_in_list_computed_once(self, monkeypatch: pytest.MonkeyPatch) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + backend = embedder._backend + + computed: list[list[str]] = [] + original = backend._embed_uncached + + def spy(utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + computed.append(list(utterances)) + return original(utterances, prompt) + + monkeypatch.setattr(backend, "_embed_uncached", spy) + + result = embedder.embed(["dup", "dup"]) + + assert result.shape[0] == 2 + np.testing.assert_array_equal(result[0], result[1]) + assert computed == [["dup"]] # computed only once + + def test_order_preserved_after_partial_hit(self) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + + first = embedder.embed(["one", "two", "three"]) + second = embedder.embed(["three", "one", "two"]) # reordered, fully cached + + np.testing.assert_allclose(second[0], first[2], rtol=1e-5) + np.testing.assert_allclose(second[1], first[0], rtol=1e-5) + np.testing.assert_allclose(second[2], first[1], rtol=1e-5) + + def test_empty_input_hashing_vectorizer_returns_empty(self) -> None: + embedder = Embedder(HashingVectorizerEmbeddingConfig(n_features=512, use_cache=True)) + result = embedder.embed([]) + assert result.shape == (0, 512) + + def test_empty_input_sentence_transformer_raises(self) -> None: + embedder = Embedder(create_sentence_transformer_config(use_cache=True)) + with pytest.raises(ValueError, match="Empty input"): + embedder.embed([]) +``` + +- [ ] **Step 2: Local gate** + +Run: `ruff check tests/embedder/test_caching.py` +Run: `mypy src/autointent tests` +Expected: clean. (Accessing `embedder._backend` / `backend._embed_uncached` is fine in tests; if ruff flags `SLF001` here, add `# noqa: SLF001` on those lines — the tests/ ruff profile typically already relaxes it.) + +- [ ] **Step 3: (reference) CI test command** + +On CI: `pytest tests/embedder/test_caching.py -v` → expect PASS. + +- [ ] **Step 4: Commit** + +```bash +git add tests/embedder/test_caching.py +git commit -m "test(cache): cover per-utterance reuse, dedup, order, and empty input + +Co-Authored-By: Claude Opus 4.8 " +``` + +--- + +## Task 5: CHANGELOG entry + +**Files:** +- Modify: `CHANGELOG.md` (repo root) + +- [ ] **Step 1: Insert an Unreleased section** at the top of `CHANGELOG.md`, immediately after the intro paragraph and before `## [0.3.2] — 2026-06-22`: + +```markdown +## [Unreleased] + +### Features + +- **Embedding cache rewritten on SQLite with per-utterance keys.** Embeddings are now cached one row per `(model, utterance, prompt)` in a single SQLite database (`/embeddings.db`) instead of one `.npy` file per call. Utterances shared across calls are embedded and stored once, so overlapping calls reuse the overlap — removing the old whole-list-or-nothing cache misses and the unbounded `.npy` inode growth. Writes are atomic and safe for concurrent processes/threads on one host (WAL). +- **`AUTOINTENT_CACHE_DIR`** environment variable to relocate the on-disk cache (defaults to the OS cache dir). It currently governs the embedding cache only; the structured-output cache is unchanged. + +### Notes + +- The new cache uses a different key scheme, so existing `.npy` embedding caches are not reused (a one-time recompute on first run). The old `embeddings/` directory is left untouched and may be deleted manually. + +--- +``` + +- [ ] **Step 2: Local gate** + +Run: `git diff --stat CHANGELOG.md` → expect only additions. (No ruff/mypy on Markdown.) + +- [ ] **Step 3: Commit** + +```bash +git add CHANGELOG.md +git commit -m "docs(changelog): note SQLite embedding cache and AUTOINTENT_CACHE_DIR + +Co-Authored-By: Claude Opus 4.8 " +``` + +--- + +## Final verification (before opening the draft PR) + +- [ ] **Whole-tree static gate:** `ruff check .` and `mypy src/autointent tests` → both clean. +- [ ] **Grep guards:** `grep -rn "get_embeddings_path" src tests` (empty); `grep -rn "from .utils import" src/autointent/_wrappers/embedder` (empty). +- [ ] **Push branch + open draft PR**, then inspect CI (the only place pytest runs). Iterate on CI failures by pushing fixes. Key CI signals to watch: the `tests/embedder/*` suite (consistency + new behavior), the 85% **combined** coverage floor (Task 2 tests cover the main `_sqlite_cache.py` branches; a few defensive branches — the WAL-unavailable debug log, the double-checked-lock re-entry, the `_deserialize` `except` — are hard to hit single-threaded and may stay uncovered, which is fine against the combined total per the spec), and mypy on Python 3.10. + +--- + +## Self-Review (completed by plan author) + +**Spec coverage:** §4.1 cache-dir → Task 1. §4.2 utterance_key → Task 2. §4.3 SQLite store (schema, pragmas, versioning, degradation, model_hash filter, chunking, memoized accessor) → Task 2. §4.4 template method + `supports_cache` + per-subclass `config` narrowing + `_embed_uncached` per backend + fake → Task 3. §6.1 unit tests → Task 2. §6.2 global isolation fixture → Task 1. §6.3 reuse/dedup/order/empty tests → Task 4. §7 file list (incl. `utils.py` removal) → Tasks 1–4. CHANGELOG → Task 5. All covered. + +**Placeholder scan:** No TBD/TODO; all steps carry complete code or exact commands. + +**Type consistency:** `get_many(model_hash, keys)` / `set_many(model_hash, entries)` / `utterance_key(model_hash, utterance, prompt)` / `_embed_uncached(utterances, prompt)` / `_to_tensor(embeddings)` / `get_embedding_cache()` are used identically across Tasks 2, 3, and 4. `supports_cache` set on base (True) and HV (False) consistently. diff --git a/docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md b/docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md new file mode 100644 index 000000000..f8de633e4 --- /dev/null +++ b/docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md @@ -0,0 +1,569 @@ +# Design: SQLite per-utterance embedding cache + +**Date:** 2026-06-25 +**Status:** Approved (scope decisions confirmed by maintainer); revised after adversarial review round 1. +**Scope owner:** voorhs + +## 1. Motivation + +AutoIntent caches embeddings as one NumPy `.npy` file per `embed()` call, named by +`hash(model_identity + entire_utterance_list + prompt)`, under +`appdirs.user_cache_dir("autointent")/embeddings/`. This has three structural problems: + +1. **Whole-list keying = zero reuse.** Two calls whose utterance lists differ by even one + element (reorder, add, drop) produce completely different files and full cache misses. + A shared utterance embedded in 50 different lists is recomputed and re-stored 50 times. +2. **Inode explosion.** Every distinct list is its own file. Long-running optimization with many + search-space points and folds produces thousands of `.npy` files with no index, no bound, + no eviction. +3. **Triplicated, non-atomic cache code.** The identical read/key/write block is copy-pasted + across three backends (`sentence_transformers.py`, `openai.py`, `vllm.py`). Writes are a bare + `np.save` with no atomicity and no concurrency story for parallel Optuna workers. + +The fix is two coordinated changes: + +- **Per-utterance keying:** key each utterance by `hash(model_identity + utterance + prompt)`, + one row per utterance. Shared utterances are stored once; a call that overlaps a previous call + reuses the overlap (partial hits). +- **SQLite store:** move the embedding cache behind a single SQLite database. One file instead of + K inodes, atomic transactions, indexed point lookups, safe single-host concurrent access (WAL), + and the schema groundwork for future eviction/TTL. + +**Honest scoping (per maintainer):** the warm read path is already sub-millisecond; SQLite will +**not** make cache *hits* faster. Its value is **correctness** (atomic writes), **operability** +(one file, future eviction, concurrency), and **enabling per-utterance keys without an inode +explosion**. We justify it by operational pain, not hit latency. + +## 2. Goals and non-goals + +### Goals +- Replace the `.npy`-per-list embedding cache with a single SQLite database, keyed **per utterance**. +- Deduplicate within and across calls: a given `(model, utterance, prompt)` is computed and stored once. +- Eliminate the triplicated cache code by lifting caching into `BaseEmbeddingBackend` as a template method. +- Add a configurable cache location via the `AUTOINTENT_CACHE_DIR` environment variable + helper, + defaulting to today's `appdirs.user_cache_dir("autointent")`. +- Lay **schema groundwork** for eviction (`created_at`, `last_accessed`, `size_bytes`, `model_hash` + columns + indexes) without changing today's unbounded behavior. +- Safe concurrent access from multiple processes (parallel Optuna trials) and threads **on one host**. +- Graceful degradation: a cache I/O failure logs and falls back to recompute; it never breaks `embed()`. + +### Non-goals (explicitly out of scope) +- **The structured-output / LLM cache** (`generation/_cache.py`) is **not touched.** It keeps its + current directory-per-entry format and its direct `user_cache_dir("autointent")` calls. It does + **not** honor `AUTOINTENT_CACHE_DIR` in this PR (documented limitation; a later PR may adopt the helper). +- **No active eviction policy.** No size cap, no TTL enforcement, no LRU sweeping. Columns + indexes + only. The cache stays unbounded by default, matching today. +- **No migration of existing `.npy` caches.** Fresh start. The old whole-list hashes cannot be + decomposed into per-utterance rows (the original utterances were never stored), so migration is + infeasible. Old files are left as orphans (the user may delete them). +- **No change to the public `Embedder.embed` / backend `embed` signatures or return types**, and no + change to any backend's per-utterance vector values. +- **No new third-party dependency.** Uses the Python stdlib `sqlite3`. +- **No LMDB / Parquet / memmap sidecar.** Per-utterance vectors from the cached backends (ST, OpenAI, + vLLM) are small (≈1.5–4 KB); a BLOB column is the right fit. The one high-dimensional backend, + HashingVectorizer (default `n_features = 2**18` ≈ 1 MB/vector), is excluded from caching entirely via + `supports_cache = False` (§4.4), so it never reaches the BLOB store. A sidecar is future work only. +- **No widening of the hash to 128-bit.** The existing 64-bit `Hasher` (xxh64) keying strength is + retained (see §4.2 collision discussion); cross-model collisions are additionally defended. + +## 3. Current state (reference) + +- `BaseEmbeddingBackend` (`_wrappers/embedder/base.py`): abstract `embed`, `get_hash`, `similarity`, + `clear_ram`, `dump`, `load`. `__init__` is abstract with an empty body; **the ABC does not declare + a `config` attribute** (each concrete backend assigns `self.config`). +- Four backends implement `embed` independently: `SentenceTransformerEmbeddingBackend`, + `OpenaiEmbeddingBackend`, `VllmEmbeddingBackend`, `HashingVectorizerEmbeddingBackend`. + The first three contain the duplicated cache block; HashingVectorizer has no cache block. + A fifth subclass, `FakeOpenaiEmbeddingBackend` (`tests/_fixtures/fake_openai_embedding.py`), is the + test stand-in swapped in for the real OpenAI backend across `tests/embedder/` via an autouse fixture. + **These five are the complete set of `BaseEmbeddingBackend` subclasses** (verified by grep; no + cross-encoder/ranker/server subclass exists). +- Cache key: `Hasher()` (xxhash-64, pickle-based) over `get_hash()` (model identity) + the whole + `utterances` list + `prompt` (if non-empty). +- `get_hash()` differs per backend (model name + HF commit SHA + max_length for ST; model name + + dimensions + max_tokens for OpenAI; model name + max_model_len for vLLM; config params for HV). +- Prompt handling differs: ST passes `prompt=` to `model.encode`; OpenAI and vLLM **prepend** + `f"{prompt} {utterance}"` before encoding. +- Empty-input behavior differs: ST/OpenAI/vLLM raise `ValueError("Empty input")`; HashingVectorizer + returns a `(0, n_features)` array. +- Cache path: `get_embeddings_path(hexdigest)` → `user_cache_dir("autointent")/embeddings/.npy`. + Only the three backends import it; `utils.py` contains nothing else. +- **`use_cache` defaults to `True`** (`configs/_embedder.py:33`). It is **not** generally off: + `tests/callback/test_callback.py` and `tests/assets/configs/full_training.yaml` use HV/embedders + with caching on, and `tests/embedder/test_caching.py` flips it on. +- **Test gap:** several suites run with `use_cache=True` but **no fixture redirects the cache + directory**, so they write to the **real OS cache dir**. The new config seam + a global isolation + fixture will fix this for the whole test tree (§6.2). + +## 4. Design + +### 4.1 Cache directory resolution — `autointent/_cache_dir.py` + +```python +def get_cache_dir() -> Path: + """Base directory for autointent on-disk caches. + + Honors the AUTOINTENT_CACHE_DIR environment variable; otherwise falls back to + appdirs.user_cache_dir("autointent"). Resolved fresh on each call so tests and + parallel workers can point it at an isolated directory via the env var. + + NOTE: currently consumed only by the embedding cache. The structured-output + cache still uses user_cache_dir("autointent") directly and is unaffected by + this variable (documented limitation; see CHANGELOG). + """ + override = os.environ.get("AUTOINTENT_CACHE_DIR") + return Path(override) if override else Path(user_cache_dir("autointent")) +``` + +- `AUTOINTENT_CACHE_DIR` matches the existing `AUTOINTENT_`-prefixed env convention + (`AUTOINTENT_PATH`, `AUTOINTENT_EXTRA_VALIDATION`, server `env_prefix="AUTOINTENT_"`). +- The embedding DB lives at `get_cache_dir() / "embeddings.db"` (replacing the `embeddings/` dir of + `.npy` files). WAL adds `embeddings.db-wal` and `embeddings.db-shm` sidecars — still ~3 files vs. + K inodes. + +### 4.2 Per-utterance key — in `autointent/_wrappers/embedder/_sqlite_cache.py` + +```python +def utterance_key(model_hash: int, utterance: str, prompt: str | None) -> str: + hasher = Hasher() + hasher.update(model_hash) + hasher.update(utterance) + if prompt: + hasher.update(prompt) + return hasher.hexdigest() +``` + +Mirrors the existing scheme but on a single string instead of the whole list. `model_hash` is the +backend's existing `get_hash()` (so all model-identity stability work from #321/#334 is reused +unchanged). `prompt` is the resolved task prompt, included only when non-empty (matches current +behavior). The key is the **original** utterance text — not the prompt-prepended form — so a backend's +internal prompt application stays an implementation detail of `_embed_uncached`. + +**Collision discussion.** The key is a 64-bit xxhash hexdigest, the same strength as today's +whole-list key, so this is not a regression in hash kind. Per-utterance keying produces more distinct +keys than per-list keying, so the absolute collision probability rises, but remains negligible +(~5e-10 at 1e5 utterances). Two cases: +- **Cross-model collision** (two different `model_hash` values producing the same key string): defended + by storing `model_hash` and filtering reads with `AND model_hash = ?` (§4.3). A cross-model collision + becomes a cache **miss**, never a wrong vector. Because the primary key is `key` alone and writes use + `INSERT OR IGNORE`, the second model can never store its colliding key (the first model's row wins), so + for that one key the second model takes a **permanent** miss + recompute. This is documented and + accepted (probability ~5e-10); a composite `(key, model_hash)` PK is a possible future refinement. +- **Same-model collision** (same model+prompt, different utterance, same 64-bit digest): would return a + wrong vector, exactly as today's scheme could; accepted as astronomically rare. Not mitigated further + in this PR (widening to 128-bit is a non-goal). + +**Backward compatibility:** this is a brand-new keying scheme and a brand-new store. All existing +`.npy` caches are invalid and ignored (the approved fresh start). First run after upgrade recomputes; +subsequent runs hit the new cache. + +### 4.3 SQLite store — `SQLiteEmbeddingCache` + +**Schema (version 1):** + +```sql +CREATE TABLE IF NOT EXISTS embeddings ( + key TEXT PRIMARY KEY, -- utterance_key() hexdigest + model_hash TEXT NOT NULL, -- str(get_hash()); cross-model filter + per-model purge + dim INTEGER NOT NULL, -- vector length + vector BLOB NOT NULL, -- float32 bytes, C-contiguous, length dim + size_bytes INTEGER NOT NULL, -- len(vector blob); eviction groundwork + created_at REAL NOT NULL, -- time.time() at insert + last_accessed REAL NOT NULL -- = created_at at insert (see note) +); +CREATE INDEX IF NOT EXISTS idx_embeddings_last_accessed ON embeddings(last_accessed); +CREATE INDEX IF NOT EXISTS idx_embeddings_created_at ON embeddings(created_at); +CREATE INDEX IF NOT EXISTS idx_embeddings_model_hash ON embeddings(model_hash); +``` + +- `model_hash` is stored as **TEXT** because `Hasher.intdigest()` is an unsigned 64-bit value that can + exceed SQLite's signed-64-bit `INTEGER` range. The public methods take `model_hash: int` but **bind + `str(model_hash)` on every path** (the INSERT values *and* the `WHERE model_hash = ?` filter); binding a + raw int > 2**63-1 would raise `OverflowError`, so the `str()` is mandatory, not cosmetic. +- **HARD INVARIANT — vector serialization.** Always store + `np.ascontiguousarray(vec, dtype=np.float32).tobytes()`. The `dtype=np.float32` coercion is + load-bearing: if a float64 vector were stored, the blob would be `8*dim` bytes and every read would + fail the `len(blob)//4 == dim` check forever (permanent miss + repeated wasted writes). Reconstruct + with `cast("npt.NDArray[np.float32]", np.frombuffer(blob, dtype=np.float32))` and validate length + against `dim`. `np.frombuffer` returns a **read-only** array; this is safe only because + `_embed_cached` always `np.stack`s (copies) before `_to_tensor` — do **not** add a single-utterance + fast path that hands a frombuffer view to torch. + +**Connection & pragmas.** `_connect()` opens a connection with `isolation_level=None` (autocommit; we +issue explicit `BEGIN IMMEDIATE` / `COMMIT` for the write paths) and applies, **per connection in +autocommit mode** (never inside an open transaction): +- `PRAGMA busy_timeout=30000` (30 s) — writers wait instead of raising "database is locked". Generous to + absorb many parallel trials flushing a fold's rows at once. Module constant `BUSY_TIMEOUT_MS` + (could become configurable later; not in this PR). +- `PRAGMA synchronous=NORMAL` — safe with WAL, faster than FULL; on power loss you may lose the last + transaction but the DB does not corrupt — acceptable for a cache. + +`PRAGMA journal_mode=WAL` is set **once at schema init**, in autocommit before the `BEGIN IMMEDIATE` +(setting WAL inside a transaction fails). It persists in the DB file, so later connections inherit it. +If the underlying filesystem does not support WAL the PRAGMA returns the actual mode without raising; we +log a debug line and the cache still works, only with weaker concurrency. + +**Single-host assumption.** WAL's shared-memory index (`-shm`) means multi-process safety holds **only +for processes on the same host.** Pointing `AUTOINTENT_CACHE_DIR` at a network filesystem (NFS/SMB) +shared across nodes is unsupported and may corrupt. This is documented (helper docstring + CHANGELOG); +no NFS detection/fallback is implemented (out of scope). + +**Schema init + versioning (cross-process safe).** `PRAGMA user_version` holds `SCHEMA_VERSION` (1). +Schema-ensure runs **once per cache instance** (guarded by an instance flag + lock); after the WAL +pragma (autocommit), the version-check-and-create steps run inside a single `BEGIN IMMEDIATE` write +transaction to be atomic against other processes: +1. ensure WAL (autocommit, see above); +2. open `BEGIN IMMEDIATE` (acquire the write lock); +3. re-read `user_version` **after** acquiring the lock; +4. if `user_version == SCHEMA_VERSION`, do nothing (another process already initialized at this version); + otherwise `DROP TABLE IF EXISTS embeddings`, `CREATE TABLE` + indexes, and `PRAGMA user_version = + SCHEMA_VERSION`. (A fresh DB starts at `user_version == 0`, so it takes this branch and is created; + there is no separate "table absent" case to special-case.) +5. `COMMIT`. +Re-reading under the write lock closes the two-process race where both see a stale version and double-drop +(the second would otherwise destroy the first's fresh rows). A version bump = automatic, safe fresh start. +A rolling upgrade where two processes run different `SCHEMA_VERSION` values causes repeated rebuilds / +cache misses (never corruption); acceptable and noted. + +**Connection model.** Every public method opens a **short-lived connection** via `_connect()` and closes +it on exit. No connection is shared across threads, so the cache is inherently thread-safe (the stdlib +`sqlite3` `check_same_thread` guard is never tripped); WAL handles inter-process safety. `embed()` is +coarse-grained (one `get_many` + one `set_many` per call), so per-call connection overhead is negligible +next to model inference. + +**Instance lifecycle.** A module-level `get_embedding_cache() -> SQLiteEmbeddingCache` resolves the DB +path from `get_cache_dir()` and returns an instance **memoized by resolved path** (module dict + lock). +Schema init runs once per path per process. Tests that set `AUTOINTENT_CACHE_DIR` to a fresh `tmp_path` +naturally get a distinct, isolated instance — no global reset needed. + +**Public API:** + +```python +class SQLiteEmbeddingCache: + def __init__(self, db_path: Path) -> None: ... + # stores path; ensures parent dir + schema lazily on first connect (idempotent, locked) + + def get_many(self, model_hash: int, keys: list[str]) -> dict[str, npt.NDArray[np.float32]]: + # SELECT key, vector, dim WHERE model_hash = ? AND key IN (...), chunked to stay under + # SQLITE_MAX_VARIABLE_NUMBER (chunk size 900; the placeholder string is built with `?` + # only, annotated `# noqa: S608`). Returns only found+valid keys, each reconstructed to a + # (dim,) float32 array. A row whose blob length disagrees with `dim` is skipped (logged), + # treated as a miss. Read-only: does NOT update last_accessed (avoids read amplification; + # see note). On sqlite3.Error / unreadable DB: log warning, return {} (recompute). + + def set_many(self, model_hash: int, entries: dict[str, npt.NDArray[np.float32]]) -> None: + # INSERT OR IGNORE within a single transaction (executemany). + # OR IGNORE => two workers computing the same key never conflict, and an existing entry is + # never overwritten (entries are deterministic). created_at = last_accessed = time.time(); + # size_bytes = len(blob). On sqlite3.Error: log warning, return (uncached, never raises). +``` + +**Graceful degradation (control flow).** The try/except in `get_many`/`set_many` wraps +**connection-open + parent-dir creation + lazy schema-ensure + statement execution end-to-end**, not just +the SQL, so a corrupt header / permission error / "path is a directory" degrades to no-op rather than +raising into `embed()`. Caught exceptions at this outer level: **`(sqlite3.Error, OSError)`** — +`sqlite3.Error` for locking/corruption, and `OSError` (incl. `PermissionError`, `NotADirectoryError`) for +the `mkdir`/file-open path. Per-row blob reconstruction is additionally guarded with +`except Exception: # noqa: BLE001` (a malformed blob / `dim` mismatch raises `ValueError`, not +`sqlite3.Error`), skipping just that row. With `str(model_hash)` binding (above), no `OverflowError` path +exists. `embed()` never observes a cache failure as anything but a miss. + +**`last_accessed` note:** populated at insert but **not** updated on read. Updating it per read would +turn every cache hit into a write, defeating the WAL concurrency benefit. The column exists so a future +eviction PR can choose its own access-tracking policy; for now it equals `created_at`. Deliberate +groundwork, documented as such. + +### 4.4 Backend refactor — template method in `BaseEmbeddingBackend` + +Lift the whole cache+dedup+reassemble flow into the base class once; backends implement only the pure +model call. + +**ABC change (mypy-blocking, must do BOTH halves):** + +1. Declare `config` on the ABC so the base's concrete methods can type-check `self.config.use_cache` / + `self.config.get_prompt(...)` (the only fields the base touches, both on `BaseEmbedderConfig`): + + ```python + class BaseEmbeddingBackend(ABC): + config: EmbedderConfig # union; narrowed in each subclass (see #2) + supports_training: bool = False + supports_cache: bool = True # HV overrides to False (see below) + ``` + +2. **Re-declare `config` with the specific type in EVERY concrete subclass.** A base annotation of the + union type *overrides* mypy's previously-narrow per-`__init__` inference, which would break + `self.config.tokenizer_config`/`device` (ST), `model_name`/`dimensions` (OpenAI), `max_model_len` + (vLLM), `n_features`/`ngram_range`/… (HV), and `model_name` (fake) — empirically reproduced under the + repo's mypy config. Each subclass body must add a covariant narrowing re-declaration: + + ```python + class SentenceTransformerEmbeddingBackend(BaseEmbeddingBackend): + config: SentenceTransformerEmbeddingConfig # narrows the base union + ``` + + …and likewise `OpenaiEmbeddingConfig`, `VllmEmbeddingConfig`, `HashingVectorizerEmbeddingConfig`, and + (in the fake) `OpenaiEmbeddingConfig`. mypy permits a subclass to narrow an attribute to a subtype, so + this restores today's narrow access while satisfying the base's `self.config` reference. **All five + files must do this** (it is in the §7 list). + +**`supports_cache` flag.** HashingVectorizer's default `n_features = 2**18` makes each per-utterance +vector ≈ 1 MB as a float32 BLOB — far outside the "small vector" premise that justifies BLOB storage, and +HV is a fast stateless backend where recompute is cheap and caching provides ~no value. HV therefore sets +`supports_cache = False`, so the template routes it straight to `_embed_uncached` regardless of +`use_cache`. **This exactly preserves today's behavior** (HV has no cache block today and is never +cached, even when `use_cache=True` as in `tests/callback` / `full_training.yaml`). Real embedding models +(ST ≈ 0.3–1 k dims, OpenAI/fake ≈ 1.5 k dims) keep `supports_cache = True`. + +**Template `embed` (concrete; overloads preserved, `@abstractmethod` removed):** + +```python +def embed(self, utterances, task_type=None, return_tensors=False): + prompt = self.config.get_prompt(task_type) + # Empty input, cache disabled, or a backend that opts out of caching (HV) bypasses the cache and + # goes straight to the backend, preserving each backend's existing empty-input behavior + # (ST/OpenAI/vLLM raise; HV returns a (0, dim) array). np.stack is therefore only ever called on a + # non-empty key list. + if not utterances or not self.config.use_cache or not self.supports_cache: + arr = self._embed_uncached(utterances, prompt) + else: + arr = self._embed_cached(utterances, prompt) + return self._to_tensor(arr) if return_tensors else arr + +def _embed_cached(self, utterances, prompt) -> npt.NDArray[np.float32]: + cache = get_embedding_cache() + model_hash = self.get_hash() + keys = [utterance_key(model_hash, u, prompt) for u in utterances] + unique_keys = list(dict.fromkeys(keys)) # de-dup, preserve order + cached = cache.get_many(model_hash, unique_keys) + missing = [k for k in unique_keys if k not in cached] + if missing: + key_to_utt: dict[str, str] = {} + for u, k in zip(utterances, keys): + if k in cached or k in key_to_utt: + continue + key_to_utt[k] = u + missing_utts = [key_to_utt[k] for k in missing] + computed = self._embed_uncached(missing_utts, prompt) # (M, dim) float32 + new_entries = {k: computed[i] for i, k in enumerate(missing)} + cache.set_many(model_hash, new_entries) + cached.update(new_entries) # update regardless of write success + return np.stack([cached[k] for k in keys]) # (N, dim), original order + +@abstractmethod +def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute embeddings WITHOUT caching. Returns a (N, dim) float32 array, except for empty + input where each backend keeps its current behavior. The backend applies `prompt` in its own + way (ST: pass to encode; OpenAI/vLLM: prepend; HV: ignore).""" + +def _to_tensor(self, arr: npt.NDArray[np.float32]) -> "torch.Tensor": + import torch + return torch.from_numpy(arr) # ST overrides to move to its device +``` + +- `embed()` becomes **concrete** with one implementation; the two `@overload` stubs are kept on the ABC + (with `@abstractmethod` removed from both stubs and impl) so direct backend-level callers keep the + `Literal[True] -> torch.Tensor` narrowing (e.g. `tests/embedder/test_openai_real_backend.py`). + `get_hash`, `similarity`, `clear_ram`, `dump`, `load` stay abstract; `_embed_uncached` is new abstract. + **Every backend (ST, OpenAI, vLLM, HV) and the fake drops its own `embed` method AND its `@overload` + stubs** (a bare overload with no implementation is a mypy error) to inherit the ABC's concrete `embed` + + overloads. +- The dedup/order algorithm is collision-free for the reassembly: every `missing` key is in + `unique_keys ⊆ keys`, so it is reached in the zip and mapped exactly once (first occurrence wins); + `missing`, `missing_utts`, `computed[i]` share one index space; empty `missing` is guarded. +- Backends collapse their `embed` to `_embed_uncached` returning **float32 numpy**: + - **ST:** keep the `if self.config.tokenizer_config.max_length is not None:` guard before setting + `model.max_seq_length`; `model.encode(..., convert_to_numpy=True, normalize_embeddings=True, + prompt=prompt)`; cast float32. Override `_to_tensor` to `torch.from_numpy(arr).to(self.config.device + or "cpu")` (preserves the current cache-hit device behavior). Keep its `ValueError` on empty input. + - **OpenAI:** keep `ValueError` on empty; prepend prompt if present; run sync/async path; return float32. + - **vLLM:** keep `ValueError` on empty; prepend prompt if present; `model.encode`; stack float32. + - **HashingVectorizer:** ignore prompt (as today, so `_embed_uncached`'s `prompt` param is unused → + `# noqa: ARG002`); transform → dense float32. **Empty input returns a `(0, n_features)` array** via an + explicit guard — note sklearn's `HashingVectorizer.transform([])` actually raises `StopIteration` + (sklearn ≥1.5), which the old `embed` propagated; the guard makes empty input graceful (this is the + one small, deliberate behavior improvement, pinned by a regression test in §6.3). Sets + `supports_cache = False` so it is never cached (avoiding ~1 MB BLOBs). +- **`FakeOpenaiEmbeddingBackend`** is migrated to implement `_embed_uncached(utterances, prompt)` and + **inherit** the template `embed`. It also re-declares `config: OpenaiEmbeddingConfig` (the narrowing from + the ABC change). Its body uses the **passed `prompt`** directly (it must NOT call `get_prompt(task_type)` + again, and must NOT prepend the prompt — it keeps prompt-as-seed: + `seed_extra = f"{model_name}|{prompt or ''}"`), and moves the lazy `self._client` touch into + `_embed_uncached`. This keeps `test_client_lazy_loading`, `test_prompts_application`, and + `test_return_tensors_functionality` green (all use `use_cache=False`) while giving the fake genuine cache + coverage when caching is on (hermetic via §6.2). + +**`base.py` imports:** the concrete methods need runtime `numpy` (`np.stack`) and +`from ._sqlite_cache import get_embedding_cache, utterance_key`; `_to_tensor` imports `torch` lazily. +`numpy` therefore moves out of the `TYPE_CHECKING` block (ruff `TC` will require this). No import cycle: +`base → _sqlite_cache → {_hash, _cache_dir}` does not point back at `base`. + +**Tensor/device semantics:** the cache always stores/reconstructs CPU float32. When `return_tensors=True`, +the base converts via `_to_tensor`. This is equivalent to the current cache-hit behavior. The only nuance +is an ST **cache-miss** with `return_tensors=True`: previously the raw on-device encode tensor was +returned; now it round-trips through CPU numpy and back to the device. sentence-transformers returns +float32 for both `convert_to_*` modes and normalizes identically, and all CI configs use `device="cpu"`, +so values are byte-identical and `.to("cpu")` is a no-op — a documented, test-invisible unification. + +### 4.5 Data flow (one `embed(["a","b","a","c"])` call, partial hit, cache on) + +1. Resolve `prompt` from `task_type`. +2. Non-empty + `use_cache=True` → `_embed_cached`. +3. `model_hash = get_hash()`; `keys = [k_a, k_b, k_a, k_c]`; `unique = [k_a, k_b, k_c]`. +4. `get_many(model_hash, [k_a,k_b,k_c])` → say `{k_a: v_a}` (a was cached). `missing = [k_b, k_c]`. +5. `key_to_utt = {k_b:"b", k_c:"c"}`; `_embed_uncached(["b","c"], prompt)` → `[v_b, v_c]`. + `set_many(model_hash, {k_b:v_b, k_c:v_c})`; `cached = {k_a:v_a, k_b:v_b, k_c:v_c}`. +6. `np.stack([v_a, v_b, v_a, v_c])` → (4, dim) in input order. Convert to tensor if requested; return. + +## 5. Error handling and robustness + +- **Cache read failure** (locked beyond busy_timeout, corruption, unreadable file): `get_many` logs a + warning and returns `{}` → everything recomputed. `embed()` still succeeds. +- **Cache write failure**: `set_many` logs a warning and returns; `cached.update(new_entries)` already + ran, so the returned matrix is correct (just uncached). +- **Connect-time failure** (corrupt header, permission denied, path is a directory): caught because the + guard wraps `_connect()` + schema-ensure end-to-end; degrades to no-op for the process. +- **Schema version mismatch**: atomic drop+recreate under `BEGIN IMMEDIATE` with a post-lock re-read + (cross-process safe); a cache rebuild, not a crash. +- **Dimension/blob mismatch on read**: that row is skipped (logged) and treated as a miss; recomputed. +- **`_embed_uncached` raising** (real model/API error) propagates normally — that is not a cache failure. +- **Concurrency**: WAL + `busy_timeout` + `INSERT OR IGNORE` + single-transaction writes make concurrent + multi-process trials and multi-thread access safe **on one host** without external locking. +- **`get_hash()` cost**: called once per `embed` (cached or not) — the same frequency as today's inline + cache block, so no regression. (For local-path ST models `get_hash` hashes all parameters on every + call; memoizing it is a possible future optimization, §9, not in scope.) + +## 6. Testing strategy + +All tests are verified **via CI on the draft PR** (maintainer rule: no heavy/exhaustive pytest locally; +ruff + mypy run locally). New tests are fast and do **not** download models (pure-Python unit tests plus +the pinned tiny ST model already used in CI). + +### 6.1 New unit tests — `tests/embedder/test_sqlite_cache.py` (pure Python, no ML) +- `set_many` then `get_many` round-trips exact float32 bytes; reconstructed shape `(dim,)`. +- Miss returns absent keys; partial hit returns only present keys. +- `get_many` filters by `model_hash`: a key stored under model A is **not** returned for model B. +- `INSERT OR IGNORE`: re-inserting an existing key does not overwrite or error. +- Chunking: `get_many` with > 900 keys returns all matches (exercises the IN-chunk loop). +- Schema: WAL enabled (where supported); `user_version == SCHEMA_VERSION`; columns/indexes present; + a DB pre-set to a different `user_version` triggers a rebuild (table dropped+recreated). +- Graceful degradation: a corrupted/garbage DB file → `get_many` returns `{}`, `set_many` no-ops, + no exception; a `dim`/blob-length mismatch row is skipped, not raised. +- `get_cache_dir()`: honors `AUTOINTENT_CACHE_DIR`; falls back to appdirs when unset (the "unset" case + must `monkeypatch.delenv("AUTOINTENT_CACHE_DIR", raising=False)` because the global isolation fixture + in §6.2 sets it for every test). +- `utterance_key()`: stable; differs by utterance, by prompt, by model_hash; equal for equal inputs. + +### 6.2 Test isolation — **global** fixture in `tests/conftest.py` +Because `use_cache` defaults to **True**, any test that builds a default-config embedder (not just +`tests/embedder/`) can write the embedding DB to the real OS cache dir. Add a **function-scoped autouse** +fixture in the top-level `tests/conftest.py`: + +```python +@pytest.fixture(autouse=True) +def _isolate_embedding_cache(tmp_path, monkeypatch): + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "ai_cache")) +``` + +- Each test gets its own cache dir (unique `tmp_path`), so there is no cross-test bleed and no + real-OS-cache pollution anywhere in the suite (fixes today's gap, incl. `tests/callback` and + `full_training.yaml` runs). +- It only sets an env var; the structured-output cache tests (which monkeypatch + `autointent.generation._cache.user_cache_dir` directly and never read `AUTOINTENT_CACHE_DIR`) are + unaffected. Add a comment in the fixture noting the per-test isolation is load-bearing for the reuse test. + +### 6.3 Updated integration tests — `tests/embedder/test_caching.py` +- Keep existing parametrized consistency tests (cache on/off identical results). +- **Per-utterance reuse (the headline win, with a real signal):** embed `["x","y"]`, then `["y","z"]`, + on a cache-on ST (or fake) backend; assert the **SQLite DB contains exactly 3 rows** afterward (not 4). + This is a black-box assertion that is true **only** with per-utterance keying (the old whole-list + scheme would store 2 list-blobs, not 3 utterance rows), so it is a meaningful red→green signal that + does not depend on the new private method name. Optionally also wrap the backend's underlying + encode and assert the second call computes only `["z"]`. +- **Dedup-within-list:** embed `["x","x"]`; underlying encode receives a single `x`; output rows 0 and 1 + are byte-identical and in order. +- **Order-preservation:** a multi-element list returns rows in input order after a partial hit. +- Keep `test_cache_with_different_prompts` (different prompt ⇒ different key ⇒ different vector). +- **Empty-input behavior preserved (regression guard):** HV `embed([])` still returns a `(0, dim)` + array; an ST/fake `embed([])` still raises `ValueError`. (Confirms the refactor did not change it.) + +### 6.4 Regression / unchanged +- `tests/embedder/test_hash.py` (incl. offline #321 cases) is unaffected — `get_hash()` is unchanged. +- `tests/embedder/test_openai_backend.py` fake-contract tests (`test_client_lazy_loading`, + `test_prompts_application`, `test_return_tensors_functionality`) stay green per §4.4. +- `mypy src/autointent tests` stays green (strict, py3.10): annotate the new module fully; add the + `config: EmbedderConfig` ABC declaration; `cast` the `np.frombuffer` result; `sqlite3` is typed. +- ruff (`select = ["ALL"]`, strict): the new module satisfies the full ruleset like any other non-`utils` + module — module/class/function docstrings (D1xx), `%`-style logging args (no f-strings in `logger.*`, + G004), named constants instead of magic numbers (e.g. `_FLOAT32_NBYTES = 4` rather than `len(blob)//4`), + `from __future__ import annotations`, `pathlib` for paths. The **non-obvious** noqas that are expected and + deliberate: `# noqa: S608` (the chunked `IN (...)` placeholder string, built from `?` only), + `# noqa: BLE001` (the per-row blob `except Exception` for graceful degradation), and `# noqa: ARG002` + (HV's unused `prompt` parameter). Keep `get_many`/schema-init small enough to avoid `C901`/`PLR0912` + (extract helpers if needed). `np.frombuffer` is wrapped in `cast("npt.NDArray[np.float32]", ...)`. +- Coverage: §6.1 covers the new module's branches **including every `except`/skip branch**, keeping the + 85% combined floor. + +## 7. File-by-file change list + +**New** +- `src/autointent/_cache_dir.py` — `get_cache_dir()`. +- `src/autointent/_wrappers/embedder/_sqlite_cache.py` — `SQLiteEmbeddingCache`, `utterance_key`, + `get_embedding_cache`, `SCHEMA_VERSION`, `BUSY_TIMEOUT_MS`. +- `tests/embedder/test_sqlite_cache.py` — unit tests (6.1). + +**Modified** +- `src/autointent/_wrappers/embedder/base.py` — add `config: EmbedderConfig` annotation + `supports_cache` + class flag; concrete `embed` (+ kept overloads) / `_embed_cached` / `_to_tensor`; abstract + `_embed_uncached`; move `numpy` to runtime import (with `from ._sqlite_cache import …`; `torch` stays + lazy inside `_to_tensor`). +- `src/autointent/_wrappers/embedder/sentence_transformers.py` — re-declare + `config: SentenceTransformerEmbeddingConfig`; `embed` → `_embed_uncached` (preserve max_length guard); + override `_to_tensor`; drop the inline cache block and `get_embeddings_path` import. +- `src/autointent/_wrappers/embedder/openai.py` — re-declare `config: OpenaiEmbeddingConfig`; + `embed` → `_embed_uncached`; drop cache block/import. +- `src/autointent/_wrappers/embedder/vllm.py` — re-declare `config: VllmEmbeddingConfig`; + `embed` → `_embed_uncached`; drop cache block/import. +- `src/autointent/_wrappers/embedder/hashing_vectorizer.py` — re-declare + `config: HashingVectorizerEmbeddingConfig`; set `supports_cache = False`; `embed` → `_embed_uncached` + (keep empty → `(0, dim)`, `# noqa: ARG002` on unused `prompt`); remove its now-redundant `embed` overloads. +- `tests/_fixtures/fake_openai_embedding.py` — re-declare `config: OpenaiEmbeddingConfig`; + `embed` → `_embed_uncached` (use passed prompt, no prepend, keep prompt-as-seed, move `_client` touch); + inherit the template embed. +- `tests/conftest.py` — global autouse `AUTOINTENT_CACHE_DIR` → tmp_path isolation fixture (§6.2). +- `tests/embedder/test_caching.py` — reuse (row-count) / dedup / order / empty-input-preserved tests. +- `CHANGELOG.md` (repo root; latest section `[0.3.2]`) — add an Unreleased/next-version entry: new SQLite + per-utterance embedding cache, `AUTOINTENT_CACHE_DIR` (embedding cache only), fresh-start invalidation + of old `.npy` caches. + +**Removed** +- `src/autointent/_wrappers/embedder/utils.py` `get_embeddings_path` (and the file, since it becomes empty; + no other importer exists). + +## 8. Risks and mitigations + +| Risk | Mitigation | +|---|---| +| Refactor changes per-backend embed values subtly | Backends keep their exact encode calls inside `_embed_uncached`; only caching/reassembly moves. Parametrized consistency tests guard cache-on == cache-off; values unchanged. | +| `use_cache` defaults to True → broader real caching than before (incl. HV) writing to real OS cache | Global autouse isolation fixture (§6.2) redirects the cache dir for every test; production behavior is intended (caching on by default, as today). | +| `self.config` undeclared on ABC → mypy strict failure | Declare `config: EmbedderConfig` on the ABC **and** re-declare `config: ` in all five subclasses (narrowing). | +| HV default caching → ~1 MB BLOBs / DB bloat | `supports_cache = False` on HV; it is never cached (preserves today's behavior). | +| Connect/`mkdir` errors or int model_hash bind escaping `embed()` | Outer catch is `(sqlite3.Error, OSError)`; `model_hash` bound as `str()`. | +| Fake backend inheriting template embed breaks fake-contract tests | `_embed_uncached` keeps prompt-as-seed and the lazy `_client` touch; §6.4 lists the exact tests guarded. | +| Cross-process schema-rebuild race | `BEGIN IMMEDIATE` + post-lock re-read of `user_version`; rebuild only on version bump. | +| WAL on a network filesystem corrupts | Documented single-host assumption; WAL pragma degrades silently on unsupported FS. | +| "database is locked" under heavy parallel writes | WAL + 30 s `busy_timeout` + `INSERT OR IGNORE` + one transaction per `set_many`; a timed-out write degrades to uncached, never wrong. | +| float64 leaking into the blob → permanent miss | HARD INVARIANT: `np.ascontiguousarray(vec, dtype=np.float32).tobytes()`; `dim` validated on read. | +| Cross-model 64-bit key collision → wrong vector | `get_many` filters `AND model_hash = ?`; collision becomes a recompute. Residual same-model collision accepted as today. | +| vLLM path can't run in CI (no GPU) | `_embed_uncached` for vLLM is a thin wrapper; shared logic tested via ST + fake + pure unit tests; vLLM test stays `skipif`. | +| Empty-input behavior change | Avoided: empty input bypasses the cache to `_embed_uncached`, preserving each backend's current behavior; regression-guarded in §6.3. | + +## 9. Future work (not in this PR) +- Active eviction: size-cap LRU and/or TTL using the groundwork columns (and a read-time + `last_accessed` update policy). +- Route the structured-output cache through `get_cache_dir()` and/or migrate it onto the same + SQLite layer (then `AUTOINTENT_CACHE_DIR` would govern both). +- Memoize `get_hash()` on the backend instance (invalidated in `train()`) to cheapen cache hits for + local-path ST models. +- Optional LMDB/Parquet/memmap sidecar for very large vector volumes if BLOB storage ever becomes + a bottleneck. +- Optional 128-bit keys if same-model collision ever becomes a practical concern. diff --git a/src/autointent/_cache_dir.py b/src/autointent/_cache_dir.py new file mode 100644 index 000000000..2bf58b052 --- /dev/null +++ b/src/autointent/_cache_dir.py @@ -0,0 +1,27 @@ +"""Resolution of the base directory for autointent on-disk caches.""" + +from __future__ import annotations + +import os +from pathlib import Path + +from appdirs import user_cache_dir + + +def get_cache_dir() -> Path: + """Return the base directory for autointent on-disk caches. + + Honors the ``AUTOINTENT_CACHE_DIR`` environment variable; otherwise falls back to + ``appdirs.user_cache_dir("autointent")``. Resolved fresh on each call so tests and + parallel workers can redirect it via the env var. + + Note: + Currently consumed only by the embedding cache. The structured-output cache + still uses ``user_cache_dir("autointent")`` directly and is unaffected by this + variable. + + Returns: + The cache base directory as a ``Path``. + """ + override = os.environ.get("AUTOINTENT_CACHE_DIR") + return Path(override) if override else Path(user_cache_dir("autointent")) diff --git a/src/autointent/_wrappers/embedder/_sqlite_cache.py b/src/autointent/_wrappers/embedder/_sqlite_cache.py new file mode 100644 index 000000000..c64d8071d --- /dev/null +++ b/src/autointent/_wrappers/embedder/_sqlite_cache.py @@ -0,0 +1,201 @@ +"""SQLite-backed per-utterance embedding cache. + +Stores one float32 vector per ``(model, utterance, prompt)`` key in a single SQLite +database, replacing the previous one-``.npy``-file-per-call cache. See +``docs/superpowers/specs/2026-06-25-sqlite-embedding-cache-design.md``. +""" + +from __future__ import annotations + +import logging +import sqlite3 +import threading +import time +from typing import TYPE_CHECKING, cast + +import numpy as np + +from autointent._cache_dir import get_cache_dir +from autointent._hash import Hasher + +if TYPE_CHECKING: + from pathlib import Path + + import numpy.typing as npt + +logger = logging.getLogger(__name__) + +SCHEMA_VERSION = 1 +BUSY_TIMEOUT_MS = 30_000 +_DB_FILENAME = "embeddings.db" +_FLOAT32_NBYTES = 4 +# SQLite's default SQLITE_MAX_VARIABLE_NUMBER is 999 on older builds; stay well under it. +_KEY_CHUNK_SIZE = 900 + +_CREATE_TABLE = """ +CREATE TABLE IF NOT EXISTS embeddings ( + key TEXT PRIMARY KEY, + model_hash TEXT NOT NULL, + dim INTEGER NOT NULL, + vector BLOB NOT NULL, + size_bytes INTEGER NOT NULL, + created_at REAL NOT NULL, + last_accessed REAL NOT NULL +) +""" +_CREATE_INDEXES = ( + "CREATE INDEX IF NOT EXISTS idx_embeddings_last_accessed ON embeddings(last_accessed)", + "CREATE INDEX IF NOT EXISTS idx_embeddings_created_at ON embeddings(created_at)", + "CREATE INDEX IF NOT EXISTS idx_embeddings_model_hash ON embeddings(model_hash)", +) +_INSERT = ( + "INSERT OR IGNORE INTO embeddings " + "(key, model_hash, dim, vector, size_bytes, created_at, last_accessed) " + "VALUES (?, ?, ?, ?, ?, ?, ?)" +) + + +def utterance_key(model_hash: int, utterance: str, prompt: str | None) -> str: + """Compute the per-utterance cache key from model identity, utterance, and prompt. + + Args: + model_hash: The backend's model-identity hash (``get_hash()``). + utterance: The original (non-prompted) utterance text. + prompt: The resolved task prompt, or ``None``. + + Returns: + A hex digest uniquely identifying ``(model_hash, utterance, prompt)``. + """ + hasher = Hasher() + hasher.update(model_hash) + hasher.update(utterance) + if prompt: + hasher.update(prompt) + return hasher.hexdigest() + + +class SQLiteEmbeddingCache: + """Per-utterance embedding cache backed by a single SQLite database. + + Thread-safe (a fresh short-lived connection per call) and process-safe on a local + filesystem (WAL + ``busy_timeout``). Never raises into callers: any cache I/O failure + degrades to a miss / no-op and is logged. + """ + + def __init__(self, db_path: Path) -> None: + """Initialize the cache bound to ``db_path`` (schema is created lazily).""" + self._db_path = db_path + self._initialized = False + self._init_lock = threading.Lock() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self._db_path, timeout=BUSY_TIMEOUT_MS / 1000, isolation_level=None) + conn.execute(f"PRAGMA busy_timeout = {BUSY_TIMEOUT_MS}") + conn.execute("PRAGMA synchronous = NORMAL") + return conn + + def _ensure_schema(self) -> None: + """Create the table/indexes once per instance; rebuild on a schema-version change. + + The version check + (re)create runs inside ``BEGIN IMMEDIATE`` with a post-lock + re-read of ``user_version`` so two processes opening a stale DB cannot double-drop. + """ + if self._initialized: + return + with self._init_lock: + if not self._initialized: # another thread may have initialized while we waited + self._db_path.parent.mkdir(parents=True, exist_ok=True) + conn = self._connect() + try: + mode = conn.execute("PRAGMA journal_mode = WAL").fetchone() + if mode is not None and str(mode[0]).lower() != "wal": + logger.debug("SQLite embedding cache: WAL unavailable (journal_mode=%s)", mode[0]) + conn.execute("BEGIN IMMEDIATE") + version = conn.execute("PRAGMA user_version").fetchone()[0] + if version != SCHEMA_VERSION: + conn.execute("DROP TABLE IF EXISTS embeddings") + conn.execute(_CREATE_TABLE) + for index_sql in _CREATE_INDEXES: + conn.execute(index_sql) + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION}") + conn.execute("COMMIT") + finally: + conn.close() + self._initialized = True + + def get_many(self, model_hash: int, keys: list[str]) -> dict[str, npt.NDArray[np.float32]]: + """Return cached vectors for ``keys`` under ``model_hash`` (missing keys omitted).""" + if not keys: + return {} + model_hash_str = str(model_hash) + result: dict[str, npt.NDArray[np.float32]] = {} + try: + self._ensure_schema() + conn = self._connect() + try: + for start in range(0, len(keys), _KEY_CHUNK_SIZE): + chunk = keys[start : start + _KEY_CHUNK_SIZE] + placeholders = ",".join("?" * len(chunk)) + query = ( + "SELECT key, vector, dim FROM embeddings " # noqa: S608 - only '?' is interpolated; values are bound + f"WHERE model_hash = ? AND key IN ({placeholders})" + ) + for row_key, blob, dim in conn.execute(query, (model_hash_str, *chunk)): + vector = self._deserialize(blob, dim) + if vector is not None: + result[row_key] = vector + finally: + conn.close() + except (sqlite3.Error, OSError) as exc: + logger.warning("SQLite embedding cache read failed (%s); recomputing.", exc) + return {} + return result + + def set_many(self, model_hash: int, entries: dict[str, npt.NDArray[np.float32]]) -> None: + """Insert vectors for new keys under ``model_hash`` (existing keys are untouched).""" + if not entries: + return + model_hash_str = str(model_hash) + now = time.time() + rows: list[tuple[str, str, int, bytes, int, float, float]] = [] + for key, vector in entries.items(): + blob = np.ascontiguousarray(vector, dtype=np.float32).tobytes() + rows.append((key, model_hash_str, int(vector.shape[-1]), blob, len(blob), now, now)) + try: + self._ensure_schema() + conn = self._connect() + try: + conn.execute("BEGIN IMMEDIATE") + conn.executemany(_INSERT, rows) + conn.execute("COMMIT") + finally: + conn.close() + except (sqlite3.Error, OSError) as exc: + logger.warning("SQLite embedding cache write failed (%s); continuing uncached.", exc) + + @staticmethod + def _deserialize(blob: bytes, dim: int) -> npt.NDArray[np.float32] | None: + try: + if len(blob) != dim * _FLOAT32_NBYTES: + logger.warning("SQLite embedding cache: blob length %d != dim %d; skipping.", len(blob), dim) + return None + return cast("npt.NDArray[np.float32]", np.frombuffer(blob, dtype=np.float32)) + except Exception as exc: # noqa: BLE001 - a bad row must never break embed() + logger.warning("SQLite embedding cache: failed to deserialize a row (%s); skipping.", exc) + return None + + +_INSTANCES: dict[str, SQLiteEmbeddingCache] = {} +_INSTANCES_LOCK = threading.Lock() + + +def get_embedding_cache() -> SQLiteEmbeddingCache: + """Return the process-wide cache for the current cache dir (memoized by db path).""" + db_path = get_cache_dir() / _DB_FILENAME + key = str(db_path) + with _INSTANCES_LOCK: + cache = _INSTANCES.get(key) + if cache is None: + cache = SQLiteEmbeddingCache(db_path) + _INSTANCES[key] = cache + return cache diff --git a/src/autointent/_wrappers/embedder/base.py b/src/autointent/_wrappers/embedder/base.py index 5b93ea2ed..f82413c5f 100644 --- a/src/autointent/_wrappers/embedder/base.py +++ b/src/autointent/_wrappers/embedder/base.py @@ -1,12 +1,15 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Literal, overload +from typing import TYPE_CHECKING, Literal, cast, overload + +import numpy as np + +from ._sqlite_cache import get_embedding_cache, utterance_key if TYPE_CHECKING: from pathlib import Path - import numpy as np import numpy.typing as npt import torch @@ -16,7 +19,9 @@ class BaseEmbeddingBackend(ABC): """Abstract base class for embedding backends.""" + config: EmbedderConfig supports_training: bool = False + supports_cache: bool = True @abstractmethod def __init__(self, config: EmbedderConfig) -> None: @@ -29,36 +34,81 @@ def clear_ram(self) -> None: ... @overload - @abstractmethod def embed( self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] ) -> torch.Tensor: ... @overload - @abstractmethod def embed( self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False ) -> npt.NDArray[np.float32]: ... - @abstractmethod def embed( self, utterances: list[str], task_type: TaskTypeEnum | None = None, return_tensors: bool = False, ) -> npt.NDArray[np.float32] | torch.Tensor: - """Calculate embeddings for a list of utterances. + """Calculate embeddings for a list of utterances, using a per-utterance cache. + + Empty input, ``use_cache=False``, or a backend that opts out of caching + (``supports_cache=False``) bypasses the cache and calls ``_embed_uncached`` + directly, preserving each backend's existing empty-input behavior. Args: utterances: List of input texts to calculate embeddings for. task_type: Type of task for which embeddings are calculated. - return_tensors: If True, return a PyTorch tensor; otherwise, return a numpy array. + return_tensors: If True, return a PyTorch tensor; otherwise, a numpy array. Returns: A numpy array or PyTorch tensor of embeddings. """ + prompt = self.config.get_prompt(task_type) + if not utterances or not self.config.use_cache or not self.supports_cache: + embeddings = self._embed_uncached(utterances, prompt) + else: + embeddings = self._embed_cached(utterances, prompt) + if return_tensors: + return self._to_tensor(embeddings) + return embeddings + + def _embed_cached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Embed via the SQLite per-utterance cache: reuse hits, compute only misses.""" + cache = get_embedding_cache() + model_hash = self.get_hash() + keys = [utterance_key(model_hash, utterance, prompt) for utterance in utterances] + unique_keys = list(dict.fromkeys(keys)) + cached = cache.get_many(model_hash, unique_keys) + missing = [key for key in unique_keys if key not in cached] + if missing: + key_to_utterance: dict[str, str] = {} + for utterance, key in zip(utterances, keys, strict=True): + if key in cached or key in key_to_utterance: + continue + key_to_utterance[key] = utterance + missing_utterances = [key_to_utterance[key] for key in missing] + computed = self._embed_uncached(missing_utterances, prompt) + new_entries = {key: computed[index] for index, key in enumerate(missing)} + cache.set_many(model_hash, new_entries) + cached.update(new_entries) + return cast("npt.NDArray[np.float32]", np.stack([cached[key] for key in keys])) + + @abstractmethod + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute embeddings WITHOUT caching, returning a ``(N, dim)`` float32 array. + + The backend applies ``prompt`` in its own way (ST passes it to ``encode``; + OpenAI/vLLM prepend it; HashingVectorizer ignores it). Each backend keeps its + current empty-input behavior here (ST/OpenAI/vLLM raise; HV returns ``(0, dim)``). + """ ... + def _to_tensor(self, embeddings: npt.NDArray[np.float32]) -> torch.Tensor: + """Convert a numpy embedding matrix to a torch tensor (CPU by default).""" + import torch + + return torch.from_numpy(embeddings) + @abstractmethod def similarity( self, embeddings1: npt.NDArray[np.float32], embeddings2: npt.NDArray[np.float32] diff --git a/src/autointent/_wrappers/embedder/hashing_vectorizer.py b/src/autointent/_wrappers/embedder/hashing_vectorizer.py index 4d596975e..90c273b59 100644 --- a/src/autointent/_wrappers/embedder/hashing_vectorizer.py +++ b/src/autointent/_wrappers/embedder/hashing_vectorizer.py @@ -4,15 +4,13 @@ import json import logging -from typing import TYPE_CHECKING, Literal, overload +from typing import TYPE_CHECKING import numpy as np -import torch from sklearn.feature_extraction.text import HashingVectorizer from sklearn.metrics.pairwise import cosine_similarity from autointent._hash import Hasher -from autointent.configs import TaskTypeEnum from autointent.configs._embedder import HashingVectorizerEmbeddingConfig from .base import BaseEmbeddingBackend @@ -33,6 +31,8 @@ class HashingVectorizerEmbeddingBackend(BaseEmbeddingBackend): """ supports_training: bool = False + supports_cache: bool = False + config: HashingVectorizerEmbeddingConfig def __init__(self, config: HashingVectorizerEmbeddingConfig) -> None: """Initialize the HashingVectorizer backend. @@ -74,38 +74,14 @@ def get_hash(self) -> int: hasher.update(self.config.dtype) return int(hasher.hexdigest(), 16) - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] = True - ) -> torch.Tensor: ... - - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False - ) -> npt.NDArray[np.float32]: ... - - def embed( - self, - utterances: list[str], - task_type: TaskTypeEnum | None = None, # noqa: ARG002 - return_tensors: bool = False, - ) -> npt.NDArray[np.float32] | torch.Tensor: - """Calculate embeddings for a list of utterances. - - Args: - utterances: List of input texts to calculate embeddings for. - task_type: Type of task for which embeddings are calculated (ignored for HashingVectorizer). - return_tensors: If True, return a PyTorch tensor; otherwise, return a numpy array. - - Returns: - A numpy array or PyTorch tensor of embeddings. - """ - # Transform texts to sparse matrix, then convert to dense + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: # noqa: ARG002 + """Compute HashingVectorizer embeddings (prompt is ignored; never cached).""" + if not utterances: + # sklearn's HashingVectorizer.transform([]) raises StopIteration; return an + # empty (0, n_features) matrix instead so empty input is handled gracefully. + return np.empty((0, self.config.n_features), dtype=np.float32) embeddings_sparse = self._vectorizer.transform(utterances) embeddings: npt.NDArray[np.float32] = embeddings_sparse.toarray().astype(np.float32) - - if return_tensors: - return torch.from_numpy(embeddings) return embeddings def similarity( diff --git a/src/autointent/_wrappers/embedder/openai.py b/src/autointent/_wrappers/embedder/openai.py index c1ae6ee3f..c7bf8fbb5 100644 --- a/src/autointent/_wrappers/embedder/openai.py +++ b/src/autointent/_wrappers/embedder/openai.py @@ -5,19 +5,17 @@ import logging import os from functools import partial -from typing import TYPE_CHECKING, Literal, TypedDict, cast, overload +from typing import TYPE_CHECKING, TypedDict, cast import aiometer import numpy as np import numpy.typing as npt -import torch from autointent._deps import require from autointent._hash import Hasher from autointent.configs._embedder import OpenaiEmbeddingConfig from .base import BaseEmbeddingBackend -from .utils import get_embeddings_path if TYPE_CHECKING: from pathlib import Path @@ -27,8 +25,6 @@ from tiktoken import Encoding from typing_extensions import NotRequired - from autointent.configs import TaskTypeEnum - logger = logging.getLogger(__name__) @@ -101,6 +97,7 @@ class EmbeddingsCreateKwargs(TypedDict): class OpenaiEmbeddingBackend(BaseEmbeddingBackend): """OpenAI-based embedding backend implementation.""" + config: OpenaiEmbeddingConfig _client: openai.OpenAI | None = None _async_client: openai.AsyncOpenAI | None = None @@ -166,55 +163,17 @@ def get_hash(self) -> int: hasher.update(str(self.config.max_tokens_in_batch)) return hasher.intdigest() - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] - ) -> torch.Tensor: ... - - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False - ) -> npt.NDArray[np.float32]: ... - - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, return_tensors: bool = False - ) -> npt.NDArray[np.float32] | torch.Tensor: - """Calculate embeddings for a list of utterances. - - Args: - utterances: List of input texts to calculate embeddings for. - task_type: Type of task for which embeddings are calculated. - return_tensors: If True, return a PyTorch tensor; otherwise, return a numpy array. - - Returns: - A numpy array or PyTorch tensor of embeddings. - """ + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute OpenAI embeddings without caching.""" if len(utterances) == 0: msg = "Empty input" logger.error(msg) raise ValueError(msg) # Apply task-specific prompt - prompt = self.config.get_prompt(task_type) if prompt: utterances = [f"{prompt} {utterance}" for utterance in utterances] - if self.config.use_cache: - logger.debug("Using cached embeddings for %s", self.config.model_name) - hasher = Hasher() - hasher.update(self.get_hash()) - hasher.update(utterances) - if prompt: - hasher.update(prompt) - - embeddings_path = get_embeddings_path(hasher.hexdigest()) - if embeddings_path.exists(): - logger.debug("loading embeddings from %s", str(embeddings_path)) - embeddings_np = cast("npt.NDArray[np.float32]", np.load(embeddings_path)) - if return_tensors: - return torch.from_numpy(embeddings_np) - return embeddings_np - logger.debug( "Calculating embeddings with OpenAI model %s, batch_size=%d, max_tokens_in_batch=%s, " "dimensions=%s, prompt=%s, max_concurrent=%s", @@ -228,17 +187,8 @@ def embed( # Use async processing if max_concurrent is specified if self.config.max_concurrent is not None: - embeddings_np = self._process_embeddings_async(utterances) - else: - embeddings_np = self._process_embeddings_sync(utterances) - - if self.config.use_cache: - embeddings_path.parent.mkdir(parents=True, exist_ok=True) - np.save(embeddings_path, embeddings_np) - - if return_tensors: - return torch.from_numpy(embeddings_np) - return embeddings_np + return self._process_embeddings_async(utterances) + return self._process_embeddings_sync(utterances) def _embedding_request_batches(self, utterances: list[str]) -> list[list[str]]: """Slice utterances into batches for each embeddings API call.""" diff --git a/src/autointent/_wrappers/embedder/sentence_transformers.py b/src/autointent/_wrappers/embedder/sentence_transformers.py index 772737fed..d84b2a8b4 100644 --- a/src/autointent/_wrappers/embedder/sentence_transformers.py +++ b/src/autointent/_wrappers/embedder/sentence_transformers.py @@ -5,7 +5,7 @@ import tempfile from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Literal, cast, overload +from typing import TYPE_CHECKING, cast from uuid import uuid4 import huggingface_hub @@ -19,14 +19,13 @@ from autointent.configs._embedder import SentenceTransformerEmbeddingConfig from .base import BaseEmbeddingBackend -from .utils import get_embeddings_path if TYPE_CHECKING: import numpy.typing as npt from sentence_transformers import SentenceTransformer from transformers import TrainerCallback - from autointent.configs import EmbedderFineTuningConfig, TaskTypeEnum + from autointent.configs import EmbedderFineTuningConfig from autointent.custom_types import ListOfLabels @@ -105,6 +104,7 @@ class SentenceTransformerEmbeddingBackend(BaseEmbeddingBackend): """SentenceTransformer-based embedding backend implementation.""" supports_training: bool = True + config: SentenceTransformerEmbeddingConfig _model: SentenceTransformer | None def __init__(self, config: SentenceTransformerEmbeddingConfig) -> None: @@ -162,53 +162,13 @@ def get_hash(self) -> int: hasher.update(self.config.tokenizer_config.max_length) return hasher.intdigest() - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] - ) -> torch.Tensor: ... - - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False - ) -> npt.NDArray[np.float32]: ... - - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, return_tensors: bool = False - ) -> npt.NDArray[np.float32] | torch.Tensor: - """Calculate embeddings for a list of utterances. - - Args: - utterances: List of input texts to calculate embeddings for. - task_type: Type of task for which embeddings are calculated. - return_tensors: If True, return a PyTorch tensor; otherwise, return a numpy array. - - Returns: - A numpy array or PyTorch tensor of embeddings. - """ + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute SentenceTransformer embeddings without caching.""" if len(utterances) == 0: msg = "Empty input" logger.error(msg) raise ValueError(msg) - prompt = self.config.get_prompt(task_type) - - if self.config.use_cache: - logger.debug("Using cached embeddings for %s", self.config.model_name) - hasher = Hasher() - hasher.update(self.get_hash()) - hasher.update(utterances) - if prompt: - hasher.update(prompt) - - embeddings_path = get_embeddings_path(hasher.hexdigest()) - if embeddings_path.exists(): - logger.debug("loading embeddings from %s", str(embeddings_path)) - embeddings_np = cast("npt.NDArray[np.float32]", np.load(embeddings_path)) - if return_tensors: - device = self.config.device or "cpu" - return torch.from_numpy(embeddings_np).to(device) - return embeddings_np - model = self._load_model() logger.debug( @@ -223,35 +183,22 @@ def embed( if self.config.tokenizer_config.max_length is not None: model.max_seq_length = self.config.tokenizer_config.max_length - embeddings: npt.NDArray[np.float32] | torch.Tensor - if return_tensors: - embeddings = model.encode( + embeddings = cast( + "npt.NDArray[np.float32]", + model.encode( utterances, - convert_to_tensor=True, + convert_to_numpy=True, batch_size=self.config.batch_size, normalize_embeddings=True, prompt=prompt, - ) - else: - embeddings = cast( - "npt.NDArray[np.float32]", - model.encode( - utterances, - convert_to_numpy=True, - batch_size=self.config.batch_size, - normalize_embeddings=True, - prompt=prompt, - ), - ) - - if self.config.use_cache: - embeddings_path.parent.mkdir(parents=True, exist_ok=True) - if isinstance(embeddings, torch.Tensor): - np.save(embeddings_path, embeddings.cpu().numpy()) - else: - np.save(embeddings_path, embeddings) + ), + ) + return embeddings.astype(np.float32, copy=False) - return embeddings + def _to_tensor(self, embeddings: npt.NDArray[np.float32]) -> torch.Tensor: + """Convert to a tensor on the configured device (preserves prior cache-hit behavior).""" + device = self.config.device or "cpu" + return torch.from_numpy(embeddings).to(device) def similarity( self, embeddings1: npt.NDArray[np.float32], embeddings2: npt.NDArray[np.float32] diff --git a/src/autointent/_wrappers/embedder/utils.py b/src/autointent/_wrappers/embedder/utils.py deleted file mode 100644 index 93f5274d7..000000000 --- a/src/autointent/_wrappers/embedder/utils.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Utility functions for the embedder module.""" - -from pathlib import Path - -from appdirs import user_cache_dir - - -def get_embeddings_path(filename: str) -> Path: - """Get the path to the embeddings file. - - This function constructs the full path to an embeddings file stored - in a specific directory under the user's home directory. The embeddings - file is named based on the provided filename, with the `.npy` extension - added. - - Args: - filename: The name of the embeddings file (without extension). - - Returns: - The full path to the embeddings file. - """ - return Path(user_cache_dir("autointent")) / "embeddings" / f"{filename}.npy" diff --git a/src/autointent/_wrappers/embedder/vllm.py b/src/autointent/_wrappers/embedder/vllm.py index 691686ca3..6b44378f3 100644 --- a/src/autointent/_wrappers/embedder/vllm.py +++ b/src/autointent/_wrappers/embedder/vllm.py @@ -14,7 +14,6 @@ from autointent.configs._embedder import VllmEmbeddingConfig from .base import BaseEmbeddingBackend -from .utils import get_embeddings_path if TYPE_CHECKING: from pathlib import Path @@ -22,8 +21,6 @@ import numpy.typing as npt from vllm import LLM # type: ignore[import-not-found] - from autointent.configs import TaskTypeEnum - logger = logging.getLogger(__name__) @@ -31,6 +28,7 @@ class VllmEmbeddingBackend(BaseEmbeddingBackend): """vLLM-based embedding backend implementation.""" supports_training: bool = False + config: VllmEmbeddingConfig def __init__(self, config: VllmEmbeddingConfig) -> None: """Initialize the vLLM backend. @@ -77,46 +75,16 @@ def get_hash(self) -> int: hasher.update(str(self.config.max_model_len)) return hasher.intdigest() - def embed( - self, - utterances: list[str], - task_type: TaskTypeEnum | None = None, - return_tensors: bool = False, - ) -> npt.NDArray[np.float32] | torch.Tensor: - """Calculate embeddings for a list of utterances. - - Args: - utterances: List of input texts to calculate embeddings for. - task_type: Type of task for which embeddings are calculated. - return_tensors: If True, return a PyTorch tensor; otherwise, return a numpy array. - - Returns: - A numpy array or PyTorch tensor of embeddings. - """ + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + """Compute vLLM embeddings without caching.""" if len(utterances) == 0: msg = "Empty input" logger.error(msg) raise ValueError(msg) - prompt = self.config.get_prompt(task_type) if prompt: utterances = [f"{prompt} {utterance}" for utterance in utterances] - if self.config.use_cache: - hasher = Hasher() - hasher.update(self.get_hash()) - hasher.update(utterances) - if prompt: - hasher.update(prompt) - - embeddings_path = get_embeddings_path(hasher.hexdigest()) - if embeddings_path.exists(): - logger.debug("Loading cached vLLM embeddings from %s", embeddings_path) - embeddings_np = cast("npt.NDArray[np.float32]", np.load(embeddings_path)) - if return_tensors: - return torch.from_numpy(embeddings_np) - return embeddings_np - model = self._load_model() logger.debug( @@ -127,16 +95,7 @@ def embed( outputs = model.encode(utterances, pooling_task="embed", **self.config.extra_encode_kwargs) all_embeddings = [output.outputs.embedding for output in outputs] - - embeddings_np = np.array(all_embeddings, dtype=np.float32) - - if self.config.use_cache: - embeddings_path.parent.mkdir(parents=True, exist_ok=True) - np.save(embeddings_path, embeddings_np) - - if return_tensors: - return torch.from_numpy(embeddings_np) - return embeddings_np + return np.array(all_embeddings, dtype=np.float32) def similarity( self, embeddings1: npt.NDArray[np.float32], embeddings2: npt.NDArray[np.float32] diff --git a/tests/_fixtures/fake_openai_embedding.py b/tests/_fixtures/fake_openai_embedding.py index acf3ceed7..1e43ac55a 100644 --- a/tests/_fixtures/fake_openai_embedding.py +++ b/tests/_fixtures/fake_openai_embedding.py @@ -4,11 +4,10 @@ import hashlib import json -from typing import TYPE_CHECKING, Literal, overload +from typing import TYPE_CHECKING import numpy as np import pytest -import torch from autointent._wrappers.embedder.base import BaseEmbeddingBackend @@ -17,7 +16,7 @@ import numpy.typing as npt - from autointent.configs import OpenaiEmbeddingConfig, TaskTypeEnum + from autointent.configs import OpenaiEmbeddingConfig def _seeded_vector(text: str, dim: int, *, seed_extra: str = "") -> npt.NDArray[np.float32]: @@ -57,6 +56,7 @@ class FakeOpenaiEmbeddingBackend(BaseEmbeddingBackend): """ supports_training = False + config: OpenaiEmbeddingConfig def __init__(self, config: OpenaiEmbeddingConfig) -> None: self.config = config @@ -68,34 +68,18 @@ def clear_ram(self) -> None: self._client = None self._async_client = None - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[True] - ) -> torch.Tensor: ... - - @overload - def embed( - self, utterances: list[str], task_type: TaskTypeEnum | None = None, *, return_tensors: Literal[False] = False - ) -> npt.NDArray[np.float32]: ... - - def embed( - self, - utterances: list[str], - task_type: TaskTypeEnum | None = None, - return_tensors: bool = False, - ) -> npt.NDArray[np.float32] | torch.Tensor: - # Touch the lazy attributes so test_client_lazy_loading observes the transition. + def _embed_uncached(self, utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + # Touch the lazy attribute so test_client_lazy_loading observes the transition. self._client = self._client or object() - dim = getattr(self.config, "dimensions", None) or 1536 + dim = self.config.dimensions or 1536 - # Prompt seed mirrors BaseEmbedderConfig.get_prompt() so that two task types - # sharing the same default_prompt produce identical vectors. - prompt = self.config.get_prompt(task_type) + # Prompt is already resolved by the base; seeding mirrors BaseEmbedderConfig.get_prompt() + # so two task types sharing the same default_prompt produce identical vectors. seed_extra = f"{self.config.model_name}|{prompt or ''}" - vectors = np.stack([_seeded_vector(text, dim, seed_extra=seed_extra) for text in utterances]) - if return_tensors: - return torch.from_numpy(vectors) + vectors: npt.NDArray[np.float32] = np.stack( + [_seeded_vector(text, dim, seed_extra=seed_extra) for text in utterances] + ) return vectors def similarity( diff --git a/tests/conftest.py b/tests/conftest.py index 0845a3e40..1fb68b5c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -352,3 +352,15 @@ def _guarded_api_model_info( ) from tests._fixtures.opensearch_container import opensearch_container # noqa: E402, F401 from tests._fixtures.respx_openai import respx_openai # noqa: E402, F401 + + +@pytest.fixture(autouse=True) +def _isolate_embedding_cache(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Redirect the embedding SQLite cache to a per-test directory. + + Because ``use_cache`` defaults to True, any test that builds a default-config + embedder could otherwise write the embedding DB to the real OS cache dir. A unique + per-test ``tmp_path`` also keeps the per-utterance reuse test in + tests/embedder/test_caching.py hermetic (its two embeds must share one DB file). + """ + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "ai_cache")) diff --git a/tests/embedder/test_caching.py b/tests/embedder/test_caching.py index 52d39cb9f..f0a6d95ad 100644 --- a/tests/embedder/test_caching.py +++ b/tests/embedder/test_caching.py @@ -1,16 +1,21 @@ from __future__ import annotations +import os +import sqlite3 +from pathlib import Path from typing import TYPE_CHECKING import numpy as np import pytest from autointent._wrappers.embedder import Embedder -from autointent.configs import TaskTypeEnum +from autointent.configs import HashingVectorizerEmbeddingConfig, TaskTypeEnum from .conftest import backend_configs, create_sentence_transformer_config if TYPE_CHECKING: + import numpy.typing as npt + from autointent.configs import EmbedderConfig @@ -109,3 +114,66 @@ def test_cache_with_different_prompts(self) -> None: # Should produce different embeddings due to different prompts assert not np.allclose(query_emb, passage_emb, rtol=1e-3) + + +def _embedding_row_count() -> int: + db_path = Path(os.environ["AUTOINTENT_CACHE_DIR"]) / "embeddings.db" + if not db_path.exists(): + return 0 + with sqlite3.connect(db_path) as conn: + return int(conn.execute("SELECT COUNT(*) FROM embeddings").fetchone()[0]) + + +class TestPerUtteranceCaching: + """Per-utterance keying: shared utterances are stored once and reused across calls.""" + + def test_overlapping_calls_store_each_utterance_once(self) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + + embedder.embed(["alpha", "beta"]) + embedder.embed(["beta", "gamma"]) # 'beta' overlaps + + # Whole-list keying would store 2 list blobs; per-utterance stores 3 rows. + assert _embedding_row_count() == 3 + + def test_duplicate_in_list_computed_once(self, monkeypatch: pytest.MonkeyPatch) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + backend = embedder._backend + + computed: list[list[str]] = [] + original = backend._embed_uncached + + def spy(utterances: list[str], prompt: str | None) -> npt.NDArray[np.float32]: + computed.append(list(utterances)) + return original(utterances, prompt) + + monkeypatch.setattr(backend, "_embed_uncached", spy) + + result = embedder.embed(["dup", "dup"]) + + assert result.shape[0] == 2 + np.testing.assert_array_equal(result[0], result[1]) + assert computed == [["dup"]] # computed only once + + def test_order_preserved_after_partial_hit(self) -> None: + config = create_sentence_transformer_config(use_cache=True) + embedder = Embedder(config) + + first = embedder.embed(["one", "two", "three"]) + second = embedder.embed(["three", "one", "two"]) # reordered, fully cached + + np.testing.assert_allclose(second[0], first[2], rtol=1e-5) + np.testing.assert_allclose(second[1], first[0], rtol=1e-5) + np.testing.assert_allclose(second[2], first[1], rtol=1e-5) + + def test_empty_input_hashing_vectorizer_returns_empty(self) -> None: + embedder = Embedder(HashingVectorizerEmbeddingConfig(n_features=512, use_cache=True)) + result = embedder.embed([]) + assert result.shape == (0, 512) + + def test_empty_input_sentence_transformer_raises(self) -> None: + embedder = Embedder(create_sentence_transformer_config(use_cache=True)) + with pytest.raises(ValueError, match="Empty input"): + embedder.embed([]) diff --git a/tests/embedder/test_sqlite_cache.py b/tests/embedder/test_sqlite_cache.py new file mode 100644 index 000000000..df0829f42 --- /dev/null +++ b/tests/embedder/test_sqlite_cache.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import sqlite3 +from typing import TYPE_CHECKING + +import numpy as np + +from autointent._wrappers.embedder._sqlite_cache import ( + SCHEMA_VERSION, + SQLiteEmbeddingCache, + get_embedding_cache, + utterance_key, +) + +if TYPE_CHECKING: + from pathlib import Path + + import numpy.typing as npt + import pytest + + +def _vec(values: list[float]) -> npt.NDArray[np.float32]: + return np.asarray(values, dtype=np.float32) + + +def test_set_get_roundtrip(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(123, {"k1": _vec([1.0, 2.0, 3.0])}) + got = cache.get_many(123, ["k1"]) + assert set(got) == {"k1"} + np.testing.assert_array_equal(got["k1"], _vec([1.0, 2.0, 3.0])) + assert got["k1"].shape == (3,) + + +def test_get_partial_hit(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {"a": _vec([1.0, 1.0])}) + got = cache.get_many(1, ["a", "b"]) + assert set(got) == {"a"} + + +def test_get_empty_keys_returns_empty(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + assert cache.get_many(1, []) == {} + + +def test_set_empty_entries_is_noop(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {}) # must not create/raise + assert cache.get_many(1, ["anything"]) == {} + + +def test_model_hash_filter(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(111, {"shared": _vec([1.0, 2.0])}) + # A different model must not read model 111's row even for the same key string. + assert cache.get_many(222, ["shared"]) == {} + assert set(cache.get_many(111, ["shared"])) == {"shared"} + + +def test_insert_or_ignore_does_not_overwrite(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + cache.set_many(1, {"k": _vec([1.0, 2.0])}) + cache.set_many(1, {"k": _vec([9.0, 9.0])}) # ignored + np.testing.assert_array_equal(cache.get_many(1, ["k"])["k"], _vec([1.0, 2.0])) + + +def test_chunking_over_variable_limit(tmp_path: Path) -> None: + cache = SQLiteEmbeddingCache(tmp_path / "e.db") + entries = {f"k{i}": _vec([float(i)]) for i in range(2000)} + cache.set_many(1, entries) + got = cache.get_many(1, list(entries)) + assert len(got) == 2000 + np.testing.assert_array_equal(got["k1999"], _vec([1999.0])) + + +def test_schema_version_and_columns(tmp_path: Path) -> None: + db = tmp_path / "e.db" + cache = SQLiteEmbeddingCache(db) + cache.set_many(1, {"k": _vec([1.0])}) # triggers schema init + with sqlite3.connect(db) as conn: + assert conn.execute("PRAGMA user_version").fetchone()[0] == SCHEMA_VERSION + cols = {row[1] for row in conn.execute("PRAGMA table_info(embeddings)")} + indexes = {row[1] for row in conn.execute("PRAGMA index_list(embeddings)")} + assert {"key", "model_hash", "dim", "vector", "size_bytes", "created_at", "last_accessed"} <= cols + assert { + "idx_embeddings_last_accessed", + "idx_embeddings_created_at", + "idx_embeddings_model_hash", + } <= indexes + + +def test_version_mismatch_triggers_rebuild(tmp_path: Path) -> None: + db = tmp_path / "e.db" + SQLiteEmbeddingCache(db).set_many(1, {"old": _vec([1.0])}) + # Simulate an older/newer schema: bump user_version so the next instance rebuilds. + with sqlite3.connect(db) as conn: + conn.execute(f"PRAGMA user_version = {SCHEMA_VERSION + 1}") + fresh = SQLiteEmbeddingCache(db) + fresh.set_many(1, {"new": _vec([2.0])}) # forces _ensure_schema -> rebuild + assert fresh.get_many(1, ["old"]) == {} # old row dropped by rebuild + + +def test_corrupted_db_degrades_to_miss(tmp_path: Path) -> None: + db = tmp_path / "e.db" + db.write_bytes(b"this is not a sqlite database") + cache = SQLiteEmbeddingCache(db) + # Must not raise; reads miss and writes no-op. + assert cache.get_many(1, ["k"]) == {} + cache.set_many(1, {"k": _vec([1.0])}) + + +def test_dim_mismatch_row_skipped(tmp_path: Path) -> None: + db = tmp_path / "e.db" + cache = SQLiteEmbeddingCache(db) + cache.set_many(1, {"k": _vec([1.0, 2.0])}) + # Corrupt the stored dim so blob length disagrees. + with sqlite3.connect(db) as conn: + conn.execute("UPDATE embeddings SET dim = 99 WHERE key = 'k'") + conn.commit() + assert cache.get_many(1, ["k"]) == {} # skipped, not raised + + +def test_get_embedding_cache_memoized_by_path(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "c")) + first = get_embedding_cache() + second = get_embedding_cache() + assert first is second + + +def test_utterance_key_distinctness() -> None: + base = utterance_key(1, "hello", None) + assert base == utterance_key(1, "hello", None) + assert base != utterance_key(2, "hello", None) + assert base != utterance_key(1, "world", None) + assert base != utterance_key(1, "hello", "Query:") diff --git a/tests/test_cache_dir.py b/tests/test_cache_dir.py new file mode 100644 index 000000000..17e8205be --- /dev/null +++ b/tests/test_cache_dir.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from autointent._cache_dir import get_cache_dir + +if TYPE_CHECKING: + from pathlib import Path + + import pytest + + +def test_get_cache_dir_honors_env_var(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("AUTOINTENT_CACHE_DIR", str(tmp_path / "custom")) + assert get_cache_dir() == tmp_path / "custom" + + +def test_get_cache_dir_falls_back_to_appdirs(monkeypatch: pytest.MonkeyPatch) -> None: + # The global autouse isolation fixture sets the env var for every test, so unset it here. + monkeypatch.delenv("AUTOINTENT_CACHE_DIR", raising=False) + result = get_cache_dir() + assert result.name == "autointent" or "autointent" in str(result)