Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions src/basic_memory/repository/entity_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ async def get_by_id(self, entity_id: int) -> Optional[Entity]: # pragma: no cov
async with db.scoped_session(self.session_maker) as session:
return await self.select_by_id(session, entity_id)

async def get_by_external_id(self, external_id: str) -> Optional[Entity]:
async def _find_one_by_query(self, query, *, load_relations: bool) -> Optional[Entity]:
"""Return one entity row with optional eager loading."""
if load_relations:
return await self.find_one(query)

result = await self.execute_query(query, use_query_options=False)
return result.scalars().one_or_none()

async def get_by_external_id(
self, external_id: str, *, load_relations: bool = True
) -> Optional[Entity]:
"""Get entity by external UUID.

Args:
Expand All @@ -54,21 +64,21 @@ async def get_by_external_id(self, external_id: str) -> Optional[Entity]:
Returns:
Entity if found, None otherwise
"""
query = (
self.select().where(Entity.external_id == external_id).options(*self.get_load_options())
)
return await self.find_one(query)
query = self.select().where(Entity.external_id == external_id)
return await self._find_one_by_query(query, load_relations=load_relations)

async def get_by_permalink(self, permalink: str) -> Optional[Entity]:
async def get_by_permalink(
self, permalink: str, *, load_relations: bool = True
) -> Optional[Entity]:
"""Get entity by permalink.

Args:
permalink: Unique identifier for the entity
"""
query = self.select().where(Entity.permalink == permalink).options(*self.get_load_options())
return await self.find_one(query)
query = self.select().where(Entity.permalink == permalink)
return await self._find_one_by_query(query, load_relations=load_relations)

async def get_by_title(self, title: str) -> Sequence[Entity]:
async def get_by_title(self, title: str, *, load_relations: bool = True) -> Sequence[Entity]:
"""Get entities by title, ordered by shortest path first.

When multiple entities share the same title (in different folders),
Expand All @@ -82,23 +92,20 @@ async def get_by_title(self, title: str) -> Sequence[Entity]:
self.select()
.where(Entity.title == title)
.order_by(func.length(Entity.file_path), Entity.file_path)
.options(*self.get_load_options())
)
result = await self.execute_query(query)
result = await self.execute_query(query, use_query_options=load_relations)
return list(result.scalars().all())

async def get_by_file_path(self, file_path: Union[Path, str]) -> Optional[Entity]:
async def get_by_file_path(
self, file_path: Union[Path, str], *, load_relations: bool = True
) -> Optional[Entity]:
"""Get entity by file_path.

Args:
file_path: Path to the entity file (will be converted to string internally)
"""
query = (
self.select()
.where(Entity.file_path == Path(file_path).as_posix())
.options(*self.get_load_options())
)
return await self.find_one(query)
query = self.select().where(Entity.file_path == Path(file_path).as_posix())
return await self._find_one_by_query(query, load_relations=load_relations)

# -------------------------------------------------------------------------
# Lightweight methods for permalink resolution (no eager loading)
Expand Down
143 changes: 108 additions & 35 deletions src/basic_memory/services/entity_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,17 @@ async def create_or_update_entity(self, schema: EntitySchema) -> Tuple[EntityMod

# Try to find existing entity using strict resolution (no fuzzy search)
# This prevents incorrectly matching similar file paths like "Node A.md" and "Node C.md"
existing = await self.link_resolver.resolve_link(schema.file_path, strict=True)
existing = await self.link_resolver.resolve_link(
schema.file_path,
strict=True,
load_relations=False,
)
if not existing and schema.permalink:
existing = await self.link_resolver.resolve_link(schema.permalink, strict=True)
existing = await self.link_resolver.resolve_link(
schema.permalink,
strict=True,
load_relations=False,
)

if existing:
logger.debug(f"Found existing entity: {existing.file_path}")
Expand Down Expand Up @@ -840,10 +848,22 @@ async def update_entity_and_observations(
"""
logger.debug(f"Updating entity and observations: {file_path}")

db_entity = await self.repository.get_by_file_path(file_path.as_posix())
with telemetry.scope(
"upsert.update.fetch_entity",
domain="entity_service",
action="upsert",
phase="fetch_entity",
):
db_entity = await self.repository.get_by_file_path(file_path.as_posix())

# Clear observations for entity
await self.observation_repository.delete_by_fields(entity_id=db_entity.id)
with telemetry.scope(
"upsert.update.delete_observations",
domain="entity_service",
action="upsert",
phase="delete_observations",
):
await self.observation_repository.delete_by_fields(entity_id=db_entity.id)

# add new observations
observations = [
Expand All @@ -857,7 +877,14 @@ async def update_entity_and_observations(
)
for obs in markdown.observations
]
await self.observation_repository.add_all(observations)
with telemetry.scope(
"upsert.update.insert_observations",
domain="entity_service",
action="upsert",
phase="insert_observations",
count=len(observations),
):
await self.observation_repository.add_all(observations)

