Skip to content

Commit 9738267

Browse files
committed
perf(core): skip unchanged vector sync work
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 3175e7c commit 9738267

File tree

7 files changed

+575
-17
lines changed

7 files changed

+575
-17
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Persist vector sync fingerprints on chunk metadata.
2+
3+
Revision ID: m6h7i8j9k0l1
4+
Revises: l5g6h7i8j9k0
5+
Create Date: 2026-04-07 00:00:00.000000
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
13+
# revision identifiers, used by Alembic.
14+
revision: str = "m6h7i8j9k0l1"
15+
down_revision: Union[str, None] = "l5g6h7i8j9k0"
16+
branch_labels: Union[str, Sequence[str], None] = None
17+
depends_on: Union[str, Sequence[str], None] = None
18+
19+
20+
def upgrade() -> None:
21+
"""Add entity fingerprint + embedding model metadata to Postgres chunk rows.
22+
23+
Trigger: vector sync now fast-skips unchanged entities using persisted
24+
semantic fingerprints.
25+
Why: chunk rows already own the per-entity derived metadata we diff against,
26+
so persisting the fingerprint on that table avoids a second sync-state table.
27+
Outcome: existing rows get empty-string placeholders and will be refreshed on
28+
the next vector sync before they become eligible for skip checks.
29+
"""
30+
connection = op.get_bind()
31+
if connection.dialect.name != "postgresql":
32+
return
33+
34+
op.execute(
35+
"""
36+
ALTER TABLE search_vector_chunks
37+
ADD COLUMN IF NOT EXISTS entity_fingerprint TEXT
38+
"""
39+
)
40+
op.execute(
41+
"""
42+
ALTER TABLE search_vector_chunks
43+
ADD COLUMN IF NOT EXISTS embedding_model TEXT
44+
"""
45+
)
46+
op.execute(
47+
"""
48+
UPDATE search_vector_chunks
49+
SET entity_fingerprint = COALESCE(entity_fingerprint, ''),
50+
embedding_model = COALESCE(embedding_model, '')
51+
"""
52+
)
53+
op.execute(
54+
"""
55+
ALTER TABLE search_vector_chunks
56+
ALTER COLUMN entity_fingerprint SET NOT NULL
57+
"""
58+
)
59+
op.execute(
60+
"""
61+
ALTER TABLE search_vector_chunks
62+
ALTER COLUMN embedding_model SET NOT NULL
63+
"""
64+
)
65+
66+
67+
def downgrade() -> None:
68+
"""Remove vector sync fingerprint columns from Postgres chunk rows."""
69+
connection = op.get_bind()
70+
if connection.dialect.name != "postgresql":
71+
return
72+
73+
op.execute(
74+
"""
75+
ALTER TABLE search_vector_chunks
76+
DROP COLUMN IF EXISTS embedding_model
77+
"""
78+
)
79+
op.execute(
80+
"""
81+
ALTER TABLE search_vector_chunks
82+
DROP COLUMN IF EXISTS entity_fingerprint
83+
"""
84+
)

