Skip to content

Commit 1fee913

Browse files
committed
fix(core): parallelize stale vector cleanup
Signed-off-by: phernandez <paul@basicmachines.co>
1 parent a83bee9 commit 1fee913

2 files changed

Lines changed: 33 additions & 17 deletions

File tree

src/basic_memory/services/search_service.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -469,25 +469,31 @@ async def sync_entity_vectors_batch(
469469
*(self._clear_entity_vectors(entity_id) for entity_id in opted_out_ids)
470470
)
471471

472-
repository_results: list[VectorSyncBatchResult] = []
473-
if unknown_ids:
474-
# Trigger: a caller passes entity IDs that were deleted after the batch was built.
475-
# Why: repository sync still owns stale chunk cleanup for IDs with no source rows.
476-
# Outcome: deleted entities do not silently keep orphaned vector rows forever.
477-
repository_results.append(await self.repository.sync_entity_vectors_batch(unknown_ids))
478-
479472
eligible_entity_ids = [
480473
entity_id
481474
for entity_id in entity_ids
482475
if entity_id in entities_by_id and entity_id not in opted_out_ids
483476
]
484-
if eligible_entity_ids:
485-
repository_results.append(
486-
await self.repository.sync_entity_vectors_batch(
487-
eligible_entity_ids,
488-
progress_callback=progress_callback,
489-
)
477+
478+
cleanup_task = (
479+
self.repository.sync_entity_vectors_batch(unknown_ids) if unknown_ids else None
480+
)
481+
eligible_task = (
482+
self.repository.sync_entity_vectors_batch(
483+
eligible_entity_ids,
484+
progress_callback=progress_callback,
490485
)
486+
if eligible_entity_ids
487+
else None
488+
)
489+
repository_results = [
490+
result
491+
for result in await asyncio.gather(
492+
cleanup_task if cleanup_task is not None else asyncio.sleep(0, result=None),
493+
eligible_task if eligible_task is not None else asyncio.sleep(0, result=None),
494+
)
495+
if result is not None
496+
]
491497

492498
if not repository_results:
493499
return VectorSyncBatchResult(
@@ -503,7 +509,9 @@ async def sync_entity_vectors_batch(
503509
entities_failed=sum(result.entities_failed for result in repository_results),
504510
entities_deferred=sum(result.entities_deferred for result in repository_results),
505511
entities_skipped=(
506-
len(opted_out_ids) + sum(result.entities_skipped for result in repository_results)
512+
len(opted_out_ids)
513+
+ sum(result.entities_skipped for result in repository_results)
514+
- len(unknown_ids)
507515
),
508516
failed_entity_ids=[
509517
failed_entity_id

tests/services/test_semantic_search.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ async def test_semantic_vector_sync_batch_cleans_up_unknown_ids(search_service,
273273
entities_total=1,
274274
entities_synced=1,
275275
entities_failed=0,
276+
entities_skipped=1,
276277
),
277278
VectorSyncBatchResult(
278279
entities_total=1,
@@ -282,12 +283,19 @@ async def test_semantic_vector_sync_batch_cleans_up_unknown_ids(search_service,
282283
]
283284
)
284285
monkeypatch.setattr(repository, "sync_entity_vectors_batch", sync_batch)
286+
progress_callback = AsyncMock()
285287

286-
result = await search_service.sync_entity_vectors_batch([41, 42])
288+
result = await search_service.sync_entity_vectors_batch([41, 42], progress_callback)
287289

288290
assert sync_batch.await_count == 2
289-
assert sync_batch.await_args_list[0].args[0] == [41]
290-
assert sync_batch.await_args_list[1].args[0] == [42]
291+
called_entity_ids = {tuple(call.args[0]) for call in sync_batch.await_args_list}
292+
assert called_entity_ids == {(41,), (42,)}
293+
progress_callback_calls = [
294+
call for call in sync_batch.await_args_list if call.kwargs.get("progress_callback") is not None
295+
]
296+
assert len(progress_callback_calls) == 1
297+
assert progress_callback_calls[0].args[0] == [42]
298+
assert progress_callback_calls[0].kwargs["progress_callback"] is progress_callback
291299
assert result.entities_total == 2
292300
assert result.entities_synced == 2
293301
assert result.entities_failed == 0

0 commit comments

Comments
 (0)