# update values from markdown
db_entity = entity_model_from_markdown(file_path, markdown, db_entity)
Expand All @@ -871,10 +898,16 @@ async def update_entity_and_observations(
db_entity.last_updated_by = user_id

# update entity
return await self.repository.update(
db_entity.id,
db_entity,
)
with telemetry.scope(
"upsert.update.save_entity",
domain="entity_service",
action="upsert",
phase="save_entity",
):
return await self.repository.update(
db_entity.id,
db_entity,
)

async def upsert_entity_from_markdown(
self,
Expand All @@ -888,20 +921,30 @@ async def upsert_entity_from_markdown(
created = await self.create_entity_from_markdown(file_path, markdown)
else:
created = await self.update_entity_and_observations(file_path, markdown)
return await self.update_entity_relations(created.file_path, markdown)
# Pass entity directly — avoids redundant get_by_file_path inside update_entity_relations
return await self.update_entity_relations(created, markdown)

async def update_entity_relations(
self,
path: str,
entity: EntityModel,
markdown: EntityMarkdown,
) -> EntityModel:
"""Update relations for entity"""
logger.debug(f"Updating relations for entity: {path}")
"""Update relations for entity.

db_entity = await self.repository.get_by_file_path(path)
Accepts the entity object directly to avoid a redundant DB fetch.
Only entity.id and entity.permalink are used from the passed-in object.
"""
entity_id = entity.id
logger.debug(f"Updating relations for entity: {entity.file_path}")

# Clear existing relations first
await self.relation_repository.delete_outgoing_relations_from_entity(db_entity.id)
with telemetry.scope(
"upsert.relations.delete_existing",
domain="entity_service",
action="upsert",
phase="delete_relations",
):
await self.relation_repository.delete_outgoing_relations_from_entity(entity_id)

# Batch resolve all relation targets in parallel
if markdown.relations:
Expand All @@ -911,12 +954,23 @@ async def update_entity_relations(
# Use strict=True to disable fuzzy search - only exact matches should create resolved relations
# This ensures forward references (links to non-existent entities) remain unresolved (to_id=NULL)
lookup_tasks = [
self.link_resolver.resolve_link(rel.target, strict=True)
self.link_resolver.resolve_link(
rel.target,
strict=True,
load_relations=False,
)
for rel in markdown.relations
]

# Execute all lookups in parallel
resolved_entities = await asyncio.gather(*lookup_tasks, return_exceptions=True)
with telemetry.scope(
"upsert.relations.resolve_links",
domain="entity_service",
action="upsert",
phase="resolve_links",
count=len(lookup_tasks),
):
resolved_entities = await asyncio.gather(*lookup_tasks, return_exceptions=True)

# Process results and create relation records
relations_to_add = []
Expand All @@ -935,7 +989,7 @@ async def update_entity_relations(
# Create the relation
relation = Relation(
project_id=self.relation_repository.project_id,
from_id=db_entity.id,
from_id=entity_id,
to_id=target_id,
to_name=target_name,
relation_type=rel.type,
Expand All @@ -945,22 +999,37 @@ async def update_entity_relations(

# Batch insert all relations
if relations_to_add:
try:
await self.relation_repository.add_all(relations_to_add)
except IntegrityError:
# Some relations might be duplicates - fall back to individual inserts
logger.debug("Batch relation insert failed, trying individual inserts")
for relation in relations_to_add:
try:
await self.relation_repository.add(relation)
except IntegrityError:
# Unique constraint violation - relation already exists
logger.debug(
f"Skipping duplicate relation {relation.relation_type} from {db_entity.permalink}"
)
continue

return await self.repository.get_by_file_path(path)
with telemetry.scope(
"upsert.relations.insert_relations",
domain="entity_service",
action="upsert",
phase="insert_relations",
count=len(relations_to_add),
):
try:
await self.relation_repository.add_all(relations_to_add)
except IntegrityError:
# Some relations might be duplicates - fall back to individual inserts
logger.debug("Batch relation insert failed, trying individual inserts")
for relation in relations_to_add:
try:
await self.relation_repository.add(relation)
except IntegrityError:
# Unique constraint violation - relation already exists
logger.debug(
f"Skipping duplicate relation {relation.relation_type} from {entity.permalink}"
)
continue

# Reload entity with relations via PK lookup (faster than get_by_file_path string match)
with telemetry.scope(
"upsert.relations.reload_entity",
domain="entity_service",
action="upsert",
phase="reload_entity",
):
reloaded = await self.repository.find_by_ids([entity_id])
return reloaded[0]

async def edit_entity(
self,
Expand Down Expand Up @@ -996,7 +1065,11 @@ async def edit_entity(
action="edit",
phase="resolve_entity",
):
entity = await self.link_resolver.resolve_link(identifier, strict=True)
entity = await self.link_resolver.resolve_link(
identifier,
strict=True,
load_relations=False,
)
if not entity:
raise EntityNotFoundError(f"Entity not found: {identifier}")

Expand Down
Loading
Loading