src/basic_memory/models/search.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@
104104
chunk_key TEXT NOT NULL,
105105
chunk_text TEXT NOT NULL,
106106
source_hash TEXT NOT NULL,
107+
entity_fingerprint TEXT NOT NULL,
108+
embedding_model TEXT NOT NULL,
107109
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
108110
UNIQUE (project_id, entity_id, chunk_key)
109111
)
@@ -124,6 +126,8 @@
124126
chunk_key TEXT NOT NULL,
125127
chunk_text TEXT NOT NULL,
126128
source_hash TEXT NOT NULL,
129+
entity_fingerprint TEXT NOT NULL,
130+
embedding_model TEXT NOT NULL,
127131
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
128132
)
129133
""")

src/basic_memory/repository/postgres_search_repository.py

Lines changed: 133 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ async def _ensure_vector_tables(self) -> None:
305305
chunk_key TEXT NOT NULL,
306306
chunk_text TEXT NOT NULL,
307307
source_hash TEXT NOT NULL,
308+
entity_fingerprint TEXT NOT NULL,
309+
embedding_model TEXT NOT NULL,
308310
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
309311
UNIQUE (project_id, entity_id, chunk_key)
310312
)
@@ -319,6 +321,47 @@ async def _ensure_vector_tables(self) -> None:
319321
"""
320322
)
321323
)
324+
await session.execute(
325+
text(
326+
"""
327+
ALTER TABLE search_vector_chunks
328+
ADD COLUMN IF NOT EXISTS entity_fingerprint TEXT
329+
"""
330+
)
331+
)
332+
await session.execute(
333+
text(
334+
"""
335+
ALTER TABLE search_vector_chunks
336+
ADD COLUMN IF NOT EXISTS embedding_model TEXT
337+
"""
338+
)
339+
)
340+
await session.execute(
341+
text(
342+
"""
343+
UPDATE search_vector_chunks
344+
SET entity_fingerprint = COALESCE(entity_fingerprint, ''),
345+
embedding_model = COALESCE(embedding_model, '')
346+
"""
347+
)
348+
)
349+
await session.execute(
350+
text(
351+
"""
352+
ALTER TABLE search_vector_chunks
353+
ALTER COLUMN entity_fingerprint SET NOT NULL
354+
"""
355+
)
356+
)
357+
await session.execute(
358+
text(
359+
"""
360+
ALTER TABLE search_vector_chunks
361+
ALTER COLUMN embedding_model SET NOT NULL
362+
"""
363+
)
364+
)
322365

