Skip to content

Commit 3175e7c

Browse files
committed
refactor timing for indexing
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent 901eb00 commit 3175e7c

File tree

5 files changed

+232
-19
lines changed

5 files changed

+232
-19
lines changed

src/basic_memory/repository/postgres_search_repository.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,13 +522,18 @@ async def sync_entity_vectors_batch(
522522

523523
embedding_jobs_count = len(prepared_sync.embedding_jobs)
524524
result.embedding_jobs_total += embedding_jobs_count
525+
result.prepare_seconds_total += prepared_sync.prepare_seconds
525526

526527
if embedding_jobs_count == 0:
527528
synced_entity_ids.add(entity_id)
528529
total_seconds = time.perf_counter() - prepared_sync.sync_start
530+
queue_wait_seconds = max(0.0, total_seconds - prepared_sync.prepare_seconds)
531+
result.queue_wait_seconds_total += queue_wait_seconds
529532
self._log_vector_sync_complete(
530533
entity_id=entity_id,
531534
total_seconds=total_seconds,
535+
prepare_seconds=prepared_sync.prepare_seconds,
536+
queue_wait_seconds=queue_wait_seconds,
532537
embed_seconds=0.0,
533538
write_seconds=0.0,
534539
source_rows_count=prepared_sync.source_rows_count,
@@ -541,6 +546,7 @@ async def sync_entity_vectors_batch(
541546
source_rows_count=prepared_sync.source_rows_count,
542547
embedding_jobs_count=embedding_jobs_count,
543548
remaining_jobs=embedding_jobs_count,
549+
prepare_seconds=prepared_sync.prepare_seconds,
544550
)
545551
pending_jobs.extend(
546552
_PendingEmbeddingJob(
@@ -562,6 +568,10 @@ async def sync_entity_vectors_batch(
562568
)
563569
result.embed_seconds_total += embed_seconds
564570
result.write_seconds_total += write_seconds
571+
(result.queue_wait_seconds_total) += self._finalize_completed_entity_syncs(
572+
entity_runtime=entity_runtime,
573+
synced_entity_ids=synced_entity_ids,
574+
)
565575
except Exception as exc:
566576
affected_entity_ids = sorted({job.entity_id for job in flush_jobs})
567577
failed_entity_ids.update(affected_entity_ids)
@@ -588,6 +598,10 @@ async def sync_entity_vectors_batch(
588598
)
589599
result.embed_seconds_total += embed_seconds
590600
result.write_seconds_total += write_seconds
601+
(result.queue_wait_seconds_total) += self._finalize_completed_entity_syncs(
602+
entity_runtime=entity_runtime,
603+
synced_entity_ids=synced_entity_ids,
604+
)
591605
except Exception as exc:
592606
affected_entity_ids = sorted({job.entity_id for job in flush_jobs})
593607
failed_entity_ids.update(affected_entity_ids)
@@ -620,13 +634,16 @@ async def sync_entity_vectors_batch(
620634
logger.info(
621635
"Vector batch sync complete: project_id={project_id} entities_total={entities_total} "
622636
"entities_synced={entities_synced} entities_failed={entities_failed} "
623-
"embedding_jobs_total={embedding_jobs_total} embed_seconds_total={embed_seconds_total:.3f} "
624-
"write_seconds_total={write_seconds_total:.3f}",
637+
"embedding_jobs_total={embedding_jobs_total} prepare_seconds_total={prepare_seconds_total:.3f} "
638+
"queue_wait_seconds_total={queue_wait_seconds_total:.3f} "
639+
"embed_seconds_total={embed_seconds_total:.3f} write_seconds_total={write_seconds_total:.3f}",
625640
project_id=self.project_id,
626641
entities_total=result.entities_total,
627642
entities_synced=result.entities_synced,
628643
entities_failed=result.entities_failed,
629644
embedding_jobs_total=result.embedding_jobs_total,
645+
prepare_seconds_total=result.prepare_seconds_total,
646+
queue_wait_seconds_total=result.queue_wait_seconds_total,
630647
embed_seconds_total=result.embed_seconds_total,
631648
write_seconds_total=result.write_seconds_total,
632649
)
@@ -679,11 +696,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
679696
source_rows_count=source_rows_count,
680697
)
681698
await self._delete_entity_chunks(session, entity_id)
699+
prepare_seconds = time.perf_counter() - sync_start
682700
return _PreparedEntityVectorSync(
683701
entity_id=entity_id,
684702
sync_start=sync_start,
685703
source_rows_count=source_rows_count,
686704
embedding_jobs=[],
705+
prepare_seconds=prepare_seconds,
687706
)
688707

689708
chunk_records = self._build_chunk_records(rows)
@@ -699,11 +718,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
699718
)
700719
if not chunk_records:
701720
await self._delete_entity_chunks(session, entity_id)
721+
prepare_seconds = time.perf_counter() - sync_start
702722
return _PreparedEntityVectorSync(
703723
entity_id=entity_id,
704724
sync_start=sync_start,
705725
source_rows_count=source_rows_count,
706726
embedding_jobs=[],
727+
prepare_seconds=prepare_seconds,
707728
)
708729

709730
existing_rows_result = await session.execute(
@@ -807,11 +828,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
807828
embedding_jobs_count=len(embedding_jobs),
808829
)
809830

831+
prepare_seconds = time.perf_counter() - sync_start
810832
return _PreparedEntityVectorSync(
811833
entity_id=entity_id,
812834
sync_start=sync_start,
813835
source_rows_count=source_rows_count,
814836
embedding_jobs=embedding_jobs,
837+
prepare_seconds=prepare_seconds,
815838
)
816839

817840
async def _write_embeddings(

src/basic_memory/repository/search_repository_base.py

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class VectorSyncBatchResult:
4444
entities_failed: int
4545
failed_entity_ids: list[int] = field(default_factory=list)
4646
embedding_jobs_total: int = 0
47+
prepare_seconds_total: float = 0.0
48+
queue_wait_seconds_total: float = 0.0
4749
embed_seconds_total: float = 0.0
4850
write_seconds_total: float = 0.0
4951

@@ -56,6 +58,7 @@ class _PreparedEntityVectorSync:
5658
sync_start: float
5759
source_rows_count: int
5860
embedding_jobs: list[tuple[int, str]]
61+
prepare_seconds: float = 0.0
5962

6063

6164
@dataclass
@@ -75,6 +78,7 @@ class _EntitySyncRuntime:
7578
source_rows_count: int
7679
embedding_jobs_count: int
7780
remaining_jobs: int
81+
prepare_seconds: float = 0.0
7882
embed_seconds: float = 0.0
7983
write_seconds: float = 0.0
8084

@@ -696,13 +700,18 @@ async def _sync_entity_vectors_internal(
696700

697701
embedding_jobs_count = len(prepared.embedding_jobs)
698702
result.embedding_jobs_total += embedding_jobs_count
703+
result.prepare_seconds_total += prepared.prepare_seconds
699704

700705
if embedding_jobs_count == 0:
701706
synced_entity_ids.add(entity_id)
702707
total_seconds = time.perf_counter() - prepared.sync_start
708+
queue_wait_seconds = max(0.0, total_seconds - prepared.prepare_seconds)
709+
result.queue_wait_seconds_total += queue_wait_seconds
703710
self._log_vector_sync_complete(
704711
entity_id=entity_id,
705712
total_seconds=total_seconds,
713+
prepare_seconds=prepared.prepare_seconds,
714+
queue_wait_seconds=queue_wait_seconds,
706715
embed_seconds=0.0,
707716
write_seconds=0.0,
708717
source_rows_count=prepared.source_rows_count,
@@ -715,6 +724,7 @@ async def _sync_entity_vectors_internal(
715724
source_rows_count=prepared.source_rows_count,
716725
embedding_jobs_count=embedding_jobs_count,
717726
remaining_jobs=embedding_jobs_count,
727+
prepare_seconds=prepared.prepare_seconds,
718728
)
719729
pending_jobs.extend(
720730
_PendingEmbeddingJob(
@@ -734,6 +744,10 @@ async def _sync_entity_vectors_internal(
734744
)
735745
result.embed_seconds_total += embed_seconds
736746
result.write_seconds_total += write_seconds
747+
(result.queue_wait_seconds_total) += self._finalize_completed_entity_syncs(
748+
entity_runtime=entity_runtime,
749+
synced_entity_ids=synced_entity_ids,
750+
)
737751
except Exception as exc:
738752
if not continue_on_error:
739753
raise
@@ -761,6 +775,10 @@ async def _sync_entity_vectors_internal(
761775
)
762776
result.embed_seconds_total += embed_seconds
763777
result.write_seconds_total += write_seconds
778+
(result.queue_wait_seconds_total) += self._finalize_completed_entity_syncs(
779+
entity_runtime=entity_runtime,
780+
synced_entity_ids=synced_entity_ids,
781+
)
764782
except Exception as exc:
765783
if not continue_on_error:
766784
raise
@@ -799,13 +817,16 @@ async def _sync_entity_vectors_internal(
799817
logger.info(
800818
"Vector batch sync complete: project_id={project_id} entities_total={entities_total} "
801819
"entities_synced={entities_synced} entities_failed={entities_failed} "
802-
"embedding_jobs_total={embedding_jobs_total} embed_seconds_total={embed_seconds_total:.3f} "
803-
"write_seconds_total={write_seconds_total:.3f}",
820+
"embedding_jobs_total={embedding_jobs_total} prepare_seconds_total={prepare_seconds_total:.3f} "
821+
"queue_wait_seconds_total={queue_wait_seconds_total:.3f} "
822+
"embed_seconds_total={embed_seconds_total:.3f} write_seconds_total={write_seconds_total:.3f}",
804823
project_id=self.project_id,
805824
entities_total=result.entities_total,
806825
entities_synced=result.entities_synced,
807826
entities_failed=result.entities_failed,
808827
embedding_jobs_total=result.embedding_jobs_total,
828+
prepare_seconds_total=result.prepare_seconds_total,
829+
queue_wait_seconds_total=result.queue_wait_seconds_total,
809830
embed_seconds_total=result.embed_seconds_total,
810831
write_seconds_total=result.write_seconds_total,
811832
)
@@ -863,11 +884,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
863884
)
864885
await self._delete_entity_chunks(session, entity_id)
865886
await session.commit()
887+
prepare_seconds = time.perf_counter() - sync_start
866888
return _PreparedEntityVectorSync(
867889
entity_id=entity_id,
868890
sync_start=sync_start,
869891
source_rows_count=source_rows_count,
870892
embedding_jobs=[],
893+
prepare_seconds=prepare_seconds,
871894
)
872895

873896
chunk_records = self._build_chunk_records(rows)
@@ -884,11 +907,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
884907
if not chunk_records:
885908
await self._delete_entity_chunks(session, entity_id)
886909
await session.commit()
910+
prepare_seconds = time.perf_counter() - sync_start
887911
return _PreparedEntityVectorSync(
888912
entity_id=entity_id,
889913
sync_start=sync_start,
890914
source_rows_count=source_rows_count,
891915
embedding_jobs=[],
916+
prepare_seconds=prepare_seconds,
892917
)
893918

894919
# --- Diff existing chunks against incoming ---
@@ -994,11 +1019,13 @@ async def _prepare_entity_vector_jobs(self, entity_id: int) -> _PreparedEntityVe
9941019
)
9951020
await session.commit()
9961021

1022+
prepare_seconds = time.perf_counter() - sync_start
9971023
return _PreparedEntityVectorSync(
9981024
entity_id=entity_id,
9991025
sync_start=sync_start,
10001026
source_rows_count=source_rows_count,
10011027
embedding_jobs=embedding_jobs,
1028+
prepare_seconds=prepare_seconds,
10021029
)
10031030

10041031
async def _flush_embedding_jobs(
@@ -1063,24 +1090,52 @@ async def _flush_embedding_jobs(
10631090

10641091
if runtime.remaining_jobs <= 0:
10651092
synced_entity_ids.add(entity_id)
1066-
total_seconds = time.perf_counter() - runtime.sync_start
1067-
self._log_vector_sync_complete(
1068-
entity_id=entity_id,
1069-
total_seconds=total_seconds,
1070-
embed_seconds=runtime.embed_seconds,
1071-
write_seconds=runtime.write_seconds,
1072-
source_rows_count=runtime.source_rows_count,
1073-
embedding_jobs_count=runtime.embedding_jobs_count,
1074-
)
1075-
entity_runtime.pop(entity_id, None)
10761093

10771094
return embed_seconds, write_seconds
10781095

1096+
def _finalize_completed_entity_syncs(
1097+
self,
1098+
*,
1099+
entity_runtime: dict[int, _EntitySyncRuntime],
1100+
synced_entity_ids: set[int],
1101+
) -> float:
1102+
"""Finalize completed entities and return cumulative queue wait seconds."""
1103+
queue_wait_seconds_total = 0.0
1104+
for entity_id, runtime in list(entity_runtime.items()):
1105+
if runtime.remaining_jobs > 0:
1106+
continue
1107+
1108+
synced_entity_ids.add(entity_id)
1109+
total_seconds = time.perf_counter() - runtime.sync_start
1110+
queue_wait_seconds = max(
1111+
0.0,
1112+
total_seconds
1113+
- runtime.prepare_seconds
1114+
- runtime.embed_seconds
1115+
- runtime.write_seconds,
1116+
)
1117+
queue_wait_seconds_total += queue_wait_seconds
1118+
self._log_vector_sync_complete(
1119+
entity_id=entity_id,
1120+
total_seconds=total_seconds,
1121+
prepare_seconds=runtime.prepare_seconds,
1122+
queue_wait_seconds=queue_wait_seconds,
1123+
embed_seconds=runtime.embed_seconds,
1124+
write_seconds=runtime.write_seconds,
1125+
source_rows_count=runtime.source_rows_count,
1126+
embedding_jobs_count=runtime.embedding_jobs_count,
1127+
)
1128+
entity_runtime.pop(entity_id, None)
1129+
1130+
return queue_wait_seconds_total
1131+
10791132
def _log_vector_sync_complete(
10801133
self,
10811134
*,
10821135
entity_id: int,
10831136
total_seconds: float,
1137+
prepare_seconds: float,
1138+
queue_wait_seconds: float,
10841139
embed_seconds: float,
10851140
write_seconds: float,
10861141
source_rows_count: int,
@@ -1089,12 +1144,15 @@ def _log_vector_sync_complete(
10891144
"""Log completion and slow-entity warnings with a consistent format."""
10901145
logger.info(
10911146
"Vector sync complete: project_id={project_id} entity_id={entity_id} "
1092-
"total_seconds={total_seconds:.3f} embed_seconds={embed_seconds:.3f} "
1147+
"total_seconds={total_seconds:.3f} prepare_seconds={prepare_seconds:.3f} "
1148+
"queue_wait_seconds={queue_wait_seconds:.3f} embed_seconds={embed_seconds:.3f} "
10931149
"write_seconds={write_seconds:.3f} source_rows_count={source_rows_count} "
10941150
"embedding_jobs_count={embedding_jobs_count}",
10951151
project_id=self.project_id,
10961152
entity_id=entity_id,
10971153
total_seconds=total_seconds,
1154+
prepare_seconds=prepare_seconds,
1155+
queue_wait_seconds=queue_wait_seconds,
10981156
embed_seconds=embed_seconds,
10991157
write_seconds=write_seconds,
11001158
source_rows_count=source_rows_count,
@@ -1103,12 +1161,15 @@ def _log_vector_sync_complete(
11031161
if total_seconds > 10:
11041162
logger.warning(
11051163
"Vector sync slow entity: project_id={project_id} entity_id={entity_id} "
1106-
"total_seconds={total_seconds:.3f} embed_seconds={embed_seconds:.3f} "
1164+
"total_seconds={total_seconds:.3f} prepare_seconds={prepare_seconds:.3f} "
1165+
"queue_wait_seconds={queue_wait_seconds:.3f} embed_seconds={embed_seconds:.3f} "
11071166
"write_seconds={write_seconds:.3f} source_rows_count={source_rows_count} "
11081167
"embedding_jobs_count={embedding_jobs_count}",
11091168
project_id=self.project_id,
11101169
entity_id=entity_id,
11111170
total_seconds=total_seconds,
1171+
prepare_seconds=prepare_seconds,
1172+
queue_wait_seconds=queue_wait_seconds,
11121173
embed_seconds=embed_seconds,
11131174
write_seconds=write_seconds,
11141175
source_rows_count=source_rows_count,

test-int/semantic/test_semantic_quality.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,8 @@
5151
("sqlite-fastembed", "paraphrase", "hybrid"): 0.25,
5252
("postgres-fastembed", "lexical", "hybrid"): 0.37,
5353
("postgres-fastembed", "paraphrase", "hybrid"): 0.25,
54-
# OpenAI hybrid should handle paraphrases better than FastEmbed
55-
("postgres-openai", "lexical", "hybrid"): 0.37,
56-
("postgres-openai", "paraphrase", "hybrid"): 0.25,
54+
# OpenAI metrics are still recorded, but we do not gate on them yet.
55+
# The current benchmark corpus is too small to make that combo stable.
5756
}
5857

5958

0 commit comments

Comments
 (0)