Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ concurrency:

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

Expand Down Expand Up @@ -144,7 +143,6 @@ jobs:
test-postgres-unit:
name: Test Postgres Unit (Python ${{ matrix.python-version }})
timeout-minutes: 30
if: github.event_name != 'pull_request' || matrix.python-version == '3.12'
strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -202,7 +200,6 @@ jobs:
test-postgres-integration:
name: Test Postgres Integration (Python ${{ matrix.python-version }})
timeout-minutes: 45
if: github.event_name != 'pull_request' || matrix.python-version == '3.12'
strategy:
fail-fast: false
matrix:
Expand Down
61 changes: 51 additions & 10 deletions src/basic_memory/api/v2/routers/knowledge_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,13 @@ async def create_entity(
):
if fast:
entity = await entity_service.fast_write_entity(data)
written_content = None
search_content = None
else:
entity = await entity_service.create_entity(data)
write_result = await entity_service.create_entity_with_content(data)
entity = write_result.entity
written_content = write_result.content
search_content = write_result.search_content

if fast:
with telemetry.scope(
Expand All @@ -329,7 +334,7 @@ async def create_entity(
action="create_entity",
phase="search_index",
):
await search_service.index_entity(entity)
await search_service.index_entity(entity, content=search_content)
with telemetry.scope(
"api.knowledge.create_entity.vector_sync",
domain="knowledge",
Expand All @@ -352,8 +357,15 @@ async def create_entity(
domain="knowledge",
action="create_entity",
phase="read_content",
source="file" if fast else "memory",
):
content = await file_service.read_file_content(entity.file_path)
if fast:
content = await file_service.read_file_content(entity.file_path)
else:
# Non-fast writes already captured the markdown in memory. Reuse it here
# instead of re-reading the file; format_on_save is the one config that can
# still make the persisted file diverge because write_file only returns a checksum.
content = written_content
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return persisted content after formatter rewrites

For non-fast writes, the response content now comes from in-memory written_content instead of reading the saved file. FileService.write_file can rewrite the file when format_on_save is enabled, so this branch can return pre-format text that no longer matches what was persisted, causing API clients to observe stale content immediately after a successful write; use the persisted file content (or formatter output) for the response body in this branch.

Useful? React with 👍 / 👎.

result = result.model_copy(update={"content": content})

logger.info(
Expand Down Expand Up @@ -421,18 +433,28 @@ async def update_entity_by_id(
):
if fast:
entity = await entity_service.fast_write_entity(data, external_id=entity_id)
written_content = None
search_content = None
response.status_code = 200 if existing else 201
else:
if existing:
entity = await entity_service.update_entity(existing, data)
write_result = await entity_service.update_entity_with_content(existing, data)
entity = write_result.entity
written_content = write_result.content
search_content = write_result.search_content
response.status_code = 200
else:
entity = await entity_service.create_entity(data)
write_result = await entity_service.create_entity_with_content(data)
entity = write_result.entity
written_content = write_result.content
search_content = write_result.search_content
if entity.external_id != entity_id:
entity = await entity_repository.update(
entity.id,
{"external_id": entity_id},
)
# external_id fixup only changes the DB row. The file content is unchanged,
# so the markdown captured during the write remains valid downstream.
if not entity:
raise HTTPException(
status_code=404,
Expand Down Expand Up @@ -461,7 +483,7 @@ async def update_entity_by_id(
action="update_entity",
phase="search_index",
):
await search_service.index_entity(entity)
await search_service.index_entity(entity, content=search_content)
with telemetry.scope(
"api.knowledge.update_entity.vector_sync",
domain="knowledge",
Expand All @@ -484,8 +506,15 @@ async def update_entity_by_id(
domain="knowledge",
action="update_entity",
phase="read_content",
source="file" if fast else "memory",
):
content = await file_service.read_file_content(entity.file_path)
if fast:
content = await file_service.read_file_content(entity.file_path)
else:
# Non-fast writes already captured the markdown in memory. Reuse it here
# instead of re-reading the file; format_on_save is the one config that can
# still make the persisted file diverge because write_file only returns a checksum.
content = written_content
result = result.model_copy(update={"content": content})

logger.info(
Expand Down Expand Up @@ -563,16 +592,21 @@ async def edit_entity_by_id(
find_text=data.find_text,
expected_replacements=data.expected_replacements,
)
written_content = None
search_content = None
else:
identifier = entity.permalink or entity.file_path
updated_entity = await entity_service.edit_entity(
write_result = await entity_service.edit_entity_with_content(
identifier=identifier,
operation=data.operation,
content=data.content,
section=data.section,
find_text=data.find_text,
expected_replacements=data.expected_replacements,
)
updated_entity = write_result.entity
written_content = write_result.content
search_content = write_result.search_content

if fast:
with telemetry.scope(
Expand All @@ -594,7 +628,7 @@ async def edit_entity_by_id(
action="edit_entity",
phase="search_index",
):
await search_service.index_entity(updated_entity)
await search_service.index_entity(updated_entity, content=search_content)
with telemetry.scope(
"api.knowledge.edit_entity.vector_sync",
domain="knowledge",
Expand All @@ -617,8 +651,15 @@ async def edit_entity_by_id(
domain="knowledge",
action="edit_entity",
phase="read_content",
source="file" if fast else "memory",
):
content = await file_service.read_file_content(updated_entity.file_path)
if fast:
content = await file_service.read_file_content(updated_entity.file_path)
else:
# Non-fast writes already captured the markdown in memory. Reuse it here
# instead of re-reading the file; format_on_save is the one config that can
# still make the persisted file diverge because write_file only returns a checksum.
content = written_content
result = result.model_copy(update={"content": content})

logger.info(
Expand Down
90 changes: 70 additions & 20 deletions src/basic_memory/services/entity_service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Service for managing entities in the database."""

from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -50,6 +51,15 @@
from basic_memory.utils import build_canonical_permalink


@dataclass(frozen=True)
class EntityWriteResult:
"""Persisted entity plus the response/search content produced during this call."""

entity: EntityModel
content: str
search_content: str


class EntityService(BaseService[EntityModel]):
"""Service for managing entities in the database."""

Expand Down Expand Up @@ -79,7 +89,7 @@ def __init__(

async def detect_file_path_conflicts(
self, file_path: str, skip_check: bool = False
) -> List[Entity]:
) -> List[str]:
"""Detect potential file path conflicts for a given file path.

This checks for entities with similar file paths that might cause conflicts:
Expand All @@ -93,28 +103,19 @@ async def detect_file_path_conflicts(
skip_check: If True, skip the check and return empty list (optimization for bulk operations)

Returns:
List of entities that might conflict with the given file path
List of file paths that might conflict with the given file path
"""
if skip_check:
return []

from basic_memory.utils import detect_potential_file_conflicts

conflicts = []

# Get all existing file paths
all_entities = await self.repository.find_all()
existing_paths = [entity.file_path for entity in all_entities]
# Load only file paths. Conflict detection is on the hot write path and
# does not need observations or relations.
existing_paths = await self.repository.get_all_file_paths()

# Use the enhanced conflict detection utility
conflicting_paths = detect_potential_file_conflicts(file_path, existing_paths)

# Find the entities corresponding to conflicting paths
for entity in all_entities:
if entity.file_path in conflicting_paths:
conflicts.append(entity)

return conflicts
return detect_potential_file_conflicts(file_path, existing_paths)

async def resolve_permalink(
self,
Expand Down Expand Up @@ -143,8 +144,7 @@ async def resolve_permalink(
)
if conflicts:
logger.warning(
f"Detected potential file path conflicts for '{file_path_str}': "
f"{[entity.file_path for entity in conflicts]}"
f"Detected potential file path conflicts for '{file_path_str}': {conflicts}"
)

# If markdown has explicit permalink, try to validate it
Expand Down Expand Up @@ -255,6 +255,10 @@ async def create_or_update_entity(self, schema: EntitySchema) -> Tuple[EntityMod

async def create_entity(self, schema: EntitySchema) -> EntityModel:
"""Create a new entity and write to filesystem."""
return (await self.create_entity_with_content(schema)).entity

async def create_entity_with_content(self, schema: EntitySchema) -> EntityWriteResult:
"""Create a new entity and return both the entity row and written markdown."""
logger.debug(f"Creating entity: {schema.title}")

# Get file path and ensure it's a Path object
Expand Down Expand Up @@ -328,10 +332,23 @@ async def create_entity(self, schema: EntitySchema) -> EntityModel:
action="create",
phase="update_checksum",
):
return await self.repository.update(entity.id, {"checksum": checksum})
updated = await self.repository.update(entity.id, {"checksum": checksum})
if not updated: # pragma: no cover
raise ValueError(f"Failed to update entity checksum after create: {entity.id}")
return EntityWriteResult(
entity=updated,
content=final_content,
search_content=remove_frontmatter(final_content),
)

async def update_entity(self, entity: EntityModel, schema: EntitySchema) -> EntityModel:
"""Update an entity's content and metadata."""
return (await self.update_entity_with_content(entity, schema)).entity

async def update_entity_with_content(
self, entity: EntityModel, schema: EntitySchema
) -> EntityWriteResult:
"""Update an entity and return both the entity row and written markdown."""
logger.debug(
f"Updating entity with permalink: {entity.permalink} content-type: {schema.content_type}"
)
Expand Down Expand Up @@ -444,8 +461,14 @@ async def update_entity(self, entity: EntityModel, schema: EntitySchema) -> Enti
phase="update_checksum",
):
entity = await self.repository.update(entity.id, {"checksum": checksum})
if not entity: # pragma: no cover
raise ValueError(f"Failed to update entity checksum after update: {file_path}")

return entity
return EntityWriteResult(
entity=entity,
content=final_content,
search_content=remove_frontmatter(final_content),
)

async def fast_write_entity(
self,
Expand Down Expand Up @@ -988,6 +1011,27 @@ async def edit_entity(
EntityNotFoundError: If the entity cannot be found
ValueError: If required parameters are missing for the operation or replacement count doesn't match expected
"""
return (
await self.edit_entity_with_content(
identifier=identifier,
operation=operation,
content=content,
section=section,
find_text=find_text,
expected_replacements=expected_replacements,
)
).entity

async def edit_entity_with_content(
self,
identifier: str,
operation: str,
content: str,
section: Optional[str] = None,
find_text: Optional[str] = None,
expected_replacements: int = 1,
) -> EntityWriteResult:
"""Edit an entity and return both the entity row and written markdown."""
logger.debug(f"Editing entity: {identifier}, operation: {operation}")

with telemetry.scope(
Expand Down Expand Up @@ -1055,8 +1099,14 @@ async def edit_entity(
phase="update_checksum",
):
entity = await self.repository.update(entity.id, {"checksum": checksum})
if not entity: # pragma: no cover
raise ValueError(f"Failed to update entity checksum after edit: {file_path}")

return entity
return EntityWriteResult(
entity=entity,
content=new_content,
search_content=remove_frontmatter(new_content),
)

def apply_edit_operation(
self,
Expand Down
5 changes: 1 addition & 4 deletions test-int/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,7 @@ async def _reset_postgres_integration_schema(engine) -> None:
await conn.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE"))

await conn.execute(
text(
f"TRUNCATE TABLE {', '.join(_postgres_reset_tables())} "
"RESTART IDENTITY CASCADE"
)
text(f"TRUNCATE TABLE {', '.join(_postgres_reset_tables())} RESTART IDENTITY CASCADE")
)


Expand Down
6 changes: 6 additions & 0 deletions tests/api/v2/test_knowledge_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ async def test_update_entity_by_id(
response = await client.put(
f"{v2_project_url}/knowledge/entities/{original_external_id}",
json=update_data,
params={"fast": False},
)

assert response.status_code == 200
Expand All @@ -363,6 +364,8 @@ async def test_update_entity_by_id(
# V2 update must return external_id field
assert updated_entity.external_id is not None
assert updated_entity.api_version == "v2"
assert updated_entity.content is not None
assert "Updated content via V2" in updated_entity.content

# Verify file was updated
file_path = file_service.get_entity_path(updated_entity)
Expand Down Expand Up @@ -532,6 +535,7 @@ async def test_edit_entity_by_id_append(
response = await client.patch(
f"{v2_project_url}/knowledge/entities/{original_external_id}",
json=edit_data,
params={"fast": False},
)

assert response.status_code == 200
Expand All @@ -540,6 +544,8 @@ async def test_edit_entity_by_id_append(
# V2 patch must return external_id field
assert edited_entity.external_id is not None
assert edited_entity.api_version == "v2"
assert edited_entity.content is not None
assert "Appended content" in edited_entity.content

# Verify file has both original and appended content
file_path = file_service.get_entity_path(edited_entity)
Expand Down
Loading
Loading