323366
# --- Embeddings table (dimension-dependent, created at runtime) ---
324367
# Trigger: provider dimensions may differ from what was previously deployed.
@@ -521,6 +564,10 @@ async def sync_entity_vectors_batch(
521564
prepared_sync = cast(_PreparedEntityVectorSync, prepared)
522565

523566
embedding_jobs_count = len(prepared_sync.embedding_jobs)
567+
result.chunks_total += prepared_sync.chunks_total
568+
result.chunks_skipped += prepared_sync.chunks_skipped
569+
if prepared_sync.entity_skipped:
570+
result.entities_skipped += 1
524571
result.embedding_jobs_total += embedding_jobs_count
525572
result.prepare_seconds_total += prepared_sync.prepare_seconds
526573

@@ -537,7 +584,10 @@ async def sync_entity_vectors_batch(
537584
embed_seconds=0.0,
538585
write_seconds=0.0,
539586
source_rows_count=prepared_sync.source_rows_count,
587+
chunks_total=prepared_sync.chunks_total,
588+
chunks_skipped=prepared_sync.chunks_skipped,
540589
embedding_jobs_count=0,
590+
entity_skipped=prepared_sync.entity_skipped,
541591
)
542592
continue
543593

@@ -546,6 +596,9 @@ async def sync_entity_vectors_batch(
546596
source_rows_count=prepared_sync.source_rows_count,
547597
embedding_jobs_count=embedding_jobs_count,
548598
remaining_jobs=embedding_jobs_count,
599+
chunks_total=prepared_sync.chunks_total,
600+
chunks_skipped=prepared_sync.chunks_skipped,
601+
entity_skipped=prepared_sync.entity_skipped,
549602
prepare_seconds=prepared_sync.prepare_seconds,
550603
)
551604
pending_jobs.extend(
@@ -634,13 +687,18 @@ async def sync_entity_vectors_batch(
634687
logger.info(
635688
"Vector batch sync complete: project_id={project_id} entities_total={entities_total} "
636689
"entities_synced={entities_synced} entities_failed={entities_failed} "
637-
"embedding_jobs_total={embedding_jobs_total} prepare_seconds_total={prepare_seconds_total:.3f} "
690+
"entities_skipped={entities_skipped} chunks_total={chunks_total} "
691+
"chunks_skipped={chunks_skipped} embedding_jobs_total={embedding_jobs_total} "
692+
"prepare_seconds_total={prepare_seconds_total:.3f} "
638693
"queue_wait_seconds_total={queue_wait_seconds_total:.3f} "
639694
"embed_seconds_total={embed_seconds_total:.3f} write_seconds_total={write_seconds_total:.3f}",
640695
project_id=self.project_id,
641696
entities_total=result.entities_total,
642697
entities_synced=result.entities_synced,
643698
entities_failed=result.entities_failed,
699+
entities_skipped=result.entities_skipped,
700+
chunks_total=result.chunks_total,
701+
chunks_skipped=result.chunks_skipped,
644702
embedding_jobs_total=result.embedding_jobs_total,
645703
prepare_seconds_total=result.prepare_seconds_total,
646704
queue_wait_seconds_total=result.queue_wait_seconds_total,
@@ -707,6 +765,8 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
707765

708766
chunk_records = self._build_chunk_records(rows)
709767
built_chunk_records_count = len(chunk_records)
768+
current_entity_fingerprint = self._build_entity_fingerprint(chunk_records)
769+
current_embedding_model = self._embedding_model_key()
710770
logger.info(
711771
"Vector sync source prepared: project_id={project_id} entity_id={entity_id} "
712772
"source_rows_count={source_rows_count} "
@@ -729,7 +789,8 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
729789

730790
existing_rows_result = await session.execute(
731791
text(
732-
"SELECT c.id, c.chunk_key, c.source_hash, "
792+
"SELECT c.id, c.chunk_key, c.source_hash, c.entity_fingerprint, "
793+
"c.embedding_model, "
733794
"(e.chunk_id IS NOT NULL) AS has_embedding "
734795
"FROM search_vector_chunks c "
735796
"LEFT JOIN search_vector_embeddings e ON e.chunk_id = c.id "
@@ -754,8 +815,43 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
754815
orphan_ids = {int(row["id"]) for row in existing_rows if not bool(row["has_embedding"])}
755816
orphan_chunks_count = len(orphan_ids)
756817

818+
skip_unchanged_entity = (
819+
existing_chunks_count == built_chunk_records_count
820+
and stale_chunks_count == 0
821+
and orphan_chunks_count == 0
822+
and existing_chunks_count > 0
823+
and all(
824+
row["entity_fingerprint"] == current_entity_fingerprint
825+
and row["embedding_model"] == current_embedding_model
826+
for row in existing_rows
827+
)
828+
)
829+
if skip_unchanged_entity:
830+
logger.info(
831+
"Vector sync skipped unchanged entity: project_id={project_id} "
832+
"entity_id={entity_id} chunks_skipped={chunks_skipped} "
833+
"entity_fingerprint={entity_fingerprint} embedding_model={embedding_model}",
834+
project_id=self.project_id,
835+
entity_id=entity_id,
836+
chunks_skipped=built_chunk_records_count,
837+
entity_fingerprint=current_entity_fingerprint,
838+
embedding_model=current_embedding_model,
839+
)
840+
prepare_seconds = time.perf_counter() - sync_start
841+
return _PreparedEntityVectorSync(
842+
entity_id=entity_id,
843+
sync_start=sync_start,
844+
source_rows_count=source_rows_count,
845+
embedding_jobs=[],
846+
chunks_total=built_chunk_records_count,
847+
chunks_skipped=built_chunk_records_count,
848+
entity_skipped=True,
849+
prepare_seconds=prepare_seconds,
850+
)
851+
757852
upsert_records: list[dict[str, str]] = []
758853
embedding_jobs: list[tuple[int, str]] = []
854+
skipped_chunks_count = 0
759855

760856
for record in chunk_records:
761857
current = existing_by_key.get(record["chunk_key"])
@@ -765,9 +861,29 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
765861

766862
row_id = int(current["id"])
767863
is_orphan = row_id in orphan_ids
768-
if current["source_hash"] == record["source_hash"]:
769-
if is_orphan:
770-
embedding_jobs.append((row_id, record["chunk_text"]))
864+
same_source_hash = current["source_hash"] == record["source_hash"]
865+
same_entity_fingerprint = (
866+
current["entity_fingerprint"] == current_entity_fingerprint
867+
)
868+
same_embedding_model = current["embedding_model"] == current_embedding_model
869+
870+
if same_source_hash and not is_orphan and same_embedding_model:
871+
if not same_entity_fingerprint:
872+
await session.execute(
873+
text(
874+
"UPDATE search_vector_chunks "
875+
"SET entity_fingerprint = :entity_fingerprint, "
876+
"embedding_model = :embedding_model, "
877+
"updated_at = NOW() "
878+
"WHERE id = :id"
879+
),
880+
{
881+
"id": row_id,
882+
"entity_fingerprint": current_entity_fingerprint,
883+
"embedding_model": current_embedding_model,
884+
},
885+
)
886+
skipped_chunks_count += 1
771887
continue
772888

773889
upsert_records.append(record)
@@ -782,10 +898,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
782898
upsert_params[f"chunk_key_{index}"] = record["chunk_key"]
783899
upsert_params[f"chunk_text_{index}"] = record["chunk_text"]
784900
upsert_params[f"source_hash_{index}"] = record["source_hash"]
901+
upsert_params[f"entity_fingerprint_{index}"] = current_entity_fingerprint
902+
upsert_params[f"embedding_model_{index}"] = current_embedding_model
785903
upsert_values.append(
786904
"("
787905
":entity_id, :project_id, "
788-
f":chunk_key_{index}, :chunk_text_{index}, :source_hash_{index}, NOW()"
906+
f":chunk_key_{index}, :chunk_text_{index}, :source_hash_{index}, "
907+
f":entity_fingerprint_{index}, :embedding_model_{index}, NOW()"
789908
")"
790909
)
791910

@@ -797,11 +916,15 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
797916
chunk_key,
798917
chunk_text,
799918
source_hash,
919+
entity_fingerprint,
920+
embedding_model,
800921
updated_at
801922
) VALUES {", ".join(upsert_values)}
802923
ON CONFLICT (project_id, entity_id, chunk_key) DO UPDATE SET
803924
chunk_text = EXCLUDED.chunk_text,
804925
source_hash = EXCLUDED.source_hash,
926+
entity_fingerprint = EXCLUDED.entity_fingerprint,
927+
embedding_model = EXCLUDED.embedding_model,
805928
updated_at = NOW()
806929
RETURNING id, chunk_key
807930
"""),
@@ -819,12 +942,14 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
819942
"existing_chunks_count={existing_chunks_count} "
820943
"stale_chunks_count={stale_chunks_count} "
821944
"orphan_chunks_count={orphan_chunks_count} "
945+
"chunks_skipped={chunks_skipped} "
822946
"embedding_jobs_count={embedding_jobs_count}",
823947
project_id=self.project_id,
824948
entity_id=entity_id,
825949
existing_chunks_count=existing_chunks_count,
826950
stale_chunks_count=stale_chunks_count,
827951
orphan_chunks_count=orphan_chunks_count,
952+
chunks_skipped=skipped_chunks_count,
828953
embedding_jobs_count=len(embedding_jobs),
829954
)
830955

@@ -834,6 +959,8 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
834959
sync_start=sync_start,
835960
source_rows_count=source_rows_count,
836961
embedding_jobs=embedding_jobs,
962+
chunks_total=built_chunk_records_count,
963+
chunks_skipped=skipped_chunks_count,
837964
prepare_seconds=prepare_seconds,
838965
)
839966

0 commit comments

Comments
 (0)