11"""SQLite-backed content store for Cortex.
22
33Handles content storage, FTS5 full-text search, config, and query logging.
4- Uses synchronous sqlite3 (async wrapper can be added later via aiosqlite) .
4+ Uses synchronous sqlite3.
55"""
66
77from __future__ import annotations
88
99import json
1010import sqlite3
11+ from collections .abc import Callable
1112from datetime import UTC , datetime
1213from pathlib import Path
1314from typing import Any
99100CREATE INDEX IF NOT EXISTS idx_query_log_timestamp ON query_log(timestamp);
100101"""
101102
103+ # Columns that callers may update via ContentStore.update().
104+ # When a migration adds a new column, add it here too.
105+ UPDATABLE_COLUMNS : frozenset [str ] = frozenset ({
106+ "title" ,
107+ "content" ,
108+ "raw_markdown" ,
109+ "type" ,
110+ "project" ,
111+ "tags" ,
112+ "summary" ,
113+ "tier" ,
114+ "pipeline_stage" ,
115+ "confidence" ,
116+ "captured_by" ,
117+ "updated_at" ,
118+ })
119+
120+ _IMMUTABLE_COLUMNS : frozenset [str ] = frozenset ({"id" , "created_at" })
121+
122+ # Schema versioning — bump SCHEMA_VERSION and add a migration function
123+ # when the schema changes. See MIGRATIONS below.
124+ SCHEMA_VERSION = 1
125+
126+ # List of (target_version, migration_function) tuples.
127+ # Each function receives a sqlite3.Connection and mutates the schema.
128+ MIGRATIONS : list [tuple [int , Callable [[sqlite3 .Connection ], None ]]] = []
129+
102130
103131class ContentStore :
104132 """SQLite store for document content, FTS5 search, config, and query logs."""
@@ -121,8 +149,32 @@ def __init__(self, path: Path | None = None):
121149 self ._init_schema ()
122150
123151 def _init_schema (self ) -> None :
124- self ._db .executescript (SCHEMA_SQL )
125- self ._db .commit ()
152+ current_version = self ._db .execute ("PRAGMA user_version" ).fetchone ()[0 ]
153+
154+ if current_version == 0 :
155+ # Check if tables already exist (pre-migration install)
156+ has_tables = self ._db .execute (
157+ "SELECT name FROM sqlite_master WHERE type='table' AND name='documents'"
158+ ).fetchone () is not None
159+
160+ if not has_tables :
161+ # Brand new database: create full schema
162+ self ._db .executescript (SCHEMA_SQL )
163+ self ._db .commit ()
164+
165+ # Stamp as version 1 (baseline)
166+ self ._db .execute (f"PRAGMA user_version = { SCHEMA_VERSION } " )
167+ self ._db .commit ()
168+ current_version = SCHEMA_VERSION
169+
170+ # Run any pending migrations
171+ for target_version , migrate_fn in MIGRATIONS :
172+ if current_version < target_version :
173+ migrate_fn (self ._db )
174+ self ._db .execute (f"PRAGMA user_version = { target_version } " )
175+ self ._db .commit ()
176+ current_version = target_version
177+ logger .info ("Migrated schema to version %d" , target_version )
126178
127179 def close (self ) -> None :
128180 self ._db .close ()
@@ -213,6 +265,21 @@ def update(self, doc_id: str, **updates: Any) -> bool:
213265 return True
214266
215267 updates ["updated_at" ] = datetime .now (UTC ).isoformat ()
268+
269+ # Validate column names against allowlist
270+ invalid_keys = set (updates .keys ()) - UPDATABLE_COLUMNS
271+ if invalid_keys :
272+ immutable = invalid_keys & _IMMUTABLE_COLUMNS
273+ if immutable :
274+ raise StoreError (
275+ f"Cannot update immutable column(s): { ', ' .join (sorted (immutable ))} " ,
276+ context = {"columns" : sorted (immutable )},
277+ )
278+ raise StoreError (
279+ f"Unknown column(s): { ', ' .join (sorted (invalid_keys ))} " ,
280+ context = {"columns" : sorted (invalid_keys )},
281+ )
282+
216283 set_clause = ", " .join (f"{ k } = ?" for k in updates )
217284 values = [* updates .values (), doc_id ]
218285
@@ -361,6 +428,21 @@ def get_embedding(self, doc_id: str) -> bytes | None:
361428 ).fetchone ()
362429 return row ["embedding" ] if row else None
363430
431+ def get_all_embeddings (self , * , limit : int = 10000 ) -> list [dict [str , Any ]]:
432+ """Return all embeddings for similarity search.
433+
434+ Args:
435+ limit: Maximum number of embeddings to return (safety cap).
436+
437+ Returns:
438+ List of dicts with keys: doc_id, embedding (bytes), dimensions (int).
439+ """
440+ rows = self ._db .execute (
441+ "SELECT doc_id, embedding, dimensions FROM embeddings LIMIT ?" ,
442+ (limit ,),
443+ ).fetchall ()
444+ return [dict (r ) for r in rows ]
445+
364446 # -------------------------------------------------------------------------
365447 # Config
366448 # -------------------------------------------------------------------------
0 commit comments