Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/basic_memory/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ class BasicMemoryConfig(BaseSettings):
gt=0,
)

sync_batch_size: int = Field(
default=100,
description="Number of files to process in a single database transaction during sync. Higher values improve performance with remote databases (Postgres) but increase memory usage. Typical values: 100 (conservative), 500 (balanced), 1000 (aggressive).",
gt=0,
)

kebab_filenames: bool = Field(
default=False,
description="Format for generated filenames. False preserves spaces and special chars, True converts them to hyphens for consistency with permalinks",
Expand Down
109 changes: 109 additions & 0 deletions src/basic_memory/repository/entity_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,38 @@ async def get_by_file_path(self, file_path: Union[Path, str]) -> Optional[Entity
)
return await self.find_one(query)

async def get_by_file_paths_batch(
self, file_paths: Sequence[Union[Path, str]]
) -> dict[str, Entity]:
"""Batch fetch entities by file paths with eager-loaded relationships.

Optimized for scan operations - reduces N queries to 1 batched query.
Returns entities with relationships already loaded via selectinload.

Args:
file_paths: List of file paths to fetch entities for

Returns:
Dict mapping file_path (as posix string) -> Entity
Only includes entities that exist; missing files are not in dict
"""
if not file_paths:
return {}

# Convert all paths to posix strings
posix_paths = [Path(p).as_posix() for p in file_paths]

# Batch query with eager loading
query = (
self.select().where(Entity.file_path.in_(posix_paths)).options(*self.get_load_options())
)

result = await self.execute_query(query)
entities = list(result.scalars().all())

# Return as dict for O(1) lookup
return {e.file_path: e for e in entities}

async def find_by_checksum(self, checksum: str) -> Sequence[Entity]:
"""Find entities with the given checksum.

Expand Down Expand Up @@ -338,3 +370,80 @@ async def _handle_permalink_conflict(self, entity: Entity, session: AsyncSession
# Re-raise if not a foreign key error
raise
return entity

async def upsert_entities(self, entities: List[Entity]) -> List[Entity]:
"""Bulk insert or update multiple entities in a single transaction.

Optimized for batch operations with remote databases (Postgres).
Handles conflicts the same way as upsert_entity() but processes
all entities in one transaction.

Args:
entities: List of entities to upsert

Returns:
List of upserted entities with relationships loaded

Raises:
SyncFatalError: If any entity references a non-existent project_id
"""
if not entities:
return []

async with db.scoped_session(self.session_maker) as session:
# Set project_id on all entities if needed
for entity in entities:
self._set_project_id_if_needed(entity)

# Try to add all entities
for entity in entities:
session.add(entity)

try:
await session.flush()

# Fetch all entities with relationships loaded
file_paths = [e.file_path for e in entities]
query = (
self.select()
.where(Entity.file_path.in_(file_paths))
.options(*self.get_load_options())
)
result = await session.execute(query)
return list(result.scalars().all())

except IntegrityError as e:
# Check for foreign key constraint failures
error_str = str(e)
if (
"FOREIGN KEY constraint failed" in error_str
or "violates foreign key constraint" in error_str
):
from basic_memory.services.exceptions import SyncFatalError

raise SyncFatalError(
"Cannot sync entities: project_id does not exist in database. "
"The project may have been deleted. This sync will be terminated."
) from e

# For other integrity errors (file_path or permalink conflicts),
# rollback and fall back to individual processing
await session.rollback()

# Process each entity individually to handle conflicts properly
logger.debug(
f"Batch upsert failed with IntegrityError, falling back to individual upserts for {len(entities)} entities"
)

result_entities = []
for entity in entities:
try:
upserted = await self.upsert_entity(entity)
result_entities.append(upserted)
except Exception as individual_error:
logger.error(
f"Failed to upsert entity {entity.file_path}: {individual_error}"
)
# Continue with other entities

return result_entities
30 changes: 30 additions & 0 deletions src/basic_memory/repository/observation_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,33 @@ async def find_by_entities(self, entity_ids: List[int]) -> Dict[int, List[Observ
observations_by_entity[obs.entity_id].append(obs)

return observations_by_entity

async def delete_by_entity_ids(self, entity_ids: List[int]) -> int:
"""Delete all observations for multiple entities in a single query.

Optimized for batch operations - deletes observations for many entities
in one database transaction.

Args:
entity_ids: List of entity IDs whose observations should be deleted

Returns:
Number of observations deleted
"""
if not entity_ids:
return 0

from basic_memory import db

async with db.scoped_session(self.session_maker) as session:
# Use bulk delete with IN clause
query = select(Observation).where(Observation.entity_id.in_(entity_ids))
result = await session.execute(query)
observations_to_delete = result.scalars().all()

# Delete all observations
for obs in observations_to_delete:
await session.delete(obs)

await session.flush()
return len(observations_to_delete)
21 changes: 21 additions & 0 deletions src/basic_memory/repository/relation_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,27 @@ async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
async with db.scoped_session(self.session_maker) as session:
await session.execute(delete(Relation).where(Relation.from_id == entity_id))

async def delete_outgoing_relations_from_entities(self, entity_ids: List[int]) -> int:
"""Delete outgoing relations for multiple entities in a single query.

Optimized for batch operations - deletes relations for many entities
in one database transaction. Only deletes relations where these entities
are the source (from_id).

Args:
entity_ids: List of entity IDs whose outgoing relations should be deleted

Returns:
Number of relations deleted
"""
if not entity_ids:
return 0

async with db.scoped_session(self.session_maker) as session:
# Use bulk delete with IN clause
result = await session.execute(delete(Relation).where(Relation.from_id.in_(entity_ids)))
return result.rowcount or 0

async def find_unresolved_relations(self) -> Sequence[Relation]:
"""Find all unresolved relations, where to_id is null."""
query = select(Relation).filter(Relation.to_id.is_(None))
Expand Down
Loading
Loading